question

Upvotes
Accepted
1 0 0 2

EDP_client: handling websocket._exceptions.WebSocketConnectionClosedExpcetion

Hello, I am utilizing the EDP client (in python).

The EDP connection closes in 18 hours after starting.

Hence, I have modified the client to catch this particular expection.

It catches the exception once and handles it correctly. However the second time around, I get the following error:

E0719 14:27:07.008920841 25901 server_chttp2.cc:40] {"created":"@1563546427.008896465","description":"No address added out of total 1 resolved","file":"src/core/ext/transport/chttp2/server/chttp2_server.cc","file_line":348,"referenced_errors":[{"created":"@1563546427.008893095","description":"Unable to configure socket","fd":11,"file":"src/core/lib/iomgr/tcp_server_utils_posix_common.cc","file_line":217,"referenced_errors":[{"created":"@1563546427.008883844","description":"Address already in use","errno":98,"file":"src/core/lib/iomgr/tcp_server_utils_posix_common.cc","file_line":190,"os_error":"Address already in use","syscall":"bind"}]}]}
Traceback (most recent call last):
File "app/src/python/systematicTrading/marketData/reuters/EDP_client_modified.py", line 521, in <module>
this_client.send_login_request(sts_token, True)
File "app/src/python/systematicTrading/marketData/reuters/EDP_client_modified.py", line 212, in send_login_request
web_socket_app.send(json.dumps(login_json))
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_app.py", line 153, in send
if not self.sock or self.sock.send(data, opcode) == 0:
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_core.py", line 253, in send
return self.send_frame(frame)
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_core.py", line 278, in send_frame
l = self._send(data)
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_core.py", line 448, in _send
return send(self.sock, data)
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_socket.py", line 135, in send
raise WebSocketConnectionClosedException("socket is already closed.")
websocket._exceptions.WebSocketConnectionClosedException: socket is already closed.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File ".../EDP_client_modified.py", line 568, in <module>
this_client.send_login_request(sts_token, True)
File "...EDP_client_modified.py", line 212, in send_login_request
web_socket_app.send(json.dumps(login_json))
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_app.py", line 153, in send
if not self.sock or self.sock.send(data, opcode) == 0:
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_core.py", line 253, in send
return self.send_frame(frame)
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_core.py", line 278, in send_frame
l = self._send(data)
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_core.py", line 448, in _send
return send(self.sock, data)
File ".../.conda/envs/z3_capital_dev/lib/python3.7/site-packages/websocket/_socket.py", line 135, in send
raise WebSocketConnectionClosedException("socket is already closed.")
websocket._exceptions.WebSocketConnectionClosedException: socket is already closed.

-----------------------------------------------------

The last message I receive from EDP is:

"State":{
"Code":"Timeout",
"Data":"Suspect",
"Stream":"Closed",
"Text":"TREP authentication token has expired."
},
"Type":"Status"
}
]
Connection is already closed.

-------------------------------------

Would very much appreciate your suggestions and pointers.

And here is my code:

#!/usr/bin/env python# |-----------------------------------------------------------------------------# |            This source code is provided under the Apache 2.0 license      --# |  and is provided AS IS with no warranty or guarantee of fit for purpose.  --# |                See the project's LICENSE.md for details.                  --# |           Copyright Thomson Reuters 2018. All rights reserved.            --# |-----------------------------------------------------------------------------"""Simple example of authenticating to EDP-GW and using the token to login andretrieve MarketPrice content.  A username and password are used to retrievethis token."""import os<br>import argparse<br>import copy<br>import datetime as dt<br>import json<br>import logging<br>import socket<br>import sys<br>import threading<br>import time<br><br>import numpy as np<br>import requests<br>import websocket<br><br>from app.src.python.systematicTrading.config import setup_config<br>from app.src.python.systematicTrading.tora import MarketDataService<br>from daily_processes.product_to_symbol_mapping import ProductToSymbolMapping as psm<br>from tick_sandbox import tick_data_kickstarter as tdk<br>from app.src.python.systematicTrading.marketData.futureEOD import FutureEOD<br><br>logger = logging.getLogger("EDP_Client")<br><br><br># Global Default Variablesapp_id = '256'auth_port = '443'position = ''sts_token = ''refresh_token = ''port = '443'client_secret = ''scope = 'trapi'web_socket_app = Noneweb_socket_open = Falselogged_in = False# Global Variablesdefault_view = ['BID', 'ASK', 'ACVOL_1', 'HST_CLOSE', 'EXPIR_DATE']<br><br><br>class EDPSnap:<br>    def __init__(self, tickers, mid_prices, close_prices, session_volumes, expiry_dates):<br>        self.tickers = tickers<br>        self.mid_prices = mid_prices<br>        self.close_prices = close_prices<br>        self.session_volumes = session_volumes<br>        self.expiry_dates = expiry_dates<br><br><br>class EDP_client():<br>    def __init__(self, streaming, root_path, data_path, metadata_path, appConfig):<br>        self._root_path = root_path<br>        self._data_path = data_path<br>        self._metadata_path = metadata_path<br>        self._appConfig = appConfig<br>        self._dict_subscribed_map = psm.get_product_symbol_mappings(self._metadata_path)['dict_prod_edp']<br>        self._trading_universe = tdk.get_trading_universe()<br>        self._time = dt.datetime.now()<br>        self._streaming = streaming<br>        self._mid_prices = {}<br>        self._bid_prices = {}<br>        self._ask_prices = {}<br>        self._close_prices = {}<br>        self._session_volumes = {}<br>        self._expiry_dates = {}<br>        self._list_edp = []<br>        for key in self._dict_subscribed_map:<br>            ric = self._dict_subscribed_map[key]<br>            self._list_edp.append(ric)<br>            self._mid_prices[ric] = 0.0self._bid_prices[ric] = 0.0self._ask_prices[ric] = 0.0self._close_prices[ric] = 0.0self._session_volumes[ric] = 0.0self._expiry_dates[ric] = ''self._no_callback_rics = self._list_edp.copy()<br><br>        self.lock = threading.RLock()<br><br>    def snapAll(self):<br>        with self.lock:<br>            tickers = copy.deepcopy(self._list_edp)<br>            print(tickers)<br>            mid_prices = copy.deepcopy(self._mid_prices)<br>            close_prices = copy.deepcopy(self._close_prices)<br>            session_volumes = copy.deepcopy(self._session_volumes)<br>            expiry_dates = copy.deepcopy(self._expiry_dates)<br>            response = EDPSnap(tickers, mid_prices, close_prices, session_volumes, expiry_dates)<br>            return response<br><br>    def process_message(self, message_json):<br>        """ Parse at high level and output JSON of message """message_type = message_json['Type']<br><br>        if message_type in {"Refresh", "Update"}:<br>            if 'Domain' in message_json:<br>                message_domain = message_json['Domain']<br>                if message_domain == "Login":<br>                    self.process_login_response(message_json)<br>            # contains market dataelif 'Key' in message_json:<br>                ric = message_json['Key']['Name']<br>                if 'Fields' in message_json:<br>                    with self.lock:<br>                        if 'BID' in message_json['Fields']:<br>                            self._bid_prices[ric] = message_json['Fields']['BID']<br>                        if 'ASK' in message_json['Fields']:<br>                            self._ask_prices[ric] = message_json['Fields']['ASK']<br>                        self._mid_prices[ric] = (self._bid_prices[ric] + self._ask_prices[ric]) /2.0if 'ACVOL_1' in message_json['Fields']:<br>                            self._session_volumes[ric] = message_json['Fields']['ACVOL_1']<br><br>                        if 'HST_CLOSE' in message_json['Fields']:<br>                            self._close_prices[ric] = message_json['Fields']['HST_CLOSE']<br><br>                        if 'EXPIR_DATE' in message_json['Fields']:<br>                                self._expiry_dates[ric] = message_json['Fields']['EXPIR_DATE']<br><br>                        if dt.datetime.now() > self._time + dt.timedelta(seconds=120):<br>                            logger.info("Market updates:")<br>                            for key in sorted(self._mid_prices):<br>                                logger.info("{0}: midPrice={1:0.5f}, closePrice={2:0.5f}, sessionVolume={3}, expiry={4}".format(key,self._mid_prices[key],self._close_prices[key],self._session_volumes[key],self._expiry_dates[key]))<br>                            self._time = dt.datetime.now()<br><br>                        if ric in self._no_callback_rics:<br>                            self._no_callback_rics.remove(ric)<br>                            self.check_overnight_return(ric)<br>                            logger.info("still waiting callbacks for: ")<br>                            logger.info(self._no_callback_rics)<br>        elif message_type == "Ping":<br>            pong_json = {'Type': 'Pong'}<br>            web_socket_app.send(json.dumps(pong_json))<br>            # print("SENT:")            # print(json.dumps(pong_json, sort_keys=True, indent=2, separators=(',', ':')))def process_login_response(self, message_json):<br>        """ Send item request """global logged_in<br><br>        if message_json['State']['Stream'] != "Open" or message_json['State']['Data'] != "Ok":<br>            logger.error("Login failed.")<br>            sys.exit(1)<br><br>        logged_in = Trueself.send_market_price_request(default_view)<br><br>    def send_market_price_request(self, default_view):<br>        """ Create and send simple Market Price request """"""            view contains the fields you want to subscribe to            22: BID, 30: BIDSIZE            25: ASK: 31: ASKSIZE            21: HST_CLOSE, 6: TRDPRC_1, 32: ACVOL_1              view = [22, 25, 30, 31]        """mp_req_json = {<br>            'ID': 2,'Key': {<br>                'Name': self._list_edp,},'View': default_view,'Streaming': self._streaming<br>        }<br>        web_socket_app.send(json.dumps(mp_req_json))<br>        print("SENT:")<br>        print(json.dumps(mp_req_json, sort_keys=True, indent=2, separators=(',', ':')))<br><br>    def send_login_request(self, auth_token, is_refresh_token):<br>        """            Send login request with authentication token.            Used both for the initial login and subsequent reissues to update the authentication token        """login_json = {<br>            'ID': 1,'Domain': 'Login','Key': {<br>                'NameType': 'AuthnToken','Elements': {<br>                    'ApplicationId': '','Position': '','AuthenticationToken': ''}<br>            }<br>        }<br><br>        login_json['Key']['Elements']['ApplicationId'] = app_id<br>        login_json['Key']['Elements']['Position'] = position<br>        login_json['Key']['Elements']['AuthenticationToken'] = auth_token<br><br>        # If the token is a refresh token, this is not our first login attempt.if is_refresh_token:<br>            login_json['Refresh'] = Falseweb_socket_app.send(json.dumps(login_json))<br>        # print("SENT:")        # print(json.dumps(login_json, sort_keys=True, indent=2, separators=(',', ':')))def on_message(self, message):<br>        """ Called when message received, parse message into JSON for processing """print("RECEIVED: ")<br>        message_json = json.loads(message)<br>        logger.info(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':')))<br><br>        for singleMsg in message_json:<br>            self.process_message(singleMsg)<br><br>    def on_error(self, error):<br>        """ Called when websocket error has occurred """print(error)<br>        logger.error(error)<br><br>    def on_close(self):<br>        """ Called when websocket is closed """global web_socket_open<br>        web_socket_open = Falselogger.info("WebSocket Closed")<br>        """        Altering the state of GLOBAL logged_in        """global logged_in<br>        logged_in = Falselogger.info("Value of logged_in in on_close is: {0}".format(logged_in))<br><br>    def on_open(self):<br>        """ Called when handshake is complete and websocket is open, send login """logger.info("WebSocket successfully connected!")<br>        global web_socket_open<br>        web_socket_open = Trueself.send_login_request(sts_token, False)<br><br>    def get_sts_token(self, current_refresh_token):<br>        """            Retrieves an authentication token.:param current_refresh_token: Refresh token retrieved from a previous authentication, used to retrieve a            subsequent access token. If not provided (i.e. on the initial authentication), the password is used.        """url = 'https://{}:{}/{}'.format(auth_hostname, auth_port, auth_path)<br>        print(url)<br>        if not current_refresh_token:  # First time through, send passworddata = {'username': user, 'password': password, 'grant_type': 'password', 'takeExclusiveSignOnControl': True,'scope': scope}<br>            print("Sending authentication request with password to ", url, "...")<br>        else:  # Use the given refresh tokendata = {'username': user, 'refresh_token': current_refresh_token, 'grant_type': 'refresh_token','takeExclusiveSignOnControl': True}<br>            print("Sending authentication request with refresh token to ", url, "...")<br><br>        #data = {'username': user, 'password': password, 'grant_type': 'password',        #        'takeExclusiveSignOnControl': True,        #        'scope': scope}        # logger.info("NOT Sending authentication request as per email from support")try:<br>            r = requests.post(url,headers={'Accept': 'application/json'},data=data,auth=(user, client_secret),verify=True)<br><br>        except requests.exceptions.RequestException as e:<br>            logger.error('EDP-GW authentication exception failure:', e)<br>            return None, None, None        if r.status_code != 200:<br>            print('EDP-GW authentication result failure:', r.status_code, r.reason)<br>            print('Text: ', r.text)<br>            if current_refresh_token:<br>                # Refresh token may have expired. Try using our password.return self.get_sts_token(None)<br>            return None, None, Noneauth_json = r.json()<br>        logger.info("EDP-GW Authentication succeeded. RECEIVED:")<br>        logger.info(json.dumps(auth_json, sort_keys=True, indent=2, separators=(',', ':')))<br><br>        logger.info("still waiting callbacks on {0}".format(self._no_callback_rics))<br><br>        # logger.info("Value of access_token is: {0}".format(auth_json['access_token']))        # logger.info("Value of refresh_token is: {0}".format(auth_json['refresh_token']))        # logger.info("Value of expires_in is: {0}".format(auth_json['expires_in']))return auth_json['access_token'], auth_json['refresh_token'], auth_json['expires_in']<br><br>    # given an update on a ric, retrieve the associated eod last price to check price consistency btw tickdata and reutersdef check_overnight_return(self, ric):<br>        try:<br>            prod = list(self._dict_subscribed_map.keys())[list(self._dict_subscribed_map.values()).index(ric)]<br>            current_prod = self._trading_universe.loc[self._trading_universe['product'] == prod].to_dict('records')[0]<br>            if current_prod['securityType'] == 'FUTURE':<br>                fut = FutureEOD(current_prod,from_TickData=False,root_path=self._root_path,data_path=self._data_path)<br>                eod_price = fut.unadj_close_prices[-1:].values[0]<br>                reuters_price = current_prod['reutersMultiplier'] * self._mid_prices[ric]<br>                ret = 100 * (reuters_price/ eod_price - 1)<br>                logger.info("{0}: eod_price = {1}, reuters_price = {2}, overnight return = {3}".format(ric,eod_price,reuters_price,np.round(ret)))<br>                if np.abs(ret) > 10:<br>                    logger.error("{0}: WEIRD overnight return = {1}, check multipliers".format(ric, ret))<br>        except:<br>            logger.error("could not find any product associated to {0}".format(ric))<br><br><br>def set_edp_start_and_end_times() -> dict:<br>    _today = dt.date.today()<br>    _now = dt.datetime.now()<br>    _utc_now = dt.datetime.utcnow()<br>    # utc_offset = utc_now - now_start_str = _today.strftime("%Y-%m-%d ") + mds_start_time<br>    _start_time = dt.datetime.strptime(_start_str, "%Y-%m-%d %H:%M:%S")<br>    # if we start after start_time then for us it is a new dayif _now >= _start_time:<br>        _today += dt.timedelta(days=1)<br>    _end_str = _today.strftime("%Y-%m-%d ") + mds_end_time<br>    _end_time_utc = dt.datetime.strptime(_end_str, "%Y-%m-%d %H:%M:%S")<br>    return {'utc_now': _utc_now, 'end_time_utc': _end_time_utc}<br><br><br>if __name__ == "__main__":<br>    parser = argparse.ArgumentParser(description='Z3 EDP Market data server')<br>    parser.add_argument('config', nargs='+', help='Configuration files (like globalConfig.json)')<br>    parser.add_argument("--env", nargs=1, choices=['home', 'uat', 'prod'],required=True, help="Environment name (home, uat, prod)")<br><br>    args = parser.parse_args()<br><br>    appConfig = setup_config.load_configs(args.config)<br><br>    env = args.env[0]<br>    root_path = Nonedata_path = Nonemetadata_path = Nonehostname = appConfig.get_string("reuters.uat.hostname")<br>    auth_hostname = appConfig.get_string("reuters.auth_hostname")<br>    auth_path = appConfig.get_string("reuters.auth_path")<br>    str = appConfig.get_string("reuters.streaming")<br>    streaming = False    if str == 'True':<br>        streaming = Trueuser = appConfig.get_string("reuters.prod.user")<br>    password = appConfig.get_string("reuters.prod.password")<br><br>    if env.lower() == 'home':<br>        root_path = appConfig.get_string("home_path")<br>        data_path = appConfig.get_string("uat_data_path")<br>        metadata_path = appConfig.get_string("uat_metadata_path")<br>    elif env.lower() == 'uat':<br>        root_path = appConfig.get_string("uat_path")<br>        data_path = appConfig.get_string("uat_data_path")<br>        metadata_path = appConfig.get_string("uat_metadata_path")<br>        hostname = appConfig.get_string("reuters.uat.hostname")<br>        user = appConfig.get_string("reuters.uat.user")<br>        password = appConfig.get_string("reuters.uat.password")<br>    elif env.lower() == 'prod':<br>        root_path = appConfig.get_string("prod_path")<br>        data_path = appConfig.get_string("prod_data_path")<br>        metadata_path = appConfig.get_string("prod_metadata_path")<br>        hostname = appConfig.get_string("reuters.prod.hostname")<br>        user = appConfig.get_string("reuters.prod.user")<br>        password = appConfig.get_string("reuters.prod.password")<br>    else:<br>        setup_config.setup_logging()<br>        logger.error(<br>            "could not recognise environment. exiting. env should be {home, uat, prod}")<br>        sys.exit(2)<br><br>    if env is None:<br>        setup_config.setup_logging()<br>        logger.error("could not recognise environment. exiting. env should be {home, uat, prod}")<br>        sys.exit(2)<br><br>    setup_config.setup_logging(environment=env)<br><br>    # schedule to run till a certain time of the daymds_start_time = appConfig.get_string("edp_client.start_time_utc")<br>    mds_end_time = appConfig.get_string("edp_client.end_time_utc")<br>    dict_edp_times = set_edp_start_and_end_times()<br>    logger.info("EDP STARTS (UTC) @{0} AND ENDS @{1}".format(dict_edp_times['utc_now'],dict_edp_times['end_time_utc']))<br><br>    if position == '':<br>        # Populate position if possibletry:<br>            position_host = socket.gethostname()<br>            position = socket.gethostbyname(position_host) + "/" + position_host<br>        except socket.gaierror:<br>            position = "127.0.0.1/net"this_client = EDP_client(streaming=streaming,root_path=root_path,data_path=data_path,metadata_path=metadata_path,appConfig=appConfig)<br>    sts_token, refresh_token, expire_time = this_client.get_sts_token(None)<br>    if not sts_token:<br>        logger.info("AM I entering not sts_token OUTSIDE while loop: {0}".format(sts_token))<br>        sys.exit(1)<br><br>    # Start websocket handshakews_address = "wss://{}:{}/WebSocket".format(hostname, port)<br>    logger.info("Connecting to WebSocket " + ws_address + " ...")<br>    web_socket_app = websocket.WebSocketApp(ws_address, on_message=this_client.on_message,on_error=this_client.on_error,on_close=this_client.on_close,subprotocols=['tr_json2'])<br>    web_socket_app.on_open = this_client.on_open<br><br>    # Event loopwst = threading.Thread(target=web_socket_app.run_forever, kwargs={'sslopt': {'check_hostname': False}})<br>    wst.start()<br><br>    marketDataServiceConfig = appConfig.get('edp_client')<br>    mdServer = MarketDataService.serve(this_client, marketDataServiceConfig)<br><br>    try:<br>        while True:<br>            if dt.datetime.utcnow() > dict_edp_times['end_time_utc']:<br>                logger.info("MDS OVER: SHUTTING DOWN MARKET DATA SERVICE")<br>                web_socket_app.close()<br>                sys.exit(1)<br>            # Give 30 seconds to obtain the new security token and send reissueif int(expire_time) > 30:<br>                time.sleep(int(expire_time) - 30)<br>            else:<br>                # Fail the refresh since value too smallsys.exit(1)<br>            sts_token, refresh_token, expire_time = this_client.get_sts_token(refresh_token)<br>            if not sts_token:<br>                logger.info("AM I entering not sts_token in while loop: {0}".format(sts_token))<br>                sys.exit(1)<br><br>            """            FORCE CLOSING WEB_SOCKET for TESTING            """web_socket_app.close()<br>            logger.info("Explicitly closing connection: {0}".format(logged_in))<br>            """            END OF: FORCE CLOSING WEB_SOCKET for TESTING            """# Update token.            # Why do i need to request this again?if logged_in:<br>                this_client.send_login_request(sts_token, True)<br>                logger.info("Value of logged_in is: {0}".format(logged_in))<br>            else:<br>                logger.info("I am in the else part, where logger_in = False")<br>                try:<br>                    logger.info("TRY is being used")<br>                    sts_token, refresh_token, expire_time = this_client.get_sts_token(None)<br>                    logger.info("Value of sts_token in logged_in False is: {0}".format(sts_token))<br>                    del web_socket_app<br>                    web_socket_app = Nonetime.sleep(10)<br>                    # RETRYdict_edp_times = set_edp_start_and_end_times()<br>                    logger.info("AGAIN:: EDP STARTS (UTC) @{0} AND ENDS @{1}".format(dict_edp_times['utc_now'],dict_edp_times['end_time_utc']))<br><br>                    if position == '':<br>                        # Populate position if possibletry:<br>                            position_host = socket.gethostname()<br>                            position = socket.gethostbyname(position_host) + "/" + position_host<br>                        except socket.gaierror:<br>                            position = "127.0.0.1/net"this_client = EDP_client(streaming=streaming,root_path=root_path,data_path=data_path,metadata_path=metadata_path,appConfig=appConfig)<br><br>                    sts_token, refresh_token, expire_time = this_client.get_sts_token(None)<br>                    if not sts_token:<br>                        logger.info("AM I entering not sts_token IN TRY IN WHILE loop: {0}".format(sts_token))<br>                        sys.exit(1)<br>                    logger.info("Value of sts_token in logged_in False is: {0}".format(sts_token))<br><br>                    # Start websocket handshakews_address = "wss://{}:{}/WebSocket".format(hostname, port)<br>                    logger.info("Connecting to WebSocket " + ws_address + " ...")<br>                    web_socket_app = websocket.WebSocketApp(ws_address, on_message=this_client.on_message,on_error=this_client.on_error,on_close=this_client.on_close,subprotocols=['tr_json2'])<br>                    web_socket_app.on_open = this_client.on_open<br><br>                    # Event loopwst = threading.Thread(target=web_socket_app.run_forever, kwargs={'sslopt': {'check_hostname': False}})<br>                    wst.start()<br><br>                    marketDataServiceConfig = appConfig.get('edp_client')<br>                    mdServer = MarketDataService.serve(this_client, marketDataServiceConfig)<br><br>                    this_client.send_login_request(sts_token, True)<br>                except websocket._exceptions.WebSocketConnectionClosedException as e:<br>                    logger.info("EXCEPTION is being activated.: {0}".format(e))<br>                    logger.info("EXCEPTION, os information: {0}".format(os.sys.exc_info()[0:2]))<br>                    del web_socket_app<br>                    web_socket_app = Nonetime.sleep(10)<br>                    # RETRYdict_edp_times = set_edp_start_and_end_times()<br>                    logger.info("AGAIN:: EDP STARTS (UTC) @{0} AND ENDS @{1}".format(dict_edp_times['utc_now'],dict_edp_times['end_time_utc']))<br>                    if position == '':<br>                        # Populate position if possibletry:<br>                            position_host = socket.gethostname()<br>                            position = socket.gethostbyname(position_host) + "/" + position_host<br>                        except socket.gaierror:<br>                            position = "127.0.0.1/net"this_client = EDP_client(streaming=streaming,root_path=root_path,data_path=data_path,metadata_path=metadata_path,appConfig=appConfig)<br>                    sts_token, refresh_token, expire_time = this_client.get_sts_token(None)<br>                    if not sts_token:<br>                        logger.info("AM I entering not sts_token IN EXCEPTION IN WHILE loop: {0}".format(sts_token))<br>                        sys.exit(1)<br>                    logger.info("Value of sts_token in logged_in False is: {0}".format(sts_token))<br><br>                    # Start websocket handshakews_address = "wss://{}:{}/WebSocket".format(hostname, port)<br>                    logger.info("Connecting to WebSocket " + ws_address + " ...")<br>                    web_socket_app = websocket.WebSocketApp(ws_address, on_message=this_client.on_message,on_error=this_client.on_error,on_close=this_client.on_close,subprotocols=['tr_json2'])<br>                    web_socket_app.on_open = this_client.on_open<br><br>                    # Event loopwst = threading.Thread(target=web_socket_app.run_forever,kwargs={'sslopt': {'check_hostname': False}})<br>                    wst.start()<br><br>                    marketDataServiceConfig = appConfig.get('edp_client')<br>                    mdServer = MarketDataService.serve(this_client, marketDataServiceConfig)<br><br>                    this_client.send_login_request(sts_token, True)<br>    except KeyboardInterrupt:<br>        web_socket_app.close()
elektron-sdktreprdp-apiwebsocketsrrtoconnection
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
Accepted
25.4k 90 13 26

Hi @siddharth.naik

Have you worked through the Websocket tutorial and the Authorization Tokens article?

I had a quick look at your code and it seems to suggest that you have not fully understood the requirements & differences in terms of both the Authentication tokens and the Login requests.

You will note that in the EDP example we provide, the Login request is sent to the ERT in Cloud server over the Websocket connection only in a couple of places

  1. From the on_open callback i.e. once the Websocket connection opens up.
  2. In the main loop after a successful token refresh and only if logged_in

In your code, you are calling send_login_request from multiple places - including two instances which are not predicated on the websocket definitely being open.

  1. Line 521 - you are calling send_login_request - almost immediately after you start the WebsocketApp thread - but can you be 100% certain the Websocket connection is open at this point in time?
  2. Line 568 - you are again calling send_login_request - almost immediately after you start the WebsocketApp thread - can you be 100% certain the Websocket connection is open at this point in time?

I recommend you work through the above tutorial and article - to gain a fuller understanding of what it is you are required to implement in your code - in terms of Authentication tokens and the Login Requests.

Looking at your error trace - the most logical explanation is that your call at line 568 is attempting to send a login request to the server, before the Websocket connection has been established.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
25.4k 90 13 26

Hi @siddharth.naik

Please insert your source code as an attachment - renaming to .txt extension - removing any username / password etc.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

I have shared my code as a .txt

After 18 hours, I am requesting a fresh connection, i.e. recreating the client and renewing the referesh token.

As a test, I am forcibly calling

web_socket_app.close()

The creation of a new client and renewing the refresh token, works the first time around.

The second time, it breaks.

Thank you for your help and do look forward to your suggestions.

Hi Umer,

# del web_socket_app

# web_socket_app = None

# time.sleep(10)

I have also commented out the explizit deletion of the web_socket all in both the try and except blocks.

It still results in the same behaviour.

Look forward to your suggestions.

Upvotes
19.2k 86 39 63

Hi @siddharth.naik,

At first glance, it appears you are trying to send a request down your WebSocket channel but was rejected because the channel had disconnected. With the little information we have about your application, it could very well be the case that it succeed the first time because you didn't try to send a request down while the channel was disconnected, but did the 2nd time.

If you are not already, I would put in a check to ensure the channel is connected before you send to avoid exceptions.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Hi Nick,

thank you for your suggestion.

Here is the difference from when it is disconnect the first time and the second time. Note the circa 5 minute difference from when it is being disconnected the first time and the second time (by construction of the 300 second expire time)

First time:

RECEIVED:
Connection is already closed.
2019-07-22 08:38:22,291 - EDP_Client - on_error - ERROR - Connection is already closed.
2019-07-22 08:38:22,292 - EDP_Client - on_close - INFO - WebSocket Closed
2019-07-22 08:38:22,292 - EDP_Client - on_close - INFO - Value of logged_in in on_close is: False
RECEIVED:

Second time:

RECEIVED:
Connection is already closed.
2019-07-22 08:43:06,533 - EDP_Client - on_error - ERROR - Connection is already closed.
2019-07-22 08:43:06,533 - EDP_Client - on_close - INFO - WebSocket Closed
2019-07-22 08:43:06,533 - EDP_Client - on_close - INFO - Value of logged_in in on_close is: False

@siddharth.naik,

What I was trying to point out was that when you try to send a request to your Websocket channel the first time, it was coincidentally connected which is why it succeeded. However, the 2nd time, it wasn’t. The exception you receive clearly tells you that you are trying to send a request down to channel that isn’t connected. @Umer Nalla points out that you are sending requests to a channel that may not be connected. When you issue a request to connect to a channel you cannot assume that the channel is connected after return from that call. It is not a synchronous call. You have to utilize the WebSocket callback events to make sure your channel is connect. That is when you should be sending a login request. Please review the WebSocket tutorials to further understand what we mean.

Upvotes
25.1k 68 15 21

Just based on your statement:

>>> The EDP connection closes in 18 hours after starting.

and TREP message:

"State":{ "Code":"Timeout", "Data":"Suspect", "Stream":"Closed", "Text":"TREP authentication token has expired." }, "Type":"Status" }]

I suspect, that you are not renewing the refresh token - which are valid for 18 hours only.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.