What is the best way to use concurrent.futures with rd.get_history ?

I am trying to snapshot data /snap data with the following function.
def getRefinitiv_SNAP(x):
snap = None
try:
snap = rd.get_history(universe=[x['ricCode']], interval = 'tick',start=x['datetimeSentGMT_str'], count = 1)
except rd.errors.RDError as e:
print(e)
return snap
The problem with rd.get_history is that it only accepts a single string as start time and end time. And my dataframe contains many different ricCode and timestamps. So the above function can onyl return a single row of result at a time.
df = {'ricCode': {0: '3690.HK', 1: '4368.T', 2: '9988.HK', 3: '2269.HK', 4: '4684.T', 5: '2382.HK', 6: '4443.T', 7: '104830.KQ', 8: '1801.HK', 9: '3064.T'}, 'datetimeSent': {0: Timestamp('2021-02-22 12:55:16'), 1: Timestamp('2021-02-22 09:45:54'), 2: Timestamp('2021-02-22 12:55:17'), 3: Timestamp('2021-02-22 12:55:17'), 4: Timestamp('2021-02-22 07:53:13'), 5: Timestamp('2021-02-22 12:01:35'), 6: Timestamp('2021-02-22 10:33:53'), 7: Timestamp('2021-02-22 08:10:39'), 8: Timestamp('2021-02-22 12:01:35'), 9: Timestamp('2021-02-22 12:56:49')}}
I tried to use concurrent.futures
to batch process multiple rows of the dataframe but i ran into the following errors. And putting the rd.open_session
command inside the getRefinitiv_SNAP
function doesn't make things better.
with concurrent.futures.ThreadPoolExecutor() as executor:
df_final= pd.DataFrame() rd.open_session()
futures = [executor.submit(getRefinitiv_SNAP, x = row) for idx, row in df.iterrows()] rd.close_session()
for future in concurrent.futures.as_completed(futures):
df_final= pd.concat([df_final,future.result()],axis=0)
ErrorMessage:
[00:00<?, ?it/s]An error occurred while requesting URL('http://localhost:9000/api/rdp/data/historical-pricing/v1/views/events/.AXVI?start=2024-01-18T22%3A43%3A49.000000000Z&count=1&fields=TRDPRC_1%2CDATE_TIME').
ReadError('[WinError 10038] An operation was attempted on something that is not a socket')
Session is not opened. Can't send any request
An error occurred while requesting URL('http://localhost:9000/api/rdp/data/historical-pricing/v1/views/events/.JNIV?start=2024-04-10T00%3A38%3A29.000000000Z&count=1&fields=TRDPRC_1%2CDATE_TIME').
ReadError('[WinError 10038] An operation was attempted on something that is not a socket')
An error occurred while requesting URL('http://localhost:9000/api/rdp/data/historical-pricing/v1/views/events/.AXVI?start=2024-04-16T00%3A01%3A37.000000000Z&count=1&fields=TRDPRC_1%2CDATE_TIME').
ReadError('[WinError 10038] An operation was attempted on something that is not a socket')
An error occurred while requesting URL('http://localhost:9000/api/rdp/data/historical-pricing/v1/views/events/.AXVI?start=2024-02-14T22%3A41%3A03.000000000Z&count=1&fields=TRDPRC_1%2CDATE_TIME').
ReadError('[WinError 10038] An operation was attempted on something that is not a socket')
Error code -1 | [WinError 10038] An operation was attempted on something that is not a socket
Error code -1 | Session is not opened. Can't send any request
Error code -1 | [WinError 10038] An operation was attempted on something that is not a socket
Error code -1 | [WinError 10038] An operation was attempted on something that is not a socket
Error code -1 | [WinError 10038] An operation was attempted on something that is not a socket
So, what is correct way to use concurrent.futures with rd.get_history ?
Best Answer
-
if my dataframe has a million rows, then i would essentially be sending out 1million requests which would exceed the daily limit. Is there a better way to send parallel requests while keeping the number of requests low ? i.e. batch processing
0
Answers
-
Thank you for reaching out to us.
You can refer to the example on GitHub regarding historical parallel requests.
About the ThreadPoolExecutor, the session should not be closed while the executor is running. The code should be like this:
rd.open_session() with concurrent.futures.ThreadPoolExecutor() as executor: df_final= pd.DataFrame() futures = [executor.submit(getRefinitiv_SNAP, x = row) for idx, row in df.iterrows()] for future in concurrent.futures.as_completed(futures): df_final= pd.concat([df_final,future.result()],axis=0) rd.close_session()
0 -
If you would like to request a lot of data (million requests), please contact your LSEG account team or Sales team directly to discuss this usage. The endpoint doesn't support a batch of request messages.
However, you can request multiple items for a single event by directly sending a request to the historical pricing endpoint. The data depth of the events API is 90 days.
The code looks like this:
historical_url="data/historical-pricing/v1/views/events" body = { "universe": [ "4368.T", "4684.T" ], "fields": [ "TRDPRC_1", "DATE_TIME" ], "end": "2024-10-01T05:00:00.000000000Z", "eventTypes": [ "trade" ] } request = rd.delivery.endpoint_request.Definition(method=rd.delivery.endpoint_request.RequestMethod.POST,url=historical_url,body_parameters=body,) response = request.get_data() response.data.raw
The endpoint example is on GitHub.
0
Categories
- All Categories
- 3 Polls
- 6 AHS
- 36 Alpha
- 166 App Studio
- 6 Block Chain
- 4 Bot Platform
- 18 Connected Risk APIs
- 47 Data Fusion
- 34 Data Model Discovery
- 690 Datastream
- 1.4K DSS
- 629 Eikon COM
- 5.2K Eikon Data APIs
- 11 Electronic Trading
- 1 Generic FIX
- 7 Local Bank Node API
- 3 Trading API
- 2.9K Elektron
- 1.4K EMA
- 255 ETA
- 559 WebSocket API
- 39 FX Venues
- 15 FX Market Data
- 1 FX Post Trade
- 1 FX Trading - Matching
- 12 FX Trading – RFQ Maker
- 5 Intelligent Tagging
- 2 Legal One
- 25 Messenger Bot
- 3 Messenger Side by Side
- 9 ONESOURCE
- 7 Indirect Tax
- 60 Open Calais
- 279 Open PermID
- 45 Entity Search
- 2 Org ID
- 1 PAM
- PAM - Logging
- 6 Product Insight
- Project Tracking
- ProView
- ProView Internal
- 23 RDMS
- 2K Refinitiv Data Platform
- 716 Refinitiv Data Platform Libraries
- 4 LSEG Due Diligence
- LSEG Due Diligence Portal API
- 4 Refinitiv Due Dilligence Centre
- Rose's Space
- 1.2K Screening
- 18 Qual-ID API
- 13 Screening Deployed
- 23 Screening Online
- 12 World-Check Customer Risk Screener
- 1K World-Check One
- 46 World-Check One Zero Footprint
- 45 Side by Side Integration API
- 2 Test Space
- 3 Thomson One Smart
- 10 TR Knowledge Graph
- 151 Transactions
- 143 REDI API
- 1.8K TREP APIs
- 4 CAT
- 27 DACS Station
- 121 Open DACS
- 1.1K RFA
- 106 UPA
- 194 TREP Infrastructure
- 229 TRKD
- 918 TRTH
- 5 Velocity Analytics
- 9 Wealth Management Web Services
- 95 Workspace SDK
- 11 Element Framework
- 5 Grid
- 19 World-Check Data File
- 1 Yield Book Analytics
- 48 中文论坛