#!/usr/bin/env python #|----------------------------------------------------------------------------- #| This source code is provided under the Apache 2.0 license -- #| and is provided AS IS with no warranty or guarantee of fit for purpose. -- #| See the project's LICENSE.md for details. -- #| Copyright Thomson Reuters 2018. All rights reserved. -- #|----------------------------------------------------------------------------- """ Simple example of authenticating to EDP-GW and using the token to login and retrieve MarketPrice content. A username and password are used to retrieve this token. """ import sys import time import getopt import requests import socket import json import websocket import threading import logging import zlib import datetime import os import base64 # Global Default Variables app_id = '256' auth_url = 'https://api.refinitiv.com:443/auth/oauth2/beta1/token' hostname = '' password = '' position = '' sts_token = '' refresh_token = '' user = '' clientid = '' port = '443' client_secret = '' scope = 'trapi' # Global Variables web_socket_app = None web_socket_open = False logged_in = False logging.basicConfig(filename='logging.log', filemode='w', level=logging.DEBUG, format='%(asctime)s: %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') RICS = ['AIRF.PA','BASFn.DE','BEIG.DE','BMWG.DE','CAPP.PA','CARR.PA','CONG.DE','EZJ.L','EDF.PA','EURA.PA', 'HMb.ST','IBE.MC','ITX.MC','ISP.MI','AD.AS', 'LEGD.PA','RB.L','REP.MC','SEBF.PA','UHR.S','VLOF.PA', 'BHPB.L','ESLX.PA', 'VNAn.DE'] _news_envelopes = list() if not os.path.isfile('news_streaming.json'): with open('news_streaming.json', 'w') as file: dico = dict() dico['news'] = list() file.write(json.dumps(dico, indent=4)) def processMRNUpdate(evenType, event): # process incoming News Update messages fields_data = event["Fields"] # declare variables tot_size = 0 guid = None # Get data for all requieried fields fragment = base64.b64decode(fields_data["FRAGMENT"]) frag_num = int(fields_data["FRAG_NUM"]) guid = fields_data["GUID"] mrn_src = fields_data["MRN_SRC"] if frag_num > 1: # We are now processing more than one part of an envelope - retrieve the current details guid_index = next((index for (index, d) in enumerate( _news_envelopes) if d["guid"] == guid), None) envelop = _news_envelopes[guid_index] if envelop and envelop["data"]["mrn_src"] == mrn_src and frag_num == envelop["data"]["frag_num"] + 1: logging.info("process multiple fragments for guid %s" % envelop["guid"]) # Merge incoming data to existing news envelop and getting FRAGMENT and TOT_SIZE data to local variables fragment = envelop["data"]["fragment"] = envelop["data"]["fragment"] + fragment envelop["data"]["frag_num"] = frag_num tot_size = envelop["data"]["tot_size"] # logging.info("TOT_SIZE = %d" % tot_size) # logging.info("Current FRAGMENT length = %d" % len(fragment)) # The multiple fragments news are not completed, waiting. if tot_size != len(fragment): return None # The multiple fragments news are completed, delete assoiclate GUID envelop elif tot_size == len(fragment): del _news_envelopes[guid_index] else: # FRAG_NUM = 1 The first fragment tot_size = int(fields_data["TOT_SIZE"]) # logging.info("FRAGMENT length = %d" % len(fragment)) # The fragment news is not completed, waiting and add this news data to envelop object. if tot_size != len(fragment): # logging.info("Add new fragments to news envelop for guid %s" % guid) _news_envelopes.append( { # the envelop object is a Python dictionary with GUID as a key and other fields are data "guid": guid, "data": { "fragment": fragment, "mrn_src": mrn_src, "frag_num": frag_num, "tot_size": tot_size } }) # News Fragment(s) completed, decompress and logging.info data as JSON to console if tot_size == len(fragment): # logging.info("decompress News FRAGMENT(s) for GUID %s" % guid) decompressed_data = zlib.decompress(fragment, zlib.MAX_WBITS | 32).decode('utf-8') # logging.info("News = %s" % json.loads(decompressed_data)) decompressed_data_dico = json.loads(decompressed_data) code_rics = [item[2:] for item in decompressed_data_dico['subjects'] if item.startswith('R:')] code_rics_wanted = [item for item in RICS if item in code_rics] #code_rics_wanted = ["toto","ff"] if decompressed_data_dico['language'] == 'en' and len(code_rics_wanted) > 0: # data_all.append(decompressed_data_dico) logging.info('News !') with open('news_streaming.json', 'r') as file: dico_all = json.loads(file.read()) dico_all['news'].append(decompressed_data_dico) with open('news_streaming.json', 'w') as file: file.write(json.dumps(dico_all, indent=4)) logging.info('File created !') def process_message(message_json): """ Parse at high level and output JSON of message """ message_type = message_json['Type'] if message_type == "Update": processMRNUpdate("Update", message_json) if message_type == "Refresh": if 'Domain' in message_json: message_domain = message_json['Domain'] if message_domain == "Login": process_login_response(message_json) elif message_type == "Ping": pong_json = {'Type': 'Pong'} web_socket_app.send(json.dumps(pong_json)) logging.info("SENT:") logging.info(json.dumps(pong_json, sort_keys=True, indent=2, separators=(',', ':'))) def process_login_response(message_json): """ Send item request """ global logged_in if message_json['State']['Stream'] != "Open" or message_json['State']['Data'] != "Ok": logging.info("Login failed.") sys.exit(1) logged_in = True send_news_request() def send_news_request(): """ Create and send simple Market Price request """ mp_req_json = { 'ID': 2, "Domain": "NewsTextAnalytics", 'Key': { 'Name': "MRN_STORY" } } web_socket_app.send(json.dumps(mp_req_json)) logging.info("SENT:") logging.info(json.dumps(mp_req_json, sort_keys=True, indent=2, separators=(',', ':'))) def send_login_request(auth_token, is_refresh_token): """ Send login request with authentication token. Used both for the initial login and subsequent reissues to update the authentication token """ login_json = { 'ID': 1, 'Domain': 'Login', 'Key': { 'NameType': 'AuthnToken', 'Elements': { 'ApplicationId': '', 'Position': '', 'AuthenticationToken': '' } } } login_json['Key']['Elements']['ApplicationId'] = app_id login_json['Key']['Elements']['Position'] = position login_json['Key']['Elements']['AuthenticationToken'] = auth_token # If the token is a refresh token, this is not our first login attempt. if is_refresh_token: login_json['Refresh'] = False web_socket_app.send(json.dumps(login_json)) logging.info("SENT:") logging.info(json.dumps(login_json, sort_keys=True, indent=2, separators=(',', ':'))) def on_message(_, message): """ Called when message received, parse message into JSON for processing """ logging.info("RECEIVED: ") message_json = json.loads(message) for singleMsg in message_json: process_message(singleMsg) def on_error(_, error): """ Called when websocket error has occurred """ logging.info(error) def on_close(_): """ Called when websocket is closed """ global web_socket_open web_socket_open = False logging.info("WebSocket Closed") def on_open(_): """ Called when handshake is complete and websocket is open, send login """ logging.info("WebSocket successfully connected!") global web_socket_open web_socket_open = True send_login_request(sts_token, False) def get_sts_token(current_refresh_token, url=None): """ Retrieves an authentication token. :param current_refresh_token: Refresh token retrieved from a previous authentication, used to retrieve a subsequent access token. If not provided (i.e. on the initial authentication), the password is used. """ if url is None: url = auth_url if not current_refresh_token: # First time through, send password if url.startswith('https'): data = {'username': user, 'password': password, 'grant_type': 'password', 'takeExclusiveSignOnControl': True, 'scope': scope} else: data = {'username': user, 'password': password, 'client_id': clientid, 'grant_type': 'password', 'takeExclusiveSignOnControl': True, 'scope': scope} logging.info("Sending authentication request with password to "+ str(url)+ "...") else: # Use the given refresh token if url.startswith('https'): data = {'username': user, 'refresh_token': current_refresh_token, 'grant_type': 'refresh_token'} else: data = {'username': user, 'client_id': clientid, 'refresh_token': current_refresh_token, 'grant_type': 'refresh_token'} logging.info("Sending authentication request with refresh token to "+ str(url) + "...") try: if url.startswith('https'): # Request with auth for https protocol r = requests.post(url, headers={'Accept': 'application/json'}, data=data, auth=(clientid, client_secret), verify=True, allow_redirects=False) else: # Request without auth for non https protocol (e.g. http) r = requests.post(url, headers={'Accept': 'application/json'}, data=data, verify=True, allow_redirects=False) except requests.exceptions.RequestException as e: logging.info('EDP-GW authentication exception failure:'+ str(e)) return None, None, None if r.status_code == 200: auth_json = r.json() logging.info("EDP-GW Authentication succeeded. RECEIVED:") logging.info(json.dumps(auth_json, sort_keys=True, indent=2, separators=(',', ':'))) return auth_json['access_token'], auth_json['refresh_token'], auth_json['expires_in'] elif r.status_code == 301 or r.status_code == 302 or r.status_code == 307 or r.status_code == 308: # Perform URL redirect logging.info('EDP-GW authentication HTTP code:', r.status_code, r.reason) new_host = r.headers['Location'] if new_host is not None: logging.info('Perform URL redirect to ', new_host) return get_sts_token(current_refresh_token, new_host) return None, None, None elif r.status_code == 400 or r.status_code == 401: # Retry with username and password logging.info('EDP-GW authentication HTTP code:', r.status_code, r.reason) if current_refresh_token: # Refresh token may have expired. Try using our password. logging.info('Retry with username and password') return get_sts_token(None) return None, None, None elif r.status_code == 403 or r.status_code == 451: # Stop retrying with the request logging.info('EDP-GW authentication HTTP code:', r.status_code, r.reason) logging.info('Stop retrying with the request') return None, None, None else: # Retry the request to the API gateway logging.info('EDP-GW authentication HTTP code:', r.status_code, r.reason) logging.info('Retry the request to the API gateway') return get_sts_token(current_refresh_token) if __name__ == "__main__": hostname = "amer-1.pricing.streaming.edp.thomsonreuters.com" clientid = "***" user = "***" password = "***" if position == '': # Populate position if possible try: position_host = socket.gethostname() position = socket.gethostbyname(position_host) + "/" + position_host except socket.gaierror: position = "127.0.0.1/net" sts_token, refresh_token, expire_time = get_sts_token(None) if not sts_token: sys.exit(1) # Start websocket handshake ws_address = "wss://{}:{}/WebSocket".format(hostname, port) logging.info("Connecting to WebSocket " + ws_address + " ...") web_socket_app = websocket.WebSocketApp(ws_address, on_message=on_message, on_error=on_error, on_close=on_close, subprotocols=['tr_json2']) web_socket_app.on_open = on_open # Event loop import ssl wst = threading.Thread(target=web_socket_app.run_forever, kwargs={'sslopt': {'check_hostname': False,"cert_reqs": ssl.CERT_NONE}}) wst.start() try: while True: # Give 30 seconds to obtain the new security token and send reissue if int(expire_time) > 30: time.sleep(int(expire_time) - 30) else: # Fail the refresh since value too small sys.exit(1) sts_token, refresh_token, expire_time = get_sts_token(refresh_token) if not sts_token: sys.exit(1) # Update token. if logged_in: send_login_request(sts_token, True) except KeyboardInterrupt: web_socket_app.close()