For a deeper look into our Elektron API, look into:

Overview |  Quickstart |  Documentation |  Downloads |  Tutorials |  Articles

question

Upvotes
Accepted
3 2 2 7

Continuing question 93989 ommstreaming-pricing-012-received-a-closing-messag (I cant add a comment there)

hi Jirapongse

I asked a question in the past "[OMMSTREAMING_PRICING_0.12] received a closing message (cxn state=StreamCxnState.Connected, message state={'Stream': 'Closed', 'Data': 'Suspect', 'Code': 'UserAccessToAppDenied', 'Text': 'TREP authentication failed (1026, Request for token validatio"

Jirapongse sent me an answer, I flowed his tip and some other tips from Refinitiv so I can reproduce scenario when I use two separate clients with same connection key (Refinitiv authorization key on configuration) the second client grabs the connection,

[OMMSTREAMING_PRICING_0.12] received a closing message (cxn state=StreamCxnState.Connected, message state={'Stream': 'Closed', 'Data': 'Suspect', 'Code': 'UserAccessToAppDenied', 'Text': 'TREP authentication failed (1026, Request for token validatio - Forum | Refinitiv Developer Community

first client shows on console the error message:

 [Error 400 - invalid_grant] empty error description

later after expiration period pass, first launched client closes with this message

[OMMSTREAMING_PRICING_0.21] received a closing message (cxn state=StreamCxnState.Connected,....

I have edited the Rfinitiv connector code to add messages and try understand how to bubble up this closing connection messages

on omm_stream_connection.py::_handle_login_message and on _process_message

 [Error 400 - invalid_grant] empty error description

and after a minute my tweak shows:

[Refinitive ## FIXME: ori kovacsi ##] From Refinitiv OMM STREAM session Status state!!!
[Refinitive ## FIXME: ori kovacsi ##] From Refinitiv OMM STREAM session Closed atate!!!
[OMMSTREAMING_PRICING_0.21] received a closing message (cxn state=StreamCxnState.Connected,....

so the code handles this case. how can I subscribe to any _on_ws_message so it will be sent to some call back on my top level application ?

Or better - alternatively how can I prevent second connection from grabbing the key from the first websocket connection in the first place. Is there a configuration that prevent new connections from getting the key so first will continue forever?

Thanks For advance

Ori

edited:

digging into the source code I can see at

C:\Users\<user name>\.conda\envs\<python environment name>\Lib\site-packages\refinitiv\data\_data\log.py

def _create_log_stdout_handler():

handler_ = logging.StreamHandler(sys.stdout)

handler_.setFormatter(_stdout_formatter)

return handler_


can I redirect log messages to bubble up to my Application instead of printing to console stdout?

I think the flow of failure message is stopped in this function and this is the reason I can't catch the case of loosing expiration token validity in a programmatic manner.

If I could bind a call-back handler on my application it can help me to create a workaround to this issue.

EDITED:

I keep digging and enforcing the connection stability

while checking the connector code I stumbled across this assignment

self._grant: "GrantPassword" = self._session._grant

on class RefreshTokenUpdater(Updater, LogReporter): in file

C:\Users\<User Name>\.conda\envs\<python env name>\Lib\site-packages\refinitiv\data\_data\core\session\refresh_token_updater.py

It is first time I see ":" notation on assignment / initialization of class property/parameter. What does this notation do?


websocketsOMMsessionerror-loginexpirationapi-key
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
Accepted
16.8k 31 9 12

Hello @okatz

Sorry for the late reply. The RD Library - Python version 1.0.0b24 has been released this month. With this version, you can get the session events and states from the following code:

 ## Define a function to display events
def display_event(eventType, event):
    currentTime = datetime.datetime.now().time()
    print("----------------------------------------------------------")
    print(">>> {} event received at {}".format(eventType, currentTime))
    print(json.dumps(event, indent=2))
    return
def on_event_callback(*args):
    print(args)

def on_state_callback(*args):
    print(args)

## Create and Set Session
session = rd.session.platform.Definition(
    app_key = APP_KEY, 
    grant = rd.session.platform.GrantPassword(
        username = RDP_LOGIN, 
        password = RDP_PASSWORD
    )
).get_session()

session.on_event(on_event_callback)
session.on_state(on_state_callback)
session.open()
rd.session.set_default(session)

## Create an Item Stream and register the event callbacks
stream = omm_stream.Definition(
    name="/TRI.N", 
    fields=['BID', 'ASK','OPEN_PRC','HST_CLOSE','TIMACT'], 
    domain='MarketPrice').get_stream()
...

Result - Normal scenario:

(<OpenState.Pending: 'Pending'>, 'Session opening in progress', <refinitiv.data.session.Definition object at 0x1d322046e20 {name='default'}>)
(<EventCode.SessionAuthenticationSuccess: 24>, 'All is well', <refinitiv.data.session.Definition object at 0x1d322046e20 {name='default'}>)
(<OpenState.Opened: 'Opened'>, 'Session is opened', <refinitiv.data.session.Definition object at 0x1d322046e20 {name='default'}>)
(<EventCode.StreamConnecting: 1>, None, <refinitiv.data.session.Definition object at 0x1d322046e20 {name='default'}>)
(<EventCode.StreamConnected: 2>, None, <refinitiv.data.session.Definition object at 0x1d322046e20 {name='default'}>)
----------------------------------------------------------
>>> Refresh event received at 17:30:34.228569
{
  "ID": 5,
  "Type": "Refresh",
  "Key": {
    "Service": "ELEKTRON_DD",
    "Name": "/TRI.N"
  },

Example - Invalid User scenario:

(<OpenState.Pending: 'Pending'>, 'Session opening in progress', <refinitiv.data.session.Definition object at 0x145de9a7e20 {name='default'}>)
(<EventCode.SessionAuthenticationFailed: 25>, 'Invalid username or password.', <refinitiv.data.session.Definition object at 0x145de9a7e20 {name='default'}>)  
(<OpenState.Closed: 'Closed'>, 'Session is closed', <refinitiv.data.session.Definition object at 0x145de9a7e20 {name='default'}>)
(<OpenState.Closed: 'Closed'>, 'Session is closed', <refinitiv.data.session.Definition object at 0x145de9a7e20 {name='default'}>)

** Caution **

There is another issue that the team is still working on it. The fix is expected to be released in the future release.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
16.8k 31 9 12

Hello @okatz

According to the "I can reproduce scenario when I use two separate clients with same connection key" message, do you mean you are using a single RDP account with multiple sessions/applications? If so, the applications may encounter this kind of key/token issue.

I highly recommend you reach out to your Refinitiv representative to get multiple RDP (or RTO) accounts if you want to run multiple applications/sessions.

Hope this helps.



icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
3 2 2 7

Thanks Wasin.W

I don't intentionally use two connections. there are apparently other users grabbing this connection key. I couldn't identify who is taking this connection. Under these circumstances and because this system is critical for my project functionality I assume that even if I will take 2 connection keys yet someone/or more then one may grab the key. I can't track every user that can grab this key.

This is the reason I plan to develop a code section to handle this error by closing the client and reconnecting, so I will be the latest connection and my application expiration key will be the valid (Last issued) one. others will drop session.

Thanks

Ori


icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
16.8k 31 9 12

Hello @okatz

Thank you for the explanation. However, I (and my colleagues) still do not fully understand the context and implementation detail of your application. Please clarify my assumption below:

  • You are using the RD Library - Python API (please confirm the version) to consume the real-time streaming data
  • You have a connection key (RDP/RTO credentials which are username/machine-id, password, and app key/client id)
  • the connection key is shared or pooled among your peer (please confirm)
  • If your application is the only one that uses the key to connect and consume data, it works fine
  • Somehow, other users can get your connection key and use it in their RD consumer application (is it the same application?)
  • Then the problem occurs on the first application in the session renewal process because the second application takes an authentication grant instead

About the "_on_ws_message " event callback, do you mean you want all WebSocket callback events messages from the library? If so, you can get the event callbacks from the following code:

stream = omm_stream.Definition(...).get_stream()

# Refresh - the first full image we get back from the server
stream.on_refresh(lambda event, stream  : display_event("Refresh", event))
# Update - as and when field values change, we receive updates from the server
stream.on_update(lambda event, stream : display_event("Update", event))
# Status - if data goes stale or item closes, we get a status message
stream.on_status(lambda event, stream : display_event("Status", event))
# Other errors
stream.on_error(lambda event, stream : display_event("Error", event))

# Send request to server and open stream
stream.open()
  • on_refresh: This callback is invoked upon receiving a refresh message. The refresh message (aka image) is the complete set of values that represents the item.
  • on_update: This callback is invoked upon receiving an update message.
  • on_status: This callback is invoked upon receiving a status message
  • on_error: This function is called when an error occurs

You can find an example from the TUT_3.1.01-OMMStream-MarketPrice-event.ipynb example code.


icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
3 2 2 7

Thanks wasin

I think I found a solution (can't share it - it's my company's intellectual property ). Will complete the technical programming and will ask again if there are problems yet.

My goal is to give a specific server the priority so it will be exclusively the owner of stream forever (until python process / thread is closed), if an other computer attempts using the key it must be declined. (you may take it as "feature request" to improve the Refinitiv Data software). I expect stream to work forever without any additional coding until the client explicitly close it.

There might be other occasions where session drops, It must t be possible to recover them automatically so stream will not miss bid/ask data. Stream must be connected consistently for ever to the privileged server.

F.Y.I. I tried your solution above - it is not triggered on "[OMMSTREAMING_PRICING_0.21] received a closing message (cxn state=StreamCxnState.Connected,...." nor it does on "[Error 400 - invalid_grant] empty error description ". so it is not the solution to this issue.

Thanks

Ori

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
16.8k 31 9 12

Hello @okatz

I am glad to hear that you have found the solution.

Please be informed that the library authenticates users to the RDP platform with a single sign-on policy. The later session always invalidates the refresh token for all previous sign-on sessions.

About the on_status or on_error callbacks methods that are not triggered by some WebSocket/HTTP events, I can replicate the same kind of issues with the library version 1.0.0b20. Can you share the library and Python versions in your environment? I will contact the library team to confirm this behavior.

Best regards,
Wasin W.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

refinitiv_data-1.0.0b10.dist-info

METADATA

Metadata-Version: 2.1

Name: refinitiv-data

Version: 1.0.0b10

Summary: Python package for retrieving data.

Home-page: https://developers.refinitiv.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-library-for-python

Author: REFINITIV

Author-email:

License: Apache 2.0

Platform: UNKNOWN

Requires-Python: >3.6

Description-Content-Type: text/markdown

License-File: LICENSE.md

Requires-Dist: appdirs (>=1.4.3)

Requires-Dist: eventemitter (>=0.2.0)

Requires-Dist: pymitter

Requires-Dist: httpx (<0.22,>=0.18)

Requires-Dist: httpcore (<=0.14.5)

Requires-Dist: mysql-connector-python

Requires-Dist: numpy (>=1.11.0)

Requires-Dist: pandas (<1.4.0,>=1.3.5)

Requires-Dist: python-configuration (>=0.8.2)

Requires-Dist: python-dateutil

Requires-Dist: requests

Requires-Dist: scipy

Requires-Dist: six

Requires-Dist: tenacity (<8.1,>=8.0)

Requires-Dist: watchdog (>=0.10.2)

Requires-Dist: websocket-client (!=1.2.2,>=0.58.0)

Hello @okatz

I have contacted the RD Library team. The team is investigating the callback behaviors. I will give you more updates as soon as I get information from them.

Thanks Wasin.w

I have also started attempting using revision 20b but couldn't get the connection closed error to bubble up to my application. It might be related to the fact I am using "websockets" package wile Refinitiv.Data version 1.0.0.20b use anyio package. there might be conflicting technologies like events loop is handled implicitly on newer python versions, while up to python 3.7 it had to be explicit on task gatherer.

Ori