async function error

Hi team,

I am going to write a for loop which will generate several thousands get_data function by investor id for a list of stocks. If I am use sync method, it takes extremely long time to finish the task. Hence, I try to using Async approach to do so. Wonder it is possible? I try to implement the async approach as below code but I encountered run time error. Wonder what is the correct way to do so? Thanks.


import asyncio


#shareheld by each investor

async def get_HVH(id):

df, err = ek.get_data(

instruments = Custom_PeersRICs,

fields = ['TR.SharesHeldValue'],

parameters = {'legacyInvestorId':str(id),'Scale':Scaling.value}

)

return df.iloc[:,1]


async def main():

tbl=pd.DataFrame(columns = Custom_PeersRICs)

for i in range(Output_Number.value):

id=output['investorid'][i]

hvh=loop.create_task(get_HVH(id))

print(i)

tbl_length=len(tbl)

tbl.loc[tbl_length]=list(hvh)

await asyncio


if __name__ == '__main__':

loop = asyncio.get_event_loop()

loop.run_until_complete(main())

loop.close()


Error msg

---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-21-d4e8b324361a> in <module>
21 22 loop = asyncio.get_event_loop()
---> 23 loop.run_until_complete(main())
24 loop.close()

/opt/conda/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
93 raise RuntimeError(
94 'Event loop stopped before Future completed.')
---> 95 return f.result()
96 finally:
97 events._set_running_loop(old_running_loop)

/opt/conda/lib/python3.7/asyncio/futures.py in result(self)
176 self.__log_traceback = False
177 if self._exception is not None:
--> 178 raise self._exception
179 return self._result
180
/opt/conda/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
247 # We use the `send` method directly, because coroutines
248 # don't have `__iter__` and `__next__` methods.
--> 249 result = coro.send(None)
250 else:
251 result = coro.throw(exc)

<ipython-input-21-d4e8b324361a> in main()
17 print(i)
18 tbl_length=len(tbl)
---> 19 tbl.loc[tbl_length]=list(hvh)
20 await asyncio
21
/opt/conda/lib/python3.7/asyncio/futures.py in __await__(self)
260 yield self # This tells Task to wait for completion.
261 if not self.done():
--> 262 raise RuntimeError("await wasn't used with future")
263 return self.result() # May raise too.
264
RuntimeError: await wasn't used with future

Welcome!

It looks like you're new here. Sign in or register to get started.

Best Answer

  • Jirapongse
    Jirapongse ✭✭✭✭✭
    Answer ✓

    @sunny.to

    I am new to the asyncio. However, I assume that you would like to send requests concurrently. I found how to turn Python sync functions to async after searching via google.

    The code that I tested is:


    import asyncio
    import eikon as ek
    import pandas as pd
    import logging
    import sys,os,os.path
    import time
    import threading
    import functools

    def force_async(fn):
        '''
        turns a sync function to async function using threads
        '''
        from concurrent.futures import ThreadPoolExecutor
        import asyncio
        pool = ThreadPoolExecutor()
        
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            global sem
            sem.acquire()
            future = pool.submit(fn, *args, **kwargs)        
            return asyncio.wrap_future(future)  # make it awaitable

        return wrapper

    @force_async
    def get_HVH(id):
        
        global sem
        df, err = ek.get_data(instruments = id, 
                              fields = ['TR.SharesHeldValue'],
                              parameters = {})
        sem.release()
        print(df)   
        return df.iloc[:,1]

    async def main():   
        chain, err = ek.get_data("0#.FTSE",["TR.RIC"])
        print(chain)
        output = chain['Instrument']
        hvh = []   
        for i in range(len(output)):
            id=output[i]       
            hvh.append(get_HVH(id))
            print(i)        

        
        await asyncio.wait(hvh, return_when=asyncio.ALL_COMPLETED)  
      
        print("asyncio.as_completed")

    if __name__ == '__main__':
        start = time.time()
        sem = threading.Semaphore(20)
        ek.set_app_key("<app key>")
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
        print("Loop Close");
        loop.close()
        end = time.time()
        print(end - start)


    The above code subscribes to a chain RIC (0#.FTSE) and then concurrently subscribe to items in that chain to retrieve TR.SharesHeldValue. The number of active requests at a time is controlled by Semaphore. The code sets a value of the semaphore to 20 so it will have 20 active requests at a time.

    After running, it takes around 51 seconds to get TR.SharesHeldValue for 101 items. However, if I call the get_data method in sequence, it will take around 300 seconds to get TR.SharesHeldValue for 101 items. Concurrent requests are much faster than sequential requests.

Answers

Welcome!

It looks like you're new here. Sign in or register to get started.

Welcome!

It looks like you're new here. Sign in or register to get started.