Hello all,
I am using the EMA Java library group: "com.refinitiv.ema", name: "ema", version: "3.9.0.1"
I am trying to create multiple connections to the HMDS websocket at the same time, on multiple threads, but I ran into a problem.
After creating the OmmConsumerConfig for each connection that I will have to make and then I create the OmmConsumer instances using the EmaFactory.createOmmConsumer, it seems like all the Worker threads can end up in the same class instance HeaderWebSocketSessionKeyHandler method generateWebSocketSessionKey, where there is a class level variable "uuidBuffer" which is used to create the websocket session key and is used by all the threads, and they all keep overwriting it and such, sometimes causing buffer overflows:
Exception in thread "pool-42-thread-1" java.nio.BufferOverflowException
at java.base/java.nio.Buffer.nextPutIndex(Buffer.java:674)
at java.base/java.nio.HeapByteBuffer.putLong(HeapByteBuffer.java:465)
at com.refinitiv.eta.transport.HeaderWebSocketSessionKeyHandler.generateWebSocketSessionKey(HeaderWebSocketSessionKeyHandler.java:95)
at com.refinitiv.eta.transport.HeaderWebSocketSessionKeyHandler.encodeWebSocketHeader(HeaderWebSocketSessionKeyHandler.java:79)
at com.refinitiv.eta.transport.WebSocketHandlerImpl.createRequestHandshake(WebSocketHandlerImpl.java:153)
at com.refinitiv.eta.transport.RsslSocketChannel.initChnlSendWebSocketHandshakeRequest(RsslSocketChannel.java:3540)
at com.refinitiv.eta.transport.RsslSocketChannel.init(RsslSocketChannel.java:3437)
at com.refinitiv.eta.valueadd.reactor.Worker.initializeChannel(Worker.java:601)
at com.refinitiv.eta.valueadd.reactor.Worker.run(Worker.java:99)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
As of now I am using the same token to create multiple connections, and sometimes, if the timing is right, they all open correctly and I can subscribe on them, but depending on how many connections I make, they either end up crashing with the buffer overflow or the login times out after 70 seconds.
My use case is that I want to be able to create multiple OmmConsumer instances at the same time on different threads that connect to HMDS Websocket endpoints.
It seems like even if I create them all in a synchronous way, the Worker threads all reach the same instance at the same method, and I can get the buffer overflow exception.
Am I using the library wrong? Could you please advise how can I correctly do my use case? Thank you!
These are some of the config values that I use:
Consumer:
LoginRequestTimeOut: 70000
DictionaryRequestTimeOut: 70000
MaxDispatchCountApiThread: 5000
Channel:
CompressionType: CompressionType::None
NumInputBuffers: 4096
ConnectionPingTimeout: 60000
GuaranteedOutputBuffers: 4096
TcpNodelay: 1
EncryptedProtocolType: EncryptedProtocolType::RSSL_WEBSOCKET
WsProtocols: rssl.rwf
HighWaterMark: <a number, same for each connection>
SysRecvBufSize: 524288
SysSendBufSize: 524288
ChannelType: ChannelType::RSSL_ENCRYPTED
There are more settings, but I am unsure which ones would be useful and which not. I've wrote the ones that I thought might be useful to know, but I can provide more