Running Concurrent Asynchronous Requests Efficiently using Refinitiv's Data Platform APIs

This question is kind of me seeking advice on this process based off of my knowledge.

What I am doing right now, is that I am trying to pull data asyncrhonously using Refinitiv (financial Platform from the London Stock Exchange Group)'s APIs. There are about 5 tasks I am trying to run asynchronously using Python's Async IO package, all calling one function with a differing value I am putting in the parameters each time, like so:

For context, each task is to run 503 times, and each Refinitiv API call is pulling a Pandas Dataframe with Data for the last 8 years (so 8 rows a request). Now, here are issues I am facing:

  1. I get the too many requests error when I try and gather tasks (asyncio.gather), and I get a lot of missing data. So much so, that I included a semaphore , for which I lowered the value down to 2, and I STILL get that error when running the code exerpt above. I get that it isn't about the total number of requests you have, but how you send them. In my case, I am sending too many at one time apparently.

  2. It takes really long for the data to come. To get full, accurate data, I have to rely on using a semaphore to run Refinitiv's API 503 times in 1 task, and that takes 10 minutes. I need to find a way to run this API call for multiple stocks quickly, where I am not sending TOO many requests, and I need to be able to somehow manage this in an efficient manner, so that I don't get the data in SUCH a long time. Because, I don't think, that per task (where there are 5 of them), it should be quick to get back my data

(Side note, I have some functions and data processing when I receive my data to clean it (remove any void, invalid data from the dataframes I am receiving)

This is some code illustration to show some of what I am talking about:

tasks = [
    main([3 Fields passed]),
    main([3 Fields passed]),
    main([3 Fields passed]),
    main([2 Fields passed]),
    main([[2 Fields passed]),
];

and inside that function,

# fetch_fundamental_data has the refinitiv API Call
tasks = [fetch_fundamental_data(ticker, semaphore) for index, ticker in enumerate(tickers)]
results = await asyncio.gather(*tasks)  # Await all tasks, handle exceptions properly

is the API call that basically retrieves the data

response = await content.fundamental_and_reference.Definition(
                universe=[f'{ric}'],
                fields=specific_fields,
                parameters={"SDate": f"{start_year + 1}-01-01", "EDate": f"{current_year}-12-31", "Frq": "Y"}
            ).get_data_async(closure='');

(Note that these fields are passed to the Refinitiv API. The main() function includes a call of the API, and then, some processing logic for the data I receive. This is data for different datapoints for the stocks of the SNP 500.)

If anyone has a strategy that I can employ based on my situation, feel free to let me know. (Side Question, is there a sub-event loop created for nested asynchronous operations occurring inside existing asynchronous tasks?)

Thanks!!!

Best Answer

  • @vishal.nanwani Thanks for your question - so our own examples use the following method:

    import refinitiv.data as rd
    from refinitiv.data.content import fundamental_and_reference
    import asyncio
    import datetime
    rd.open_session()
    tasks = asyncio.gather(
        fundamental_and_reference.Definition(universe=['VOD.L'],fields=['TR.F.IncAvailToComShr.date', 'TR.F.IncAvailToComShr'],parameters={"SDate": "2018-01-01", "EDate": "2023-12-31", "Frq": "CY"}).get_data_async(closure='Vodafone'),
        fundamental_and_reference.Definition(universe=['AAPL.O'],fields=['TR.F.IncAvailToComShr.date', 'TR.F.IncAvailToComShr'],parameters={"SDate": "2018-01-01", "EDate": "2023-12-31", "Frq": "CY"}).get_data_async(closure='Apple'),
        fundamental_and_reference.Definition(universe=['MSFT.O'],fields=['TR.F.IncAvailToComShr.date', 'TR.F.IncAvailToComShr'],parameters={"SDate": "2018-01-01", "EDate": "2023-12-31", "Frq": "CY"}).get_data_async(closure='Microsoft')
    )
    await tasks

    def display_reponse(response):
        print(response)
        print("\nReponse received for", response.closure)
        if response.is_success:
            display(response.data.df)
        else:
            print(response.http_status)

    vodafone, apple, microsoft = tasks.result()

    display_reponse(vodafone)
    display_reponse(apple)
    display_reponse(microsoft)

    1725963295137.png


    1725963332936.png

    I hope this can help.

Answers

  • Hi @jason.ramchandani01 ,


    Thank you for your reply! Another question I have , is that, when I run the requests in the syntax that you have stated , in a Jupyter Notebook, the requests do not timeout, however, if this is run from a Python file, I see that it is more prone to ReadTimeout errors. Is there a way that I can go about fixing this error, and know why this is occurring?

    Thanks

Welcome!

It looks like you're new here. Sign in or register to get started.

Welcome!

It looks like you're new here. Sign in or register to get started.