EMA Java Creating multiple OmmConsumer instances problem

Options
Zeno
Zeno LSEG
edited August 6 in EMA

Hello all,

I am using the EMA Java library group: "com.refinitiv.ema", name: "ema", version: "3.9.0.1"

I am trying to create multiple connections to the HMDS websocket at the same time, on multiple threads, but I ran into a problem.

After creating the OmmConsumerConfig for each connection that I will have to make and then I create the OmmConsumer instances using the EmaFactory.createOmmConsumer, it seems like all the Worker threads can end up in the same class instance HeaderWebSocketSessionKeyHandler method generateWebSocketSessionKey, where there is a class level variable "uuidBuffer" which is used to create the websocket session key and is used by all the threads, and they all keep overwriting it and such, sometimes causing buffer overflows:

Exception in thread "pool-42-thread-1" java.nio.BufferOverflowException
at java.base/java.nio.Buffer.nextPutIndex(Buffer.java:674)
at java.base/java.nio.HeapByteBuffer.putLong(HeapByteBuffer.java:465)
at com.refinitiv.eta.transport.HeaderWebSocketSessionKeyHandler.generateWebSocketSessionKey(HeaderWebSocketSessionKeyHandler.java:95)
at com.refinitiv.eta.transport.HeaderWebSocketSessionKeyHandler.encodeWebSocketHeader(HeaderWebSocketSessionKeyHandler.java:79)
at com.refinitiv.eta.transport.WebSocketHandlerImpl.createRequestHandshake(WebSocketHandlerImpl.java:153)
at com.refinitiv.eta.transport.RsslSocketChannel.initChnlSendWebSocketHandshakeRequest(RsslSocketChannel.java:3540)
at com.refinitiv.eta.transport.RsslSocketChannel.init(RsslSocketChannel.java:3437)
at com.refinitiv.eta.valueadd.reactor.Worker.initializeChannel(Worker.java:601)
at com.refinitiv.eta.valueadd.reactor.Worker.run(Worker.java:99)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

As of now I am using the same token to create multiple connections, and sometimes, if the timing is right, they all open correctly and I can subscribe on them, but depending on how many connections I make, they either end up crashing with the buffer overflow or the login times out after 70 seconds.

My use case is that I want to be able to create multiple OmmConsumer instances at the same time on different threads that connect to HMDS Websocket endpoints.

It seems like even if I create them all in a synchronous way, the Worker threads all reach the same instance at the same method, and I can get the buffer overflow exception.

Am I using the library wrong? Could you please advise how can I correctly do my use case? Thank you!

These are some of the config values that I use:

Consumer:

LoginRequestTimeOut: 70000

DictionaryRequestTimeOut: 70000

MaxDispatchCountApiThread: 5000

Channel:

CompressionType: CompressionType::None

NumInputBuffers: 4096

ConnectionPingTimeout: 60000
GuaranteedOutputBuffers: 4096

TcpNodelay: 1

EncryptedProtocolType: EncryptedProtocolType::RSSL_WEBSOCKET

WsProtocols: rssl.rwf

HighWaterMark: <a number, same for each connection>

SysRecvBufSize: 524288
SysSendBufSize: 524288

ChannelType: ChannelType::RSSL_ENCRYPTED

There are more settings, but I am unsure which ones would be useful and which not. I've wrote the ones that I thought might be useful to know, but I can provide more

Best Answer

Answers

  • Zeno
    Zeno LSEG

    I am already doing the Horizontal scaling, which from my understanding, is just creating in a synchronous way the OmmConsumer instances. The problem comes from the fact that my OmmConsumerConfigs are of type Websocket, which creates the Sec-WebSocket-Key using the same class for all the OmmConsumers, which leads to that exception.

    Unfortunately, I do not have that premium support button on my page, but I will search and try to contact someone that has a tangent with that library. Thank you!

  • Hello @Zeno

    Is it for LSEG internal project or from the external client? If it is for LSEG internal project development, you can contact the Real-Time APIs Support team via https://support.lseg.com/ website, and choose "RFA - Java Edition" product. The helpdesk team will escalate to the Real-Time APIs Support Team.

    • Please mention this is the EMA Java issue, not RFA (they are supported by the same team).
    • Please mention this is for LSEG internal project

    Alternatively, you may contact the Development team directly via Real-Time-SDK GitHub Issue page.

    contact_rfa_support.png
  • Zeno
    Zeno LSEG

    It is for an internal project. I have been guided by someone to contact developers support, and if that won't work, I will do a ticked in support LSEG website. Thank you very much for your help!

  • Just as a side comment..
    Is there a particular reason you are using WebSocket as opposed to the more efficient RSSL option?

  • Zeno
    Zeno LSEG

    I am unsure how much I can explain about the projects, as they are internal projects for us :) Maybe in private if we could…
    But as a curiosity, can I use HMDS tokens (the short ones that expire in 7199 seconds, made through PingId was it called iirc) to connect to RSSL at a wss HMDS API endpoint, for example: "emea1-ws-fo-hmds.ppe.platform.refinitiv.com"? Will they need an extra scope added when a client logs in?

  • Hello @Zeno

    I am not sure what the HMDS service is. According to my research, the HMDS (Hosted Market Data System) should be some kind of Real-Time platform service. The EMA API needs to set authentication token (DACS??) via a login request message to connect to it.

    If so, it is depending on whether that HMDS service support RSSL connection with the same token system. To switch from the WebSocket connection to RSSL connection, you need to change the API configuration, not the code (except you are using programmatic configuration).

    The RSSL connection is Refinitiv/LSEG proprietary TCP based connection that sends and receives message in binary. It is designed for data distribution and much faster than WebSocket JSON string.

    However, changing to RSSL might not relate to the original issue.

  • Zeno
    Zeno LSEG
    edited August 6

    From my understanding, HMDS is the "older version" of the current RTO, and it will be decommissioned later, but as of now, our implementation is based on its endpoints and workings.

    Yes, in our test stressing, we have observed that the EMA API using RSSL_WEBSOCKET to connect to the HMDS endpoints is way better than the JSON/text based one, so we are working to switch to it.

    We have already configured and are using the RSSL WebSocket connection with the HMDS endpoints, and are successfully establishing connection and getting data. We are using a programmatic configuration, and I believe we are using the RSSL WebSocket connection given that we set the following omm consumer config parameters like so:
    EncryptedProtocolType: EncryptedProtocolType::RSSL_WEBSOCKET

    WsProtocols: rssl.rwf

    And when creating channels: ChannelType::RSSL_ENCRYPTED

    But yes, the original issue is that while we can successfully create the OmmConsumer instance that connects through RSSL WebSocket to the HMDS endpoints, creating multiple results in that race condition internal library error, which results in some OmmConsumer instances not being able to establish connection, and unfortunately, I did not find a way in which I can access and try to circumvent that race condition, as I cannot reach that part of the library in any way.