Upgrade from Eikon -> Workspace. Learn about programming differences.

For a deeper look into our Eikon Data API, look into:

Overview |  Quickstart |  Documentation |  Downloads |  Tutorials |  Articles

question

Upvotes
Accepted
1 0 0 1

async function error

Hi team,

I am going to write a for loop which will generate several thousands get_data function by investor id for a list of stocks. If I am use sync method, it takes extremely long time to finish the task. Hence, I try to using Async approach to do so. Wonder it is possible? I try to implement the async approach as below code but I encountered run time error. Wonder what is the correct way to do so? Thanks.


import asyncio


#shareheld by each investor

async def get_HVH(id):

df, err = ek.get_data(

instruments = Custom_PeersRICs,

fields = ['TR.SharesHeldValue'],

parameters = {'legacyInvestorId':str(id),'Scale':Scaling.value}

)

return df.iloc[:,1]


async def main():

tbl=pd.DataFrame(columns = Custom_PeersRICs)

for i in range(Output_Number.value):

id=output['investorid'][i]

hvh=loop.create_task(get_HVH(id))

print(i)

tbl_length=len(tbl)

tbl.loc[tbl_length]=list(hvh)

await asyncio


if __name__ == '__main__':

loop = asyncio.get_event_loop()

loop.run_until_complete(main())

loop.close()


Error msg

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-21-d4e8b324361a> in <module>
     21      22 loop = asyncio.get_event_loop()
---> 23 loop.run_until_complete(main())
     24 loop.close()

/opt/conda/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
     93                 raise RuntimeError(
     94                     'Event loop stopped before Future completed.')
---> 95             return f.result()
     96         finally:
     97             events._set_running_loop(old_running_loop)

/opt/conda/lib/python3.7/asyncio/futures.py in result(self)
    176         self.__log_traceback = False
    177         if self._exception is not None:
--> 178             raise self._exception
    179         return self._result
    180 
/opt/conda/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
    247                 # We use the `send` method directly, because coroutines
    248                 # don't have `__iter__` and `__next__` methods.
--> 249                 result = coro.send(None)
    250             else:
    251                 result = coro.throw(exc)

<ipython-input-21-d4e8b324361a> in main()
     17         print(i)
     18         tbl_length=len(tbl)
---> 19         tbl.loc[tbl_length]=list(hvh)
     20         await asyncio
     21 
/opt/conda/lib/python3.7/asyncio/futures.py in __await__(self)
    260             yield self  # This tells Task to wait for completion.
    261         if not self.done():
--> 262             raise RuntimeError("await wasn't used with future")
    263         return self.result()  # May raise too.
    264 
RuntimeError: await wasn't used with future
eikoneikon-data-apipythonrefinitiv-dataplatform-eikonworkspaceworkspace-data-apiasynchronous
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Made the post public. Private posts are only used when identifying client information is included.

Hi Gurpreet, how to change the questions to public?

Hello @sunny.to,

Thank you for your participation in the forum.

Is the reply below satisfactory in resolving your query? If yes please click the 'Accept' text next to the reply. This will guide all community members who have a similar question. Otherwise please post again offering further insight into your question.

Thanks,

-AHS

Please be informed that a reply has been verified as correct in answering the question, and has been marked as such.

Thanks,


AHS

Upvotes
Accepted
78.8k 250 52 74

@sunny.to

I am new to the asyncio. However, I assume that you would like to send requests concurrently. I found how to turn Python sync functions to async after searching via google.

The code that I tested is:


import asyncio
import eikon as ek
import pandas as pd
import logging
import sys,os,os.path
import time
import threading
import functools

def force_async(fn):
    '''
    turns a sync function to async function using threads
    '''
    from concurrent.futures import ThreadPoolExecutor
    import asyncio
    pool = ThreadPoolExecutor()
    
    @functools.wraps(fn)
    def wrapper(*args, **kwargs):
        global sem
        sem.acquire()
        future = pool.submit(fn, *args, **kwargs)        
        return asyncio.wrap_future(future)  # make it awaitable

    return wrapper

@force_async
def get_HVH(id):
    
    global sem
    df, err = ek.get_data(instruments = id, 
                          fields = ['TR.SharesHeldValue'],
                          parameters = {})
    sem.release()
    print(df)   
    return df.iloc[:,1]

async def main():   
    chain, err = ek.get_data("0#.FTSE",["TR.RIC"])
    print(chain)
    output = chain['Instrument']
    hvh = []   
    for i in range(len(output)):
        id=output[i]       
        hvh.append(get_HVH(id))
        print(i)        

    
    await asyncio.wait(hvh, return_when=asyncio.ALL_COMPLETED)  
  
    print("asyncio.as_completed")

if __name__ == '__main__':
    start = time.time()
    sem = threading.Semaphore(20)
    ek.set_app_key("<app key>")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print("Loop Close");
    loop.close()
    end = time.time()
    print(end - start)


The above code subscribes to a chain RIC (0#.FTSE) and then concurrently subscribe to items in that chain to retrieve TR.SharesHeldValue. The number of active requests at a time is controlled by Semaphore. The code sets a value of the semaphore to 20 so it will have 20 active requests at a time.

After running, it takes around 51 seconds to get TR.SharesHeldValue for 101 items. However, if I call the get_data method in sequence, it will take around 300 seconds to get TR.SharesHeldValue for 101 items. Concurrent requests are much faster than sequential requests.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Thanks @jirapongse.phuriphanvichai

This is very useful.


Hi @sunny.to

Please also make sure that the number of request and amount of data do not exceed the API limitation.

Upvotes
1 0 0 1

Thank you @ jirapongse.phuriphanvichai. It works in Jupyter notebook but fail to run in codebook with below error. do you know why?

RefiImportError: module 'os' is restricted. 
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

@sunny.to

You can remove 'import sys,os,os.path' from the code.

It works when removed 'import sys,os,os.path' from the code.

thank you very much

Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.