question

Upvotes
Accepted
55 7 13 20

get_lots_of_data function for the Pyhton LSEG Data Library

Could you please provide some function using the Python LDL's get_data for lots of data, specifically, lots of instruments?

pythonlseg-data-library
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
Accepted
7.2k 23 3 6

Hi @danieluphromes, how about this?

def get_lots_of_data(
    req_univ: list[str],
    fields: list[str],
    batch_of: int = 50,
    warns: bool = True,
    messages: bool = True):

    err = {'Error': [], 'ErrorUniverse': []}

    if not warns:
        warnings.filterwarnings("ignore", category=FutureWarning, module="lseg.data._tools._dataframe")

    def Chunks(lst, n):
        """Yield successive n-sized chunks from lst."""
        for i in range(0, len(lst), n):
            yield lst[i:i + n]

    df_list = []

    for i, j in enumerate(Chunks(lst=req_univ, n=batch_of)):

        if messages:
            print(f"Batch of {batch_of} requests no. {str(i+1)}/{str(len([i for i in Chunks(req_univ, batch_of)]))} started")

        try:
            _df = ld.get_data(
                universe=j,
                fields=fields)
            df_list.append(_df)
            df = pd.concat(df_list, ignore_index=True)
        except ld.errors.LDError as e:
            print("There was an error, please note that the following were not added to the df:")
            # print("LDError:")
            print(e)
            err['Error'].append(e)
            err['ErrorUniverse'].append(j)

        if messages:
            print(f"Batch of {batch_of} requests no. {str(i+1)}/{str(len([i for i in Chunks(req_univ, batch_of)]))} ended")

    return df, err

.

If you get an error, you can find info about it with `err['Error'][0].__dict__`. The object `err['ErrorUniverse']` will give you all the universe items missed in the call.

.

def loop_get_lots_of_data(
    req_univ: list[str],
    fields: list[str],
    batch_of: int = 50,
    warns: bool = True,
    messages: bool = True,
    no_of_trials: int = 5
):

    df, err = get_lots_of_data(
        req_univ=req_univ,
        fields=fields,
        warns=warns,
        messages=messages,
        batch_of=batch_of)

    trial = 0

    while trial < no_of_trials:
        if len(err['ErrorUniverse']) != 0:

            # It's possible to have more than one list in the err['ErrorUniverse'] list, let's flatten it.
            err_univ = [item for row in err['ErrorUniverse'] for item in row]

            _df, _err = get_lots_of_data(
                req_univ=err_univ,
                fields=fields,
                warns=warns,
                messages=messages,
                batch_of=batch_of)

            if len(_df) > 0:
                df = pd.concat([df, _df], ignore_index=True)
                break

        trial += 1

    return df

.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
1 0 0 0

Hi Johnathan


Here is a version based on rd with a lower nuber of batch (10) , it reduces the probability to get anerror :

def get_rd_timeseries_by_chunks(
    rics: list[str] = ["LSEG.L", "VOD.L"],      # List of instrument codes (RICs) to retrieve data for. Default is ["LSEG.L", "VOD.L"]
    tr_fields: list[str] = ["BID", "ASK"],      # Fields (e.g., BID, ASK) to retrieve from the data source. Default is ["BID", "ASK"]
    start_date: str = "2022-01-01",             # Start date for the historical data, formatted as a string (YYYY-MM-DD)
    end_date: str = "2023-01-01",               # End date for the historical data, formatted as a string (YYYY-MM-DD)
    frequency: str = "1D",                      # Data frequency (e.g., "1D" for daily, "1W" for weekly)
    batch_of: int = 10,                         # Size of the chunks or batches to split the list of RICs into for each API call. Default is 20
    verbose: bool = True,                       # If True, print progress messages for each batch processed
    with_errors: bool = False                   # If True, return any errors encountered during data retrieval along with the results
):
    """
    Retrieves historical time series data in chunks from the Refinitiv Data Platform (RDP).
    
    Args:
        rics (list[str]): List of RICs (Reuters Instrument Codes) to retrieve data for.
        tr_fields (list or None): Fields to request. None by default.
        start_date (str): Start date for the time series.
        end_date (str): End date for the time series.
        frequency (str): The frequency of the time series (e.g., daily - '1D').
        batch_of (int): Size of the chunks the RIC list is split into for API requests.
        verbose (bool): If True, prints progress for each batch processed.
        with_errors (bool): If True, returns both the data and any errors encountered.
        
    Returns:
        pd.DataFrame: Combined DataFrame with time series for all RICs.
        dict (optional): If with_errors is True, returns a dictionary of errors encountered.
    """
    
    # Dictionary to hold any errors encountered
    err = {'Error': [], 'ErrorUniverse': []}

    # Generator to split the list of RICs into batches
    def Chunks(lst, n):
        """Yield successive n-sized chunks from lst."""
        for i in range(0, len(lst), n):
            yield lst[i:i + n]
    
    df_list = []  # List to collect DataFrames for each batch

    # Iterate over the chunks of RICs
    for i, sub_list in enumerate(Chunks(lst=rics, n=batch_of)):
        try:
            # Call the Refinitiv Data (rd) API to get historical data for the current batch
            _df = rd.get_history(
                universe=sub_list,  # Only pass the sub_list of RICs, not the full list
                fields=tr_fields,
                start=start_date,
                end=end_date,
                interval=frequency
            )
            df_list.append(_df)  # Append the result DataFrame to the list

            if verbose:
                print(f"Batch of {batch_of} requests n {str(i+1)} done")  # Print progress

        except rd.errors.RDError as e:  # Catch API-specific errors
            print(f"""There was an error processing chunk {i}, please note that the following RICs
{sub_list} won't be added to the final result :""")
            err['Error'].append(e)         # Log the error
            err['ErrorUniverse'].append(sub_list)  # Log the RICs that caused the error

    # Merge all the DataFrames collected for each batch
    df = pd.concat(df_list, axis=1, join='outer')  # Merge on index, allowing outer join
    df = df.sort_index()  # Sort by the index (usually the time series index)

    # Return the final DataFrame, along with errors if requested
    if with_errors:
        return df, err
    else:
        return df


icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.