question

Upvotes
Accepted
21 0 1 3

Local test environment to test data posting to the Refinitiv Contribution Channel (RCC) directly

I have followed the steps from https://developers.refinitiv.com/en/api-catalog/refinitiv-real-time-opnsrc/rt-sdk-java/tutorials/ema-consumer/ema-consumer-posting-data-to-contribution-channel and now I can post data via consumer directly to Refinitiv Contribution Channel (RCC). So no questions about that. However, I need to write some integration tests for our posting consumer, but I cannot figure out how I can emulate a connection behind a tunnel. There are a lot of test examples how a consumer can connect directly to a provider and post some data. For example:

./gradlew runiProvider340 - starts interactive provider and accepts posting message from a consumer

and when I run

./gradlew runConsumer341 - messages are posted to the provider.

All is good, except the fact that this method to test is not working if I use tunnelled connection. EMA Java SDK provides a test for the tunnel posting consumer ex440_System_TunStrm, but how I am supposed to test it? against what? My believe that my tests are supposed to be run offline, tests also are run in parallel which is another reason not to open a real/true connection to Refiniv servers on each test run. Is the there a way to test let's say ex440_System_TunStrm consumer or the one mentioned in the article offline as part of local integration tests?

#technologyema-apircc-apircctesting
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
Accepted
21 0 1 3

@Jirapongse
After some tries/errors and debug I was able to adapt com.refinitiv.eta.valueadd.examples.provider to accept a tunnelled connection. Now It can also accepts posts and sends Acks in response. The only file I have to modify was

com.refinitiv.eta.valueadd.examples.provider.TunnelStreamHandler

I have attached it for the reference - TunnelStreamHandler.txt
It works. However, there 2 things which are still not clear.
1. Upon receiving the first login request (line 119) Original code checks that authentication type equals to

ClassesOfService.AuthenticationTypes.OMM_LOGIN 

But this is not the case actual authentication type sent by Consumer equals to

ClassesOfService.AuthenticationTypes.NOT_REQUIRED

So I have to modify the initial check to

if (event.containerType() == DataTypes.MSG &&
        (event.tunnelStream().classOfService().authentication().type() == ClassesOfService.AuthenticationTypes.OMM_LOGIN
                || event.tunnelStream().classOfService().authentication().type() == ClassesOfService.AuthenticationTypes.NOT_REQUIRED
        ) && _msg.domainType() == DomainTypes.LOGIN && _msg.msgClass() == MsgClasses.REQUEST)

2. Upon above change I was able to get almost proper Refresh Message im my Consumer app. I am saying "almost" because serviceId was absent. I tried several ways to add/encode missing serviceId into the LoginRequest but I could not figure out the way to do it. It appears that it is somehow removed during encoding phase. My current 'live' Consumer which works with 'true' online Refinitiv endpoints relies on this check to mark connection as valid. I mean 'true' Refinitiv endpoints have serviceId in that Refresh messages while the test is not.

@Override
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) {
    log.info("Refresh message {}", refreshMsg);
    if (refreshMsg.domainType() == EmaRdm.MMT_LOGIN
            && refreshMsg.state().streamState() == OmmState.StreamState.OPEN
            && refreshMsg.state().dataState() == OmmState.DataState.OK) {
        // Login accepted, app can post data now
        loginStreamID = refreshMsg.streamId();
        log.info("System refresh, loginStreamID: {}", loginStreamID);
        if (refreshMsg.hasServiceId()
                || refreshMsg.toString().contains("CONTRIBUTION_TUNNEL")
        ) {
  

In other words, onRefreshMsg method is called twice, the first call contains no serviceId in refreshMsg while the second call contains serviceId and I use it as the trigger to mark the connection 'active', right? However, now for an emulated connection for the test via

com.refinitiv.eta.valueadd.examples.provider.TunnelStreamHandler

I have to add this not very reliable check

|| refreshMsg.toString().contains("CONTRIBUTION_TUNNEL")

as well. Is there a better way to know that the cannel I request has actually accepted the connection/tunnel? Modifying Consumer code to comply with the emulated provider is not a good way to proceed. How can I inject serviceId during LoginRequest reply (second Message Refresh Consumer callback)?




icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
23.1k 60 15 21

Hi @nariman.ab,

Tunnel is a stream with end to end line of sight authentication and guaranteed delivery. I am not sure how you can unit test it without a connection.

Either ways, you can get some hints of how unit tests are done in the RTSDK here and here.


icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
81.1k 264 53 76

@nariman.ab

You can run the ETA VA Provider example (com.refinitiv.eta.valueadd.examples.provider). It supports tunnel streams. However, you need to modify it to handle post messages.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
21 0 1 3

@Jirapongse

Thank you for the hint. I had also to change Cannel type from RSSL_ENCRYPTED to RSSL_SOCKET otherwise I got the following error on Provider side

ConnectionType:encrypted
portNo: 14002
interfaceName: null
serviceName: DIRECT_FEED
serviceId: 1
enableRTT: false
protocolList: rssl.rwf, tr_json2, rssl.json.v2
keyfile: null
keypasswd: null

Server bound on port 14002

Connection down: Channel java.nio.channels.SocketChannel[connected local=/127.0.0.1:14002 remote=/127.0.0.1:61780]
    Error text: Error initializing channel: errorId=-1 text=no cipher suites in common

I guess I have to configure trust store, certificates and all that to make it work. so I just decided to set the channel type to RSSL_SOCKET in order not to deal with certificates, right?

And now I have the progress!

Provider:

ConnectionType:socket
portNo: 14002
interfaceName: null
serviceName: DIRECT_FEED
serviceId: 1
enableRTT: false
protocolList: rssl.rwf, tr_json2, rssl.json.v2

Server bound on port 14002

Connection up!
Server sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:14002]: New client on Channel java.nio.channels.SocketChannel[connected local=/127.0.0.1:14002 remote=/127.0.0.1:61632]

Received Login Request for Username: xxx

Received Source Directory Request

Received Login Close for StreamId 1
Closing login stream id '1' with user name: xxx

Connection down: Channel java.nio.channels.SocketChannel[connected local=/127.0.0.1:14002 remote=/127.0.0.1:61632]
    Error text: SocketChannel.read returned -1 (end-of-stream)

Closing source directory stream id '2' with service name: DIRECT_FEED

Consumer:

I 230324 092657.596 [main] OmmConsumerImpl - loggerMsg
    ClientName: ChannelCallbackClient
    Severity: Info
    Text:    Received ChannelUp event on channel channel_local-a
Instance Name consumer3_1
Component Version etaj3.6.8.L1.all.rrg
loggerMsgEnd


I 230324 092657.660 [main] ConsumerClient - Connecting...
I 230324 092657.662 [main] ConsumerClient - Requesting tunnel stream CONTRIBUTION_TUNNEL
I 230324 092657.664 [main] Publisher - waiting for queue processing end
I 230324 092657.664 [main] Publisher - queue processing is empty
I 230324 092657.664 [pool-4-thread-1] ConsumerClient - Refresh message RefreshMsg
    streamId="1"
    domain="Login Domain"
    solicited
    RefreshComplete
    state="Open / Ok / None / 'Login accepted by host localhost'"
    itemGroup="00 00"
    name="xxx"
    nameType="1"
    Attrib dataType="ElementList"
        ElementList
            ElementEntry name="ApplicationId" dataType="Ascii" value="256"
            ElementEntry name="ApplicationName" dataType="Ascii" value="ETA Provider"
            ElementEntry name="Position" dataType="Ascii" value="192.168.178.60/xxx-xxxxx"
            ElementEntry name="SingleOpen" dataType="UInt" value="0"
            ElementEntry name="SupportOMMPost" dataType="UInt" value="1"
            ElementEntry name="SupportBatchRequests" dataType="UInt" value="1"
        ElementListEnd
    AttribEnd
RefreshMsgEnd

I 230324 092657.664 [pool-4-thread-1] ConsumerClient - System refresh, loginStreamID: 1
Exception in thread "main" Exception Type='OmmInvalidUsageException', Text='Attempt to get servieId while it is not set.', Error Code='-4048'
at com.refinitiv.ema.access.TunnelStreamRequestImpl.ommIUExcept(TunnelStreamRequestImpl.java:221)
at com.refinitiv.ema.access.TunnelStreamRequestImpl.serviceId(TunnelStreamRequestImpl.java:162)
at com.refinitiv.ema.access.TunnelItem.submit(ItemCallbackClient.java:579)
at com.refinitiv.ema.access.TunnelItem.open(ItemCallbackClient.java:448)
at com.refinitiv.ema.access.ItemCallbackClient.registerClient(ItemCallbackClient.java:2294)
at com.refinitiv.ema.access.OmmBaseImpl.registerClient(OmmBaseImpl.java:566)
at com.refinitiv.ema.access.OmmConsumerImpl.registerClient(OmmConsumerImpl.java:267)

So the next step is to figure out what servieId it complains about...

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Yes, you need certificate files for establishing encrypted connections. It could be a self-signed certificate.

You can enable tracing in the EMA so you see the messages sent between the applications by setting the XmlTraceToStdout configuration to 1 in the consumer.

For example:

        <Consumer>
            <!-- Name is mandatory    -->
            <Name value="Consumer_1"/>
            <!-- Channel is optional: defaulted to "RSSL_SOCKET + localhost + 14002" -->
            <!-- Channel or ChannelSet may be specified -->
            <Channel value="Channel_1"/>
            <!-- Dictionary is optional: defaulted to "ChannelDictionary" -->
            <Dictionary value="Dictionary_1"/>
            <XmlTraceToStdout value="1"/>
        </Consumer>
Upvotes
21 0 1 3

@Jirapongse

I think I'v got the answer on my second question. I just have not to use LoginRefresh (delete its complete mapping) and map/encode directly to RefreshMsg.


However, the first question from the above post is still open.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

@nariman.ab

Thank you for sharing the code.

The Provider example code may not be able to handle all use cases for tunnel streams. It is typically used to test the Consumer example so it needs to be modified to support RCC applications.



Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.