get_lots_of_data function for the Pyhton LSEG Data Library

Could you please provide some function using the Python LDL's get_data for lots of data, specifically, lots of instruments?

Best Answer

  • [Deleted User]
    [Deleted User] Newcomer
    Answer ✓

    Hi @danieluphromes, how about this?

    def get_lots_of_data(
    req_univ: list[str],
    fields: list[str],
    batch_of: int = 50,
    warns: bool = True,
    messages: bool = True):

    err = {'Error': [], 'ErrorUniverse': []}

    if not warns:
    warnings.filterwarnings("ignore", category=FutureWarning, module="lseg.data._tools._dataframe")

    def Chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
    yield lst[i:i + n]

    df_list = []

    for i, j in enumerate(Chunks(lst=req_univ, n=batch_of)):

    if messages:
    print(f"Batch of {batch_of} requests no. {str(i+1)}/{str(len([i for i in Chunks(req_univ, batch_of)]))} started")

    try:
    _df = ld.get_data(
    universe=j,
    fields=fields)
    df_list.append(_df)
    df = pd.concat(df_list, ignore_index=True)
    except ld.errors.LDError as e:
    print("There was an error, please note that the following were not added to the df:")
    # print("LDError:")
    print(e)
    err['Error'].append(e)
    err['ErrorUniverse'].append(j)

    if messages:
    print(f"Batch of {batch_of} requests no. {str(i+1)}/{str(len([i for i in Chunks(req_univ, batch_of)]))} ended")

    return df, err

    .

    If you get an error, you can find info about it with `err['Error'][0].__dict__`. The object `err['ErrorUniverse']` will give you all the universe items missed in the call.

    .

    def loop_get_lots_of_data(
    req_univ: list[str],
    fields: list[str],
    batch_of: int = 50,
    warns: bool = True,
    messages: bool = True,
    no_of_trials: int = 5
    ):

    df, err = get_lots_of_data(
    req_univ=req_univ,
    fields=fields,
    warns=warns,
    messages=messages,
    batch_of=batch_of)

    trial = 0

    while trial < no_of_trials:
    if len(err['ErrorUniverse']) != 0:

    # It's possible to have more than one list in the err['ErrorUniverse'] list, let's flatten it.
    err_univ = [item for row in err['ErrorUniverse'] for item in row]

    _df, _err = get_lots_of_data(
    req_univ=err_univ,
    fields=fields,
    warns=warns,
    messages=messages,
    batch_of=batch_of)

    if len(_df) > 0:
    df = pd.concat([df, _df], ignore_index=True)
    break

    trial += 1

    return df

    .

Answers

  • Hi Johnathan


    Here is a version based on rd with a lower nuber of batch (10) , it reduces the probability to get anerror :

    def get_rd_timeseries_by_chunks(
    rics: list[str] = ["LSEG.L", "VOD.L"], # List of instrument codes (RICs) to retrieve data for. Default is ["LSEG.L", "VOD.L"]
    tr_fields: list[str] = ["BID", "ASK"], # Fields (e.g., BID, ASK) to retrieve from the data source. Default is ["BID", "ASK"]
    start_date: str = "2022-01-01", # Start date for the historical data, formatted as a string (YYYY-MM-DD)
    end_date: str = "2023-01-01", # End date for the historical data, formatted as a string (YYYY-MM-DD)
    frequency: str = "1D", # Data frequency (e.g., "1D" for daily, "1W" for weekly)
    batch_of: int = 10, # Size of the chunks or batches to split the list of RICs into for each API call. Default is 20
    verbose: bool = True, # If True, print progress messages for each batch processed
    with_errors: bool = False # If True, return any errors encountered during data retrieval along with the results
    ):
    """
    Retrieves historical time series data in chunks from the Refinitiv Data Platform (RDP).

    Args:
    rics (list[str]): List of RICs (Reuters Instrument Codes) to retrieve data for.
    tr_fields (list or None): Fields to request. None by default.
    start_date (str): Start date for the time series.
    end_date (str): End date for the time series.
    frequency (str): The frequency of the time series (e.g., daily - '1D').
    batch_of (int): Size of the chunks the RIC list is split into for API requests.
    verbose (bool): If True, prints progress for each batch processed.
    with_errors (bool): If True, returns both the data and any errors encountered.

    Returns:
    pd.DataFrame: Combined DataFrame with time series for all RICs.
    dict (optional): If with_errors is True, returns a dictionary of errors encountered.
    """

    # Dictionary to hold any errors encountered
    err = {'Error': [], 'ErrorUniverse': []}

    # Generator to split the list of RICs into batches
    def Chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
    yield lst[i:i + n]

    df_list = [] # List to collect DataFrames for each batch

    # Iterate over the chunks of RICs
    for i, sub_list in enumerate(Chunks(lst=rics, n=batch_of)):
    try:
    # Call the Refinitiv Data (rd) API to get historical data for the current batch
    _df = rd.get_history(
    universe=sub_list, # Only pass the sub_list of RICs, not the full list
    fields=tr_fields,
    start=start_date,
    end=end_date,
    interval=frequency
    )
    df_list.append(_df) # Append the result DataFrame to the list

    if verbose:
    print(f"Batch of {batch_of} requests n {str(i+1)} done") # Print progress

    except rd.errors.RDError as e: # Catch API-specific errors
    print(f"""There was an error processing chunk {i}, please note that the following RICs
    {sub_list} won't be added to the final result :""")
    err['Error'].append(e) # Log the error
    err['ErrorUniverse'].append(sub_list) # Log the RICs that caused the error

    # Merge all the DataFrames collected for each batch
    df = pd.concat(df_list, axis=1, join='outer') # Merge on index, allowing outer join
    df = df.sort_index() # Sort by the index (usually the time series index)

    # Return the final DataFrame, along with errors if requested
    if with_errors:
    return df, err
    else:
    return df


  • yaokoffi.kouassi
    edited March 26

    I propose this block of code which works all the time.

    import lseg.data as ld
    import pandas as pd
    from typing import Union
    import time
    import math
    from concurrent.futures import ThreadPoolExecutor



    config = ld.get_config()
    config.set_param("http.request-timeout", 180)
    ld.open_session()





    # Max item per request in a batch
    NB_ITEMS_THRESHOLD = 1000


    def __split_list_into_chunks(items, nb_chunks):
    n = max(1, nb_chunks)
    return list(items[i:i+n] for i in range(0, len(items), n))


    # Define a defensive get_data function
    def __internal_ld_get_data(universe, fields, parameters) -> pd.DataFrame:
    res = pd.DataFrame()
    nb_relaunch_request = 5
    succeed = False
    time.sleep(1) # one second delay before a call to the back-end
    while (succeed is False and nb_relaunch_request > 0):
    try:
    res = ld.get_data(universe, fields, parameters)
    except:
    nb_relaunch_request = nb_relaunch_request - 1
    time.sleep(0.2)
    else:
    succeed = True
    return res


    # function which optimize ld.get_data
    def ld_get_data(universe: Union[str, list],
    fields: Union[str, list] = [],
    parameters: Union[str,dict] = {},
    nb_items_threshold = NB_ITEMS_THRESHOLD) -> pd.DataFrame:

    if type(universe) is str or len(universe) < nb_items_threshold:
    return __internal_ld_get_data(universe, fields, parameters)
    output_df = pd.DataFrame()

    with ThreadPoolExecutor(max_workers = None) as executor:
    best_nb_items_threshold = min(math.ceil(len(universe) / executor._max_workers), NB_ITEMS_THRESHOLD)
    universe_list = __split_list_into_chunks(universe, best_nb_items_threshold)
    nb_loop = len(universe_list)
    for result in executor.map(__internal_ld_get_data,
    universe_list,
    [fields] * nb_loop,
    [parameters] * nb_loop):
    output_df = pd.concat([output_df, result], ignore_index=True)
    return output_df

    Now, instead of using ld.get_data, use ld_get_data