question

Upvotes
Accepted
1 1 1 4

EMA api - query regarding failover and other config option in java

Hi Team,

I am using EMA api and need to supply multiple hosts for failover

As per article https://developers.refinitiv.com/en/article-catalog/article/elektron-message-api-ema-configuration-overview, I should use channel sets for definiing multiple channels for single consumer.

I am using programmatic way to OMMConsumerConfig as per example ex421_MP_ProgrammaticCfg.java.

However, I could not find channelSet configuration equivalent in programmtic api.

Suppose i create two channels : channel1 , channel 2

innerMap.add(EmaFactory.createMapEntry().keyAscii( "Channel_1", MapEntry.MapAction.ADD, innerElementList));
innerMap.add(EmaFactory.createMapEntry().keyAscii( "Channel_2", MapEntry.MapAction.ADD, innerElementList));


a) How i specify multiple set of for single consumer in below line:

innerElementList.add(EmaFactory.createElementEntry().ascii( "Channel", "Channel_1" ));


Should it be something like this :

innerElementList.add(EmaFactory.createElementEntry().ascii( "Channel", "Channel_1, Channel_2" ));


b) I am assuming that EMA api will take care of all failover scenarios Or I need to specify some configuration while sending Request for any RIC.


c) How I can specify operationModel , position, applicationId, userId and password If i am using programmtic way:

Is the below code correct:

EmaFactory.createOmmConsumerConfig().config( createProgramaticConfig());
config.position(trepConnectionConfig.getPosition());
config.applicationId(trepConnectionConfig.getApplicationId());
config.username(trepConnectionConfig.getDacsUserName());
config.password(trepConnectionConfig.getDacsPassword());
config.operationModel(trepConnectionConfig.getOperationModel());

d) Could you please post the link for EMAConfigOptions - what each parameter meant?

Regards

Rinki


ema-apijava
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvote
Accepted
52.4k 134 44 63

@rinki121

a) To define the ChannelSet configuration, the code looks like this:

    //innerElementList.add(EmaFactory.createElementEntry().ascii( "Channel", "Channel_1" ));
        innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelSet", "Channel_1, Channel_2" ));

b) You are correct. EMA will handle fail-over and resubscribe to all items.

c) The code looks like this:

OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig().config( createProgramaticConfig());            

config.username("Hello").position("127.0.0.1/net").applicationId("256").operationModel(OperationModel.USER_DISPATCH);      

consumer  = EmaFactory.createOmmConsumer(config); 

d) You can refer to the EMA Reference Guide in the package for more information about OmmConsumerConfig.

1659433950557.png

For more information about EMA configurations, please refer to the EMAJ_ConfigGuide.pdf document in the package.


1659433950557.png (96.7 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Hello @rinki121

While my colleague has already provided a clear answer for you, the example code of Channel_1 and Channel_2 for the programmatic ChannelSet are as follows

...
//Channel 1
innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelType", "ChannelType::RSSL_SOCKET" ));
innerElementList.add(EmaFactory.createElementEntry().ascii( "Host", "RTDS_1"));
innerElementList.add(EmaFactory.createElementEntry().ascii("Port", "14022"));
innerMap.add(EmaFactory.createMapEntry().keyAscii( "Channel_1", MapEntry.MapAction.ADD, innerElementList));

innerElementList.clear(); 


//Channel 2
innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelType", "ChannelType::RSSL_SOCKET" ));
innerElementList.add(EmaFactory.createElementEntry().ascii( "Host", "RTDS_2"));
innerElementList.add(EmaFactory.createElementEntry().ascii("Port", "14002"));
innerMap.add(EmaFactory.createMapEntry().keyAscii( "Channel_2", MapEntry.MapAction.ADD, innerElementList));

innerElementList.clear();
                
elementList.add(EmaFactory.createElementEntry().map( "ChannelList", innerMap ));
...
Upvotes
1 1 1 4

Thanks @Jirapongse && @wasin.w . I am using below code:

for (int index=1; index <= connectionLength; index ++) {
            channelNames.add("Channel_" + index);
        }
        final String channelSet = String.join(",", channelNames);

        final String consumerName = "Consumer_RT_1";

        // create consumer list
        elementList.add(EmaFactory.createElementEntry().ascii("DefaultConsumer", consumerName ));

        innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelSet", channelSet ));
        innerElementList.add(EmaFactory.createElementEntry().ascii( "Dictionary", "Dictionary_1" ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "ItemCountHint", 5000 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "ServiceCountHint", 5000 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "ObeyOpenWindow", 0 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "RequestTimeout", 5000 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "MaxOutstandingPosts", 5000 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "DispatchTimeoutApiThread", 1 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "HandleException", 0 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "MaxDispatchCountApiThread", 500 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "MaxDispatchCountUserThread", 500 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "ReactorEventFdPort", 45000 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "ReconnectAttemptLimit", 10 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "ReconnectMinDelay", 2000 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "ReconnectMaxDelay", 6000 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "XmlTraceToStdout", 0 ));
        innerElementList.add(EmaFactory.createElementEntry().intValue( "MsgKeyInUpdates", 1 ));

        innerMap.add(EmaFactory.createMapEntry().keyAscii(consumerName, MapEntry.MapAction.ADD, innerElementList));
        innerElementList.clear();

        elementList.add(EmaFactory.createElementEntry().map( "ConsumerList", innerMap ));
        innerMap.clear();

        // create consumer group
        configMap.add(EmaFactory.createMapEntry().keyAscii( "ConsumerGroup", MapEntry.MapAction.ADD, elementList ));
        elementList.clear();


        // create channel list
        for (int index =0; index< connectionLength; index++) {
            final String channelName = channelNames.get(index);
            final String host = getHosts().get(index);
            final String port = getPorts().get(index);
            innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelType", "ChannelType::RSSL_SOCKET" ));
            innerElementList.add(EmaFactory.createElementEntry().ascii( "CompressionType", "CompressionType::ZLib"));
            innerElementList.add(EmaFactory.createElementEntry().intValue( "GuaranteedOutputBuffers", 5000));
            innerElementList.add(EmaFactory.createElementEntry().intValue( "ConnectionPingTimeout", 50000));
            innerElementList.add(EmaFactory.createElementEntry().ascii( "Host", host));
            innerElementList.add(EmaFactory.createElementEntry().ascii("Port", port));
            innerElementList.add(EmaFactory.createElementEntry().intValue( "TcpNodelay", 0));
            innerMap.add(EmaFactory.createMapEntry().keyAscii( channelName, MapEntry.MapAction.ADD, innerElementList));
            innerElementList.clear();
        }

        innerElementList.clear();

        elementList.add(EmaFactory.createElementEntry().map( "ChannelList", innerMap ));
        innerMap.clear();

        configMap.add(EmaFactory.createMapEntry().keyAscii("ChannelGroup", MapEntry.MapAction.ADD, elementList ));
        elementList.clear();

        // create dictionary group
        innerElementList.add(EmaFactory.createElementEntry().ascii( "DictionaryType", "DictionaryType::FileDictionary"));
        innerElementList.add(EmaFactory.createElementEntry().ascii( "RdmFieldDictionaryFileName", trepConnectionConfig.getDictionaryPath() + FIELD_DICTIONARY_FILE_NAME));
        innerElementList.add(EmaFactory.createElementEntry().ascii( "EnumTypeDefFileName", trepConnectionConfig.getDictionaryPath() + ENUM_TABLE_FILE_NAME));
        innerMap.add(EmaFactory.createMapEntry().keyAscii( "Dictionary_1", MapEntry.MapAction.ADD, innerElementList));
        innerElementList.clear();

        elementList.add(EmaFactory.createElementEntry().map( "DictionaryList", innerMap ));
        innerMap.clear();

        configMap.add(EmaFactory.createMapEntry().keyAscii( "DictionaryGroup", MapEntry.MapAction.ADD, elementList ));
        elementList.clear();


 OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig().config( configMap);
config.position("167.10.0.1");
config.applicationId("188");
config.username("user");
config.password("abc1111);

config.operationModel(OperationModel.USER_DISPATCH);


final OmmConsumer ommConsumer = EmaFactory.createOmmConsumer(config, adminOmmConsumer, adminOmmConsumer);


Even though i specify dictionary and position , I am still getting these warnings in logs

20220802 18:49:12.852 [efxrts] ERROR c.r.e.a.OmmConsumerImpl - loggerMsg

ClientName: EmaConfig

Severity: Error

Text: Unable to find tagId for Position

loggerMsgEnd



20220802 18:49:12.853 [efxrts] WARN c.r.e.a.OmmConsumerImpl - loggerMsg

ClientName: EmaConfig

Severity: Warning

Text: no configuration exists in the config file for consumer dictionary [DictionaryGroup|DictionaryList]. Will use dictionary defaults if not config programmatically.

loggerMsgEnd



Could you please what is missed?


Also, I would like to log below events - not trace level

1. When channel is connected

2. What login message are sent

3. Every request message that is being sent --> most important host name && port to which it is sent (server 1 or server2)

4. When failover happens, channel down , up etc all those events

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
16.7k 31 9 12

Hello @rinki121

About the issue, can you give us more information about the API/environment that encounter the issue?

  • Is it EMA Java edition or C/C++ edition
  • The version of EMA API (or RTSDK)
  • The version of the compiler
  • OS


icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

@wasin.w

I am using EMA java version : 3.6.4.0 downloaded from central maven and consumer running code on Windows 10 Enterprise machine in intellij


<dependency>
  <groupId>com.refinitiv.ema</groupId>
  <artifactId>ema</artifactId>
  <version>3.6.4.0</version>
</dependency>
Upvotes
16.7k 31 9 12

Hello @rinki121

About the log for the "channel is connected" and "failover happens, channel down, up, etc" events, the application can register to the Loing Domain message. The channel/connection information will be sent to the onRefreshMsg and onStatusMsg callback methods. You can see more detail in the EMA API example 333 (ex333_Login_Streaming_DomainRep - Java/Cons333 - C/C++).

Example messages:

//Connect to Channel_1
Item Name: <Username>
Service Name: <not set>
Item State: Open / Ok / None / 'Login accepted by host cf5956052d10.'


AllowSuspectData : true
ApplicationId : 256
ApplicationName : ADS
Position : 10.42.84.98/Machine
ProvidePermissionExpressions : true
ProvidePermissionProfile : false
SingleOpen : true
SupportBatchRequests : 7
SupportOMMPost : true
SupportOptimizedPauseResume : false
SupportViewRequests : true
SupportStandby : true
SupportEnhancedSymbolList : 1
Solicited : true
UserName : <Username>
UserNameType : 1
State : StreamState: 1 DataState: 1 StatusCode: 0StatusText: Login accepted by host cf5956052d10.
...
//Disconnect
Item Name: <Username>
Service Name: <not set>
Item State: Open / Suspect / None / 'channel down'
//Connect to Channel_2
tem Name: U8004042
Service Name: <not set>
Item State: Open / Ok / None / 'Login accepted by host cd6d332fa58d.'


AllowSuspectData : true
ApplicationId : 256
ApplicationName : RTC
Position : 10.42.84.98/<Machine>
ProvidePermissionExpressions : true
ProvidePermissionProfile : false
SingleOpen : true
SupportBatchRequests : 7
SupportOMMPost : true
SupportOptimizedPauseResume : false
SupportViewRequests : true
SupportStandby : true
SupportEnhancedSymbolList : 1
Solicited : true
UserName : <Username>
UserNameType : 1
State : StreamState: 1 DataState: 1 StatusCode: 0StatusText: Login accepted by host cd6d332fa58d.


onStatusMsg
Item Name: <Username>
Service Name: <not set>
Item State: Open / Ok / None / 'channel up'


icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Thanks @wasin.w. I could do the same by register client while creating OMMconsumer. Let me see if I could get failover messages there

Upvotes
16.7k 31 9 12

About the log when the "login message is sent" and "Every request message that is being sent", the API does not have those log messages at the application level.

You can enable the trace log for those messages via the EmaConfig.xml file, please see more detail in section "Logger Configuration" of the Enterprise Message API (EMA) - Configuration Overview article.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

@wasin.w , for logging I am using sl4j with logback in java, so that all output is logged into one file. I have gone through this documentation.

As per documentation, for enabling the tracing in java only we have one option XmlTraceToStdout :1

Let me try setting this option and see if all requests can be logged.


Upvote
16.7k 31 9 12

Hello @rinki121

I have tried to replicate the issue with the given code, but it works fine in my environment. The code can connect to my local RTDS server and perform failover fine without the following error/warning messages:

  • "Unable to find tagId for Position" error message
  • "no configuration exists in the config file for consumer dictionary [DictionaryGroup|DictionaryList]. Will use dictionary defaults if not config programmatically."

My replication details are as follows:

  • EMA Java API 3.6.4
  • OpenJDK 11
  • Windows 10
  • Logback-classic 1.2.11
  • SLF4J-API 1.7.36

My replication code:


OmmConsumer consumer = null;
try{
   AppClient appClient = new AppClient();

   Map innerMap = EmaFactory.createMap();
   Map configMap = EmaFactory.createMap();

   ElementList elementList = EmaFactory.createElementList();
   ElementList innerElementList = EmaFactory.createElementList();

   int connectionLength = 2;
   ArrayList<String> channelNames = new ArrayList<String>();

   for (int index=1; index <= connectionLength; index ++) {
      channelNames.add("Channel_" + index);
   }
   final String channelSet = String.join(",", channelNames);


   final String consumerName = "Consumer_RT_1";

   // create consumer list
   elementList.add(EmaFactory.createElementEntry().ascii("DefaultConsumer", consumerName ));


   innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelSet", channelSet ));
   innerElementList.add(EmaFactory.createElementEntry().ascii( "Dictionary", "Dictionary_1" ));
   //..


   innerMap.add(EmaFactory.createMapEntry().keyAscii(consumerName, MapEntry.MapAction.ADD, innerElementList));
   innerElementList.clear();


    elementList.add(EmaFactory.createElementEntry().map( "ConsumerList", innerMap ));
    innerMap.clear();


   // create consumer group
   configMap.add(EmaFactory.createMapEntry().keyAscii( "ConsumerGroup", MapEntry.MapAction.ADD, elementList ));
   elementList.clear();


   // create channel list
   for (int index =0; index< connectionLength; index++) {
      final String channelName = channelNames.get(index);
      final String host = getHost(index);
      final String port = getPorts(index);
      innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelType", "ChannelType::RSSL_SOCKET" ));
      //..
      innerElementList.add(EmaFactory.createElementEntry().ascii( "Host", host));
      innerElementList.add(EmaFactory.createElementEntry().ascii("Port", port));
      innerMap.add(EmaFactory.createMapEntry().keyAscii( channelName, MapEntry.MapAction.ADD, innerElementList));
      innerElementList.clear();
   }


   innerElementList.clear();


   elementList.add(EmaFactory.createElementEntry().map( "ChannelList", innerMap ));
   innerMap.clear();


   configMap.add(EmaFactory.createMapEntry().keyAscii("ChannelGroup", MapEntry.MapAction.ADD, elementList ));
   elementList.clear();


   // create dictionary group
   innerElementList.add(EmaFactory.createElementEntry().ascii( "DictionaryType", "DictionaryType::FileDictionary"));
   innerElementList.add(EmaFactory.createElementEntry().ascii( "RdmFieldDictionaryFileName", "C:\\etc\\RDMFieldDictionary"));
   innerElementList.add(EmaFactory.createElementEntry().ascii( "EnumTypeDefFileName", "C:\\etc\\enumtype.def"));
   innerMap.add(EmaFactory.createMapEntry().keyAscii( "Dictionary_1", MapEntry.MapAction.ADD, innerElementList));
   innerElementList.clear();


   elementList.add(EmaFactory.createElementEntry().map( "DictionaryList", innerMap ));
   innerMap.clear();


   configMap.add(EmaFactory.createMapEntry().keyAscii( "DictionaryGroup", MapEntry.MapAction.ADD, elementList ));
   elementList.clear();

   OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig().config( configMap);
   config.position("127.0.0.1/net");
   config.applicationId("188");
   config.username("user");
   config.password("abc1111");
   config.operationModel(OperationModel.USER_DISPATCH);
 
   consumer = EmaFactory.createOmmConsumer(config, appClient, appClient);
   consumer.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").name("/EUR="), appClient);


   long startTime = System.currentTimeMillis();
   while (startTime + 60000 > System.currentTimeMillis())
      consumer.dispatch(10);        // calls to onRefreshMsg(), onUpdateMsg(), or onStatusMsg() execute on this thread
} catch (OmmException excp) {
   System.out.println(excp.getMessage());
} finally {
   if (consumer != null) consumer.uninitialize();
}


icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Hello @rinki121

What is the version of Java/JDK that you are using?

@wasin.w I am using JDK11
Upvote
16.7k 31 9 12

Hello @rinki121

I have tried the given code with the Oracle JDK 11, but I cannot replicate the same issues. The application works fine without any "Unable to find tagId for Position" and "no configuration exists in the config file for consumer dictionary" error messages.

If the problem still persists in your environment, I highly recommend you contact the Real-Time API support team to investigate the issue in more detail. If you are an RDC subscription member, you can submit a ticket via the "contact premium support button" on the RTSDK Java page.

rtsdk-contact-support.png

Alternatively, you can submit the issue to the RTSDK team via the GitHub issue page.



icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.