I am having issues pulling level 2 data into python. I have access to level 2 data (see screenshot) but my websocket connection through OMM_stream is saying the record could not be found. i am using code inspired by the LSEG Refinitiv Data Stream on Github (see this link: https://github.com/LSEG-API-Samples/Example.DataLibrary.Python/tree/main/Examples/3-Delivery/3.02-Streaming). I am using SPY as my RIC which you can see in my screenshot.
Here is my python code:
import refinitiv.data as rd
from refinitiv.data.delivery import omm_stream
import datetime
from IPython.display import clear_output
import logging
import pandas as pd
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Define a callback to display market data updates
def display_market_data(response, instrument_name, stream):
try:
clear_output(wait=True)
current_time = datetime.datetime.now().time()
logger.info(f"{current_time} - Market data update for {instrument_name}")
# Log the raw response for debugging
logger.debug(f"Raw response: {response}")
# Handle response as a dictionary
if isinstance(response, dict):
# Check for 'Fields' (Level 1) or 'Map' (Level 2)
if 'Fields' in response:
data = response.get('Fields', {})
if data:
df = pd.DataFrame([data])
print(df)
else:
logger.warning(f"No data found in Fields for {instrument_name}")
elif 'Map' in response:
# Handle Level 2 MarketByPrice data
map_data = response.get('Map', {})
entries = map_data.get('Entries', [])
if entries:
# Convert map entries to a list of dictionaries
data_list = []
for entry in entries:
key = entry.get('Key', '') # Price level or order key
fields = entry.get('Fields', {})
fields['PriceLevel'] = key # Add price level as a field
data_list.append(fields)
if data_list:
df = pd.DataFrame(data_list)
print(df)
else:
logger.warning(f"No map entries found for {instrument_name}")
else:
logger.warning(f"No map data found for {instrument_name}")
else:
logger.warning(f"Unexpected response structure for {instrument_name}")
else:
# Try accessing response.data.df if it's not a dict
print(response.data.df)
except Exception as e:
logger.error(f"Error processing market data for {instrument_name}: {e}")
# Define a status callback
def on_status(status, stream):
logger.info(f"Status update: {status}")
state = status.get('State', {})
if state.get('Data') == 'Suspect' and state.get('Code') == 'NotFound':
logger.error(f"RIC {stream.name} not found. Verify RIC or check permissions.")
# Open a desktop session
try:
rd.open_session()
logger.info("Session opened successfully")
except Exception as e:
logger.error(f"Failed to open session: {e}")
exit(1)
# Define the RIC for the instrument
ric = "SPY" # Verify if this RIC supports MarketByPrice
# Create an OMM stream for Level 2 market data
try:
stream = omm_stream.Definition(
name=ric,
domain="MarketByPrice", # Use MarketByPrice for Level 2 data
fields=[
"DSPLY_NAME", # Display name (Level 1 field)
"BID_PRICE_1", "BID_SIZE_1", # Level 2: Bid price/size at level 1
"ASK_PRICE_1", "ASK_SIZE_1", # Level 2: Ask price/size at level 1
"BID_PRICE_2", "BID_SIZE_2", # Level 2: Bid price/size at level 2
"ASK_PRICE_2", "ASK_SIZE_2", # Level 2: Ask price/size at level 2
"NO_ORD", # Number of orders at price level
"ORDER_SIDE", # Bid or Ask
"PRICELVL" # Price level
]
).get_stream()
logger.info(f"Stream created for RIC: {ric}")
except Exception as e:
logger.error(f"Failed to create stream for {ric}: {e}")
rd.close_session()
exit(1)
# Register callbacks
stream.on_refresh(lambda response, stream: display_market_data(response, ric, stream))
stream.on_update(lambda response, stream: display_market_data(response, ric, stream))
stream.on_status(on_status)
stream.on_error(lambda error, stream: logger.error(f"Stream error: {error}"))
# Open the stream
try:
stream.open()
logger.info(f"Stream opened for {ric}")
except Exception as e:
logger.error(f"Failed to open stream for {ric}: {e}")
rd.close_session()
exit(1)
# Keep the stream open for 60 seconds
import time
time.sleep(60)
# Clean up
try:
stream.close()
logger.info("Stream closed")
rd.close_session()
logger.info("Session closed")
except Exception as e:
logger.error(f"Error during cleanup: {e}")
and here is the output:
2025-07-08 08:44:46,124 - INFO - HTTP Request: GET http://localhost:9000/api/status "HTTP/1.1 200 OK"
2025-07-08 08:44:46,124 - INFO - HTTP Request: POST http://localhost:9000/api/handshake "HTTP/1.1 200 OK"
20
25-07-08 08:44:46,124 - INFO - Session opened successfully
2025-07-08 08:44:46,124 - INFO - Stream created for RIC: SPY
2025-07-08 08:44:46,149 - INFO - HTTP Request: GET http://localhost:9000/api/rdp/streaming/pricing/v1/ "HTTP/1.1 200 OK"
2025-07-08 08:44:46,177 - INFO - Websocket connected
2025-07-08 08:44:46,382 - INFO - Status update: {'Domain': 'MarketByPrice', 'Type': 'Status', 'ID': 5, 'Key': {'Name': 'SPY'}, 'State': {'Data': 'Suspect', 'Stream': 'Closed', 'Code': 'NotFound', 'Text': 'The record could not be found'}}
2025-07-08 08:44:46,384 - INFO - Stream opened for SPY
2025-07-08 08:44:46,385 - ERROR - RIC SPY not found. Verify RIC or check permissions.