question

Upvotes
1 0 1 2

Running Concurrent Asynchronous Requests Efficiently using Refinitiv's Data Platform APIs

This question is kind of me seeking advice on this process based off of my knowledge.

What I am doing right now, is that I am trying to pull data asyncrhonously using Refinitiv (financial Platform from the London Stock Exchange Group)'s APIs. There are about 5 tasks I am trying to run asynchronously using Python's Async IO package, all calling one function with a differing value I am putting in the parameters each time, like so:

For context, each task is to run 503 times, and each Refinitiv API call is pulling a Pandas Dataframe with Data for the last 8 years (so 8 rows a request). Now, here are issues I am facing:

  1. I get the too many requests error when I try and gather tasks (asyncio.gather), and I get a lot of missing data. So much so, that I included a semaphore , for which I lowered the value down to 2, and I STILL get that error when running the code exerpt above. I get that it isn't about the total number of requests you have, but how you send them. In my case, I am sending too many at one time apparently.

  2. It takes really long for the data to come. To get full, accurate data, I have to rely on using a semaphore to run Refinitiv's API 503 times in 1 task, and that takes 10 minutes. I need to find a way to run this API call for multiple stocks quickly, where I am not sending TOO many requests, and I need to be able to somehow manage this in an efficient manner, so that I don't get the data in SUCH a long time. Because, I don't think, that per task (where there are 5 of them), it should be quick to get back my data

(Side note, I have some functions and data processing when I receive my data to clean it (remove any void, invalid data from the dataframes I am receiving)

This is some code illustration to show some of what I am talking about:

tasks = [
    main([3 Fields passed]),
    main([3 Fields passed]),
    main([3 Fields passed]),
    main([2 Fields passed]),
    main([[2 Fields passed]),
];

and inside that function,

# fetch_fundamental_data has the refinitiv API Call
tasks = [fetch_fundamental_data(ticker, semaphore) for index, ticker in enumerate(tickers)]
results = await asyncio.gather(*tasks)  # Await all tasks, handle exceptions properly

is the API call that basically retrieves the data

response = await content.fundamental_and_reference.Definition(
                universe=[f'{ric}'],
                fields=specific_fields,
                parameters={"SDate": f"{start_year + 1}-01-01", "EDate": f"{current_year}-12-31", "Frq": "Y"}
            ).get_data_async(closure='');

(Note that these fields are passed to the Refinitiv API. The main() function includes a call of the API, and then, some processing logic for the data I receive. This is data for different datapoints for the stocks of the SNP 500.)

If anyone has a strategy that I can employ based on my situation, feel free to let me know. (Side Question, is there a sub-event loop created for nested asynchronous operations occurring inside existing asynchronous tasks?)

Thanks!!!

python#technologyrefinitiv-data-librariesasynchronous
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

1 Answer

· Write an Answer
Upvotes
10.8k 21 6 9

@vishal.nanwani Thanks for your question - so our own examples use the following method:

import refinitiv.data as rd
from refinitiv.data.content import fundamental_and_reference
import asyncio
import datetime
rd.open_session()
tasks = asyncio.gather(
    fundamental_and_reference.Definition(universe=['VOD.L'],fields=['TR.F.IncAvailToComShr.date', 'TR.F.IncAvailToComShr'],parameters={"SDate": "2018-01-01", "EDate": "2023-12-31", "Frq": "CY"}).get_data_async(closure='Vodafone'),
    fundamental_and_reference.Definition(universe=['AAPL.O'],fields=['TR.F.IncAvailToComShr.date', 'TR.F.IncAvailToComShr'],parameters={"SDate": "2018-01-01", "EDate": "2023-12-31", "Frq": "CY"}).get_data_async(closure='Apple'),
    fundamental_and_reference.Definition(universe=['MSFT.O'],fields=['TR.F.IncAvailToComShr.date', 'TR.F.IncAvailToComShr'],parameters={"SDate": "2018-01-01", "EDate": "2023-12-31", "Frq": "CY"}).get_data_async(closure='Microsoft')
)
await tasks

def display_reponse(response):
    print(response)
    print("\nReponse received for", response.closure)
    if response.is_success:
        display(response.data.df)
    else:
        print(response.http_status)

vodafone, apple, microsoft = tasks.result()

display_reponse(vodafone)
display_reponse(apple)
display_reponse(microsoft)

1725963295137.png


1725963332936.png

I hope this can help.


1725963295137.png (63.9 KiB)
1725963332936.png (67.5 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.