For a deeper look into our Elektron API, look into:

Overview |  Quickstart |  Documentation |  Downloads |  Tutorials |  Articles

question

Upvotes
Accepted
3 4 6 10

Continuing question 93989 ommstreaming-pricing-012-received-a-closing-messag (I cant add a comment there)

hi Jirapongse

I asked a question in the past "[OMMSTREAMING_PRICING_0.12] received a closing message (cxn state=StreamCxnState.Connected, message state={'Stream': 'Closed', 'Data': 'Suspect', 'Code': 'UserAccessToAppDenied', 'Text': 'TREP authentication failed (1026, Request for token validatio"

Jirapongse sent me an answer, I flowed his tip and some other tips from Refinitiv so I can reproduce scenario when I use two separate clients with same connection key (Refinitiv authorization key on configuration) the second client grabs the connection,

[OMMSTREAMING_PRICING_0.12] received a closing message (cxn state=StreamCxnState.Connected, message state={'Stream': 'Closed', 'Data': 'Suspect', 'Code': 'UserAccessToAppDenied', 'Text': 'TREP authentication failed (1026, Request for token validatio - Forum | Refinitiv Developer Community

first client shows on console the error message:

 [Error 400 - invalid_grant] empty error description

later after expiration period pass, first launched client closes with this message

[OMMSTREAMING_PRICING_0.21] received a closing message (cxn state=StreamCxnState.Connected,....

I have edited the Rfinitiv connector code to add messages and try understand how to bubble up this closing connection messages

on omm_stream_connection.py::_handle_login_message and on _process_message

 [Error 400 - invalid_grant] empty error description

and after a minute my tweak shows:

[Refinitive ## FIXME: ori kovacsi ##] From Refinitiv OMM STREAM session Status state!!!
[Refinitive ## FIXME: ori kovacsi ##] From Refinitiv OMM STREAM session Closed atate!!!
[OMMSTREAMING_PRICING_0.21] received a closing message (cxn state=StreamCxnState.Connected,....

so the code handles this case. how can I subscribe to any _on_ws_message so it will be sent to some call back on my top level application ?

Or better - alternatively how can I prevent second connection from grabbing the key from the first websocket connection in the first place. Is there a configuration that prevent new connections from getting the key so first will continue forever?

Thanks For advance

Ori

edited:

digging into the source code I can see at

C:\Users\<user name>\.conda\envs\<python environment name>\Lib\site-packages\refinitiv\data\_data\log.py

def _create_log_stdout_handler():

handler_ = logging.StreamHandler(sys.stdout)

handler_.setFormatter(_stdout_formatter)

return handler_


can I redirect log messages to bubble up to my Application instead of printing to console stdout?

I think the flow of failure message is stopped in this function and this is the reason I can't catch the case of loosing expiration token validity in a programmatic manner.

If I could bind a call-back handler on my application it can help me to create a workaround to this issue.

EDITED:

I keep digging and enforcing the connection stability

while checking the connector code I stumbled across this assignment

self._grant: "GrantPassword" = self._session._grant

on class RefreshTokenUpdater(Updater, LogReporter): in file

C:\Users\<User Name>\.conda\envs\<python env name>\Lib\site-packages\refinitiv\data\_data\core\session\refresh_token_updater.py

It is first time I see ":" notation on assignment / initialization of class property/parameter. What does this notation do?


websocketsOMMsessionerror-loginexpirationapi-key
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
Accepted
24.4k 53 17 14

Hello @okatz

Sorry for the late reply. The RD Library - Python version 1.0.0b24 has been released this month. With this version, you can get the session events and states from the following code:

 ## Define a function to display events
def display_event(eventType, event):
    currentTime = datetime.datetime.now().time()
    print("----------------------------------------------------------")
    print(">>> {} event received at {}".format(eventType, currentTime))
    print(json.dumps(event, indent=2))
    return
def on_event_callback(*args):
    print(args)

def on_state_callback(*args):
    print(args)

## Create and Set Session
session = rd.session.platform.Definition(
    app_key = APP_KEY, 
    grant = rd.session.platform.GrantPassword(
        username = RDP_LOGIN, 
        password = RDP_PASSWORD
    )
).get_session()

session.on_event(on_event_callback)
session.on_state(on_state_callback)
session.open()
rd.session.set_default(session)

## Create an Item Stream and register the event callbacks
stream = omm_stream.Definition(
    name="/TRI.N", 
    fields=['BID', 'ASK','OPEN_PRC','HST_CLOSE','TIMACT'], 
    domain='MarketPrice').get_stream()
...

Result - Normal scenario:

(<OpenState.Pending: 'Pending'>, 'Session opening in progress', <refinitiv.data.session.Definition object at 0x1d322046e20 {name='default'}>)
(<EventCode.SessionAuthenticationSuccess: 24>, 'All is well', <refinitiv.data.session.Definition object at 0x1d322046e20 {name='default'}>)
(<OpenState.Opened: 'Opened'>, 'Session is opened', <refinitiv.data.session.Definition object at 0x1d322046e20 {name='default'}>)
(<EventCode.StreamConnecting: 1>, None, <refinitiv.data.session.Definition object at 0x1d322046e20 {name='default'}>)
(<EventCode.StreamConnected: 2>, None, <refinitiv.data.session.Definition object at 0x1d322046e20 {name='default'}>)
----------------------------------------------------------
>>> Refresh event received at 17:30:34.228569
{
  "ID": 5,
  "Type": "Refresh",
  "Key": {
    "Service": "ELEKTRON_DD",
    "Name": "/TRI.N"
  },

Example - Invalid User scenario:

(<OpenState.Pending: 'Pending'>, 'Session opening in progress', <refinitiv.data.session.Definition object at 0x145de9a7e20 {name='default'}>)
(<EventCode.SessionAuthenticationFailed: 25>, 'Invalid username or password.', <refinitiv.data.session.Definition object at 0x145de9a7e20 {name='default'}>)  
(<OpenState.Closed: 'Closed'>, 'Session is closed', <refinitiv.data.session.Definition object at 0x145de9a7e20 {name='default'}>)
(<OpenState.Closed: 'Closed'>, 'Session is closed', <refinitiv.data.session.Definition object at 0x145de9a7e20 {name='default'}>)

** Caution **

There is another issue that the team is still working on it. The fix is expected to be released in the future release.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
24.4k 53 17 14

Hello @okatz

According to the "I can reproduce scenario when I use two separate clients with same connection key" message, do you mean you are using a single RDP account with multiple sessions/applications? If so, the applications may encounter this kind of key/token issue.

I highly recommend you reach out to your Refinitiv representative to get multiple RDP (or RTO) accounts if you want to run multiple applications/sessions.

Hope this helps.



icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
3 4 6 10

Thanks Wasin.W

I don't intentionally use two connections. there are apparently other users grabbing this connection key. I couldn't identify who is taking this connection. Under these circumstances and because this system is critical for my project functionality I assume that even if I will take 2 connection keys yet someone/or more then one may grab the key. I can't track every user that can grab this key.

This is the reason I plan to develop a code section to handle this error by closing the client and reconnecting, so I will be the latest connection and my application expiration key will be the valid (Last issued) one. others will drop session.

Thanks

Ori


icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
24.4k 53 17 14

Hello @okatz

Thank you for the explanation. However, I (and my colleagues) still do not fully understand the context and implementation detail of your application. Please clarify my assumption below:

  • You are using the RD Library - Python API (please confirm the version) to consume the real-time streaming data
  • You have a connection key (RDP/RTO credentials which are username/machine-id, password, and app key/client id)
  • the connection key is shared or pooled among your peer (please confirm)
  • If your application is the only one that uses the key to connect and consume data, it works fine
  • Somehow, other users can get your connection key and use it in their RD consumer application (is it the same application?)
  • Then the problem occurs on the first application in the session renewal process because the second application takes an authentication grant instead

About the "_on_ws_message " event callback, do you mean you want all WebSocket callback events messages from the library? If so, you can get the event callbacks from the following code:

stream = omm_stream.Definition(...).get_stream()

# Refresh - the first full image we get back from the server
stream.on_refresh(lambda event, stream  : display_event("Refresh", event))
# Update - as and when field values change, we receive updates from the server
stream.on_update(lambda event, stream : display_event("Update", event))
# Status - if data goes stale or item closes, we get a status message
stream.on_status(lambda event, stream : display_event("Status", event))
# Other errors
stream.on_error(lambda event, stream : display_event("Error", event))

# Send request to server and open stream
stream.open()
  • on_refresh: This callback is invoked upon receiving a refresh message. The refresh message (aka image) is the complete set of values that represents the item.
  • on_update: This callback is invoked upon receiving an update message.
  • on_status: This callback is invoked upon receiving a status message
  • on_error: This function is called when an error occurs

You can find an example from the TUT_3.1.01-OMMStream-MarketPrice-event.ipynb example code.


icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
3 4 6 10

Thanks wasin

I think I found a solution (can't share it - it's my company's intellectual property ). Will complete the technical programming and will ask again if there are problems yet.

My goal is to give a specific server the priority so it will be exclusively the owner of stream forever (until python process / thread is closed), if an other computer attempts using the key it must be declined. (you may take it as "feature request" to improve the Refinitiv Data software). I expect stream to work forever without any additional coding until the client explicitly close it.

There might be other occasions where session drops, It must t be possible to recover them automatically so stream will not miss bid/ask data. Stream must be connected consistently for ever to the privileged server.

F.Y.I. I tried your solution above - it is not triggered on "[OMMSTREAMING_PRICING_0.21] received a closing message (cxn state=StreamCxnState.Connected,...." nor it does on "[Error 400 - invalid_grant] empty error description ". so it is not the solution to this issue.

Thanks

Ori

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
24.4k 53 17 14

Hello @okatz

I am glad to hear that you have found the solution.

Please be informed that the library authenticates users to the RDP platform with a single sign-on policy. The later session always invalidates the refresh token for all previous sign-on sessions.

About the on_status or on_error callbacks methods that are not triggered by some WebSocket/HTTP events, I can replicate the same kind of issues with the library version 1.0.0b20. Can you share the library and Python versions in your environment? I will contact the library team to confirm this behavior.

Best regards,
Wasin W.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

refinitiv_data-1.0.0b10.dist-info

METADATA

Metadata-Version: 2.1

Name: refinitiv-data

Version: 1.0.0b10

Summary: Python package for retrieving data.

Home-page: https://developers.refinitiv.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-library-for-python

Author: REFINITIV

Author-email:

License: Apache 2.0

Platform: UNKNOWN

Requires-Python: >3.6

Description-Content-Type: text/markdown

License-File: LICENSE.md

Requires-Dist: appdirs (>=1.4.3)

Requires-Dist: eventemitter (>=0.2.0)

Requires-Dist: pymitter

Requires-Dist: httpx (<0.22,>=0.18)

Requires-Dist: httpcore (<=0.14.5)

Requires-Dist: mysql-connector-python

Requires-Dist: numpy (>=1.11.0)

Requires-Dist: pandas (<1.4.0,>=1.3.5)

Requires-Dist: python-configuration (>=0.8.2)

Requires-Dist: python-dateutil

Requires-Dist: requests

Requires-Dist: scipy

Requires-Dist: six

Requires-Dist: tenacity (<8.1,>=8.0)

Requires-Dist: watchdog (>=0.10.2)

Requires-Dist: websocket-client (!=1.2.2,>=0.58.0)

Hello @okatz

I have contacted the RD Library team. The team is investigating the callback behaviors. I will give you more updates as soon as I get information from them.

Show more comments

Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.