import time import socket import json import websocket import threading import os # Global Default Variables hostname = '192.168.5.207' port = '15000' user = 'beacon' app_id = 'beacon' position = socket.gethostbyname(socket.gethostname()) user_id = os.getpid() # Global Variables next_post_time = 0 web_socket_app = None web_socket_open = False post_id = 1 ric_states = {} def process_message(ws, message_json): """ Parse at high level and output JSON of message """ message_type = message_json['Type'] if message_type == "Refresh": if 'Domain' in message_json: message_domain = message_json['Domain'] if message_domain == "Login": process_login_response(ws, message_json) elif message_type == "Ping": pong_json = { 'Type':'Pong' } ws.send(json.dumps(pong_json)) print("SENT:") print(json.dumps(pong_json, sort_keys=True, indent=2, separators=(',', ':'))) # If stream is now open for ric with ID==message_json['ID'], we can start sending posts for that ric. if ('ID' in message_json and message_json['State']['Stream'] == "Open" and message_json['State']['Data'] == "Ok"): ric_states[message_json['ID']]['ready'] = True def process_login_response(ws, message_json): """ Send item request """ for ric_id, ric_state in ric_states.items(): ric = ric_state['ric'] send_market_price_request(ws, ric_id, ric) def send_market_price_request(ws, Id, ric): """ Create and send simple Market Price request """ mp_req_json = { 'ID': Id, 'Key': { 'Name': ric, 'Service': 'DDS' }, } ws.send(json.dumps(mp_req_json)) def send_market_price_post(ws, Id, fields): global post_id mp_post_json = { 'ID': Id, 'Type':'Post', 'Domain':'MarketPrice', 'Ack':True, 'PostID':post_id, 'PostUserInfo': { 'Address':position, # Use IP address as the Post User Address. 'UserID':user_id # Use process ID as the Post User Id. }, 'Message': { 'ID': 0, 'Type':'Update', 'Domain':'MarketPrice', 'Fields':fields } } ws.send(json.dumps(mp_post_json)) print("SENT:") print(json.dumps(mp_post_json, sort_keys=True, indent=2, separators=(',', ':'))) post_id += 1 def send_login_request(ws): """ Generate a login request from command line data (or defaults) and send """ login_json = { 'ID': 1, 'Domain': 'Login', 'Key': { 'Name': '', 'Elements': { 'ApplicationId': '', 'Position': '' } } } login_json['Key']['Name'] = user login_json['Key']['Elements']['ApplicationId'] = app_id login_json['Key']['Elements']['Position'] = position ws.send(json.dumps(login_json)) print("SENT:") print(json.dumps(login_json, sort_keys=True, indent=2, separators=(',', ':'))) def on_message(ws, message): """ Called when message received, parse message into JSON for processing """ print("RECEIVED: ") message_json = json.loads(message) print(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':'))) for singleMsg in message_json: process_message(ws, singleMsg) def on_error(ws, error): """ Called when websocket error has occurred """ print(error) def on_close(ws): """ Called when websocket is closed """ global web_socket_open print("WebSocket Closed") web_socket_open = False def on_open(ws): """ Called when handshake is complete and websocket is open, send login """ print("WebSocket successfully connected!") global web_socket_open web_socket_open = True send_login_request(ws) if __name__ == "__main__": rics = ["6YEAR_COP_SWAP=SBCM"] post_statistics = {} Id = 2 # Id starts from 2, 1 is reserved for login for ric in rics: post_statistics[ric] = 0 ric_states[Id] = {'ready':False, 'ric':ric} Id += 1 # Start websocket handshake ws_address = "ws://{}:{}/WebSocket".format(hostname, port) print("Connecting to WebSocket " + ws_address + " ...") web_socket_app = websocket.WebSocketApp(ws_address, header=['User-Agent: Python'], on_message=on_message, on_error=on_error, on_close=on_close, subprotocols=['tr_json2']) web_socket_app.on_open = on_open # Event loop wst = threading.Thread(target=web_socket_app.run_forever) wst.start() fields = {'FAIR_VALUE':45.55} #fields = {'BID':45.55,'BIDSIZE':18,'ASK':45.57,'ASKSIZE':19} show_statistics_timeline = time.time() + 10 try: while True: time.sleep(10) for ric_id, ric_state in ric_states.items(): ric_ready = ric_state['ready'] ric = ric_state['ric'] if ric_ready: send_market_price_post(web_socket_app, ric_id, fields) post_statistics[ric] += 1 cur_time = time.time() if cur_time > show_statistics_timeline: show_statistics_timeline = cur_time + 10 print(post_statistics) except KeyboardInterrupt: web_socket_app.close()