#!/usr/bin/env python # |----------------------------------------------------------------------------- # | This source code is provided under the Apache 2.0 license -- # | and is provided AS IS with no warranty or guarantee of fit for purpose. -- # | See the project's LICENSE.md for details. -- # | Copyright Thomson Reuters 2018. All rights reserved. -- # |----------------------------------------------------------------------------- """ Simple example of authenticating to EDP-GW and using the token to login and retrieve MarketPrice content. A username and password are used to retrieve this token. """ import os import argparse import copy import datetime as dt import json import logging import socket import sys import threading import time import numpy as np import requests import websocket from app.src.python.systematicTrading.config import setup_config from app.src.python.systematicTrading.tora import MarketDataService from daily_processes.product_to_symbol_mapping import ProductToSymbolMapping as psm from tick_sandbox import tick_data_kickstarter as tdk from app.src.python.systematicTrading.marketData.futureEOD import FutureEOD logger = logging.getLogger("EDP_Client") # Global Default Variables app_id = '256' auth_port = '443' position = '' sts_token = '' refresh_token = '' port = '443' client_secret = '' scope = 'trapi' web_socket_app = None web_socket_open = False logged_in = False # Global Variables default_view = ['BID', 'ASK', 'ACVOL_1', 'HST_CLOSE', 'EXPIR_DATE'] class EDPSnap: def __init__(self, tickers, mid_prices, close_prices, session_volumes, expiry_dates): self.tickers = tickers self.mid_prices = mid_prices self.close_prices = close_prices self.session_volumes = session_volumes self.expiry_dates = expiry_dates class EDP_client(): def __init__(self, streaming, root_path, data_path, metadata_path, appConfig): self._root_path = root_path self._data_path = data_path self._metadata_path = metadata_path self._appConfig = appConfig self._dict_subscribed_map = psm.get_product_symbol_mappings(self._metadata_path)['dict_prod_edp'] self._trading_universe = tdk.get_trading_universe() self._time = dt.datetime.now() self._streaming = streaming self._mid_prices = {} self._bid_prices = {} self._ask_prices = {} self._close_prices = {} self._session_volumes = {} self._expiry_dates = {} self._list_edp = [] for key in self._dict_subscribed_map: ric = self._dict_subscribed_map[key] self._list_edp.append(ric) self._mid_prices[ric] = 0.0 self._bid_prices[ric] = 0.0 self._ask_prices[ric] = 0.0 self._close_prices[ric] = 0.0 self._session_volumes[ric] = 0.0 self._expiry_dates[ric] = '' self._no_callback_rics = self._list_edp.copy() self.lock = threading.RLock() def snapAll(self): with self.lock: tickers = copy.deepcopy(self._list_edp) print(tickers) mid_prices = copy.deepcopy(self._mid_prices) close_prices = copy.deepcopy(self._close_prices) session_volumes = copy.deepcopy(self._session_volumes) expiry_dates = copy.deepcopy(self._expiry_dates) response = EDPSnap(tickers, mid_prices, close_prices, session_volumes, expiry_dates) return response def process_message(self, message_json): """ Parse at high level and output JSON of message """ message_type = message_json['Type'] if message_type in {"Refresh", "Update"}: if 'Domain' in message_json: message_domain = message_json['Domain'] if message_domain == "Login": self.process_login_response(message_json) # contains market data elif 'Key' in message_json: ric = message_json['Key']['Name'] if 'Fields' in message_json: with self.lock: if 'BID' in message_json['Fields']: self._bid_prices[ric] = message_json['Fields']['BID'] if 'ASK' in message_json['Fields']: self._ask_prices[ric] = message_json['Fields']['ASK'] self._mid_prices[ric] = (self._bid_prices[ric] + self._ask_prices[ric]) /2.0 if 'ACVOL_1' in message_json['Fields']: self._session_volumes[ric] = message_json['Fields']['ACVOL_1'] if 'HST_CLOSE' in message_json['Fields']: self._close_prices[ric] = message_json['Fields']['HST_CLOSE'] if 'EXPIR_DATE' in message_json['Fields']: self._expiry_dates[ric] = message_json['Fields']['EXPIR_DATE'] if dt.datetime.now() > self._time + dt.timedelta(seconds=120): logger.info("Market updates:") for key in sorted(self._mid_prices): logger.info("{0}: midPrice={1:0.5f}, closePrice={2:0.5f}, sessionVolume={3}, expiry={4}".format(key, self._mid_prices[key], self._close_prices[key], self._session_volumes[key], self._expiry_dates[key])) self._time = dt.datetime.now() if ric in self._no_callback_rics: self._no_callback_rics.remove(ric) self.check_overnight_return(ric) logger.info("still waiting callbacks for: ") logger.info(self._no_callback_rics) elif message_type == "Ping": pong_json = {'Type': 'Pong'} web_socket_app.send(json.dumps(pong_json)) # print("SENT:") # print(json.dumps(pong_json, sort_keys=True, indent=2, separators=(',', ':'))) def process_login_response(self, message_json): """ Send item request """ global logged_in if message_json['State']['Stream'] != "Open" or message_json['State']['Data'] != "Ok": logger.error("Login failed.") sys.exit(1) logged_in = True self.send_market_price_request(default_view) def send_market_price_request(self, default_view): """ Create and send simple Market Price request """ """ view contains the fields you want to subscribe to 22: BID, 30: BIDSIZE 25: ASK: 31: ASKSIZE 21: HST_CLOSE, 6: TRDPRC_1, 32: ACVOL_1 view = [22, 25, 30, 31] """ mp_req_json = { 'ID': 2, 'Key': { 'Name': self._list_edp, }, 'View': default_view, 'Streaming': self._streaming } web_socket_app.send(json.dumps(mp_req_json)) print("SENT:") print(json.dumps(mp_req_json, sort_keys=True, indent=2, separators=(',', ':'))) def send_login_request(self, auth_token, is_refresh_token): """ Send login request with authentication token. Used both for the initial login and subsequent reissues to update the authentication token """ login_json = { 'ID': 1, 'Domain': 'Login', 'Key': { 'NameType': 'AuthnToken', 'Elements': { 'ApplicationId': '', 'Position': '', 'AuthenticationToken': '' } } } login_json['Key']['Elements']['ApplicationId'] = app_id login_json['Key']['Elements']['Position'] = position login_json['Key']['Elements']['AuthenticationToken'] = auth_token # If the token is a refresh token, this is not our first login attempt. if is_refresh_token: login_json['Refresh'] = False web_socket_app.send(json.dumps(login_json)) # print("SENT:") # print(json.dumps(login_json, sort_keys=True, indent=2, separators=(',', ':'))) def on_message(self, message): """ Called when message received, parse message into JSON for processing """ print("RECEIVED: ") message_json = json.loads(message) logger.info(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':'))) for singleMsg in message_json: self.process_message(singleMsg) def on_error(self, error): """ Called when websocket error has occurred """ print(error) logger.error(error) def on_close(self): """ Called when websocket is closed """ global web_socket_open web_socket_open = False logger.info("WebSocket Closed") """ Altering the state of GLOBAL logged_in """ global logged_in logged_in = False logger.info("Value of logged_in in on_close is: {0}".format(logged_in)) def on_open(self): """ Called when handshake is complete and websocket is open, send login """ logger.info("WebSocket successfully connected!") global web_socket_open web_socket_open = True self.send_login_request(sts_token, False) def get_sts_token(self, current_refresh_token): """ Retrieves an authentication token. :param current_refresh_token: Refresh token retrieved from a previous authentication, used to retrieve a subsequent access token. If not provided (i.e. on the initial authentication), the password is used. """ url = 'https://{}:{}/{}'.format(auth_hostname, auth_port, auth_path) print(url) if not current_refresh_token: # First time through, send password data = {'username': user, 'password': password, 'grant_type': 'password', 'takeExclusiveSignOnControl': True, 'scope': scope} print("Sending authentication request with password to ", url, "...") else: # Use the given refresh token data = {'username': user, 'refresh_token': current_refresh_token, 'grant_type': 'refresh_token', 'takeExclusiveSignOnControl': True} print("Sending authentication request with refresh token to ", url, "...") #data = {'username': user, 'password': password, 'grant_type': 'password', # 'takeExclusiveSignOnControl': True, # 'scope': scope} # logger.info("NOT Sending authentication request as per email from support") try: r = requests.post(url, headers={'Accept': 'application/json'}, data=data, auth=(user, client_secret), verify=True) except requests.exceptions.RequestException as e: logger.error('EDP-GW authentication exception failure:', e) return None, None, None if r.status_code != 200: print('EDP-GW authentication result failure:', r.status_code, r.reason) print('Text: ', r.text) if current_refresh_token: # Refresh token may have expired. Try using our password. return self.get_sts_token(None) return None, None, None auth_json = r.json() logger.info("EDP-GW Authentication succeeded. RECEIVED:") logger.info(json.dumps(auth_json, sort_keys=True, indent=2, separators=(',', ':'))) logger.info("still waiting callbacks on {0}".format(self._no_callback_rics)) # logger.info("Value of access_token is: {0}".format(auth_json['access_token'])) # logger.info("Value of refresh_token is: {0}".format(auth_json['refresh_token'])) # logger.info("Value of expires_in is: {0}".format(auth_json['expires_in'])) return auth_json['access_token'], auth_json['refresh_token'], auth_json['expires_in'] # given an update on a ric, retrieve the associated eod last price to check price consistency btw tickdata and reuters def check_overnight_return(self, ric): try: prod = list(self._dict_subscribed_map.keys())[list(self._dict_subscribed_map.values()).index(ric)] current_prod = self._trading_universe.loc[self._trading_universe['product'] == prod].to_dict('records')[0] if current_prod['securityType'] == 'FUTURE': fut = FutureEOD(current_prod, from_TickData=False, root_path=self._root_path, data_path=self._data_path) eod_price = fut.unadj_close_prices[-1:].values[0] reuters_price = current_prod['reutersMultiplier'] * self._mid_prices[ric] ret = 100 * (reuters_price/ eod_price - 1) logger.info("{0}: eod_price = {1}, reuters_price = {2}, overnight return = {3}".format(ric, eod_price, reuters_price, np.round(ret))) if np.abs(ret) > 10: logger.error("{0}: WEIRD overnight return = {1}, check multipliers".format(ric, ret)) except: logger.error("could not find any product associated to {0}".format(ric)) def set_edp_start_and_end_times() -> dict: _today = dt.date.today() _now = dt.datetime.now() _utc_now = dt.datetime.utcnow() # utc_offset = utc_now - now _start_str = _today.strftime("%Y-%m-%d ") + mds_start_time _start_time = dt.datetime.strptime(_start_str, "%Y-%m-%d %H:%M:%S") # if we start after start_time then for us it is a new day if _now >= _start_time: _today += dt.timedelta(days=1) _end_str = _today.strftime("%Y-%m-%d ") + mds_end_time _end_time_utc = dt.datetime.strptime(_end_str, "%Y-%m-%d %H:%M:%S") return {'utc_now': _utc_now, 'end_time_utc': _end_time_utc} if __name__ == "__main__": parser = argparse.ArgumentParser(description='Z3 EDP Market data server') parser.add_argument('config', nargs='+', help='Configuration files (like globalConfig.json)') parser.add_argument("--env", nargs=1, choices=['home', 'uat', 'prod'], required=True, help="Environment name (home, uat, prod)") args = parser.parse_args() appConfig = setup_config.load_configs(args.config) env = args.env[0] root_path = None data_path = None metadata_path = None hostname = appConfig.get_string("reuters.uat.hostname") auth_hostname = appConfig.get_string("reuters.auth_hostname") auth_path = appConfig.get_string("reuters.auth_path") str = appConfig.get_string("reuters.streaming") streaming = False if str == 'True': streaming = True user = appConfig.get_string("reuters.prod.user") password = appConfig.get_string("reuters.prod.password") if env.lower() == 'home': root_path = appConfig.get_string("home_path") data_path = appConfig.get_string("uat_data_path") metadata_path = appConfig.get_string("uat_metadata_path") elif env.lower() == 'uat': root_path = appConfig.get_string("uat_path") data_path = appConfig.get_string("uat_data_path") metadata_path = appConfig.get_string("uat_metadata_path") hostname = appConfig.get_string("reuters.uat.hostname") user = appConfig.get_string("reuters.uat.user") password = appConfig.get_string("reuters.uat.password") elif env.lower() == 'prod': root_path = appConfig.get_string("prod_path") data_path = appConfig.get_string("prod_data_path") metadata_path = appConfig.get_string("prod_metadata_path") hostname = appConfig.get_string("reuters.prod.hostname") user = appConfig.get_string("reuters.prod.user") password = appConfig.get_string("reuters.prod.password") else: setup_config.setup_logging() logger.error( "could not recognise environment. exiting. env should be {home, uat, prod}") sys.exit(2) if env is None: setup_config.setup_logging() logger.error("could not recognise environment. exiting. env should be {home, uat, prod}") sys.exit(2) setup_config.setup_logging(environment=env) # schedule to run till a certain time of the day mds_start_time = appConfig.get_string("edp_client.start_time_utc") mds_end_time = appConfig.get_string("edp_client.end_time_utc") dict_edp_times = set_edp_start_and_end_times() logger.info("EDP STARTS (UTC) @{0} AND ENDS @{1}".format(dict_edp_times['utc_now'], dict_edp_times['end_time_utc'])) if position == '': # Populate position if possible try: position_host = socket.gethostname() position = socket.gethostbyname(position_host) + "/" + position_host except socket.gaierror: position = "127.0.0.1/net" this_client = EDP_client(streaming=streaming, root_path=root_path, data_path=data_path, metadata_path=metadata_path, appConfig=appConfig) sts_token, refresh_token, expire_time = this_client.get_sts_token(None) if not sts_token: logger.info("AM I entering not sts_token OUTSIDE while loop: {0}".format(sts_token)) sys.exit(1) # Start websocket handshake ws_address = "wss://{}:{}/WebSocket".format(hostname, port) logger.info("Connecting to WebSocket " + ws_address + " ...") web_socket_app = websocket.WebSocketApp(ws_address, on_message=this_client.on_message, on_error=this_client.on_error, on_close=this_client.on_close, subprotocols=['tr_json2']) web_socket_app.on_open = this_client.on_open # Event loop wst = threading.Thread(target=web_socket_app.run_forever, kwargs={'sslopt': {'check_hostname': False}}) wst.start() marketDataServiceConfig = appConfig.get('edp_client') mdServer = MarketDataService.serve(this_client, marketDataServiceConfig) try: while True: if dt.datetime.utcnow() > dict_edp_times['end_time_utc']: logger.info("MDS OVER: SHUTTING DOWN MARKET DATA SERVICE") web_socket_app.close() sys.exit(1) # Give 30 seconds to obtain the new security token and send reissue if int(expire_time) > 30: time.sleep(int(expire_time) - 30) else: # Fail the refresh since value too small sys.exit(1) sts_token, refresh_token, expire_time = this_client.get_sts_token(refresh_token) if not sts_token: logger.info("AM I entering not sts_token in while loop: {0}".format(sts_token)) sys.exit(1) """ FORCE CLOSING WEB_SOCKET for TESTING """ web_socket_app.close() logger.info("Explicitly closing connection: {0}".format(logged_in)) """ END OF: FORCE CLOSING WEB_SOCKET for TESTING """ # Update token. # Why do i need to request this again? if logged_in: this_client.send_login_request(sts_token, True) logger.info("Value of logged_in is: {0}".format(logged_in)) else: logger.info("I am in the else part, where logger_in = False") try: logger.info("TRY is being used") sts_token, refresh_token, expire_time = this_client.get_sts_token(None) logger.info("Value of sts_token in logged_in False is: {0}".format(sts_token)) del web_socket_app web_socket_app = None time.sleep(10) # RETRY dict_edp_times = set_edp_start_and_end_times() logger.info("AGAIN:: EDP STARTS (UTC) @{0} AND ENDS @{1}".format(dict_edp_times['utc_now'], dict_edp_times['end_time_utc'])) if position == '': # Populate position if possible try: position_host = socket.gethostname() position = socket.gethostbyname(position_host) + "/" + position_host except socket.gaierror: position = "127.0.0.1/net" this_client = EDP_client(streaming=streaming, root_path=root_path, data_path=data_path, metadata_path=metadata_path, appConfig=appConfig) sts_token, refresh_token, expire_time = this_client.get_sts_token(None) if not sts_token: logger.info("AM I entering not sts_token IN TRY IN WHILE loop: {0}".format(sts_token)) sys.exit(1) logger.info("Value of sts_token in logged_in False is: {0}".format(sts_token)) # Start websocket handshake ws_address = "wss://{}:{}/WebSocket".format(hostname, port) logger.info("Connecting to WebSocket " + ws_address + " ...") web_socket_app = websocket.WebSocketApp(ws_address, on_message=this_client.on_message, on_error=this_client.on_error, on_close=this_client.on_close, subprotocols=['tr_json2']) web_socket_app.on_open = this_client.on_open # Event loop wst = threading.Thread(target=web_socket_app.run_forever, kwargs={'sslopt': {'check_hostname': False}}) wst.start() marketDataServiceConfig = appConfig.get('edp_client') mdServer = MarketDataService.serve(this_client, marketDataServiceConfig) this_client.send_login_request(sts_token, True) except websocket._exceptions.WebSocketConnectionClosedException as e: logger.info("EXCEPTION is being activated.: {0}".format(e)) logger.info("EXCEPTION, os information: {0}".format(os.sys.exc_info()[0:2])) del web_socket_app web_socket_app = None time.sleep(10) # RETRY dict_edp_times = set_edp_start_and_end_times() logger.info("AGAIN:: EDP STARTS (UTC) @{0} AND ENDS @{1}".format(dict_edp_times['utc_now'], dict_edp_times['end_time_utc'])) if position == '': # Populate position if possible try: position_host = socket.gethostname() position = socket.gethostbyname(position_host) + "/" + position_host except socket.gaierror: position = "127.0.0.1/net" this_client = EDP_client(streaming=streaming, root_path=root_path, data_path=data_path, metadata_path=metadata_path, appConfig=appConfig) sts_token, refresh_token, expire_time = this_client.get_sts_token(None) if not sts_token: logger.info("AM I entering not sts_token IN EXCEPTION IN WHILE loop: {0}".format(sts_token)) sys.exit(1) logger.info("Value of sts_token in logged_in False is: {0}".format(sts_token)) # Start websocket handshake ws_address = "wss://{}:{}/WebSocket".format(hostname, port) logger.info("Connecting to WebSocket " + ws_address + " ...") web_socket_app = websocket.WebSocketApp(ws_address, on_message=this_client.on_message, on_error=this_client.on_error, on_close=this_client.on_close, subprotocols=['tr_json2']) web_socket_app.on_open = this_client.on_open # Event loop wst = threading.Thread(target=web_socket_app.run_forever, kwargs={'sslopt': {'check_hostname': False}}) wst.start() marketDataServiceConfig = appConfig.get('edp_client') mdServer = MarketDataService.serve(this_client, marketDataServiceConfig) this_client.send_login_request(sts_token, True) except KeyboardInterrupt: web_socket_app.close()