Unable to post to ATS service using on-stream example code

I am using the example code that came with the documentation for ADS 3.2.3 . I want to poke a value into the ATS service for a test instrument. The refinitiv example code came with a posting method. However when I run it I am given the following message
"NakCode":"DeniedBySrc",
"Text":"[400]: Invalid Request",
"Type":"Ack"
The code is almost identical except the Service is our service name for ATS and the instrument is not TRI.N
Changed
'Key': {
'Name': 'TRI.N',
< Can you really contribute and poke a value to TRI.N ??? >
To
'Key': {
'Service': 'DTS',
'Name': '.AAAATEST'
},
Best Answer
-
Hello @graham.street
The "NakCode":"DeniedBySrc", "Text":"[400]: Invalid Request", error messages are sent from the TREP and ATS servers. The "[400]:Invalid Request" message is generated from the ATS server. This error description is "The format is incorrect so the contribution cannot be processed.".
I did a quick test with ADS 3.2.3's market_price_posting.py example and applied your send_market_price_post() function. The example application can request data from my ATS version 1.4 and can On-Stream post back to my ATS successfully (please see attach).post_result.txt
What is the version of your ATS, ADS and ADH servers?
Can you On-Stream post to the same ATS with other Elektron SDK or TREP APIs?
I did a quick check on ADH document and found the following ADH 3.x behavior regarding the *adh*routename.route*servicename*forwardOnStreamPostMsgKey configuration. Could you please check if your ADH has configured this parameter to True?
0
Answers
-
Hello @graham.street
Does the item '.AAAATEST' already available in your ATS server? Basically, the On-Stream post needs to subscribe that item from the same service first.
Could you please share your entire JSON messages? (Please remove your infrastructure IP and password).
You can find more detail regarding how to use the Elektron WebSocket API with ATS from the following articles:
0 -
Yes the instrument .AAAATEST exists.
My colleague wrote an off-stream program in python that was able to poke in data to that service and instrument..
The code is pasted below and you will see that it is just the refinitiv example code with the service added and the instrument name changed, Incidentally I changed the service name to one of our ADH cached services and the instrument to one that existed in that ADH cache. The code worked fine for that.
#!/usr/bin/env python
""" Simple example of posting Market Price JSON data using Websockets """import sys
import time
import getopt
import socket
import json
import websocket
import threading
import os
from threading import Thread, Event# Global Default Variables
hostname = 'xxxxxxxxx'
port = '15000'
user = 'xxxxxxxxxxxxxxx'
app_id = '256'
position = socket.gethostbyname(socket.gethostname())# Global Variables
next_post_time = 0
web_socket_app = None
web_socket_open = False
post_id = 1
def process_message(ws, message_json):
""" Parse at high level and output JSON of message """
message_type = message_json['Type']if message_type == "Refresh":
if 'Domain' in message_json:
message_domain = message_json['Domain']
if message_domain == "Login":
process_login_response(ws, message_json)
elif message_type == "Ping":
pong_json = { 'Type':'Pong' }
ws.send(json.dumps(pong_json))
print("SENT:")
print(json.dumps(pong_json, sort_keys=True, indent=2, separators=(',', ':')))# If our TRI stream is now open, we can start sending posts.
global next_post_time
if ('ID' in message_json and message_json['ID'] == 2 and next_post_time == 0 and
(not 'State' in message_json or message_json['State']['Stream'] == "Open" and message_json['State']['Data'] == "Ok")):
next_post_time = time.time() + 3
def process_login_response(ws, message_json):
""" Send item request """
send_market_price_request(ws)
def send_market_price_request(ws):
""" Create and send simple Market Price request """
mp_req_json = {
'ID': 2,
'Key': {
'Service': 'DTS',
'Name': '.AAAATEST'
},
}
ws.send(json.dumps(mp_req_json))
print("SENT:")
print(json.dumps(mp_req_json, sort_keys=True, indent=2, separators=(',', ':')))def send_market_price_post(ws):
global post_id
""" Send a post message containing market-price content for TRI.N """
print("send_market_price_post")
time.sleep(1)
mp_post_json = {
'ID': 2,
'Type':'Post',
'Domain':'MarketPrice',
'Ack':True,
'PostID':post_id,
'PostUserInfo': {
'Address':position, # Use IP address as the Post User Address.
'UserID':os.getpid() # Use process ID as the Post User Id.
},
'Message': {
'ID': 0,
'Type':'Update',
'Domain':'MarketPrice',
'Fields':{'BID': 44.55,'ASK': 66.77}
}
}time.sleep(1)
ws.send(json.dumps(mp_post_json))
print("Sending this :")
print(json.dumps(mp_post_json))
print("SENT POST:")
time.sleep(1)
print(json.dumps(mp_post_json, sort_keys=True, indent=2, separators=(',', ':')))
print("SENT POSTEND:")
post_id += 1def send_login_request(ws):
""" Generate a login request from command line data (or defaults) and send """
login_json = {
'ID': 1,
'Domain': 'Login',
'Key': {
'Name': '',
'Elements': {
'ApplicationId': '',
'Position': ''
}
}
}login_json['Key']['Name'] = user
login_json['Key']['Elements']['ApplicationId'] = app_id
login_json['Key']['Elements']['Position'] = positionws.send(json.dumps(login_json))
print("SENT:")
print(json.dumps(login_json, sort_keys=True, indent=2, separators=(',', ':')))
def on_message(ws, message):
""" Called when message received, parse message into JSON for processing """
print("RECEIVED: ")
message_json = json.loads(message)
print("on_message")
print(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':')))for singleMsg in message_json:
process_message(ws, singleMsg)
def on_error(ws, error):
""" Called when websocket error has occurred """
print("on_error")
print(error)
def on_close(ws):
""" Called when websocket is closed """
global web_socket_open
print("WebSocket Closed")
web_socket_open = False
def on_open(ws):
""" Called when handshake is complete and websocket is open, send login """
print("on_open")
print("WebSocket successfully connected!")
global web_socket_open
web_socket_open = True
send_login_request(ws)
if __name__ == "__main__":# Get command line parameters
try:
opts, args = getopt.getopt(sys.argv[1:], "", ["help", "hostname=", "port=", "app_id=", "user=", "position="])
except getopt.GetoptError:
print('Usage: market_price.py [--hostname hostname] [--port port] [--app_id app_id] [--user user] [--position position] [--help]')
sys.exit(2)
for opt, arg in opts:
if opt in ("--help"):
print('Usage: market_price.py [--hostname hostname] [--port port] [--app_id app_id] [--user user] [--position position] [--help]')
sys.exit(0)
elif opt in ("--hostname"):
hostname = arg
elif opt in ("--port"):
port = arg
elif opt in ("--app_id"):
app_id = arg
elif opt in ("--user"):
user = arg
elif opt in ("--position"):
position = arg# Start websocket handshake
ws_address = "ws://{}:{}/WebSocket".format(hostname, port)
print("Connecting to WebSocket " + ws_address + " ...")
web_socket_app = websocket.WebSocketApp(ws_address, header=['User-Agent: Python'],
on_message=on_message,
on_error=on_error,
on_close=on_close,
subprotocols=['tr_json2'])
web_socket_app.on_open = on_open# Event loop
wst = threading.Thread(target=web_socket_app.run_forever)
wst.start()
print("just started wst")
try:
while True:
time.sleep(1)
print("+++++++++++++++++++++++++++++")
if next_post_time != 0 and time.time() > next_post_time:
send_market_price_post(web_socket_app)
next_post_time = time.time() + 3
except KeyboardInterrupt:
web_socket_app.close()0 -
I added the line *adh*routename.route*servicename*forwardOnStreamPostMsgKey to my cnf file, restarted the adhs and the code can now push data to the ATS service. I am using ADH 3.2.3 but this line is only documented from 3.3 onwards it seems. Well done Wasin for solving this problem.
0
Categories
- All Categories
- 6 AHS
- 36 Alpha
- 166 App Studio
- 6 Block Chain
- 4 Bot Platform
- 18 Connected Risk APIs
- 47 Data Fusion
- 34 Data Model Discovery
- 684 Datastream
- 1.4K DSS
- 613 Eikon COM
- 5.2K Eikon Data APIs
- 10 Electronic Trading
- Generic FIX
- 7 Local Bank Node API
- 3 Trading API
- 2.9K Elektron
- 1.4K EMA
- 248 ETA
- 552 WebSocket API
- 37 FX Venues
- 14 FX Market Data
- 1 FX Post Trade
- 1 FX Trading - Matching
- 12 FX Trading – RFQ Maker
- 5 Intelligent Tagging
- 2 Legal One
- 23 Messenger Bot
- 3 Messenger Side by Side
- 9 ONESOURCE
- 7 Indirect Tax
- 60 Open Calais
- 275 Open PermID
- 44 Entity Search
- 2 Org ID
- 1 PAM
- PAM - Logging
- 6 Product Insight
- Project Tracking
- ProView
- ProView Internal
- 22 RDMS
- 1.9K Refinitiv Data Platform
- 630 Refinitiv Data Platform Libraries
- 4 LSEG Due Diligence
- LSEG Due Diligence Portal API
- 4 Refinitiv Due Dilligence Centre
- Rose's Space
- 1.2K Screening
- 18 Qual-ID API
- 13 Screening Deployed
- 23 Screening Online
- 12 World-Check Customer Risk Screener
- 1K World-Check One
- 46 World-Check One Zero Footprint
- 45 Side by Side Integration API
- 2 Test Space
- 3 Thomson One Smart
- 10 TR Knowledge Graph
- 151 Transactions
- 143 REDI API
- 1.8K TREP APIs
- 4 CAT
- 26 DACS Station
- 121 Open DACS
- 1.1K RFA
- 104 UPA
- 191 TREP Infrastructure
- 228 TRKD
- 915 TRTH
- 5 Velocity Analytics
- 9 Wealth Management Web Services
- 86 Workspace SDK
- 11 Element Framework
- 5 Grid
- 18 World-Check Data File
- 1 Yield Book Analytics
- 46 中文论坛