Hello guys, encountered a problem. Language Java. I have created OmmConsumer, and opened a tunnel stream like in tutorials.
The goal is to contribute price, and to get other stock prices. Both of the goals fails.
Here is the code of tunnel opening
consumer = EmaFactory.createOmmConsumer( consumerConfig.consumerName("Consumer_1") .username("***") .password("***") ); ClassOfService cos = EmaFactory.createClassOfService() .authentication(EmaFactory.createCosAuthentication() .type(CosAuthentication.CosAuthenticationType.OMM_LOGIN)) .dataIntegrity(EmaFactory.createCosDataIntegrity() .type(CosDataIntegrity.CosDataIntegrityType.RELIABLE)) .flowControl(EmaFactory.createCosFlowControl() .type(CosFlowControl.CosFlowControlType.BIDIRECTIONAL) .recvWindowSize(1200)) .guarantee(EmaFactory.createCosGuarantee() .type(CosGuarantee.CosGuaranteeType.NONE)); TunnelStreamRequest tsr = EmaFactory.createTunnelStreamRequest() .classOfService(cos) .domainType(EmaRdm.MMT_SYSTEM) .name("TUNNEL1") .serviceName("DDS_TRCE"); val tunnelStreamConsumerClient = OmmConsumerClientImpl.builder() .consumerName("tunnel-stream") .statusMsgConsumer(this::handleConnectionStatus) .log(log) .build(); this.tunnelStreamHandle = consumer.registerClient(tsr, tunnelStreamConsumerClient);
That part completes successfully. Here are responses from trace:
<REFRESH domainType="LOGIN" streamId="1" containerType="NO_DATA" flags="0x68 (HAS_MSG_KEY|SOLICITED|REFRESH_COMPLETE)" groupId="0" State: Open/Ok/None - text: "" dataSize="0"> <key flags="0x26 (HAS_NAME|HAS_NAME_TYPE|HAS_ATTRIB)" name="GE-A-10001662-3-4170" nameType="1" attribContainerType="ELEMENT_LIST"> <attrib> <elementList flags="0x08 (HAS_STANDARD_DATA)"> <elementEntry name="ApplicationId" dataType="ASCII_STRING" data="256"/> <elementEntry name="ApplicationName" dataType="ASCII_STRING" data="ema"/> <elementEntry name="Position" dataType="ASCII_STRING" data="192.168.109.1/***-windows-pc"/> <elementEntry name="SingleOpen" dataType="UINT" data="0"/> <elementEntry name="SupportOMMPost" dataType="UINT" data="1"/> </elementList> </attrib> </key> <dataBody> </dataBody> </REFRESH>
<!-- Incoming Reactor message --> <!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50321 remote=contrib1-amers1.platform.refinitiv.com/3.218.233.177:443] --> <!-- Thu Oct 01 12:18:25 UTC 2020 --> <!-- rwfMajorVer="14" rwfMinorVer="1" --> <REFRESH domainType="SOURCE" streamId="2" containerType="MAP" flags="0x68 (HAS_MSG_KEY|SOLICITED|REFRESH_COMPLETE)" groupId="0" State: Open/Ok/None - text: "" dataSize="195"> <key flags="0x08 (HAS_FILTER)" filter="11"/> <dataBody> <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" > <mapEntry flags="0x00" action="ADD" key="10" > <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00"> <filterEntry id="1" action="SET" flags="0x00" containerType="ELEMENT_LIST"> <elementList flags="0x08 (HAS_STANDARD_DATA)"> <elementEntry name="Name" dataType="ASCII_STRING" data="DDS_TRCE"/> <elementEntry name="Vendor" dataType="ASCII_STRING" data="Refinitiv"/> <elementEntry name="Capabilities" dataType="ARRAY"> <array itemLength="0" primitiveType="UINT"> <arrayEntry data="127"/> <arrayEntry data="6"/> </array> </elementEntry> <elementEntry name="QoS" dataType="ARRAY"> <array itemLength="0" primitiveType="QOS"> <arrayEntry Qos: Realtime/TickByTick/Static - timeInfo: 0 - rateInfo: 0/> </array> </elementEntry> <elementEntry name="SupportsOutOfBandSnapshots" dataType="UINT" data="0"/> </elementList> </filterEntry> <filterEntry id="2" action="SET" flags="0x00" containerType="ELEMENT_LIST"> <elementList flags="0x08 (HAS_STANDARD_DATA)"> <elementEntry name="ServiceState" dataType="UINT" data="1"/> <elementEntry name="Status" dataType="STATE" State: Open/Ok/None - text: ""/> </elementList> </filterEntry> <filterEntry id="4" action="SET" flags="0x00" containerType="ELEMENT_LIST"> <elementList flags="0x08 (HAS_STANDARD_DATA)"> <elementEntry name="OpenLimit" dataType="UINT" data="3"/> <elementEntry name="OpenWindow" dataType="UINT" data="1"/> </elementList> </filterEntry> </filterList> </mapEntry> </map> </dataBody> </REFRESH>
2020-10-01T15:18:27.359 | DEBUG | pool-3-thread-1 | RefinitivConnection-main: Consumer: tunnel-stream StatusMsg: StatusMsg streamId="5" domain="System Domain" privateStream state="Open / Ok / None / 'Login accepted by host 169c5654fd6c via ip-10-62-11-92'" name="TUNNEL1" serviceId="10" serviceName="DDS_TRCE" StatusMsgEnd
So when the tunnel stream is opened. First attempt to contribute:
FieldList nestedFieldList = EmaFactory.createFieldList(); nestedFieldList.add(EmaFactory.createFieldEntry().floatValue(393, 1)); UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg() .streamId(1) .name("DDS_TRCE") .payload(nestedFieldList); consumer.submit(EmaFactory.createPostMsg() .streamId(1) .postId(1) .domainType(EmaRdm.MMT_MARKET_PRICE) .solicitAck(true) .complete(true) .payload(nestedUpdateMsg), tunnelStreamHandle);
The response of failed post msg.
2020-10-01T15:43:13.173 | ERROR | pool-3-thread-1 | c.t.ema.access.OmmConsumerImpl: loggerMsg ClientName: TunnelItem Severity: Error Text: Invalid attempt to submit PostMsg on tunnel stream. loggerMsgEnd | MDC: <!-- Outgoing Reactor message --> <!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50631 remote=contrib1-amers1.platform.refinitiv.com/3.218.233.177:443] --> <!-- Thu Oct 01 12:43:13 UTC 2020 --> <!-- rwfMajorVer="14" rwfMinorVer="1" --> <CLOSE domainType="SYSTEM" streamId="3" containerType="NO_DATA" flags="0x00" dataSize="0"> <dataBody> </dataBody> </CLOSE> 2020-10-01T15:43:13.175 | DEBUG | pool-3-thread-1 | RefinitivConnection-main: Consumer: tunnel-stream StatusMsg: StatusMsg streamId="5" domain="System Domain" privateStream state="Closed, Recoverable / Suspect / None / 'TunnelStream.readMsg() Exception: loggerMsg ClientName: TunnelItem Severity: Error Text: Invalid attempt to submit PostMsg on tunnel stream. loggerMsgEnd '" name="TUNNEL1" serviceId="10" serviceName="DDS_TRCE" StatusMsgEnd event: com.thomsonreuters.ema.access.OmmEventImpl@1461a361 | MDC:
So the question is, how to do it right?
And here is another situation. Attempting to get prices from the same tunnel stream after it is successfully authorized:
consumer.registerClient( EmaFactory.createReqMsg().name("IBM.N").serviceId(1), OmmConsumerClientImpl.builder().consumerName("request").log(log).build(), 1, tunnelStreamHandle);
Response:
<!-- Outgoing Reactor message --> <!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50690 remote=contrib1-amers1.platform.refinitiv.com/3.213.90.178:443] --> <!-- Thu Oct 01 12:47:19 UTC 2020 --> <!-- rwfMajorVer="14" rwfMinorVer="1" --> <GENERIC domainType="SYSTEM" streamId="3" containerType="MSG" flags="0x19 (HAS_EXTENDED_HEADER|HAS_SEQ_NUM|MESSAGE_COMPLETE)" seqNum="2" dataSize="20"> <extendedHeader data="0100"/> <dataBody> <REQUEST domainType="MARKET_PRICE" streamId="6" containerType="NO_DATA" flags="0x04 (STREAMING)" dataSize="0"> <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="1" name="IBM.N"/> <dataBody> </dataBody> </REQUEST> </dataBody> </GENERIC> <!-- Incoming Reactor message --> <!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50690 remote=contrib1-amers1.platform.refinitiv.com/3.213.90.178:443] --> <!-- Thu Oct 01 12:47:19 UTC 2020 --> <!-- rwfMajorVer="14" rwfMinorVer="1" --> <GENERIC domainType="SYSTEM" streamId="3" containerType="NO_DATA" flags="0x11 (HAS_EXTENDED_HEADER|MESSAGE_COMPLETE)" dataSize="0"> <extendedHeader data="0200 0000 0002 0000 000F 4240"/> <dataBody> </dataBody> </GENERIC> <!-- Incoming Reactor message --> <!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50690 remote=contrib1-amers1.platform.refinitiv.com/3.213.90.178:443] --> <!-- Thu Oct 01 12:47:19 UTC 2020 --> <!-- rwfMajorVer="14" rwfMinorVer="1" --> <STATUS domainType="MARKET_PRICE" streamId="6" containerType="NO_DATA" flags="0x60 (HAS_STATE|CLEAR_CACHE)" State: Closed/Suspect/Not entitled - text: "Stream not found" dataSize="0"> <dataBody> </dataBody> </STATUS> <!-- Outgoing Reactor message --> <!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50690 remote=contrib1-amers1.platform.refinitiv.com/3.213.90.178:443] --> <!-- Thu Oct 01 12:47:19 UTC 2020 --> <!-- rwfMajorVer="14" rwfMinorVer="1" --> <CLOSE domainType="MARKET_PRICE" streamId="6" containerType="NO_DATA" flags="0x00" dataSize="0"> <dataBody> </dataBody> </CLOSE> <!-- Incoming Reactor message --> <!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50690 remote=contrib1-amers1.platform.refinitiv.com/3.213.90.178:443] --> <!-- Thu Oct 01 12:47:19 UTC 2020 --> <!-- rwfMajorVer="14" rwfMinorVer="1" --> <STATUS domainType="MARKET_PRICE" streamId="6" containerType="NO_DATA" flags="0x60 (HAS_STATE|CLEAR_CACHE)" State: Closed/Suspect/Not entitled - text: "Message ignored as tunnel stream has been established" dataSize="0"> <dataBody> </dataBody> </STATUS> <!-- Outgoing Reactor message --> <!-- java.nio.channels.SocketChannel[connected local=/192.168.31.74:50690 remote=contrib1-amers1.platform.refinitiv.com/3.213.90.178:443] --> <!-- Thu Oct 01 12:47:19 UTC 2020 --> <!-- rwfMajorVer="14" rwfMinorVer="1" --> <CLOSE domainType="MARKET_PRICE" streamId="6" containerType="NO_DATA" flags="0x00" dataSize="0"> <dataBody> </dataBody> </CLOSE>
As a piece of additional information: dictionaries was taken from github examples.
Would appreciate any help.