async function error

Hi team,
I am going to write a for loop which will generate several thousands get_data function by investor id for a list of stocks. If I am use sync method, it takes extremely long time to finish the task. Hence, I try to using Async approach to do so. Wonder it is possible? I try to implement the async approach as below code but I encountered run time error. Wonder what is the correct way to do so? Thanks.
import asyncio
#shareheld by each investor
async def get_HVH(id):
df, err = ek.get_data(
instruments = Custom_PeersRICs,
fields = ['TR.SharesHeldValue'],
parameters = {'legacyInvestorId':str(id),'Scale':Scaling.value}
)
return df.iloc[:,1]
async def main():
tbl=pd.DataFrame(columns = Custom_PeersRICs)
for i in range(Output_Number.value):
id=output['investorid'][i]
hvh=loop.create_task(get_HVH(id))
print(i)
tbl_length=len(tbl)
tbl.loc[tbl_length]=list(hvh)
await asyncio
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Error msg
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-21-d4e8b324361a> in <module>
21 22 loop = asyncio.get_event_loop()
---> 23 loop.run_until_complete(main())
24 loop.close()
/opt/conda/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
93 raise RuntimeError(
94 'Event loop stopped before Future completed.')
---> 95 return f.result()
96 finally:
97 events._set_running_loop(old_running_loop)
/opt/conda/lib/python3.7/asyncio/futures.py in result(self)
176 self.__log_traceback = False
177 if self._exception is not None:
--> 178 raise self._exception
179 return self._result
180
/opt/conda/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
247 # We use the `send` method directly, because coroutines
248 # don't have `__iter__` and `__next__` methods.
--> 249 result = coro.send(None)
250 else:
251 result = coro.throw(exc)
<ipython-input-21-d4e8b324361a> in main()
17 print(i)
18 tbl_length=len(tbl)
---> 19 tbl.loc[tbl_length]=list(hvh)
20 await asyncio
21
/opt/conda/lib/python3.7/asyncio/futures.py in __await__(self)
260 yield self # This tells Task to wait for completion.
261 if not self.done():
--> 262 raise RuntimeError("await wasn't used with future")
263 return self.result() # May raise too.
264
RuntimeError: await wasn't used with future
Best Answer
-
I am new to the asyncio. However, I assume that you would like to send requests concurrently. I found how to turn Python sync functions to async after searching via google.
The code that I tested is:
import asyncio
import eikon as ek
import pandas as pd
import logging
import sys,os,os.path
import time
import threading
import functools
def force_async(fn):
'''
turns a sync function to async function using threads
'''
from concurrent.futures import ThreadPoolExecutor
import asyncio
pool = ThreadPoolExecutor()
@functools.wraps(fn)
def wrapper(*args, **kwargs):
global sem
sem.acquire()
future = pool.submit(fn, *args, **kwargs)
return asyncio.wrap_future(future) # make it awaitable
return wrapper
@force_async
def get_HVH(id):
global sem
df, err = ek.get_data(instruments = id,
fields = ['TR.SharesHeldValue'],
parameters = {})
sem.release()
print(df)
return df.iloc[:,1]
async def main():
chain, err = ek.get_data("0#.FTSE",["TR.RIC"])
print(chain)
output = chain['Instrument']
hvh = []
for i in range(len(output)):
id=output[i]
hvh.append(get_HVH(id))
print(i)
await asyncio.wait(hvh, return_when=asyncio.ALL_COMPLETED)
print("asyncio.as_completed")
if __name__ == '__main__':
start = time.time()
sem = threading.Semaphore(20)
ek.set_app_key("<app key>")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Loop Close");
loop.close()
end = time.time()
print(end - start)The above code subscribes to a chain RIC (0#.FTSE) and then concurrently subscribe to items in that chain to retrieve TR.SharesHeldValue. The number of active requests at a time is controlled by Semaphore. The code sets a value of the semaphore to 20 so it will have 20 active requests at a time.
After running, it takes around 51 seconds to get TR.SharesHeldValue for 101 items. However, if I call the get_data method in sequence, it will take around 300 seconds to get TR.SharesHeldValue for 101 items. Concurrent requests are much faster than sequential requests.
0
Answers
-
Thanks @Jirapongse
This is very useful.
Hi @sunny.to
Please also make sure that the number of request and amount of data do not exceed the API limitation.
0 -
Hi Gurpreet, how to change the questions to public?
0 -
Thank you @ jirapongse.phuriphanvichai. It works in Jupyter notebook but fail to run in codebook with below error. do you know why?
RefiImportError: module 'os' is restricted.
0 -
You can remove 'import sys,os,os.path' from the code.
0 -
It works when removed 'import sys,os,os.path' from the code.
thank you very much
0
Categories
- All Categories
- 6 AHS
- 36 Alpha
- 166 App Studio
- 6 Block Chain
- 4 Bot Platform
- 18 Connected Risk APIs
- 47 Data Fusion
- 34 Data Model Discovery
- 684 Datastream
- 1.4K DSS
- 613 Eikon COM
- 5.2K Eikon Data APIs
- 10 Electronic Trading
- Generic FIX
- 7 Local Bank Node API
- 3 Trading API
- 2.9K Elektron
- 1.4K EMA
- 248 ETA
- 552 WebSocket API
- 37 FX Venues
- 14 FX Market Data
- 1 FX Post Trade
- 1 FX Trading - Matching
- 12 FX Trading – RFQ Maker
- 5 Intelligent Tagging
- 2 Legal One
- 23 Messenger Bot
- 3 Messenger Side by Side
- 9 ONESOURCE
- 7 Indirect Tax
- 60 Open Calais
- 275 Open PermID
- 44 Entity Search
- 2 Org ID
- 1 PAM
- PAM - Logging
- 6 Product Insight
- Project Tracking
- ProView
- ProView Internal
- 22 RDMS
- 1.9K Refinitiv Data Platform
- 632 Refinitiv Data Platform Libraries
- 4 LSEG Due Diligence
- LSEG Due Diligence Portal API
- 4 Refinitiv Due Dilligence Centre
- Rose's Space
- 1.2K Screening
- 18 Qual-ID API
- 13 Screening Deployed
- 23 Screening Online
- 12 World-Check Customer Risk Screener
- 1K World-Check One
- 46 World-Check One Zero Footprint
- 45 Side by Side Integration API
- 2 Test Space
- 3 Thomson One Smart
- 10 TR Knowledge Graph
- 151 Transactions
- 143 REDI API
- 1.8K TREP APIs
- 4 CAT
- 26 DACS Station
- 121 Open DACS
- 1.1K RFA
- 104 UPA
- 191 TREP Infrastructure
- 228 TRKD
- 915 TRTH
- 5 Velocity Analytics
- 9 Wealth Management Web Services
- 85 Workspace SDK
- 11 Element Framework
- 5 Grid
- 18 World-Check Data File
- 1 Yield Book Analytics
- 46 中文论坛