#!/usr/bin/env python # coding: utf-8 # In[11]: # #uncomment if you do not have requests and websocket-client (version 0.49 and above) installed\n # #Install requests and websocket-client packages in a current Jupyter kernal\n import sys # !{sys.executable} -m pip install requests # !{sys.executable} -m pip install websocket-client import time import getopt import socket import json import websocket import threading from threading import Thread, Event import base64 import zlib import requests # In[12]: # EDP Credentials user = '' password = '' client_id = '' # In[13]: # EDP connection variables token = '' token_refresh = '' token_api_endpoint = 'https://api.refinitiv.com/auth/oauth2/v1/token' #hostname = 'apac-1-t2.streaming-pricing-api.refinitiv.com' hostname = 'ap-southeast-1-aws-1-sm.optimized-pricing-api.refinitiv.net' port = '443' app_id = '256' position = socket.gethostbyname(socket.gethostname()) login_id = 1 # In[14]: # WebSocket connections Variables web_socket_app = None web_socket_open = False counter = 0 sleep_for_seconds = 10 # keeps decompress news JSON messaage _news_envelopes = [] _news_messages = [] # In[15]: # EDP Token Retrieval def get_token(): print('Retrieve Token') data = { 'client_id': client_id, 'grant_type': 'password', 'password': password, 'scope': 'trapi', 'takeExclusiveSignOnControl': True, 'username': user, } #print(data) r = requests.post(token_api_endpoint, data=data, verify=True) auth_json = r.json() #print(json.dumps(auth_json, sort_keys=True, indent=2, separators=(',', ':'))) response = { 'token': '', 'error': '', 'refresh': '' } if ('access_token' in auth_json): response['token'] = auth_json['access_token'] response['refresh'] = auth_json['refresh_token'] else: response['error'] = auth_json['error_description'] return response # EDP Token Refresh def refresh_token(refresh): print('Refresh Token') print(refresh) refresh = refresh.encode('ascii') data = { 'client_id': client_id, 'grant_type': 'refresh_token', 'refresh_token': refresh, 'username': user, } #print(data) r = requests.post(token_api_endpoint, data=data, verify=True) auth_json = r.json() print(json.dumps(auth_json, sort_keys=True, indent=2, separators=(',', ':'))) response = { 'token': '', 'error': '', 'refresh': '' } if ('access_token' in auth_json): response['token'] = auth_json['access_token'] response['refresh'] = auth_json['refresh_token'] else: response['error'] = auth_json['error_description'] return response # EDP Login def login(ws, refresh): # Get token if refresh != None: token_result = refresh_token(refresh) else: token_result = get_token() if (token_result['token'] != ''): # Success global token token = token_result['token'] global token_refresh token_refresh = token_result['refresh'] else: # Failed print(token_result['error']) sys.exit(2) if (ws != None): send_login_request(ws, True) # In[16]: # MRN variables mrn_domain = 'NewsTextAnalytics' mrn_item = 'MRN_STORY' def send_mrn_request(ws): """ Create and send MRN request """ mrn_req_json = { 'ID': 2, "Domain": mrn_domain, 'Key': { 'Name': mrn_item } } ws.send(json.dumps(mrn_req_json)) print("SENT:") print(json.dumps(mrn_req_json, sort_keys=True, indent=2, separators=(',', ':'))) # In[17]: # Process FieldList, Refresh and Status messages. def decodeFieldList(fieldList_dict): for key, value in fieldList_dict.items(): print("Name = %s: Value = %s" % (key, value)) def processRefresh(ws, message_json): print("RECEIVED: Refresh Message") decodeFieldList(message_json["Fields"]) def processStatus(ws, message_json): # process incoming status message print("RECEIVED: Status Message") print(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':'))) # In[18]: def processMRNUpdate(ws, message_json): # process incoming News Update messages fields_data = message_json["Fields"] # Dump the FieldList first (for informational purposes) # decodeFieldList(message_json["Fields"]) # declare variables tot_size = 0 guid = None try: # Get data for all requried fields fragment = base64.b64decode(fields_data["FRAGMENT"]) frag_num = int(fields_data["FRAG_NUM"]) guid = fields_data["GUID"] mrn_src = fields_data["MRN_SRC"] #print("GUID = %s" % guid) #print("FRAG_NUM = %d" % frag_num) #print("MRN_SRC = %s" % mrn_src) if frag_num > 1: # We are now processing more than one part of an envelope - retrieve the current details guid_index = next((index for (index, d) in enumerate( _news_envelopes) if d["guid"] == guid), None) envelop = _news_envelopes[guid_index] if envelop and envelop["data"]["mrn_src"] == mrn_src and frag_num == envelop["data"]["frag_num"] + 1: print("process multiple fragments for guid %s" % envelop["guid"]) #print("fragment before merge = %d" % len(envelop["data"]["fragment"])) # Merge incoming data to existing news envelop and getting FRAGMENT and TOT_SIZE data to local variables fragment = envelop["data"]["fragment"] = envelop["data"]["fragment"] + fragment envelop["data"]["frag_num"] = frag_num tot_size = envelop["data"]["tot_size"] print("TOT_SIZE = %d" % tot_size) print("Current FRAGMENT length = %d" % len(fragment)) # The multiple fragments news are not completed, waiting. if tot_size != len(fragment): return None # The multiple fragments news are completed, delete assoiclate GUID envelop elif tot_size == len(fragment): del _news_envelopes[guid_index] else: print("Error: Cannot find fragment for GUID %s with matching FRAG_NUM or MRN_SRC %s" % ( guid, mrn_src)) return None else: # FRAG_NUM = 1 The first fragment tot_size = int(fields_data["TOT_SIZE"]) print("FRAGMENT length = %d" % len(fragment)) # The fragment news is not completed, waiting and add this news data to envelop object. if tot_size != len(fragment): print("Add new fragments to news envelop for guid %s" % guid) _news_envelopes.append({ # the envelop object is a Python dictionary with GUID as a key and other fields are data "guid": guid, "data": { "fragment": fragment, "mrn_src": mrn_src, "frag_num": frag_num, "tot_size": tot_size } }) return None # News Fragment(s) completed, decompress and print data as JSON to console if tot_size == len(fragment): print("decompress News FRAGMENT(s) for GUID %s" % guid) decompressed_data = zlib.decompress(fragment, zlib.MAX_WBITS | 32) json_news = json.loads(decompressed_data) _news_messages.append(json_news) print("News = %s" % json_news) except KeyError as keyerror: print('KeyError exception: ', keyerror) except IndexError as indexerror: print('IndexError exception: ', indexerror) except binascii.Error as b64error: print('base64 decoding exception:', b64error) except zlib.error as error: print('zlib decompressing exception: ', error) # Some console environments like Windows may encounter this unicode display as a limitation of OS except UnicodeEncodeError as encodeerror: print("UnicodeEncodeError exception. Cannot decode unicode character for %s in this enviroment: " % guid, encodeerror) except Exception as e: print('exception: ', sys.exc_info()[0]) # In[19]: def process_message(ws, message_json): """ Parse at high level and output JSON of message """ message_type = message_json['Type'] if message_type == "Refresh": if "Domain" in message_json: message_domain = message_json["Domain"] if message_domain == "Login": process_login_response(ws, message_json) elif message_domain: processRefresh(ws, message_json) elif message_type == "Update": if "Domain" in message_json and message_json["Domain"] == mrn_domain: processMRNUpdate(ws, message_json) elif message_type == "Status": processStatus(ws, message_json) elif message_type == "Ping": pong_json = {'Type': 'Pong'} ws.send(json.dumps(pong_json)) print("SENT:") print(json.dumps(pong_json, sort_keys=True, indent=2, separators=(',', ':'))) def process_login_response(ws, message_json): """ Send item request """ send_mrn_request(ws) def send_login_request(ws, refresh): """ Generate a login request from command line data (or defaults) and send """ login_json = { 'ID': 1, "Domain": 'Login', 'Key': { #'Name': '', 'Elements': { 'ApplicationId': '', 'Position': '' }, 'NameType': 'AuthnToken' } } login_json['Key']['Elements']['ApplicationId'] = app_id login_json['Key']['Elements']['Position'] = position login_json['Key']['Elements']['AuthenticationToken'] = token if (refresh == True): login_json['Refresh'] = False ws.send(json.dumps(login_json)) print("SENT:") print(json.dumps(login_json, sort_keys=True, indent=2, separators=(',', ':'))) # In[20]: def on_message(ws, message): """ Called when message received, parse message into JSON for processing """ print("RECEIVED: ") message_json = json.loads(message) print(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':'))) for singleMsg in message_json: process_message(ws, singleMsg) def on_error(ws, error): """ Called when websocket error has occurred """ print(error) def on_close(ws): """ Called when websocket is closed """ global web_socket_open print("WebSocket Closed") web_socket_open = False def on_open(ws): """ Called when handshake is complete and websocket is open, send login """ print("WebSocket successfully connected!") global web_socket_open web_socket_open = True send_login_request(ws, False) def create_socket(address): login(None, None) # Start websocket handshake print("Connecting to WebSocket " + address + " ...") web_socket_app = websocket.WebSocketApp(address, header=['User-Agent: Python'], on_message=on_message, on_error=on_error, on_close=on_close, subprotocols=['tr_json2']) web_socket_app.on_open = on_open # Event loop wst = threading.Thread(target=web_socket_app.run_forever, kwargs={'sslopt': {'check_hostname': False}}) wst.start() return web_socket_app if __name__ == "__main__": ws_address = "wss://{}:{}/WebSocket".format(hostname, port) try: web_socket_app = create_socket(ws_address) time.sleep(10) while True: if web_socket_open == True: # When connection is opened, check for token refresh time.sleep(sleep_for_seconds) counter = counter + sleep_for_seconds #print('Counter', counter) if (counter % 250 == 0): login(web_socket_app, token_refresh) else: # When connection is closed, reconnect counter = 0 web_socket_app = create_socket(ws_address) except KeyboardInterrupt: web_socket_app.close() # In[ ]: