For a deeper look into our Elektron API, look into:

Overview |  Quickstart |  Documentation |  Downloads |  Tutorials |  Articles

question

Upvotes
Accepted
3 4 6 10

how to bind async handle on web socket python?

Hi refinitiv community

I am using Rfinitiv connector client with asyncio and websockets (python) packages

Trying to bind the update handler to async function so I can await other functions on processing.

the refinitiv data package use synchronic handle_update function so I can not add the async one.

existing call-back allowed:

def handle_update(self, streaming_prices, instrument_name, fields):

desired call-back missing:

async def handle_update(self, streaming_prices, instrument_name, fields):

then some time later:

        # refinitiv connection
        #os.environ["RD_LIB_CONFIG_PATH"] = "/home/ubuntu/enigmajupiter/pyScripts/refinitivdataclient/Configuration"
        rd.load_config('E:/work/enigmajupiter/KFCMR/Configuration/refinitiv-data.config.json')
        #rd.load_config('/home/ubuntu/enigmajupiter/pyScripts/refinitivdataclient/Configuration/refinitiv-data.config.json')
        # self.refinitivSession=
        rd.open_session()
        #self.theWebSocket = rd.content.pricing.Definition(['BTC='], fields=['BID', 'ASK', 'QUOTIM', 'QUOTIM_MS', 'ORDER_SIDE', 'ORDER_PRC', 'ORDER_SIZE', 'BID_SIZE', 'ASK_SIZE']).get_stream()
        self.theWebSocket = rd.content.pricing.Definition(['BTCc1','BTCc2','BTCc3','BTCc4','BTCc5','BTCc6','BTCc7','BTCc8','BTCc9','QBTCu.TO','QETHu.TO'], fields=['BID', 'ASK', 'QUOTIM', 'QUOTIM_MS', 'ORDER_SIDE', 'ORDER_PRC', 'ORDER_SIZE', 'BID_SIZE', 'ASK_SIZE']).get_stream()
        self.theWebSocket.on_refresh(self.handle_refresh)
        self.theWebSocket.on_update(self.handle_update)
        self.theWebSocket.on_status(self.handle_status)
        self.theWebSocket.open()


how to bind async handle?

thanks

Ori

pythonrdp-apiupdate-messageasynchronous
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

1 Answer

· Write an Answer
Upvote
Accepted
79.2k 251 52 74

@okatz

I checked the code and found that the RD library doesn't provide methods to bind async functions to the WebSocket. This could be an enhancement request.

However, I found that you can call async functions from the synchronous callback by calling the asyncio.run_coroutine_threadsafe method.

The code looks like this:

import asyncio
loop = asyncio.get_event_loop()


rd.open_session()

...

async def display_updated_fields_async(pricing_stream, instrument_name, fields):
    current_time = datetime.datetime.now().time()
    print(current_time, "- Update received for", instrument_name, ":", fields)
    
def display_updated_fields(pricing_stream, instrument_name, fields):
    print("display_updated_fields","- Update received for", instrument_name, ":", fields)       
    asyncio.run_coroutine_threadsafe(display_updated_fields_async(pricing_stream, instrument_name, fields), loop)

...

stream.on_update(display_updated_fields)

...

stream.open()
loop.run_forever()

You need to check the Python document that the messages in the event loop will be processed in order.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

thanks Jirapongse
I am testing for this solution. do you know if this will work with tasks (not with threads), I am working with tasks so asyncio will manage for me the scheduling automatically

"pricing_stream" or "streaming_prices" ? is it positional or by argument name?

Ori

Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.