Hello, I am utilizing the EDP client (in python).
The EDP connection closes in 18 hours after starting.
Hence, I have modified the client to catch this particular expection.
It catches the exception once and handles it correctly. However the second time around, I get the following error:
E0719 14:27:07.008920841 25901 server_chttp2.cc:40] {"created":"@1563546427.008896465","description":"No address added out of total 1 resolved","file":"src/core/ext/transport/chttp2/server/chttp2_server.cc","file_line":348,"referenced_errors":[{"created":"@1563546427.008893095","description":"Unable to configure socket","fd":11,"file":"src/core/lib/iomgr/tcp_server_utils_posix_common.cc","file_line":217,"referenced_errors":[{"created":"@1563546427.008883844","description":"Address already in use","errno":98,"file":"src/core/lib/iomgr/tcp_server_utils_posix_common.cc","file_line":190,"os_error":"Address already in use","syscall":"bind"}]}]}
Traceback (most recent call last):
File "app/src/python/systematicTrading/marketData/reuters/EDP_client_modified.py", line 521, in <module>
this_client.send_login_request(sts_token, True)
File "app/src/python/systematicTrading/marketData/reuters/EDP_client_modified.py", line 212, in send_login_request
web_socket_app.send(json.dumps(login_json))
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_app.py", line 153, in send
if not self.sock or self.sock.send(data, opcode) == 0:
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_core.py", line 253, in send
return self.send_frame(frame)
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_core.py", line 278, in send_frame
l = self._send(data)
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_core.py", line 448, in _send
return send(self.sock, data)
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_socket.py", line 135, in send
raise WebSocketConnectionClosedException("socket is already closed.")
websocket._exceptions.WebSocketConnectionClosedException: socket is already closed.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File ".../EDP_client_modified.py", line 568, in <module>
this_client.send_login_request(sts_token, True)
File "...EDP_client_modified.py", line 212, in send_login_request
web_socket_app.send(json.dumps(login_json))
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_app.py", line 153, in send
if not self.sock or self.sock.send(data, opcode) == 0:
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_core.py", line 253, in send
return self.send_frame(frame)
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_core.py", line 278, in send_frame
l = self._send(data)
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_core.py", line 448, in _send
return send(self.sock, data)
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_socket.py", line 135, in send
raise WebSocketConnectionClosedException("socket is already closed.")
websocket._exceptions.WebSocketConnectionClosedException: socket is already closed.
-----------------------------------------------------
The last message I receive from EDP is:
"State":{
"Code":"Timeout",
"Data":"Suspect",
"Stream":"Closed",
"Text":"TREP authentication token has expired."
},
"Type":"Status"
}
]
Connection is already closed.
-------------------------------------
Would very much appreciate your suggestions and pointers.
And here is my code:
#!/usr/bin/env python# |-----------------------------------------------------------------------------# | This source code is provided under the Apache 2.0 license --# | and is provided AS IS with no warranty or guarantee of fit for purpose. --# | See the project's LICENSE.md for details. --# | Copyright Thomson Reuters 2018. All rights reserved. --# |-----------------------------------------------------------------------------"""Simple example of authenticating to EDP-GW and using the token to login andretrieve MarketPrice content. A username and password are used to retrievethis token."""import os<br>import argparse<br>import copy<br>import datetime as dt<br>import json<br>import logging<br>import socket<br>import sys<br>import threading<br>import time<br><br>import numpy as np<br>import requests<br>import websocket<br><br>from app.src.python.systematicTrading.config import setup_config<br>from app.src.python.systematicTrading.tora import MarketDataService<br>from daily_processes.product_to_symbol_mapping import ProductToSymbolMapping as psm<br>from tick_sandbox import tick_data_kickstarter as tdk<br>from app.src.python.systematicTrading.marketData.futureEOD import FutureEOD<br><br>logger = logging.getLogger("EDP_Client")<br><br><br># Global Default Variablesapp_id = '256'auth_port = '443'position = ''sts_token = ''refresh_token = ''port = '443'client_secret = ''scope = 'trapi'web_socket_app = Noneweb_socket_open = Falselogged_in = False# Global Variablesdefault_view = ['BID', 'ASK', 'ACVOL_1', 'HST_CLOSE', 'EXPIR_DATE']<br><br><br>class EDPSnap:<br> def __init__(self, tickers, mid_prices, close_prices, session_volumes, expiry_dates):<br> self.tickers = tickers<br> self.mid_prices = mid_prices<br> self.close_prices = close_prices<br> self.session_volumes = session_volumes<br> self.expiry_dates = expiry_dates<br><br><br>class EDP_client():<br> def __init__(self, streaming, root_path, data_path, metadata_path, appConfig):<br> self._root_path = root_path<br> self._data_path = data_path<br> self._metadata_path = metadata_path<br> self._appConfig = appConfig<br> self._dict_subscribed_map = psm.get_product_symbol_mappings(self._metadata_path)['dict_prod_edp']<br> self._trading_universe = tdk.get_trading_universe()<br> self._time = dt.datetime.now()<br> self._streaming = streaming<br> self._mid_prices = {}<br> self._bid_prices = {}<br> self._ask_prices = {}<br> self._close_prices = {}<br> self._session_volumes = {}<br> self._expiry_dates = {}<br> self._list_edp = []<br> for key in self._dict_subscribed_map:<br> ric = self._dict_subscribed_map[key]<br> self._list_edp.append(ric)<br> self._mid_prices[ric] = 0.0self._bid_prices[ric] = 0.0self._ask_prices[ric] = 0.0self._close_prices[ric] = 0.0self._session_volumes[ric] = 0.0self._expiry_dates[ric] = ''self._no_callback_rics = self._list_edp.copy()<br><br> self.lock = threading.RLock()<br><br> def snapAll(self):<br> with self.lock:<br> tickers = copy.deepcopy(self._list_edp)<br> print(tickers)<br> mid_prices = copy.deepcopy(self._mid_prices)<br> close_prices = copy.deepcopy(self._close_prices)<br> session_volumes = copy.deepcopy(self._session_volumes)<br> expiry_dates = copy.deepcopy(self._expiry_dates)<br> response = EDPSnap(tickers, mid_prices, close_prices, session_volumes, expiry_dates)<br> return response<br><br> def process_message(self, message_json):<br> """ Parse at high level and output JSON of message """message_type = message_json['Type']<br><br> if message_type in {"Refresh", "Update"}:<br> if 'Domain' in message_json:<br> message_domain = message_json['Domain']<br> if message_domain == "Login":<br> self.process_login_response(message_json)<br> # contains market dataelif 'Key' in message_json:<br> ric = message_json['Key']['Name']<br> if 'Fields' in message_json:<br> with self.lock:<br> if 'BID' in message_json['Fields']:<br> self._bid_prices[ric] = message_json['Fields']['BID']<br> if 'ASK' in message_json['Fields']:<br> self._ask_prices[ric] = message_json['Fields']['ASK']<br> self._mid_prices[ric] = (self._bid_prices[ric] + self._ask_prices[ric]) /2.0if 'ACVOL_1' in message_json['Fields']:<br> self._session_volumes[ric] = message_json['Fields']['ACVOL_1']<br><br> if 'HST_CLOSE' in message_json['Fields']:<br> self._close_prices[ric] = message_json['Fields']['HST_CLOSE']<br><br> if 'EXPIR_DATE' in message_json['Fields']:<br> self._expiry_dates[ric] = message_json['Fields']['EXPIR_DATE']<br><br> if dt.datetime.now() > self._time + dt.timedelta(seconds=120):<br> logger.info("Market updates:")<br> for key in sorted(self._mid_prices):<br> logger.info("{0}: midPrice={1:0.5f}, closePrice={2:0.5f}, sessionVolume={3}, expiry={4}".format(key,self._mid_prices[key],self._close_prices[key],self._session_volumes[key],self._expiry_dates[key]))<br> self._time = dt.datetime.now()<br><br> if ric in self._no_callback_rics:<br> self._no_callback_rics.remove(ric)<br> self.check_overnight_return(ric)<br> logger.info("still waiting callbacks for: ")<br> logger.info(self._no_callback_rics)<br> elif message_type == "Ping":<br> pong_json = {'Type': 'Pong'}<br> web_socket_app.send(json.dumps(pong_json))<br> # print("SENT:") # print(json.dumps(pong_json, sort_keys=True, indent=2, separators=(',', ':')))def process_login_response(self, message_json):<br> """ Send item request """global logged_in<br><br> if message_json['State']['Stream'] != "Open" or message_json['State']['Data'] != "Ok":<br> logger.error("Login failed.")<br> sys.exit(1)<br><br> logged_in = Trueself.send_market_price_request(default_view)<br><br> def send_market_price_request(self, default_view):<br> """ Create and send simple Market Price request """""" view contains the fields you want to subscribe to 22: BID, 30: BIDSIZE 25: ASK: 31: ASKSIZE 21: HST_CLOSE, 6: TRDPRC_1, 32: ACVOL_1 view = [22, 25, 30, 31] """mp_req_json = {<br> 'ID': 2,'Key': {<br> 'Name': self._list_edp,},'View': default_view,'Streaming': self._streaming<br> }<br> web_socket_app.send(json.dumps(mp_req_json))<br> print("SENT:")<br> print(json.dumps(mp_req_json, sort_keys=True, indent=2, separators=(',', ':')))<br><br> def send_login_request(self, auth_token, is_refresh_token):<br> """ Send login request with authentication token. Used both for the initial login and subsequent reissues to update the authentication token """login_json = {<br> 'ID': 1,'Domain': 'Login','Key': {<br> 'NameType': 'AuthnToken','Elements': {<br> 'ApplicationId': '','Position': '','AuthenticationToken': ''}<br> }<br> }<br><br> login_json['Key']['Elements']['ApplicationId'] = app_id<br> login_json['Key']['Elements']['Position'] = position<br> login_json['Key']['Elements']['AuthenticationToken'] = auth_token<br><br> # If the token is a refresh token, this is not our first login attempt.if is_refresh_token:<br> login_json['Refresh'] = Falseweb_socket_app.send(json.dumps(login_json))<br> # print("SENT:") # print(json.dumps(login_json, sort_keys=True, indent=2, separators=(',', ':')))def on_message(self, message):<br> """ Called when message received, parse message into JSON for processing """print("RECEIVED: ")<br> message_json = json.loads(message)<br> logger.info(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':')))<br><br> for singleMsg in message_json:<br> self.process_message(singleMsg)<br><br> def on_error(self, error):<br> """ Called when websocket error has occurred """print(error)<br> logger.error(error)<br><br> def on_close(self):<br> """ Called when websocket is closed """global web_socket_open<br> web_socket_open = Falselogger.info("WebSocket Closed")<br> """ Altering the state of GLOBAL logged_in """global logged_in<br> logged_in = Falselogger.info("Value of logged_in in on_close is: {0}".format(logged_in))<br><br> def on_open(self):<br> """ Called when handshake is complete and websocket is open, send login """logger.info("WebSocket successfully connected!")<br> global web_socket_open<br> web_socket_open = Trueself.send_login_request(sts_token, False)<br><br> def get_sts_token(self, current_refresh_token):<br> """ Retrieves an authentication token.:param current_refresh_token: Refresh token retrieved from a previous authentication, used to retrieve a subsequent access token. If not provided (i.e. on the initial authentication), the password is used. """url = 'https://{}:{}/{}'.format(auth_hostname, auth_port, auth_path)<br> print(url)<br> if not current_refresh_token: # First time through, send passworddata = {'username': user, 'password': password, 'grant_type': 'password', 'takeExclusiveSignOnControl': True,'scope': scope}<br> print("Sending authentication request with password to ", url, "...")<br> else: # Use the given refresh tokendata = {'username': user, 'refresh_token': current_refresh_token, 'grant_type': 'refresh_token','takeExclusiveSignOnControl': True}<br> print("Sending authentication request with refresh token to ", url, "...")<br><br> #data = {'username': user, 'password': password, 'grant_type': 'password', # 'takeExclusiveSignOnControl': True, # 'scope': scope} # logger.info("NOT Sending authentication request as per email from support")try:<br> r = requests.post(url,headers={'Accept': 'application/json'},data=data,auth=(user, client_secret),verify=True)<br><br> except requests.exceptions.RequestException as e:<br> logger.error('EDP-GW authentication exception failure:', e)<br> return None, None, None if r.status_code != 200:<br> print('EDP-GW authentication result failure:', r.status_code, r.reason)<br> print('Text: ', r.text)<br> if current_refresh_token:<br> # Refresh token may have expired. Try using our password.return self.get_sts_token(None)<br> return None, None, Noneauth_json = r.json()<br> logger.info("EDP-GW Authentication succeeded. RECEIVED:")<br> logger.info(json.dumps(auth_json, sort_keys=True, indent=2, separators=(',', ':')))<br><br> logger.info("still waiting callbacks on {0}".format(self._no_callback_rics))<br><br> # logger.info("Value of access_token is: {0}".format(auth_json['access_token'])) # logger.info("Value of refresh_token is: {0}".format(auth_json['refresh_token'])) # logger.info("Value of expires_in is: {0}".format(auth_json['expires_in']))return auth_json['access_token'], auth_json['refresh_token'], auth_json['expires_in']<br><br> # given an update on a ric, retrieve the associated eod last price to check price consistency btw tickdata and reutersdef check_overnight_return(self, ric):<br> try:<br> prod = list(self._dict_subscribed_map.keys())[list(self._dict_subscribed_map.values()).index(ric)]<br> current_prod = self._trading_universe.loc[self._trading_universe['product'] == prod].to_dict('records')[0]<br> if current_prod['securityType'] == 'FUTURE':<br> fut = FutureEOD(current_prod,from_TickData=False,root_path=self._root_path,data_path=self._data_path)<br> eod_price = fut.unadj_close_prices[-1:].values[0]<br> reuters_price = current_prod['reutersMultiplier'] * self._mid_prices[ric]<br> ret = 100 * (reuters_price/ eod_price - 1)<br> logger.info("{0}: eod_price = {1}, reuters_price = {2}, overnight return = {3}".format(ric,eod_price,reuters_price,np.round(ret)))<br> if np.abs(ret) > 10:<br> logger.error("{0}: WEIRD overnight return = {1}, check multipliers".format(ric, ret))<br> except:<br> logger.error("could not find any product associated to {0}".format(ric))<br><br><br>def set_edp_start_and_end_times() -> dict:<br> _today = dt.date.today()<br> _now = dt.datetime.now()<br> _utc_now = dt.datetime.utcnow()<br> # utc_offset = utc_now - now_start_str = _today.strftime("%Y-%m-%d ") + mds_start_time<br> _start_time = dt.datetime.strptime(_start_str, "%Y-%m-%d %H:%M:%S")<br> # if we start after start_time then for us it is a new dayif _now >= _start_time:<br> _today += dt.timedelta(days=1)<br> _end_str = _today.strftime("%Y-%m-%d ") + mds_end_time<br> _end_time_utc = dt.datetime.strptime(_end_str, "%Y-%m-%d %H:%M:%S")<br> return {'utc_now': _utc_now, 'end_time_utc': _end_time_utc}<br><br><br>if __name__ == "__main__":<br> parser = argparse.ArgumentParser(description='Z3 EDP Market data server')<br> parser.add_argument('config', nargs='+', help='Configuration files (like globalConfig.json)')<br> parser.add_argument("--env", nargs=1, choices=['home', 'uat', 'prod'],required=True, help="Environment name (home, uat, prod)")<br><br> args = parser.parse_args()<br><br> appConfig = setup_config.load_configs(args.config)<br><br> env = args.env[0]<br> root_path = Nonedata_path = Nonemetadata_path = Nonehostname = appConfig.get_string("reuters.uat.hostname")<br> auth_hostname = appConfig.get_string("reuters.auth_hostname")<br> auth_path = appConfig.get_string("reuters.auth_path")<br> str = appConfig.get_string("reuters.streaming")<br> streaming = False if str == 'True':<br> streaming = Trueuser = appConfig.get_string("reuters.prod.user")<br> password = appConfig.get_string("reuters.prod.password")<br><br> if env.lower() == 'home':<br> root_path = appConfig.get_string("home_path")<br> data_path = appConfig.get_string("uat_data_path")<br> metadata_path = appConfig.get_string("uat_metadata_path")<br> elif env.lower() == 'uat':<br> root_path = appConfig.get_string("uat_path")<br> data_path = appConfig.get_string("uat_data_path")<br> metadata_path = appConfig.get_string("uat_metadata_path")<br> hostname = appConfig.get_string("reuters.uat.hostname")<br> user = appConfig.get_string("reuters.uat.user")<br> password = appConfig.get_string("reuters.uat.password")<br> elif env.lower() == 'prod':<br> root_path = appConfig.get_string("prod_path")<br> data_path = appConfig.get_string("prod_data_path")<br> metadata_path = appConfig.get_string("prod_metadata_path")<br> hostname = appConfig.get_string("reuters.prod.hostname")<br> user = appConfig.get_string("reuters.prod.user")<br> password = appConfig.get_string("reuters.prod.password")<br> else:<br> setup_config.setup_logging()<br> logger.error(<br> "could not recognise environment. exiting. env should be {home, uat, prod}")<br> sys.exit(2)<br><br> if env is None:<br> setup_config.setup_logging()<br> logger.error("could not recognise environment. exiting. env should be {home, uat, prod}")<br> sys.exit(2)<br><br> setup_config.setup_logging(environment=env)<br><br> # schedule to run till a certain time of the daymds_start_time = appConfig.get_string("edp_client.start_time_utc")<br> mds_end_time = appConfig.get_string("edp_client.end_time_utc")<br> dict_edp_times = set_edp_start_and_end_times()<br> logger.info("EDP STARTS (UTC) @{0} AND ENDS @{1}".format(dict_edp_times['utc_now'],dict_edp_times['end_time_utc']))<br><br> if position == '':<br> # Populate position if possibletry:<br> position_host = socket.gethostname()<br> position = socket.gethostbyname(position_host) + "/" + position_host<br> except socket.gaierror:<br> position = "127.0.0.1/net"this_client = EDP_client(streaming=streaming,root_path=root_path,data_path=data_path,metadata_path=metadata_path,appConfig=appConfig)<br> sts_token, refresh_token, expire_time = this_client.get_sts_token(None)<br> if not sts_token:<br> logger.info("AM I entering not sts_token OUTSIDE while loop: {0}".format(sts_token))<br> sys.exit(1)<br><br> # Start websocket handshakews_address = "wss://{}:{}/WebSocket".format(hostname, port)<br> logger.info("Connecting to WebSocket " + ws_address + " ...")<br> web_socket_app = websocket.WebSocketApp(ws_address, on_message=this_client.on_message,on_error=this_client.on_error,on_close=this_client.on_close,subprotocols=['tr_json2'])<br> web_socket_app.on_open = this_client.on_open<br><br> # Event loopwst = threading.Thread(target=web_socket_app.run_forever, kwargs={'sslopt': {'check_hostname': False}})<br> wst.start()<br><br> marketDataServiceConfig = appConfig.get('edp_client')<br> mdServer = MarketDataService.serve(this_client, marketDataServiceConfig)<br><br> try:<br> while True:<br> if dt.datetime.utcnow() > dict_edp_times['end_time_utc']:<br> logger.info("MDS OVER: SHUTTING DOWN MARKET DATA SERVICE")<br> web_socket_app.close()<br> sys.exit(1)<br> # Give 30 seconds to obtain the new security token and send reissueif int(expire_time) > 30:<br> time.sleep(int(expire_time) - 30)<br> else:<br> # Fail the refresh since value too smallsys.exit(1)<br> sts_token, refresh_token, expire_time = this_client.get_sts_token(refresh_token)<br> if not sts_token:<br> logger.info("AM I entering not sts_token in while loop: {0}".format(sts_token))<br> sys.exit(1)<br><br> """ FORCE CLOSING WEB_SOCKET for TESTING """web_socket_app.close()<br> logger.info("Explicitly closing connection: {0}".format(logged_in))<br> """ END OF: FORCE CLOSING WEB_SOCKET for TESTING """# Update token. # Why do i need to request this again?if logged_in:<br> this_client.send_login_request(sts_token, True)<br> logger.info("Value of logged_in is: {0}".format(logged_in))<br> else:<br> logger.info("I am in the else part, where logger_in = False")<br> try:<br> logger.info("TRY is being used")<br> sts_token, refresh_token, expire_time = this_client.get_sts_token(None)<br> logger.info("Value of sts_token in logged_in False is: {0}".format(sts_token))<br> del web_socket_app<br> web_socket_app = Nonetime.sleep(10)<br> # RETRYdict_edp_times = set_edp_start_and_end_times()<br> logger.info("AGAIN:: EDP STARTS (UTC) @{0} AND ENDS @{1}".format(dict_edp_times['utc_now'],dict_edp_times['end_time_utc']))<br><br> if position == '':<br> # Populate position if possibletry:<br> position_host = socket.gethostname()<br> position = socket.gethostbyname(position_host) + "/" + position_host<br> except socket.gaierror:<br> position = "127.0.0.1/net"this_client = EDP_client(streaming=streaming,root_path=root_path,data_path=data_path,metadata_path=metadata_path,appConfig=appConfig)<br><br> sts_token, refresh_token, expire_time = this_client.get_sts_token(None)<br> if not sts_token:<br> logger.info("AM I entering not sts_token IN TRY IN WHILE loop: {0}".format(sts_token))<br> sys.exit(1)<br> logger.info("Value of sts_token in logged_in False is: {0}".format(sts_token))<br><br> # Start websocket handshakews_address = "wss://{}:{}/WebSocket".format(hostname, port)<br> logger.info("Connecting to WebSocket " + ws_address + " ...")<br> web_socket_app = websocket.WebSocketApp(ws_address, on_message=this_client.on_message,on_error=this_client.on_error,on_close=this_client.on_close,subprotocols=['tr_json2'])<br> web_socket_app.on_open = this_client.on_open<br><br> # Event loopwst = threading.Thread(target=web_socket_app.run_forever, kwargs={'sslopt': {'check_hostname': False}})<br> wst.start()<br><br> marketDataServiceConfig = appConfig.get('edp_client')<br> mdServer = MarketDataService.serve(this_client, marketDataServiceConfig)<br><br> this_client.send_login_request(sts_token, True)<br> except websocket._exceptions.WebSocketConnectionClosedException as e:<br> logger.info("EXCEPTION is being activated.: {0}".format(e))<br> logger.info("EXCEPTION, os information: {0}".format(os.sys.exc_info()[0:2]))<br> del web_socket_app<br> web_socket_app = Nonetime.sleep(10)<br> # RETRYdict_edp_times = set_edp_start_and_end_times()<br> logger.info("AGAIN:: EDP STARTS (UTC) @{0} AND ENDS @{1}".format(dict_edp_times['utc_now'],dict_edp_times['end_time_utc']))<br> if position == '':<br> # Populate position if possibletry:<br> position_host = socket.gethostname()<br> position = socket.gethostbyname(position_host) + "/" + position_host<br> except socket.gaierror:<br> position = "127.0.0.1/net"this_client = EDP_client(streaming=streaming,root_path=root_path,data_path=data_path,metadata_path=metadata_path,appConfig=appConfig)<br> sts_token, refresh_token, expire_time = this_client.get_sts_token(None)<br> if not sts_token:<br> logger.info("AM I entering not sts_token IN EXCEPTION IN WHILE loop: {0}".format(sts_token))<br> sys.exit(1)<br> logger.info("Value of sts_token in logged_in False is: {0}".format(sts_token))<br><br> # Start websocket handshakews_address = "wss://{}:{}/WebSocket".format(hostname, port)<br> logger.info("Connecting to WebSocket " + ws_address + " ...")<br> web_socket_app = websocket.WebSocketApp(ws_address, on_message=this_client.on_message,on_error=this_client.on_error,on_close=this_client.on_close,subprotocols=['tr_json2'])<br> web_socket_app.on_open = this_client.on_open<br><br> # Event loopwst = threading.Thread(target=web_socket_app.run_forever,kwargs={'sslopt': {'check_hostname': False}})<br> wst.start()<br><br> marketDataServiceConfig = appConfig.get('edp_client')<br> mdServer = MarketDataService.serve(this_client, marketDataServiceConfig)<br><br> this_client.send_login_request(sts_token, True)<br> except KeyboardInterrupt:<br> web_socket_app.close()