Unable to post to ATS service using on-stream example code

I am using the example code that came with the documentation for ADS 3.2.3 . I want to poke a value into the ATS service for a test instrument. The refinitiv example code came with a posting method. However when I run it I am given the following message

"NakCode":"DeniedBySrc",
"Text":"[400]: Invalid Request",
"Type":"Ack"


The code is almost identical except the Service is our service name for ATS and the instrument is not TRI.N

Changed

'Key': {
'Name': 'TRI.N',

< Can you really contribute and poke a value to TRI.N ??? >

To

'Key': {
'Service': 'DTS',
'Name': '.AAAATEST'
},


Best Answer

  • wasin.w
    wasin.w admin
    Answer ✓

    Hello @graham.street

    The "NakCode":"DeniedBySrc", "Text":"[400]: Invalid Request", error messages are sent from the TREP and ATS servers. The "[400]:Invalid Request" message is generated from the ATS server. This error description is "The format is incorrect so the contribution cannot be processed.".


    I did a quick test with ADS 3.2.3's market_price_posting.py example and applied your send_market_price_post() function. The example application can request data from my ATS version 1.4 and can On-Stream post back to my ATS successfully (please see attach).post_result.txt

    What is the version of your ATS, ADS and ADH servers?

    Can you On-Stream post to the same ATS with other Elektron SDK or TREP APIs?

    I did a quick check on ADH document and found the following ADH 3.x behavior regarding the *adh*routename.route*servicename*forwardOnStreamPostMsgKey configuration. Could you please check if your ADH has configured this parameter to True?

    image




Answers

  • Hello @graham.street

    Does the item '.AAAATEST' already available in your ATS server? Basically, the On-Stream post needs to subscribe that item from the same service first.

    Could you please share your entire JSON messages? (Please remove your infrastructure IP and password).

    You can find more detail regarding how to use the Elektron WebSocket API with ATS from the following articles:

  • Yes the instrument .AAAATEST exists.

    My colleague wrote an off-stream program in python that was able to poke in data to that service and instrument..

    The code is pasted below and you will see that it is just the refinitiv example code with the service added and the instrument name changed, Incidentally I changed the service name to one of our ADH cached services and the instrument to one that existed in that ADH cache. The code worked fine for that.




    #!/usr/bin/env python
    """ Simple example of posting Market Price JSON data using Websockets """

    import sys
    import time
    import getopt
    import socket
    import json
    import websocket
    import threading
    import os
    from threading import Thread, Event

    # Global Default Variables
    hostname = 'xxxxxxxxx'
    port = '15000'
    user = 'xxxxxxxxxxxxxxx'
    app_id = '256'
    position = socket.gethostbyname(socket.gethostname())

    # Global Variables
    next_post_time = 0
    web_socket_app = None
    web_socket_open = False
    post_id = 1


    def process_message(ws, message_json):
    """ Parse at high level and output JSON of message """
    message_type = message_json['Type']

    if message_type == "Refresh":
    if 'Domain' in message_json:
    message_domain = message_json['Domain']
    if message_domain == "Login":
    process_login_response(ws, message_json)
    elif message_type == "Ping":
    pong_json = { 'Type':'Pong' }
    ws.send(json.dumps(pong_json))
    print("SENT:")
    print(json.dumps(pong_json, sort_keys=True, indent=2, separators=(',', ':')))

    # If our TRI stream is now open, we can start sending posts.
    global next_post_time
    if ('ID' in message_json and message_json['ID'] == 2 and next_post_time == 0 and
    (not 'State' in message_json or message_json['State']['Stream'] == "Open" and message_json['State']['Data'] == "Ok")):
    next_post_time = time.time() + 3


    def process_login_response(ws, message_json):
    """ Send item request """
    send_market_price_request(ws)


    def send_market_price_request(ws):
    """ Create and send simple Market Price request """
    mp_req_json = {
    'ID': 2,
    'Key': {
    'Service': 'DTS',
    'Name': '.AAAATEST'
    },
    }
    ws.send(json.dumps(mp_req_json))
    print("SENT:")
    print(json.dumps(mp_req_json, sort_keys=True, indent=2, separators=(',', ':')))

    def send_market_price_post(ws):
    global post_id
    """ Send a post message containing market-price content for TRI.N """
    print("send_market_price_post")
    time.sleep(1)
    mp_post_json = {
    'ID': 2,
    'Type':'Post',
    'Domain':'MarketPrice',
    'Ack':True,
    'PostID':post_id,
    'PostUserInfo': {
    'Address':position, # Use IP address as the Post User Address.
    'UserID':os.getpid() # Use process ID as the Post User Id.
    },
    'Message': {
    'ID': 0,
    'Type':'Update',
    'Domain':'MarketPrice',
    'Fields':{'BID': 44.55,'ASK': 66.77}
    }
    }

    time.sleep(1)
    ws.send(json.dumps(mp_post_json))
    print("Sending this :")
    print(json.dumps(mp_post_json))
    print("SENT POST:")
    time.sleep(1)
    print(json.dumps(mp_post_json, sort_keys=True, indent=2, separators=(',', ':')))
    print("SENT POSTEND:")
    post_id += 1

    def send_login_request(ws):
    """ Generate a login request from command line data (or defaults) and send """
    login_json = {
    'ID': 1,
    'Domain': 'Login',
    'Key': {
    'Name': '',
    'Elements': {
    'ApplicationId': '',
    'Position': ''
    }
    }
    }

    login_json['Key']['Name'] = user
    login_json['Key']['Elements']['ApplicationId'] = app_id
    login_json['Key']['Elements']['Position'] = position

    ws.send(json.dumps(login_json))
    print("SENT:")
    print(json.dumps(login_json, sort_keys=True, indent=2, separators=(',', ':')))


    def on_message(ws, message):
    """ Called when message received, parse message into JSON for processing """
    print("RECEIVED: ")
    message_json = json.loads(message)
    print("on_message")
    print(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':')))

    for singleMsg in message_json:
    process_message(ws, singleMsg)


    def on_error(ws, error):
    """ Called when websocket error has occurred """
    print("on_error")
    print(error)


    def on_close(ws):
    """ Called when websocket is closed """
    global web_socket_open
    print("WebSocket Closed")
    web_socket_open = False


    def on_open(ws):
    """ Called when handshake is complete and websocket is open, send login """
    print("on_open")
    print("WebSocket successfully connected!")
    global web_socket_open
    web_socket_open = True
    send_login_request(ws)


    if __name__ == "__main__":

    # Get command line parameters
    try:
    opts, args = getopt.getopt(sys.argv[1:], "", ["help", "hostname=", "port=", "app_id=", "user=", "position="])
    except getopt.GetoptError:
    print('Usage: market_price.py [--hostname hostname] [--port port] [--app_id app_id] [--user user] [--position position] [--help]')
    sys.exit(2)
    for opt, arg in opts:
    if opt in ("--help"):
    print('Usage: market_price.py [--hostname hostname] [--port port] [--app_id app_id] [--user user] [--position position] [--help]')
    sys.exit(0)
    elif opt in ("--hostname"):
    hostname = arg
    elif opt in ("--port"):
    port = arg
    elif opt in ("--app_id"):
    app_id = arg
    elif opt in ("--user"):
    user = arg
    elif opt in ("--position"):
    position = arg

    # Start websocket handshake
    ws_address = "ws://{}:{}/WebSocket".format(hostname, port)
    print("Connecting to WebSocket " + ws_address + " ...")
    web_socket_app = websocket.WebSocketApp(ws_address, header=['User-Agent: Python'],
    on_message=on_message,
    on_error=on_error,
    on_close=on_close,
    subprotocols=['tr_json2'])
    web_socket_app.on_open = on_open

    # Event loop
    wst = threading.Thread(target=web_socket_app.run_forever)
    wst.start()
    print("just started wst")
    try:
    while True:
    time.sleep(1)
    print("+++++++++++++++++++++++++++++")
    if next_post_time != 0 and time.time() > next_post_time:
    send_market_price_post(web_socket_app)
    next_post_time = time.time() + 3
    except KeyboardInterrupt:
    web_socket_app.close()

  • I added the line *adh*routename.route*servicename*forwardOnStreamPostMsgKey to my cnf file, restarted the adhs and the code can now push data to the ATS service. I am using ADH 3.2.3 but this line is only documented from 3.3 onwards it seems. Well done Wasin for solving this problem.