Daily extraction large scale

I have a script that extracts daily time series data and static data for the individual constituents of the ETF IEAC.L over a 10-year period. Currently, I’m fetching the data day by day for each of roughly 3500 constituents, which makes the process very slow — a test for a single day already took about 1 hour. So I'm trying to use batches but it's not working so far. I'm using Workspace desktop session.

Would appreciate any advice on optimizing this workflow

Where is the test for 2 days only:

import refinitiv.data as rd
import pandas as pd
from datetime import datetime
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

rd.open_session()

# Configurations
etf_ric = "IEAC.L"
snapshot_date = "2025-04-30"  
start_ts = "2024-04-08"       
end_ts = "2024-04-10"

holdings_fields = ["TR.FundHoldingRIC", "TR.FundHoldingName", "TR.FundLatestFilingDate"]

static_fields = [
    "TR.CommonName", "TR.ISIN", "TR.FIIssuerName", "TR.TRBCEconomicSector", 
    "TR.TRBCBusinessSector", "TR.TRBCIndustryGroup", "TR.FiCountryName", 
    "TR.FIMaturityDate", "TR.FiFaceIssuedTotal", "TR.FiIssueDate",
    "TR.FiIssuerCountry", "TR.FILiquidationRank", "TR.FI.Group", "TR.FI.Rank", 
    "TR.CouponRate", "TR.FiCouponFrequency", "TR.FiCouponFrequencyDescription", 
    "TR.FiFirstCouponDate", "TR.FiLastCouponDate", "TR.ADF_COUPON", 
    "TR.FiMaturityStandardYield", "TR.MACAULAYDURATION"
]

 

ts_fields = [
    "Date", "ASK", "BID", "RIC", "OPEN_PRC", "HIGH_YLD", "LOW_YLD",
    "MID_PRICE", "A_YLD_1", "ISMA_A_YLD", "MID_YLD_1", "CONVEXITY",
    "MOD_DURTN", "OPEN_ASK", "OPEN_YLD", "SWAP_SPRDB", "OAS_BID", "ZSPREAD",
    "OIS_SPREAD", "CLEAN_PRC", "DIRTY_PRC", "YLDTOMAT", "MAC_DURTN", "YLDWST",
    "DIRTY_ASK", "DIRTY_BID", "DIRTY_MID", "CLN_PRC_M", "CLN_PRC_A"
]

 

# --- 1. GET ETF HOLDINGS ---
params = {"SDate": snapshot_date, "EDate": snapshot_date, "Frq": "D", "EndNum": 4000}
holdings = rd.get_data([etf_ric], fields=holdings_fields, parameters=params)
df_holdings = holdings.dropna(subset=["Holding RIC"])
df_holdings = df_holdings[df_holdings["Holding RIC"].str.strip() != ""]
unique_rics = df_holdings["Holding RIC"].astype(str).unique().tolist()
print(f"Total unique holdings: {len(unique_rics)}")

 

# --- 2. FETCH STATIC DATA IN BATCHES ---
def fetch_static(rics, fields, chunk_size=500):
    dfs = []
    for i in tqdm(range(0, len(rics), chunk_size), desc="Fetching static data"):
        sub = rics[i:i+chunk_size]
        try:
            df_chunk = rd.get_data(sub, fields)
            if df_chunk is not None and not df_chunk.empty:
                dfs.append(df_chunk)
        except Exception as e:
            print(f"Error in static chunk {i}-{i+chunk_size}: {e}")
        time.sleep(1)
    df_full = pd.concat(dfs, ignore_index=True).drop_duplicates()
    return df_full.copy()  # defragment

 

df_static = fetch_static(unique_rics, static_fields)

 

# --- 3. PREPARE BUSINESS DAYS ---
def business_days_list(start_date, end_date):
    return pd.date_range(start=start_date, end=end_date, freq='B').strftime("%Y-%m-%d").tolist()

business_days = business_days_list(start_ts, end_ts)

 

# --- 4. THREADPOOL FOR FETCHING TIME SERIES ---
def threaded_time_series(rics, start, end, fields, batch_size=20, max_workers=8):
    outputs = []

 

    def fetch(batch):
        try:
            df = rd.get_history(universe=batch, start=start, end=end, fields=fields)
            if df is not None and not df.empty:
                df = df.reset_index()
                return df[df["Date"].isin(business_days)]
        except Exception as e:
            print(f"Error fetching batch {batch[:3]}...: {e}")
        return pd.DataFrame()

 

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for i in range(0, len(rics), batch_size):
            batch = rics[i:i+batch_size]
            futures.append(executor.submit(fetch, batch))

 

        for f in tqdm(as_completed(futures), total=len(futures), desc="Fetching time series"):
            df = f.result()
            if not df.empty:
                outputs.append(df)

 

    df_all = pd.concat(outputs, ignore_index=True)
    return df_all.copy()

 

df_ts = threaded_time_series(unique_rics, start_ts, end_ts, ts_fields, batch_size=20, max_workers=8)

 

# --- 5. ENSURE RIC COLUMN EXISTS ---
if "RIC" not in df_ts.columns:
    if isinstance(df_ts.columns, pd.MultiIndex):
        df_ts.columns = df_ts.columns.get_level_values(0)
    if "Instrument" in df_ts.columns:
        df_ts.rename(columns={"Instrument":"RIC"}, inplace=True)
    elif isinstance(df_ts.index, pd.MultiIndex):
        df_ts = df_ts.reset_index()

 

# --- 6. MERGE AND SAVE ---
df_final = df_ts.merge(df_static, how="left", left_on="RIC", right_on="TR.ISIN")
df_final = df_final.sort_values(["Date","RIC"]).reset_index(drop=True)
df_final.to_csv("ETF_holdings_timeseries.csv", index=False)
print("File saved: ETF_holdings_timeseries.csv")

Answers