Could you please provide some function using the Python LDL's get_data for lots of data, specifically, lots of instruments?
Hi @danieluphromes, how about this?
def get_lots_of_data( req_univ: list[str], fields: list[str], batch_of: int = 50, warns: bool = True, messages: bool = True): err = {'Error': [], 'ErrorUniverse': []} if not warns: warnings.filterwarnings("ignore", category=FutureWarning, module="lseg.data._tools._dataframe") def Chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n] df_list = [] for i, j in enumerate(Chunks(lst=req_univ, n=batch_of)): if messages: print(f"Batch of {batch_of} requests no. {str(i+1)}/{str(len([i for i in Chunks(req_univ, batch_of)]))} started") try: _df = ld.get_data( universe=j, fields=fields) df_list.append(_df) df = pd.concat(df_list, ignore_index=True) except ld.errors.LDError as e: print("There was an error, please note that the following were not added to the df:") # print("LDError:") print(e) err['Error'].append(e) err['ErrorUniverse'].append(j) if messages: print(f"Batch of {batch_of} requests no. {str(i+1)}/{str(len([i for i in Chunks(req_univ, batch_of)]))} ended") return df, err
.
If you get an error, you can find info about it with `err['Error'][0].__dict__`. The object `err['ErrorUniverse']` will give you all the universe items missed in the call.
def loop_get_lots_of_data( req_univ: list[str], fields: list[str], batch_of: int = 50, warns: bool = True, messages: bool = True, no_of_trials: int = 5): df, err = get_lots_of_data( req_univ=req_univ, fields=fields, warns=warns, messages=messages, batch_of=batch_of) trial = 0 while trial < no_of_trials: if len(err['ErrorUniverse']) != 0: # It's possible to have more than one list in the err['ErrorUniverse'] list, let's flatten it. err_univ = [item for row in err['ErrorUniverse'] for item in row] _df, _err = get_lots_of_data( req_univ=err_univ, fields=fields, warns=warns, messages=messages, batch_of=batch_of) if len(_df) > 0: df = pd.concat([df, _df], ignore_index=True) break trial += 1 return df
Hi Johnathan
Here is a version based on rd with a lower nuber of batch (10) , it reduces the probability to get anerror :
def get_rd_timeseries_by_chunks( rics: list[str] = ["LSEG.L", "VOD.L"], # List of instrument codes (RICs) to retrieve data for. Default is ["LSEG.L", "VOD.L"] tr_fields: list[str] = ["BID", "ASK"], # Fields (e.g., BID, ASK) to retrieve from the data source. Default is ["BID", "ASK"] start_date: str = "2022-01-01", # Start date for the historical data, formatted as a string (YYYY-MM-DD) end_date: str = "2023-01-01", # End date for the historical data, formatted as a string (YYYY-MM-DD) frequency: str = "1D", # Data frequency (e.g., "1D" for daily, "1W" for weekly) batch_of: int = 10, # Size of the chunks or batches to split the list of RICs into for each API call. Default is 20 verbose: bool = True, # If True, print progress messages for each batch processed with_errors: bool = False # If True, return any errors encountered during data retrieval along with the results): """ Retrieves historical time series data in chunks from the Refinitiv Data Platform (RDP). Args: rics (list[str]): List of RICs (Reuters Instrument Codes) to retrieve data for. tr_fields (list or None): Fields to request. None by default. start_date (str): Start date for the time series. end_date (str): End date for the time series. frequency (str): The frequency of the time series (e.g., daily - '1D'). batch_of (int): Size of the chunks the RIC list is split into for API requests. verbose (bool): If True, prints progress for each batch processed. with_errors (bool): If True, returns both the data and any errors encountered. Returns: pd.DataFrame: Combined DataFrame with time series for all RICs. dict (optional): If with_errors is True, returns a dictionary of errors encountered. """ # Dictionary to hold any errors encountered err = {'Error': [], 'ErrorUniverse': []} # Generator to split the list of RICs into batches def Chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n] df_list = [] # List to collect DataFrames for each batch # Iterate over the chunks of RICs for i, sub_list in enumerate(Chunks(lst=rics, n=batch_of)): try: # Call the Refinitiv Data (rd) API to get historical data for the current batch _df = rd.get_history( universe=sub_list, # Only pass the sub_list of RICs, not the full list fields=tr_fields, start=start_date, end=end_date, interval=frequency ) df_list.append(_df) # Append the result DataFrame to the list if verbose: print(f"Batch of {batch_of} requests n {str(i+1)} done") # Print progress except rd.errors.RDError as e: # Catch API-specific errors print(f"""There was an error processing chunk {i}, please note that the following RICs{sub_list} won't be added to the final result :""") err['Error'].append(e) # Log the error err['ErrorUniverse'].append(sub_list) # Log the RICs that caused the error # Merge all the DataFrames collected for each batch df = pd.concat(df_list, axis=1, join='outer') # Merge on index, allowing outer join df = df.sort_index() # Sort by the index (usually the time series index) # Return the final DataFrame, along with errors if requested if with_errors: return df, err else: return df
I propose this block of code which works all the time.
import lseg.data as ld import pandas as pd from typing import Union import time import math from concurrent.futures import ThreadPoolExecutor config = ld.get_config() config.set_param("http.request-timeout", 180) ld.open_session() # Max item per request in a batch NB_ITEMS_THRESHOLD = 1000 def __split_list_into_chunks(items, nb_chunks): n = max(1, nb_chunks) return list(items[i:i+n] for i in range(0, len(items), n)) # Define a defensive get_data function def __internal_ld_get_data(universe, fields, parameters) -> pd.DataFrame: res = pd.DataFrame() nb_relaunch_request = 5 succeed = False time.sleep(1) # one second delay before a call to the back-end while (succeed is False and nb_relaunch_request > 0): try: res = ld.get_data(universe, fields, parameters) except: nb_relaunch_request = nb_relaunch_request - 1 time.sleep(0.2) else: succeed = True return res # function which optimize ld.get_data def ld_get_data(universe: Union[str, list], fields: Union[str, list] = [], parameters: Union[str,dict] = {}, nb_items_threshold = NB_ITEMS_THRESHOLD) -> pd.DataFrame: if type(universe) is str or len(universe) < nb_items_threshold: return __internal_ld_get_data(universe, fields, parameters) output_df = pd.DataFrame() with ThreadPoolExecutor(max_workers = None) as executor: best_nb_items_threshold = min(math.ceil(len(universe) / executor._max_workers), NB_ITEMS_THRESHOLD) universe_list = __split_list_into_chunks(universe, best_nb_items_threshold) nb_loop = len(universe_list) for result in executor.map(__internal_ld_get_data, universe_list, [fields] * nb_loop, [parameters] * nb_loop): output_df = pd.concat([output_df, result], ignore_index=True) return output_df
Now, instead of using ld.get_data, use ld_get_data