We use refinitiv-dataplatform=1.0.0a7 to get real-time prices. From time to time I see an error in my logs:
2021-03-10 23:37:21,537 - Session session.platform - Thread 140598623328000 | WebSocket 0 - OMM Protocol - pricing WebSocket error occurred for web socket client 1 (login id 570) : Connection is already closed. Exception in callback OMMStream._on_reconnect(<FailoverStat...verStarted: 0>, 'Open', 'Suspect', 'FailoverStarted', 'Streaming co...g to recover.')() handle: <Handle OMMStream._on_reconnect(<FailoverStat...verStarted: 0>, 'Open', 'Suspect', 'FailoverStarted', 'Streaming co...g to recover.')()> Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/nest_asyncio.py", line 150, in run ctx.run(self._callback, *self._args) File "/usr/local/lib/python3.8/site-packages/refinitiv/dataplatform/delivery/stream/omm_stream.py", line 237, in _on_reconnect self._on_status(status_message) File "/usr/local/lib/python3.8/site-packages/refinitiv/dataplatform/delivery/stream/omm_item_stream.py", line 334, in _on_status super()._on_status(status) File "/usr/local/lib/python3.8/site-packages/refinitiv/dataplatform/delivery/stream/omm_stream.py", line 284, in _on_status if new_status_code == 'Open': NameError: name 'new_status_code' is not defined
Why does it happen? And how to fix it?
OS: Debian GNU/Linux 10 (buster)
I found two issues here.
1. The connection is closed (WebSocket error occurred for web socket client 1 (login id 570) : Connection is already closed)
2. NameError: name 'new_status_code' is not defined
For the second issue, it should be resolved in 1.0.0a7.post7.
The error message indicates that the server you are connecting to has closed the connection.
Can you please provide more information to help with understanding the issue?
Are you connecting to an ADS (deployed) or to Real-Time Optimized (Cloud)?
How many instruments are you subscribing to?
What kind of processing do you do with the update payload as it is received in the callback handlers e.g. complex calculations / writing to database etc?
We getting D5 prices for 16 currencies (e.g. EUR=D5, JPY=D5). I guess it is Real-Time Optimized though not sure.
We don't do anything special, just send events to Kafka:
def streaming_prices(): streaming_prices = rdp.StreamingPrices( universe=RIC_CCY_D on_update=lambda streaming_price, instrument_name, fields: display_updated_fields(streaming_price, instrument_name, fields) ) streaming_prices.open() while True: try: asyncio.get_event_loop().run_until_complete(asyncio.sleep(1)) except (KeyboardInterrupt, SystemExit): rdp.close_session() break def display_updated_fields(streaming_price, instrument_name, fields): fx_timestamp = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat().replace("+00:00", "Z") fields['fx_timestamp'] = fx_timestamp fields['ric'] = instrument_name message = json.dumps(fields) producer.produce(topic=my_topic, value=message, callback=acked) producer.poll()
We can get a few dozens of error messages during one second and the next second it works again as usual.