Could you please provide some function using the Python LDL's get_data for lots of data, specifically, lots of instruments?
New posts are disabled while we improve the user experience.
You can browse the site, or for urgent issues, raise a query at MyAccount.
Could you please provide some function using the Python LDL's get_data for lots of data, specifically, lots of instruments?
Hi @danieluphromes, how about this?
def get_lots_of_data( req_univ: list[str], fields: list[str], batch_of: int = 50, warns: bool = True, messages: bool = True): err = {'Error': [], 'ErrorUniverse': []} if not warns: warnings.filterwarnings("ignore", category=FutureWarning, module="lseg.data._tools._dataframe") def Chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n] df_list = [] for i, j in enumerate(Chunks(lst=req_univ, n=batch_of)): if messages: print(f"Batch of {batch_of} requests no. {str(i+1)}/{str(len([i for i in Chunks(req_univ, batch_of)]))} started") try: _df = ld.get_data( universe=j, fields=fields) df_list.append(_df) df = pd.concat(df_list, ignore_index=True) except ld.errors.LDError as e: print("There was an error, please note that the following were not added to the df:") # print("LDError:") print(e) err['Error'].append(e) err['ErrorUniverse'].append(j) if messages: print(f"Batch of {batch_of} requests no. {str(i+1)}/{str(len([i for i in Chunks(req_univ, batch_of)]))} ended") return df, err
.
If you get an error, you can find info about it with `err['Error'][0].__dict__`. The object `err['ErrorUniverse']` will give you all the universe items missed in the call.
.
def loop_get_lots_of_data( req_univ: list[str], fields: list[str], batch_of: int = 50, warns: bool = True, messages: bool = True, no_of_trials: int = 5 ): df, err = get_lots_of_data( req_univ=req_univ, fields=fields, warns=warns, messages=messages, batch_of=batch_of) trial = 0 while trial < no_of_trials: if len(err['ErrorUniverse']) != 0: # It's possible to have more than one list in the err['ErrorUniverse'] list, let's flatten it. err_univ = [item for row in err['ErrorUniverse'] for item in row] _df, _err = get_lots_of_data( req_univ=err_univ, fields=fields, warns=warns, messages=messages, batch_of=batch_of) if len(_df) > 0: df = pd.concat([df, _df], ignore_index=True) break trial += 1 return df
.
Hi Johnathan
Here is a version based on rd with a lower nuber of batch (10) , it reduces the probability to get anerror :
def get_rd_timeseries_by_chunks( rics: list[str] = ["LSEG.L", "VOD.L"], # List of instrument codes (RICs) to retrieve data for. Default is ["LSEG.L", "VOD.L"] tr_fields: list[str] = ["BID", "ASK"], # Fields (e.g., BID, ASK) to retrieve from the data source. Default is ["BID", "ASK"] start_date: str = "2022-01-01", # Start date for the historical data, formatted as a string (YYYY-MM-DD) end_date: str = "2023-01-01", # End date for the historical data, formatted as a string (YYYY-MM-DD) frequency: str = "1D", # Data frequency (e.g., "1D" for daily, "1W" for weekly) batch_of: int = 10, # Size of the chunks or batches to split the list of RICs into for each API call. Default is 20 verbose: bool = True, # If True, print progress messages for each batch processed with_errors: bool = False # If True, return any errors encountered during data retrieval along with the results ): """ Retrieves historical time series data in chunks from the Refinitiv Data Platform (RDP). Args: rics (list[str]): List of RICs (Reuters Instrument Codes) to retrieve data for. tr_fields (list or None): Fields to request. None by default. start_date (str): Start date for the time series. end_date (str): End date for the time series. frequency (str): The frequency of the time series (e.g., daily - '1D'). batch_of (int): Size of the chunks the RIC list is split into for API requests. verbose (bool): If True, prints progress for each batch processed. with_errors (bool): If True, returns both the data and any errors encountered. Returns: pd.DataFrame: Combined DataFrame with time series for all RICs. dict (optional): If with_errors is True, returns a dictionary of errors encountered. """ # Dictionary to hold any errors encountered err = {'Error': [], 'ErrorUniverse': []} # Generator to split the list of RICs into batches def Chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n] df_list = [] # List to collect DataFrames for each batch # Iterate over the chunks of RICs for i, sub_list in enumerate(Chunks(lst=rics, n=batch_of)): try: # Call the Refinitiv Data (rd) API to get historical data for the current batch _df = rd.get_history( universe=sub_list, # Only pass the sub_list of RICs, not the full list fields=tr_fields, start=start_date, end=end_date, interval=frequency ) df_list.append(_df) # Append the result DataFrame to the list if verbose: print(f"Batch of {batch_of} requests n {str(i+1)} done") # Print progress except rd.errors.RDError as e: # Catch API-specific errors print(f"""There was an error processing chunk {i}, please note that the following RICs {sub_list} won't be added to the final result :""") err['Error'].append(e) # Log the error err['ErrorUniverse'].append(sub_list) # Log the RICs that caused the error # Merge all the DataFrames collected for each batch df = pd.concat(df_list, axis=1, join='outer') # Merge on index, allowing outer join df = df.sort_index() # Sort by the index (usually the time series index) # Return the final DataFrame, along with errors if requested if with_errors: return df, err else: return df