Python program to get earnings call transcripts through EDP-API

I have the Python program below to download some earnings call transcripts through EDP-API. I have an enabled EDP-API and the program authenticates successfully, but it cannot find any earnings call transcripts. I am not sure if the transcripts are not accessible through the API, or if there is something wrong with the program. Any help would be appreciated.

Output:

INFO - Attempting authentication...
INFO - Authentication successful!
INFO - Fetching transcripts for AAPL.O from 2022-01-01 to 2022-12-31...
ERROR - Failed to get transcripts: 404 - 404 page not found
ERROR - No transcripts found or failed to download transcripts.

import requests
import base64
import json
from datetime import datetime, timedelta
import logging
import configparser
import os

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class RDPClient:
def __init__(self, config_file='config.txt'):
"""Initialize client with credentials from config file"""
self.config = self._load_config(config_file)
self.api_key = self.config['api_key']
self.client_secret = self.config['client_secret']
self.username = self.config['username']
self.password = self.config['password']
self.base_url = "https://api.refinitiv.com"
self.auth_url = "https://api.refinitiv.com/auth/oauth2/v1/token"
self.access_token = None
self.token_expiry = None

def _load_config(self, config_file):
if not os.path.exists(config_file):
raise FileNotFoundError(f"Config file {config_file} not found.")

config = configparser.ConfigParser()
config.read(config_file)

try:
return {
'api_key': config['credentials']['api_key'],
'client_secret': config['credentials']['client_secret'],
'username': config['credentials']['username'],
'password': config['credentials']['password']
}
except KeyError as e:
raise KeyError(f"Missing required credential in config file: {e}")

def authenticate(self):
"""Authenticate with Refinitiv RDP API and get access token"""
try:
# Encode client credentials
credentials = f'{self.api_key}:{self.client_secret}'
encoded_credentials = base64.b64encode(credentials.encode()).decode()

headers = {
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {encoded_credentials}"
}

payload = {
"grant_type": "password",
"username": self.username,
"password": self.password,
"scope": "trapi",
"takeExclusiveSignOnControl": "true" # Forces other sessions to log out
}

logger.info("Attempting authentication...")
response = requests.post(self.auth_url, headers=headers, data=payload)

if response.status_code != 200:
error_data = response.json()
logger.error(f"Authentication failed: {error_data.get('error_description', 'Unknown error')}")
return False

auth_data = response.json()
self.access_token = auth_data['access_token']
expires_in = int(auth_data['expires_in'])
self.token_expiry = datetime.now() + timedelta(seconds=expires_in)

logger.info("Authentication successful!")
return True

except Exception as e:
logger.error(f"Authentication error: {str(e)}")
return False

def refresh_token(self):
"""Refresh token if expired"""
if datetime.now() >= self.token_expiry:
logger.info("Token expired, re-authenticating...")
return self.authenticate()
return True

def get_transcripts(self, symbol="AAPL.O", start_date="2022-01-01", end_date="2022-12-31"):
"""Fetch earnings call transcripts using RDP Filings API."""
if not self.refresh_token():
logger.error("Failed to refresh token.")
return None

try:
filings_url = f"{self.base_url}/data/filings/v1/graphql"

headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}

# Construct the GraphQL query
query = {
"query": """
query ($symbol: String!, $startDate: String!, $endDate: String!) {
filings(filter: {ticker: {eq: $symbol}, filingDate: {gte: $startDate, lte: $endDate}}) {
edges {
node {
document {
content
title
filingDate
}
}
}
}
}
""",
"variables": {
"symbol": symbol,
"startDate": start_date,
"endDate": end_date
}
}

logger.info(f"Fetching transcripts for {symbol} from {start_date} to {end_date}...")
response = requests.post(filings_url, headers=headers, json=query)

if response.status_code == 200:
data = response.json()
if data.get('data') and data['data'].get('filings'):
filings = data['data']['filings']['edges']
transcripts = []
for filing in filings:
document = filing['node']['document']
if document and 'Earnings Call Transcript' in document['title']:
transcripts.append({
'title': document['title'],
'filingDate': document['filingDate'],
'content': document['content']
})
if transcripts:
logger.info(f"Found {len(transcripts)} earnings call transcripts.")
return transcripts
else:
logger.info("No earnings call transcripts found in the specified date range.")
return None
else:
logger.error(f"Failed to get transcripts: {response.status_code} - {response.text}")
return None

except Exception as e:
logger.error(f"Error getting transcripts: {str(e)}")
return None

def main():
try:
client = RDPClient('config.txt')

if client.authenticate():
transcripts = client.get_transcripts("AAPL.O", "2022-01-01", "2022-12-31")
if transcripts:
for i, transcript in enumerate(transcripts, start=1):
logger.info(f"Transcript {i}: {transcript['title']} on {transcript['filingDate']}")
logger.info(f"Content Preview: {transcript['content'][:500]}...") # Preview content
else:
logger.error("No transcripts found or failed to download transcripts.")
else:
logger.error("Authentication failed")
except Exception as e:
logger.error(f"An error occurred: {str(e)}")

if __name__ == "__main__":
main()

Answers

  • Hello @Quantum Pro

    I checked the RDP API Playground page. The GraphQL endpoint is available on the https://api.refinitiv.com/data-store/v1/graphql endpoint, not the https://api.refinitiv.com/data/filings/v1/graphql endpoint.

    graphql.png


    You can use the Plarygound and GraphQL tabs on the page to test the query.

    graphql2.png

    About the OrganizationId information, you can use the https://api.refinitiv.com/discovery/symbology/v1/lookup endpoint to convert the RIC to the OrganizationId (aka PermID) information.

    I hope this information helps.

  • These were very helpful. Thank you very much. I changed the The GraphQL endpoint. I also tried replicating your query in the Playground and it worked.

    However, I still cannot get the Python program work. Below is the updated program. I am trying to get a list of documents available for downloading for AAPL in year 2022 so that I can actually see if the earnings call transcripts are available for downloading or not. But the program cannot even retrieve the PermID. That is not the main issue since it defaults to the PermID that provide manually. But then program reports that it cannot find any financial filings or transcripts for PermID=4295905573 in year 2022.

    2024-11-09 14:06:27,498 - INFO - Attempting authentication...
    2024-11-09 14:06:28,292 - INFO - Authentication successful!
    2024-11-09 14:06:28,609 - ERROR - Failed to lookup PermID: 401 - {"message":"No API key found in request"}
    2024-11-09 14:06:28,610 - WARNING - Failed to retrieve PermID for symbol 'AAPL.O', using default PermID: 4295905573
    AAPL.O PermID=4295905573
    2024-11-09 14:06:28,978 - INFO - No financial filings found in the specified date range.
    2024-11-09 14:06:28,979 - ERROR - No transcripts found or failed to download transcripts.

    import requests
    import base64
    import json
    from datetime import datetime, timedelta
    import logging
    import configparser
    import os

    # Set up logging
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)

    class RDPClient:
    def __init__(self, config_file='config.txt'):
    """Initialize client with credentials from config file"""
    self.config = self._load_config(config_file)
    self.api_key = self.config['api_key']
    self.client_secret = self.config['client_secret']
    self.username = self.config['username']
    self.password = self.config['password']
    self.base_url = "https://api.refinitiv.com"
    self.auth_url = "https://api.refinitiv.com/auth/oauth2/v1/token"
    self.access_token = None
    self.token_expiry = None

    def _load_config(self, config_file):
    if not os.path.exists(config_file):
    raise FileNotFoundError(f"Config file {config_file} not found.")

    config = configparser.ConfigParser()
    config.read(config_file)

    try:
    return {
    'api_key': config['credentials']['api_key'],
    'client_secret': config['credentials']['client_secret'],
    'username': config['credentials']['username'],
    'password': config['credentials']['password']
    }
    except KeyError as e:
    raise KeyError(f"Missing required credential in config file: {e}")

    def authenticate(self):
    """Authenticate with Refinitiv RDP API and get access token"""
    try:
    # Encode client credentials
    credentials = f'{self.api_key}:{self.client_secret}'
    encoded_credentials = base64.b64encode(credentials.encode()).decode()

    headers = {
    "Accept": "application/json",
    "Content-Type": "application/x-www-form-urlencoded",
    "Authorization": f"Basic {encoded_credentials}"
    }

    payload = {
    "grant_type": "password",
    "username": self.username,
    "password": self.password,
    "scope": "trapi",
    "takeExclusiveSignOnControl": "true"
    }

    logger.info("Attempting authentication...")
    response = requests.post(self.auth_url, headers=headers, data=payload)

    if response.status_code != 200:
    error_data = response.json()
    logger.error(f"Authentication failed: {error_data.get('error_description', 'Unknown error')}")
    return False

    auth_data = response.json()
    self.access_token = auth_data['access_token']
    expires_in = int(auth_data['expires_in'])
    self.token_expiry = datetime.now() + timedelta(seconds=expires_in)

    logger.info("Authentication successful!")
    return True

    except Exception as e:
    logger.error(f"Authentication error: {str(e)}")
    return False

    def refresh_token(self):
    """Refresh token if expired"""
    if datetime.now() >= self.token_expiry:
    logger.info("Token expired, re-authenticating...")
    return self.authenticate()
    return True

    def get_organization_id(self, symbol):
    """Convert ticker symbol to OrganizationId (PermID) using LSEG's PermID Entity Search API, with API key in headers."""
    try:
    # Remove any ".O" suffix from the symbol, as LSEG may not recognize it
    clean_symbol = symbol.split('.')[0] # Remove suffix if present

    # LSEG PermID Entity Search API URL
    entity_search_url = "https://api-eit.refinitiv.com/permid/search"

    # Set the API key in headers
    headers = {
    "x-api-key": self.api_key, # Use API key from configuration
    "Accept": "application/json"
    }

    # Query for the cleaned ticker symbol
    params = {
    "q": f"ticker:{clean_symbol}",
    "entitytype": "organization"
    }

    response = requests.get(entity_search_url, headers=headers, params=params)

    if response.status_code == 200:
    data = response.json()
    if data.get('result') and data['result'].get('organizations') and data['result']['organizations'].get('entities'):
    perm_id_url = data['result']['organizations']['entities'][0].get('@id';)
    perm_id = perm_id_url.split('/')[-1] # Extract the PermID from the URL
    logger.info(f"PermID for symbol '{clean_symbol}': {perm_id}")
    print(f"{symbol} PermID={perm_id}") # Print the PermID
    return perm_id
    else:
    logger.error(f"PermID not found for symbol: {symbol}")
    return None
    else:
    logger.error(f"Failed to lookup PermID: {response.status_code} - {response.text}")
    return None
    except Exception as e:
    logger.error(f"Error getting PermID: {str(e)}")
    return None



    def get_transcripts(self, perm_id="4295905573", start_date="2022-01-01", end_date="2022-12-31"):
    """Fetch financial filings and list available documents for downloading."""
    if not self.refresh_token():
    logger.error("Failed to refresh token.")
    return None

    try:
    filings_url = f"{self.base_url}/data-store/v1/graphql"

    headers = {
    "Authorization": f"Bearer {self.access_token}",
    "Content-Type": "application/json"
    }

    # Query with the GraphQL structure
    query = {
    "query": """
    query FinancialFiling($OrganizationId: Long, $FilingDateFrom: DateTime!, $FilingDateTo: DateTime!, $LanguageId: Long) {
    FinancialFiling(filter: {AND: [{FilingDocument: {Identifiers: {OrganizationId: {EQ: $OrganizationId}}}}, {FilingDocument: {DocumentSummary: {FilingDate: {BETWN: {FROM: $FilingDateFrom, TO: $FilingDateTo}}}}}]}, sort: {FilingDocument: {DocumentSummary: {FilingDate: DESC}}}, limit: 10) {
    FilingDocument {
    Identifiers {
    Dcn
    }
    DocId
    DocumentSummary {
    DocumentTitle
    FilingDate
    OriginalFilename
    }
    FilesMetaData {
    FileName
    MimeType
    }
    }
    }
    }
    """,
    "variables": {
    "OrganizationId": perm_id,
    "LanguageId": "505062",
    "FilingDateFrom": f"{start_date}T00:00:00Z",
    "FilingDateTo": f"{end_date}T00:00:00Z"
    }
    }

    response = requests.post(filings_url, headers=headers, json=query)

    if response.status_code == 200:
    data = response.json()
    if data.get('data') and data['data'].get('FinancialFiling'):
    filings = data['data']['FinancialFiling']
    downloadable_documents = []
    for filing in filings:
    document = filing.get('FilingDocument')
    if document:
    # Collect document metadata for download listing
    doc_info = {
    "DocumentTitle": document['DocumentSummary']['DocumentTitle'],
    "FilingDate": document['DocumentSummary']['FilingDate'],
    "OriginalFilename": document['DocumentSummary'].get('OriginalFilename'),
    "FileMetaData": [
    {
    "FileName": file_meta.get("FileName"),
    "MimeType": file_meta.get("MimeType")
    }
    for file_meta in document.get('FilesMetaData', [])
    ]
    }
    downloadable_documents.append(doc_info)

    # Print or log the downloadable documents
    for doc in downloadable_documents:
    logger.info(f"Document: {doc['DocumentTitle']} - Date: {doc['FilingDate']}")
    logger.info(f"File Metadata: {doc['FileMetaData']}")

    return downloadable_documents
    else:
    logger.info("No financial filings found in the specified date range.")
    return None
    else:
    logger.error(f"Failed to get financial filings: {response.status_code} - {response.text}")
    return None

    except Exception as e:
    logger.error(f"Error fetching financial filings: {str(e)}")
    return None

    def main():
    try:
    client = RDPClient('config.txt')

    if client.authenticate():
    symbol = "AAPL.O"
    default_perm_id = "4295905573" # Define your default PermID here
    perm_id = client.get_organization_id(symbol) or default_perm_id # Use the default if lookup fails

    if perm_id == default_perm_id:
    logger.warning(f"Failed to retrieve PermID for symbol '{symbol}', using default PermID: {default_perm_id}")
    else:
    logger.info(f"PermID for symbol '{symbol}' is {perm_id}")

    # Print the PermID being used
    print(f"{symbol} PermID={perm_id}")

    # Fetch transcripts using the determined PermID
    transcripts = client.get_transcripts(perm_id, "2022-01-01", "2022-12-31")
    if transcripts:
    for i, transcript in enumerate(transcripts, start=1):
    logger.info(f"Transcript {i}: {transcript['DocumentSummary']['DocumentTitle']} on {transcript['DocumentSummary']['FilingDate']}")
    else:
    logger.error("No transcripts found or failed to download transcripts.")
    else:
    logger.error("Authentication failed")
    except Exception as e:
    logger.error(f"An error occurred: {str(e)}")

    if __name__ == "__main__":
    main()