What is the best way to use concurrent.futures with rd.get_history ?

I am trying to snapshot data /snap data with the following function.

def getRefinitiv_SNAP(x):
snap = None
try:
snap = rd.get_history(universe=[x['ricCode']], interval = 'tick',start=x['datetimeSentGMT_str'], count = 1)
except rd.errors.RDError as e:
print(e)
return snap

The problem with rd.get_history is that it only accepts a single string as start time and end time. And my dataframe contains many different ricCode and timestamps. So the above function can onyl return a single row of result at a time.

df = {'ricCode': {0: '3690.HK', 1: '4368.T', 2: '9988.HK', 3: '2269.HK', 4: '4684.T', 5: '2382.HK', 6: '4443.T', 7: '104830.KQ', 8: '1801.HK', 9: '3064.T'}, 'datetimeSent': {0: Timestamp('2021-02-22 12:55:16'), 1: Timestamp('2021-02-22 09:45:54'), 2: Timestamp('2021-02-22 12:55:17'), 3: Timestamp('2021-02-22 12:55:17'), 4: Timestamp('2021-02-22 07:53:13'), 5: Timestamp('2021-02-22 12:01:35'), 6: Timestamp('2021-02-22 10:33:53'), 7: Timestamp('2021-02-22 08:10:39'), 8: Timestamp('2021-02-22 12:01:35'), 9: Timestamp('2021-02-22 12:56:49')}}

I tried to use concurrent.futures to batch process multiple rows of the dataframe but i ran into the following errors. And putting the rd.open_session command inside the getRefinitiv_SNAP function doesn't make things better.

with concurrent.futures.ThreadPoolExecutor() as executor:
df_final= pd.DataFrame() rd.open_session()
futures = [executor.submit(getRefinitiv_SNAP, x = row) for idx, row in df.iterrows()] rd.close_session()
for future in concurrent.futures.as_completed(futures):
df_final= pd.concat([df_final,future.result()],axis=0)

ErrorMessage:

[00:00<?, ?it/s]An error occurred while requesting URL('http://localhost:9000/api/rdp/data/historical-pricing/v1/views/events/.AXVI?start=2024-01-18T22%3A43%3A49.000000000Z&count=1&fields=TRDPRC_1%2CDATE_TIME').
ReadError('[WinError 10038] An operation was attempted on something that is not a socket')
Session is not opened. Can't send any request
An error occurred while requesting URL('http://localhost:9000/api/rdp/data/historical-pricing/v1/views/events/.JNIV?start=2024-04-10T00%3A38%3A29.000000000Z&count=1&fields=TRDPRC_1%2CDATE_TIME').
ReadError('[WinError 10038] An operation was attempted on something that is not a socket')
An error occurred while requesting URL('http://localhost:9000/api/rdp/data/historical-pricing/v1/views/events/.AXVI?start=2024-04-16T00%3A01%3A37.000000000Z&count=1&fields=TRDPRC_1%2CDATE_TIME').
ReadError('[WinError 10038] An operation was attempted on something that is not a socket')
An error occurred while requesting URL('http://localhost:9000/api/rdp/data/historical-pricing/v1/views/events/.AXVI?start=2024-02-14T22%3A41%3A03.000000000Z&count=1&fields=TRDPRC_1%2CDATE_TIME').
ReadError('[WinError 10038] An operation was attempted on something that is not a socket')
Error code -1 | [WinError 10038] An operation was attempted on something that is not a socket
Error code -1 | Session is not opened. Can't send any request
Error code -1 | [WinError 10038] An operation was attempted on something that is not a socket
Error code -1 | [WinError 10038] An operation was attempted on something that is not a socket
Error code -1 | [WinError 10038] An operation was attempted on something that is not a socket

So, what is correct way to use concurrent.futures with rd.get_history ?

Tagged:

Best Answer

Answers

  • Jirapongse
    Jirapongse ✭✭✭✭✭

    @thomas.ng

    Thank you for reaching out to us.

    You can refer to the example on GitHub regarding historical parallel requests.

    About the ThreadPoolExecutor, the session should not be closed while the executor is running. The code should be like this:

    rd.open_session()
    
    with concurrent.futures.ThreadPoolExecutor() as executor:
            df_final= pd.DataFrame()       
            
            futures = [executor.submit(getRefinitiv_SNAP, x = row) for idx, row in df.iterrows()]     
            for future in concurrent.futures.as_completed(futures):
                df_final= pd.concat([df_final,future.result()],axis=0)
    
    rd.close_session()
    
    
  • Jirapongse
    Jirapongse ✭✭✭✭✭

    @thomas.ng

    If you would like to request a lot of data (million requests), please contact your LSEG account team or Sales team directly to discuss this usage. The endpoint doesn't support a batch of request messages.

    However, you can request multiple items for a single event by directly sending a request to the historical pricing endpoint. The data depth of the events API is 90 days.

    The code looks like this:

    historical_url="data/historical-pricing/v1/views/events"
    body = {
      "universe": [    
        "4368.T",
        "4684.T"
      ],
      "fields": [
        "TRDPRC_1",
        "DATE_TIME"
      ],
      "end": "2024-10-01T05:00:00.000000000Z",
      "eventTypes": [
        "trade"
      ]
    }
    request = rd.delivery.endpoint_request.Definition(method=rd.delivery.endpoint_request.RequestMethod.POST,url=historical_url,body_parameters=body,)
    response = request.get_data()
    
    
    response.data.raw
    

    The endpoint example is on GitHub.