For a deeper look into our Eikon Data API, look into:

Overview |  Quickstart |  Documentation |  Downloads |  Tutorials |  Articles

question

Upvotes
Accepted
1 0 0 1

async function error

Hi team,

I am going to write a for loop which will generate several thousands get_data function by investor id for a list of stocks. If I am use sync method, it takes extremely long time to finish the task. Hence, I try to using Async approach to do so. Wonder it is possible? I try to implement the async approach as below code but I encountered run time error. Wonder what is the correct way to do so? Thanks.


import asyncio


#shareheld by each investor

async def get_HVH(id):

df, err = ek.get_data(

instruments = Custom_PeersRICs,

fields = ['TR.SharesHeldValue'],

parameters = {'legacyInvestorId':str(id),'Scale':Scaling.value}

)

return df.iloc[:,1]


async def main():

tbl=pd.DataFrame(columns = Custom_PeersRICs)

for i in range(Output_Number.value):

id=output['investorid'][i]

hvh=loop.create_task(get_HVH(id))

print(i)

tbl_length=len(tbl)

tbl.loc[tbl_length]=list(hvh)

await asyncio


if __name__ == '__main__':

loop = asyncio.get_event_loop()

loop.run_until_complete(main())

loop.close()


Error msg

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-21-d4e8b324361a> in <module>
     21      22 loop = asyncio.get_event_loop()
---> 23 loop.run_until_complete(main())
     24 loop.close()

/opt/conda/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
     93                 raise RuntimeError(
     94                     'Event loop stopped before Future completed.')
---> 95             return f.result()
     96         finally:
     97             events._set_running_loop(old_running_loop)

/opt/conda/lib/python3.7/asyncio/futures.py in result(self)
    176         self.__log_traceback = False
    177         if self._exception is not None:
--> 178             raise self._exception
    179         return self._result
    180 
/opt/conda/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
    247                 # We use the `send` method directly, because coroutines
    248                 # don't have `__iter__` and `__next__` methods.
--> 249                 result = coro.send(None)
    250             else:
    251                 result = coro.throw(exc)

<ipython-input-21-d4e8b324361a> in main()
     17         print(i)
     18         tbl_length=len(tbl)
---> 19         tbl.loc[tbl_length]=list(hvh)
     20         await asyncio
     21 
/opt/conda/lib/python3.7/asyncio/futures.py in __await__(self)
    260             yield self  # This tells Task to wait for completion.
    261         if not self.done():
--> 262             raise RuntimeError("await wasn't used with future")
    263         return self.result()  # May raise too.
    264 
RuntimeError: await wasn't used with future
eikoneikon-data-apipythonworkspaceworkspace-data-apirefinitiv-dataplatform-eikonasynchronous
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
Accepted
47.2k 109 44 60

@sunny.to

I am new to the asyncio. However, I assume that you would like to send requests concurrently. I found how to turn Python sync functions to async after searching via google.

The code that I tested is:


import asyncio
import eikon as ek
import pandas as pd
import logging
import sys,os,os.path
import time
import threading
import functools

def force_async(fn):
    '''
    turns a sync function to async function using threads
    '''
    from concurrent.futures import ThreadPoolExecutor
    import asyncio
    pool = ThreadPoolExecutor()
    
    @functools.wraps(fn)
    def wrapper(*args, **kwargs):
        global sem
        sem.acquire()
        future = pool.submit(fn, *args, **kwargs)        
        return asyncio.wrap_future(future)  # make it awaitable

    return wrapper

@force_async
def get_HVH(id):
    
    global sem
    df, err = ek.get_data(instruments = id, 
                          fields = ['TR.SharesHeldValue'],
                          parameters = {})
    sem.release()
    print(df)   
    return df.iloc[:,1]

async def main():   
    chain, err = ek.get_data("0#.FTSE",["TR.RIC"])
    print(chain)
    output = chain['Instrument']
    hvh = []   
    for i in range(len(output)):
        id=output[i]       
        hvh.append(get_HVH(id))
        print(i)        

    
    await asyncio.wait(hvh, return_when=asyncio.ALL_COMPLETED)  
  
    print("asyncio.as_completed")

if __name__ == '__main__':
    start = time.time()
    sem = threading.Semaphore(20)
    ek.set_app_key("<app key>")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print("Loop Close");
    loop.close()
    end = time.time()
    print(end - start)


The above code subscribes to a chain RIC (0#.FTSE) and then concurrently subscribe to items in that chain to retrieve TR.SharesHeldValue. The number of active requests at a time is controlled by Semaphore. The code sets a value of the semaphore to 20 so it will have 20 active requests at a time.

After running, it takes around 51 seconds to get TR.SharesHeldValue for 101 items. However, if I call the get_data method in sequence, it will take around 300 seconds to get TR.SharesHeldValue for 101 items. Concurrent requests are much faster than sequential requests.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Thanks @jirapongse.phuriphanvichai

This is very useful.


Hi @sunny.to

Please also make sure that the number of request and amount of data do not exceed the API limitation.

Upvotes
1 0 0 1

Thank you @ jirapongse.phuriphanvichai. It works in Jupyter notebook but fail to run in codebook with below error. do you know why?

RefiImportError: module 'os' is restricted. 
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

@sunny.to

You can remove 'import sys,os,os.path' from the code.

It works when removed 'import sys,os,os.path' from the code.

thank you very much