how to bind async handle on web socket python?

Hi refinitiv community
I am using Rfinitiv connector client with asyncio and websockets (python) packages
Trying to bind the update handler to async function so I can await other functions on processing.
the refinitiv data package use synchronic handle_update function so I can not add the async one.
existing call-back allowed:
def handle_update(self, streaming_prices, instrument_name, fields):
desired call-back missing:
async def handle_update(self, streaming_prices, instrument_name, fields):
then some time later:
# refinitiv connection
#os.environ["RD_LIB_CONFIG_PATH"] = "/home/ubuntu/enigmajupiter/pyScripts/refinitivdataclient/Configuration"
rd.load_config('E:/work/enigmajupiter/KFCMR/Configuration/refinitiv-data.config.json')
#rd.load_config('/home/ubuntu/enigmajupiter/pyScripts/refinitivdataclient/Configuration/refinitiv-data.config.json')
# self.refinitivSession=
rd.open_session()
#self.theWebSocket = rd.content.pricing.Definition(['BTC='], fields=['BID', 'ASK', 'QUOTIM', 'QUOTIM_MS', 'ORDER_SIDE', 'ORDER_PRC', 'ORDER_SIZE', 'BID_SIZE', 'ASK_SIZE']).get_stream()
self.theWebSocket = rd.content.pricing.Definition(['BTCc1','BTCc2','BTCc3','BTCc4','BTCc5','BTCc6','BTCc7','BTCc8','BTCc9','QBTCu.TO','QETHu.TO'], fields=['BID', 'ASK', 'QUOTIM', 'QUOTIM_MS', 'ORDER_SIDE', 'ORDER_PRC', 'ORDER_SIZE', 'BID_SIZE', 'ASK_SIZE']).get_stream()
self.theWebSocket.on_refresh(self.handle_refresh)
self.theWebSocket.on_update(self.handle_update)
self.theWebSocket.on_status(self.handle_status)
self.theWebSocket.open()
how to bind async handle?
thanks
Ori
Best Answer
-
I checked the code and found that the RD library doesn't provide methods to bind async functions to the WebSocket. This could be an enhancement request.
However, I found that you can call async functions from the synchronous callback by calling the asyncio.run_coroutine_threadsafe method.
The code looks like this:
import asyncio
loop = asyncio.get_event_loop()
rd.open_session()
...
async def display_updated_fields_async(pricing_stream, instrument_name, fields):
current_time = datetime.datetime.now().time()
print(current_time, "- Update received for", instrument_name, ":", fields)
def display_updated_fields(pricing_stream, instrument_name, fields):
print("display_updated_fields","- Update received for", instrument_name, ":", fields)
asyncio.run_coroutine_threadsafe(display_updated_fields_async(pricing_stream, instrument_name, fields), loop)
...
stream.on_update(display_updated_fields)
...
stream.open()
loop.run_forever()You need to check the Python document that the messages in the event loop will be processed in order.
0
Answers
-
thanks Jirapongse
I am testing for this solution. do you know if this will work with tasks (not with threads), I am working with tasks so asyncio will manage for me the scheduling automatically"pricing_stream" or "streaming_prices" ? is it positional or by argument name?
Ori
0
Categories
- All Categories
- 3 Polls
- 6 AHS
- 36 Alpha
- 166 App Studio
- 6 Block Chain
- 4 Bot Platform
- 18 Connected Risk APIs
- 47 Data Fusion
- 34 Data Model Discovery
- 684 Datastream
- 1.4K DSS
- 615 Eikon COM
- 5.2K Eikon Data APIs
- 10 Electronic Trading
- Generic FIX
- 7 Local Bank Node API
- 3 Trading API
- 2.9K Elektron
- 1.4K EMA
- 249 ETA
- 554 WebSocket API
- 37 FX Venues
- 14 FX Market Data
- 1 FX Post Trade
- 1 FX Trading - Matching
- 12 FX Trading – RFQ Maker
- 5 Intelligent Tagging
- 2 Legal One
- 23 Messenger Bot
- 3 Messenger Side by Side
- 9 ONESOURCE
- 7 Indirect Tax
- 60 Open Calais
- 275 Open PermID
- 44 Entity Search
- 2 Org ID
- 1 PAM
- PAM - Logging
- 6 Product Insight
- Project Tracking
- ProView
- ProView Internal
- 22 RDMS
- 1.9K Refinitiv Data Platform
- 643 Refinitiv Data Platform Libraries
- 4 LSEG Due Diligence
- LSEG Due Diligence Portal API
- 4 Refinitiv Due Dilligence Centre
- Rose's Space
- 1.2K Screening
- 18 Qual-ID API
- 13 Screening Deployed
- 23 Screening Online
- 12 World-Check Customer Risk Screener
- 1K World-Check One
- 46 World-Check One Zero Footprint
- 45 Side by Side Integration API
- 2 Test Space
- 3 Thomson One Smart
- 10 TR Knowledge Graph
- 151 Transactions
- 143 REDI API
- 1.8K TREP APIs
- 4 CAT
- 26 DACS Station
- 121 Open DACS
- 1.1K RFA
- 104 UPA
- 192 TREP Infrastructure
- 228 TRKD
- 915 TRTH
- 5 Velocity Analytics
- 9 Wealth Management Web Services
- 90 Workspace SDK
- 11 Element Framework
- 5 Grid
- 18 World-Check Data File
- 1 Yield Book Analytics
- 46 中文论坛