EMA Java Creating multiple OmmConsumer instances problem
Hello all,
I am using the EMA Java library group: "com.refinitiv.ema", name: "ema", version: "3.9.0.1"
I am trying to create multiple connections to the HMDS websocket at the same time, on multiple threads, but I ran into a problem.
After creating the OmmConsumerConfig for each connection that I will have to make and then I create the OmmConsumer instances using the EmaFactory.createOmmConsumer, it seems like all the Worker threads can end up in the same class instance HeaderWebSocketSessionKeyHandler method generateWebSocketSessionKey, where there is a class level variable "uuidBuffer" which is used to create the websocket session key and is used by all the threads, and they all keep overwriting it and such, sometimes causing buffer overflows:
Exception in thread "pool-42-thread-1" java.nio.BufferOverflowException
at java.base/java.nio.Buffer.nextPutIndex(Buffer.java:674)
at java.base/java.nio.HeapByteBuffer.putLong(HeapByteBuffer.java:465)
at com.refinitiv.eta.transport.HeaderWebSocketSessionKeyHandler.generateWebSocketSessionKey(HeaderWebSocketSessionKeyHandler.java:95)
at com.refinitiv.eta.transport.HeaderWebSocketSessionKeyHandler.encodeWebSocketHeader(HeaderWebSocketSessionKeyHandler.java:79)
at com.refinitiv.eta.transport.WebSocketHandlerImpl.createRequestHandshake(WebSocketHandlerImpl.java:153)
at com.refinitiv.eta.transport.RsslSocketChannel.initChnlSendWebSocketHandshakeRequest(RsslSocketChannel.java:3540)
at com.refinitiv.eta.transport.RsslSocketChannel.init(RsslSocketChannel.java:3437)
at com.refinitiv.eta.valueadd.reactor.Worker.initializeChannel(Worker.java:601)
at com.refinitiv.eta.valueadd.reactor.Worker.run(Worker.java:99)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
As of now I am using the same token to create multiple connections, and sometimes, if the timing is right, they all open correctly and I can subscribe on them, but depending on how many connections I make, they either end up crashing with the buffer overflow or the login times out after 70 seconds.
My use case is that I want to be able to create multiple OmmConsumer instances at the same time on different threads that connect to HMDS Websocket endpoints.
It seems like even if I create them all in a synchronous way, the Worker threads all reach the same instance at the same method, and I can get the buffer overflow exception.
Am I using the library wrong? Could you please advise how can I correctly do my use case? Thank you!
These are some of the config values that I use:
Consumer:
LoginRequestTimeOut: 70000
DictionaryRequestTimeOut: 70000
MaxDispatchCountApiThread: 5000
Channel:
CompressionType: CompressionType::None
NumInputBuffers: 4096
ConnectionPingTimeout: 60000
GuaranteedOutputBuffers: 4096
TcpNodelay: 1
EncryptedProtocolType: EncryptedProtocolType::RSSL_WEBSOCKET
WsProtocols: rssl.rwf
HighWaterMark: <a number, same for each connection>
SysRecvBufSize: 524288
SysSendBufSize: 524288
ChannelType: ChannelType::RSSL_ENCRYPTED
There are more settings, but I am unsure which ones would be useful and which not. I've wrote the ones that I thought might be useful to know, but I can provide more
Best Answer
-
Hello @Zeno
Please check the ex410_MP_HorizontalScaling example. The Horizontal Scaling distributes the processing of events between multiple CPU cores while running a single process on the machine.
If the problem still persisting after you have applied the Horizontal Scaling, I strongly suggest you contact the Real-Time APIs Support Team to investigate this behavior in detail. You can contact the team by clicking "Contact premium support" button on the
page.0
Answers
-
I am already doing the Horizontal scaling, which from my understanding, is just creating in a synchronous way the OmmConsumer instances. The problem comes from the fact that my OmmConsumerConfigs are of type Websocket, which creates the Sec-WebSocket-Key using the same class for all the OmmConsumers, which leads to that exception.
Unfortunately, I do not have that premium support button on my page, but I will search and try to contact someone that has a tangent with that library. Thank you!
0 -
Hello @Zeno
Is it for LSEG internal project or from the external client? If it is for LSEG internal project development, you can contact the Real-Time APIs Support team via https://support.lseg.com/ website, and choose "RFA - Java Edition" product. The helpdesk team will escalate to the Real-Time APIs Support Team.
- Please mention this is the EMA Java issue, not RFA (they are supported by the same team).
- Please mention this is for LSEG internal project
Alternatively, you may contact the Development team directly via Real-Time-SDK GitHub Issue page.
0 -
It is for an internal project. I have been guided by someone to contact developers support, and if that won't work, I will do a ticked in support LSEG website. Thank you very much for your help!
0 -
Just as a side comment..
Is there a particular reason you are using WebSocket as opposed to the more efficient RSSL option?0 -
I am unsure how much I can explain about the projects, as they are internal projects for us :) Maybe in private if we could…
But as a curiosity, can I use HMDS tokens (the short ones that expire in 7199 seconds, made through PingId was it called iirc) to connect to RSSL at a wss HMDS API endpoint, for example: "emea1-ws-fo-hmds.ppe.platform.refinitiv.com"? Will they need an extra scope added when a client logs in?0 -
Hello @Zeno
I am not sure what the HMDS service is. According to my research, the HMDS (Hosted Market Data System) should be some kind of Real-Time platform service. The EMA API needs to set authentication token (DACS??) via a login request message to connect to it.
If so, it is depending on whether that HMDS service support RSSL connection with the same token system. To switch from the WebSocket connection to RSSL connection, you need to change the API configuration, not the code (except you are using programmatic configuration).
The RSSL connection is Refinitiv/LSEG proprietary TCP based connection that sends and receives message in binary. It is designed for data distribution and much faster than WebSocket JSON string.
However, changing to RSSL might not relate to the original issue.
0 -
From my understanding, HMDS is the "older version" of the current RTO, and it will be decommissioned later, but as of now, our implementation is based on its endpoints and workings.
Yes, in our test stressing, we have observed that the EMA API using RSSL_WEBSOCKET to connect to the HMDS endpoints is way better than the JSON/text based one, so we are working to switch to it.
We have already configured and are using the RSSL WebSocket connection with the HMDS endpoints, and are successfully establishing connection and getting data. We are using a programmatic configuration, and I believe we are using the RSSL WebSocket connection given that we set the following omm consumer config parameters like so:
EncryptedProtocolType: EncryptedProtocolType::RSSL_WEBSOCKETWsProtocols: rssl.rwf
And when creating channels: ChannelType::RSSL_ENCRYPTED
But yes, the original issue is that while we can successfully create the OmmConsumer instance that connects through RSSL WebSocket to the HMDS endpoints, creating multiple results in that race condition internal library error, which results in some OmmConsumer instances not being able to establish connection, and unfortunately, I did not find a way in which I can access and try to circumvent that race condition, as I cannot reach that part of the library in any way.
0
Categories
- All Categories
- 3 Polls
- 6 AHS
- 37 Alpha
- 167 App Studio
- 6 Block Chain
- 4 Bot Platform
- 18 Connected Risk APIs
- 47 Data Fusion
- 34 Data Model Discovery
- 702 Datastream
- 1.5K DSS
- 633 Eikon COM
- 5.2K Eikon Data APIs
- 14 Electronic Trading
- 1 Generic FIX
- 7 Local Bank Node API
- 6 Trading API
- 2.9K Elektron
- 1.5K EMA
- 257 ETA
- 566 WebSocket API
- 40 FX Venues
- 16 FX Market Data
- 1 FX Post Trade
- 1 FX Trading - Matching
- 12 FX Trading – RFQ Maker
- 5 Intelligent Tagging
- 2 Legal One
- 25 Messenger Bot
- 3 Messenger Side by Side
- 9 ONESOURCE
- 7 Indirect Tax
- 60 Open Calais
- 283 Open PermID
- 47 Entity Search
- 2 Org ID
- 1 PAM
- PAM - Logging
- 6 Product Insight
- Project Tracking
- ProView
- ProView Internal
- 24 RDMS
- 2.1K Refinitiv Data Platform
- 824 Refinitiv Data Platform Libraries
- 4 LSEG Due Diligence
- LSEG Due Diligence Portal API
- 4 Refinitiv Due Dilligence Centre
- Rose's Space
- 1.2K Screening
- 18 Qual-ID API
- 13 Screening Deployed
- 23 Screening Online
- 12 World-Check Customer Risk Screener
- 1K World-Check One
- 46 World-Check One Zero Footprint
- 45 Side by Side Integration API
- 2 Test Space
- 3 Thomson One Smart
- 10 TR Knowledge Graph
- 151 Transactions
- 143 REDI API
- 1.8K TREP APIs
- 4 CAT
- 27 DACS Station
- 122 Open DACS
- 1.1K RFA
- 107 UPA
- 195 TREP Infrastructure
- 232 TRKD
- 918 TRTH
- 5 Velocity Analytics
- 9 Wealth Management Web Services
- 100 Workspace SDK
- 11 Element Framework
- 5 Grid
- 19 World-Check Data File
- 1 Yield Book Analytics
- 48 中文论坛