how to bind async handle on web socket python?

Options

Hi refinitiv community

I am using Rfinitiv connector client with asyncio and websockets (python) packages

Trying to bind the update handler to async function so I can await other functions on processing.

the refinitiv data package use synchronic handle_update function so I can not add the async one.

existing call-back allowed:

def handle_update(self, streaming_prices, instrument_name, fields):

desired call-back missing:

async def handle_update(self, streaming_prices, instrument_name, fields):

then some time later:

        # refinitiv connection
#os.environ["RD_LIB_CONFIG_PATH"] = "/home/ubuntu/enigmajupiter/pyScripts/refinitivdataclient/Configuration"
rd.load_config('E:/work/enigmajupiter/KFCMR/Configuration/refinitiv-data.config.json')
#rd.load_config('/home/ubuntu/enigmajupiter/pyScripts/refinitivdataclient/Configuration/refinitiv-data.config.json')
# self.refinitivSession=
rd.open_session()
#self.theWebSocket = rd.content.pricing.Definition(['BTC='], fields=['BID', 'ASK', 'QUOTIM', 'QUOTIM_MS', 'ORDER_SIDE', 'ORDER_PRC', 'ORDER_SIZE', 'BID_SIZE', 'ASK_SIZE']).get_stream()
self.theWebSocket = rd.content.pricing.Definition(['BTCc1','BTCc2','BTCc3','BTCc4','BTCc5','BTCc6','BTCc7','BTCc8','BTCc9','QBTCu.TO','QETHu.TO'], fields=['BID', 'ASK', 'QUOTIM', 'QUOTIM_MS', 'ORDER_SIDE', 'ORDER_PRC', 'ORDER_SIZE', 'BID_SIZE', 'ASK_SIZE']).get_stream()
self.theWebSocket.on_refresh(self.handle_refresh)
self.theWebSocket.on_update(self.handle_update)
self.theWebSocket.on_status(self.handle_status)
self.theWebSocket.open()


how to bind async handle?

thanks

Ori

Best Answer

  • Jirapongse
    Jirapongse ✭✭✭✭✭
    Answer ✓

    @okatz

    I checked the code and found that the RD library doesn't provide methods to bind async functions to the WebSocket. This could be an enhancement request.

    However, I found that you can call async functions from the synchronous callback by calling the asyncio.run_coroutine_threadsafe method.

    The code looks like this:

    import asyncio
    loop = asyncio.get_event_loop()


    rd.open_session()

    ...

    async def display_updated_fields_async(pricing_stream, instrument_name, fields):
        current_time = datetime.datetime.now().time()
        print(current_time, "- Update received for", instrument_name, ":", fields)
        
    def display_updated_fields(pricing_stream, instrument_name, fields):
        print("display_updated_fields","- Update received for", instrument_name, ":", fields)       
        asyncio.run_coroutine_threadsafe(display_updated_fields_async(pricing_stream, instrument_name, fields), loop)

    ...

    stream.on_update(display_updated_fields)

    ...

    stream.open()
    loop.run_forever()

    You need to check the Python document that the messages in the event loop will be processed in order.

Answers

Welcome!

It looks like you're new here. Sign in or register to get started.

Welcome!

It looks like you're new here. Sign in or register to get started.