how to bind async handle on web socket python?

Hi refinitiv community

I am using Rfinitiv connector client with asyncio and websockets (python) packages

Trying to bind the update handler to async function so I can await other functions on processing.

the refinitiv data package use synchronic handle_update function so I can not add the async one.

existing call-back allowed:

def handle_update(self, streaming_prices, instrument_name, fields):

desired call-back missing:

async def handle_update(self, streaming_prices, instrument_name, fields):

then some time later:

        # refinitiv connection
#os.environ["RD_LIB_CONFIG_PATH"] = "/home/ubuntu/enigmajupiter/pyScripts/refinitivdataclient/Configuration"
rd.load_config('E:/work/enigmajupiter/KFCMR/Configuration/refinitiv-data.config.json')
#rd.load_config('/home/ubuntu/enigmajupiter/pyScripts/refinitivdataclient/Configuration/refinitiv-data.config.json')
# self.refinitivSession=
rd.open_session()
#self.theWebSocket = rd.content.pricing.Definition(['BTC='], fields=['BID', 'ASK', 'QUOTIM', 'QUOTIM_MS', 'ORDER_SIDE', 'ORDER_PRC', 'ORDER_SIZE', 'BID_SIZE', 'ASK_SIZE']).get_stream()
self.theWebSocket = rd.content.pricing.Definition(['BTCc1','BTCc2','BTCc3','BTCc4','BTCc5','BTCc6','BTCc7','BTCc8','BTCc9','QBTCu.TO','QETHu.TO'], fields=['BID', 'ASK', 'QUOTIM', 'QUOTIM_MS', 'ORDER_SIDE', 'ORDER_PRC', 'ORDER_SIZE', 'BID_SIZE', 'ASK_SIZE']).get_stream()
self.theWebSocket.on_refresh(self.handle_refresh)
self.theWebSocket.on_update(self.handle_update)
self.theWebSocket.on_status(self.handle_status)
self.theWebSocket.open()


how to bind async handle?

thanks

Ori

Best Answer

  • Jirapongse
    Jirapongse ✭✭✭✭✭
    Answer ✓

    @okatz

    I checked the code and found that the RD library doesn't provide methods to bind async functions to the WebSocket. This could be an enhancement request.

    However, I found that you can call async functions from the synchronous callback by calling the asyncio.run_coroutine_threadsafe method.

    The code looks like this:

    import asyncio
    loop = asyncio.get_event_loop()


    rd.open_session()

    ...

    async def display_updated_fields_async(pricing_stream, instrument_name, fields):
        current_time = datetime.datetime.now().time()
        print(current_time, "- Update received for", instrument_name, ":", fields)
        
    def display_updated_fields(pricing_stream, instrument_name, fields):
        print("display_updated_fields","- Update received for", instrument_name, ":", fields)       
        asyncio.run_coroutine_threadsafe(display_updated_fields_async(pricing_stream, instrument_name, fields), loop)

    ...

    stream.on_update(display_updated_fields)

    ...

    stream.open()
    loop.run_forever()

    You need to check the Python document that the messages in the event loop will be processed in order.

Answers