get_lots_of_data function for the Pyhton LSEG Data Library

danieluphromes
Newcomer
Could you please provide some function using the Python LDL's get_data for lots of data, specifically, lots of instruments?
Tagged:
0
Best Answer
-
Hi @danieluphromes, how about this?
def get_lots_of_data(
req_univ: list[str],
fields: list[str],
batch_of: int = 50,
warns: bool = True,
messages: bool = True):
err = {'Error': [], 'ErrorUniverse': []}
if not warns:
warnings.filterwarnings("ignore", category=FutureWarning, module="lseg.data._tools._dataframe")
def Chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
df_list = []
for i, j in enumerate(Chunks(lst=req_univ, n=batch_of)):
if messages:
print(f"Batch of {batch_of} requests no. {str(i+1)}/{str(len([i for i in Chunks(req_univ, batch_of)]))} started")
try:
_df = ld.get_data(
universe=j,
fields=fields)
df_list.append(_df)
df = pd.concat(df_list, ignore_index=True)
except ld.errors.LDError as e:
print("There was an error, please note that the following were not added to the df:")
# print("LDError:")
print(e)
err['Error'].append(e)
err['ErrorUniverse'].append(j)
if messages:
print(f"Batch of {batch_of} requests no. {str(i+1)}/{str(len([i for i in Chunks(req_univ, batch_of)]))} ended")
return df, err.
If you get an error, you can find info about it with `err['Error'][0].__dict__`. The object `err['ErrorUniverse']` will give you all the universe items missed in the call.
.
def loop_get_lots_of_data(
req_univ: list[str],
fields: list[str],
batch_of: int = 50,
warns: bool = True,
messages: bool = True,
no_of_trials: int = 5
):
df, err = get_lots_of_data(
req_univ=req_univ,
fields=fields,
warns=warns,
messages=messages,
batch_of=batch_of)
trial = 0
while trial < no_of_trials:
if len(err['ErrorUniverse']) != 0:
# It's possible to have more than one list in the err['ErrorUniverse'] list, let's flatten it.
err_univ = [item for row in err['ErrorUniverse'] for item in row]
_df, _err = get_lots_of_data(
req_univ=err_univ,
fields=fields,
warns=warns,
messages=messages,
batch_of=batch_of)
if len(_df) > 0:
df = pd.concat([df, _df], ignore_index=True)
break
trial += 1
return df.
0
Answers
-
Hi Johnathan
Here is a version based on rd with a lower nuber of batch (10) , it reduces the probability to get anerror :
def get_rd_timeseries_by_chunks(
rics: list[str] = ["LSEG.L", "VOD.L"], # List of instrument codes (RICs) to retrieve data for. Default is ["LSEG.L", "VOD.L"]
tr_fields: list[str] = ["BID", "ASK"], # Fields (e.g., BID, ASK) to retrieve from the data source. Default is ["BID", "ASK"]
start_date: str = "2022-01-01", # Start date for the historical data, formatted as a string (YYYY-MM-DD)
end_date: str = "2023-01-01", # End date for the historical data, formatted as a string (YYYY-MM-DD)
frequency: str = "1D", # Data frequency (e.g., "1D" for daily, "1W" for weekly)
batch_of: int = 10, # Size of the chunks or batches to split the list of RICs into for each API call. Default is 20
verbose: bool = True, # If True, print progress messages for each batch processed
with_errors: bool = False # If True, return any errors encountered during data retrieval along with the results
):
"""
Retrieves historical time series data in chunks from the Refinitiv Data Platform (RDP).
Args:
rics (list[str]): List of RICs (Reuters Instrument Codes) to retrieve data for.
tr_fields (list or None): Fields to request. None by default.
start_date (str): Start date for the time series.
end_date (str): End date for the time series.
frequency (str): The frequency of the time series (e.g., daily - '1D').
batch_of (int): Size of the chunks the RIC list is split into for API requests.
verbose (bool): If True, prints progress for each batch processed.
with_errors (bool): If True, returns both the data and any errors encountered.
Returns:
pd.DataFrame: Combined DataFrame with time series for all RICs.
dict (optional): If with_errors is True, returns a dictionary of errors encountered.
"""
# Dictionary to hold any errors encountered
err = {'Error': [], 'ErrorUniverse': []}
# Generator to split the list of RICs into batches
def Chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
df_list = [] # List to collect DataFrames for each batch
# Iterate over the chunks of RICs
for i, sub_list in enumerate(Chunks(lst=rics, n=batch_of)):
try:
# Call the Refinitiv Data (rd) API to get historical data for the current batch
_df = rd.get_history(
universe=sub_list, # Only pass the sub_list of RICs, not the full list
fields=tr_fields,
start=start_date,
end=end_date,
interval=frequency
)
df_list.append(_df) # Append the result DataFrame to the list
if verbose:
print(f"Batch of {batch_of} requests n {str(i+1)} done") # Print progress
except rd.errors.RDError as e: # Catch API-specific errors
print(f"""There was an error processing chunk {i}, please note that the following RICs
{sub_list} won't be added to the final result :""")
err['Error'].append(e) # Log the error
err['ErrorUniverse'].append(sub_list) # Log the RICs that caused the error
# Merge all the DataFrames collected for each batch
df = pd.concat(df_list, axis=1, join='outer') # Merge on index, allowing outer join
df = df.sort_index() # Sort by the index (usually the time series index)
# Return the final DataFrame, along with errors if requested
if with_errors:
return df, err
else:
return df0 -
I propose this block of code which works all the time.
import lseg.data as ld
import pandas as pd
from typing import Union
import time
import math
from concurrent.futures import ThreadPoolExecutor
config = ld.get_config()
config.set_param("http.request-timeout", 180)
ld.open_session()
# Max item per request in a batch
NB_ITEMS_THRESHOLD = 1000
def __split_list_into_chunks(items, nb_chunks):
n = max(1, nb_chunks)
return list(items[i:i+n] for i in range(0, len(items), n))
# Define a defensive get_data function
def __internal_ld_get_data(universe, fields, parameters) -> pd.DataFrame:
res = pd.DataFrame()
nb_relaunch_request = 5
succeed = False
time.sleep(1) # one second delay before a call to the back-end
while (succeed is False and nb_relaunch_request > 0):
try:
res = ld.get_data(universe, fields, parameters)
except:
nb_relaunch_request = nb_relaunch_request - 1
time.sleep(0.2)
else:
succeed = True
return res
# function which optimize ld.get_data
def ld_get_data(universe: Union[str, list],
fields: Union[str, list] = [],
parameters: Union[str,dict] = {},
nb_items_threshold = NB_ITEMS_THRESHOLD) -> pd.DataFrame:
if type(universe) is str or len(universe) < nb_items_threshold:
return __internal_ld_get_data(universe, fields, parameters)
output_df = pd.DataFrame()
with ThreadPoolExecutor(max_workers = None) as executor:
best_nb_items_threshold = min(math.ceil(len(universe) / executor._max_workers), NB_ITEMS_THRESHOLD)
universe_list = __split_list_into_chunks(universe, best_nb_items_threshold)
nb_loop = len(universe_list)
for result in executor.map(__internal_ld_get_data,
universe_list,
[fields] * nb_loop,
[parameters] * nb_loop):
output_df = pd.concat([output_df, result], ignore_index=True)
return output_dfNow, instead of using ld.get_data, use ld_get_data
0
Categories
- All Categories
- 3 Polls
- 6 AHS
- 36 Alpha
- 166 App Studio
- 6 Block Chain
- 4 Bot Platform
- 18 Connected Risk APIs
- 47 Data Fusion
- 34 Data Model Discovery
- 685 Datastream
- 1.4K DSS
- 616 Eikon COM
- 5.2K Eikon Data APIs
- 10 Electronic Trading
- Generic FIX
- 7 Local Bank Node API
- 3 Trading API
- 2.9K Elektron
- 1.4K EMA
- 252 ETA
- 556 WebSocket API
- 38 FX Venues
- 14 FX Market Data
- 1 FX Post Trade
- 1 FX Trading - Matching
- 12 FX Trading – RFQ Maker
- 5 Intelligent Tagging
- 2 Legal One
- 23 Messenger Bot
- 3 Messenger Side by Side
- 9 ONESOURCE
- 7 Indirect Tax
- 60 Open Calais
- 275 Open PermID
- 44 Entity Search
- 2 Org ID
- 1 PAM
- PAM - Logging
- 6 Product Insight
- Project Tracking
- ProView
- ProView Internal
- 22 RDMS
- 1.9K Refinitiv Data Platform
- 652 Refinitiv Data Platform Libraries
- 4 LSEG Due Diligence
- LSEG Due Diligence Portal API
- 4 Refinitiv Due Dilligence Centre
- Rose's Space
- 1.2K Screening
- 18 Qual-ID API
- 13 Screening Deployed
- 23 Screening Online
- 12 World-Check Customer Risk Screener
- 1K World-Check One
- 46 World-Check One Zero Footprint
- 45 Side by Side Integration API
- 2 Test Space
- 3 Thomson One Smart
- 10 TR Knowledge Graph
- 151 Transactions
- 143 REDI API
- 1.8K TREP APIs
- 4 CAT
- 27 DACS Station
- 121 Open DACS
- 1.1K RFA
- 104 UPA
- 193 TREP Infrastructure
- 228 TRKD
- 917 TRTH
- 5 Velocity Analytics
- 9 Wealth Management Web Services
- 90 Workspace SDK
- 11 Element Framework
- 5 Grid
- 18 World-Check Data File
- 1 Yield Book Analytics
- 46 中文论坛