EMA api - query regarding failover and other config option in java

Options
rinki121
rinki121 Newcomer

Hi Team,

I am using EMA api and need to supply multiple hosts for failover

As per article https://developers.refinitiv.com/en/article-catalog/article/elektron-message-api-ema-configuration-overview, I should use channel sets for definiing multiple channels for single consumer.

I am using programmatic way to OMMConsumerConfig as per example ex421_MP_ProgrammaticCfg.java.

However, I could not find channelSet configuration equivalent in programmtic api.

Suppose i create two channels : channel1 , channel 2

innerMap.add(EmaFactory.createMapEntry().keyAscii( "Channel_1", MapEntry.MapAction.ADD, innerElementList));
innerMap.add(EmaFactory.createMapEntry().keyAscii( "Channel_2", MapEntry.MapAction.ADD, innerElementList));


a) How i specify multiple set of for single consumer in below line:

innerElementList.add(EmaFactory.createElementEntry().ascii( "Channel", "Channel_1" ));


Should it be something like this :

innerElementList.add(EmaFactory.createElementEntry().ascii( "Channel", "Channel_1, Channel_2" ));


b) I am assuming that EMA api will take care of all failover scenarios Or I need to specify some configuration while sending Request for any RIC.


c) How I can specify operationModel , position, applicationId, userId and password If i am using programmtic way:

Is the below code correct:

EmaFactory.createOmmConsumerConfig().config( createProgramaticConfig());
config.position(trepConnectionConfig.getPosition());
config.applicationId(trepConnectionConfig.getApplicationId());
config.username(trepConnectionConfig.getDacsUserName());
config.password(trepConnectionConfig.getDacsPassword());
config.operationModel(trepConnectionConfig.getOperationModel());

d) Could you please post the link for EMAConfigOptions - what each parameter meant?

Regards

Rinki


Tagged:

Best Answer

  • Jirapongse
    Jirapongse ✭✭✭✭✭
    Answer ✓

    @rinki121

    a) To define the ChannelSet configuration, the code looks like this:

        //innerElementList.add(EmaFactory.createElementEntry().ascii( "Channel", "Channel_1" ));
            innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelSet", "Channel_1, Channel_2" ));

    b) You are correct. EMA will handle fail-over and resubscribe to all items.

    c) The code looks like this:

    OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig().config( createProgramaticConfig());            

    config.username("Hello").position("127.0.0.1/net").applicationId("256").operationModel(OperationModel.USER_DISPATCH);      

    consumer  = EmaFactory.createOmmConsumer(config);

    d) You can refer to the EMA Reference Guide in the package for more information about OmmConsumerConfig.

    1659433950557.png

    For more information about EMA configurations, please refer to the EMAJ_ConfigGuide.pdf document in the package.

Answers

  • Hello @rinki121

    While my colleague has already provided a clear answer for you, the example code of Channel_1 and Channel_2 for the programmatic ChannelSet are as follows

    ...
    //Channel 1
    innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelType", "ChannelType::RSSL_SOCKET" ));
    innerElementList.add(EmaFactory.createElementEntry().ascii( "Host", "RTDS_1"));
    innerElementList.add(EmaFactory.createElementEntry().ascii("Port", "14022"));
    innerMap.add(EmaFactory.createMapEntry().keyAscii( "Channel_1", MapEntry.MapAction.ADD, innerElementList));

    innerElementList.clear(); 


    //Channel 2
    innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelType", "ChannelType::RSSL_SOCKET" ));
    innerElementList.add(EmaFactory.createElementEntry().ascii( "Host", "RTDS_2"));
    innerElementList.add(EmaFactory.createElementEntry().ascii("Port", "14002"));
    innerMap.add(EmaFactory.createMapEntry().keyAscii( "Channel_2", MapEntry.MapAction.ADD, innerElementList));

    innerElementList.clear();
                    
    elementList.add(EmaFactory.createElementEntry().map( "ChannelList", innerMap ));
    ...
  • Thanks @Jirapongse && @wasin.w . I am using below code:

    for (int index=1; index <= connectionLength; index ++) {
    channelNames.add("Channel_" + index);
    }
    final String channelSet = String.join(",", channelNames);

    final String consumerName = "Consumer_RT_1";

    // create consumer list
    elementList.add(EmaFactory.createElementEntry().ascii("DefaultConsumer", consumerName ));

    innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelSet", channelSet ));
    innerElementList.add(EmaFactory.createElementEntry().ascii( "Dictionary", "Dictionary_1" ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "ItemCountHint", 5000 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "ServiceCountHint", 5000 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "ObeyOpenWindow", 0 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "RequestTimeout", 5000 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "MaxOutstandingPosts", 5000 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "DispatchTimeoutApiThread", 1 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "HandleException", 0 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "MaxDispatchCountApiThread", 500 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "MaxDispatchCountUserThread", 500 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "ReactorEventFdPort", 45000 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "ReconnectAttemptLimit", 10 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "ReconnectMinDelay", 2000 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "ReconnectMaxDelay", 6000 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "XmlTraceToStdout", 0 ));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "MsgKeyInUpdates", 1 ));

    innerMap.add(EmaFactory.createMapEntry().keyAscii(consumerName, MapEntry.MapAction.ADD, innerElementList));
    innerElementList.clear();

    elementList.add(EmaFactory.createElementEntry().map( "ConsumerList", innerMap ));
    innerMap.clear();

    // create consumer group
    configMap.add(EmaFactory.createMapEntry().keyAscii( "ConsumerGroup", MapEntry.MapAction.ADD, elementList ));
    elementList.clear();


    // create channel list
    for (int index =0; index< connectionLength; index++) {
    final String channelName = channelNames.get(index);
    final String host = getHosts().get(index);
    final String port = getPorts().get(index);
    innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelType", "ChannelType::RSSL_SOCKET" ));
    innerElementList.add(EmaFactory.createElementEntry().ascii( "CompressionType", "CompressionType::ZLib"));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "GuaranteedOutputBuffers", 5000));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "ConnectionPingTimeout", 50000));
    innerElementList.add(EmaFactory.createElementEntry().ascii( "Host", host));
    innerElementList.add(EmaFactory.createElementEntry().ascii("Port", port));
    innerElementList.add(EmaFactory.createElementEntry().intValue( "TcpNodelay", 0));
    innerMap.add(EmaFactory.createMapEntry().keyAscii( channelName, MapEntry.MapAction.ADD, innerElementList));
    innerElementList.clear();
    }

    innerElementList.clear();

    elementList.add(EmaFactory.createElementEntry().map( "ChannelList", innerMap ));
    innerMap.clear();

    configMap.add(EmaFactory.createMapEntry().keyAscii("ChannelGroup", MapEntry.MapAction.ADD, elementList ));
    elementList.clear();

    // create dictionary group
    innerElementList.add(EmaFactory.createElementEntry().ascii( "DictionaryType", "DictionaryType::FileDictionary"));
    innerElementList.add(EmaFactory.createElementEntry().ascii( "RdmFieldDictionaryFileName", trepConnectionConfig.getDictionaryPath() + FIELD_DICTIONARY_FILE_NAME));
    innerElementList.add(EmaFactory.createElementEntry().ascii( "EnumTypeDefFileName", trepConnectionConfig.getDictionaryPath() + ENUM_TABLE_FILE_NAME));
    innerMap.add(EmaFactory.createMapEntry().keyAscii( "Dictionary_1", MapEntry.MapAction.ADD, innerElementList));
    innerElementList.clear();

    elementList.add(EmaFactory.createElementEntry().map( "DictionaryList", innerMap ));
    innerMap.clear();

    configMap.add(EmaFactory.createMapEntry().keyAscii( "DictionaryGroup", MapEntry.MapAction.ADD, elementList ));
    elementList.clear();


    OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig().config( configMap);
    config.position("167.10.0.1");
    config.applicationId("188");
    config.username("user");
    config.password("abc1111);

    config.operationModel(OperationModel.USER_DISPATCH);


    final OmmConsumer ommConsumer = EmaFactory.createOmmConsumer(config, adminOmmConsumer, adminOmmConsumer);


    Even though i specify dictionary and position , I am still getting these warnings in logs

    20220802 18:49:12.852 [efxrts] ERROR c.r.e.a.OmmConsumerImpl - loggerMsg

    ClientName: EmaConfig

    Severity: Error

    Text: Unable to find tagId for Position

    loggerMsgEnd



    20220802 18:49:12.853 [efxrts] WARN c.r.e.a.OmmConsumerImpl - loggerMsg

    ClientName: EmaConfig

    Severity: Warning

    Text: no configuration exists in the config file for consumer dictionary [DictionaryGroup|DictionaryList]. Will use dictionary defaults if not config programmatically.

    loggerMsgEnd



    Could you please what is missed?


    Also, I would like to log below events - not trace level

    1. When channel is connected

    2. What login message are sent

    3. Every request message that is being sent --> most important host name && port to which it is sent (server 1 or server2)

    4. When failover happens, channel down , up etc all those events

  • Hello @rinki121

    About the issue, can you give us more information about the API/environment that encounter the issue?

    • Is it EMA Java edition or C/C++ edition
    • The version of EMA API (or RTSDK)
    • The version of the compiler
    • OS


  • Hello @rinki121

    About the log for the "channel is connected" and "failover happens, channel down, up, etc" events, the application can register to the Loing Domain message. The channel/connection information will be sent to the onRefreshMsg and onStatusMsg callback methods. You can see more detail in the EMA API example 333 (ex333_Login_Streaming_DomainRep - Java/Cons333 - C/C++).

    Example messages:

    //Connect to Channel_1
    Item Name: <Username>
    Service Name: <not set>
    Item State: Open / Ok / None / 'Login accepted by host cf5956052d10.'


    AllowSuspectData : true
    ApplicationId : 256
    ApplicationName : ADS
    Position : 10.42.84.98/Machine
    ProvidePermissionExpressions : true
    ProvidePermissionProfile : false
    SingleOpen : true
    SupportBatchRequests : 7
    SupportOMMPost : true
    SupportOptimizedPauseResume : false
    SupportViewRequests : true
    SupportStandby : true
    SupportEnhancedSymbolList : 1
    Solicited : true
    UserName : <Username>
    UserNameType : 1
    State : StreamState: 1 DataState: 1 StatusCode: 0StatusText: Login accepted by host cf5956052d10.
    ...
    //Disconnect
    Item Name: <Username>
    Service Name: <not set>
    Item State: Open / Suspect / None / 'channel down'
    //Connect to Channel_2
    tem Name: U8004042
    Service Name: <not set>
    Item State: Open / Ok / None / 'Login accepted by host cd6d332fa58d.'


    AllowSuspectData : true
    ApplicationId : 256
    ApplicationName : RTC
    Position : 10.42.84.98/<Machine>
    ProvidePermissionExpressions : true
    ProvidePermissionProfile : false
    SingleOpen : true
    SupportBatchRequests : 7
    SupportOMMPost : true
    SupportOptimizedPauseResume : false
    SupportViewRequests : true
    SupportStandby : true
    SupportEnhancedSymbolList : 1
    Solicited : true
    UserName : <Username>
    UserNameType : 1
    State : StreamState: 1 DataState: 1 StatusCode: 0StatusText: Login accepted by host cd6d332fa58d.


    onStatusMsg
    Item Name: <Username>
    Service Name: <not set>
    Item State: Open / Ok / None / 'channel up'


  • About the log when the "login message is sent" and "Every request message that is being sent", the API does not have those log messages at the application level.

    You can enable the trace log for those messages via the EmaConfig.xml file, please see more detail in section "Logger Configuration" of the Enterprise Message API (EMA) - Configuration Overview article.

  • @wasin.w

    I am using EMA java version : 3.6.4.0 downloaded from central maven and consumer running code on Windows 10 Enterprise machine in intellij


    <dependency>
    <groupId>com.refinitiv.ema</groupId>
    <artifactId>ema</artifactId>
    <version>3.6.4.0</version>
    </dependency>
  • Thanks @wasin.w. I could do the same by register client while creating OMMconsumer. Let me see if I could get failover messages there

  • @wasin.w , for logging I am using sl4j with logback in java, so that all output is logged into one file. I have gone through this documentation.

    As per documentation, for enabling the tracing in java only we have one option XmlTraceToStdout :1

    Let me try setting this option and see if all requests can be logged.


  • Hello @rinki121

    I have tried to replicate the issue with the given code, but it works fine in my environment. The code can connect to my local RTDS server and perform failover fine without the following error/warning messages:

    • "Unable to find tagId for Position" error message
    • "no configuration exists in the config file for consumer dictionary [DictionaryGroup|DictionaryList]. Will use dictionary defaults if not config programmatically."

    My replication details are as follows:

    • EMA Java API 3.6.4
    • OpenJDK 11
    • Windows 10
    • Logback-classic 1.2.11
    • SLF4J-API 1.7.36

    My replication code:


    OmmConsumer consumer = null;
    try{
       AppClient appClient = new AppClient();

       Map innerMap = EmaFactory.createMap();
       Map configMap = EmaFactory.createMap();

       ElementList elementList = EmaFactory.createElementList();
       ElementList innerElementList = EmaFactory.createElementList();

       int connectionLength = 2;
       ArrayList<String> channelNames = new ArrayList<String>();

       for (int index=1; index <= connectionLength; index ++) {
          channelNames.add("Channel_" + index);
       }
       final String channelSet = String.join(",", channelNames);


       final String consumerName = "Consumer_RT_1";

       // create consumer list
       elementList.add(EmaFactory.createElementEntry().ascii("DefaultConsumer", consumerName ));


       innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelSet", channelSet ));
       innerElementList.add(EmaFactory.createElementEntry().ascii( "Dictionary", "Dictionary_1" ));
       //..


       innerMap.add(EmaFactory.createMapEntry().keyAscii(consumerName, MapEntry.MapAction.ADD, innerElementList));
       innerElementList.clear();


        elementList.add(EmaFactory.createElementEntry().map( "ConsumerList", innerMap ));
        innerMap.clear();


       // create consumer group
       configMap.add(EmaFactory.createMapEntry().keyAscii( "ConsumerGroup", MapEntry.MapAction.ADD, elementList ));
       elementList.clear();


       // create channel list
       for (int index =0; index< connectionLength; index++) {
          final String channelName = channelNames.get(index);
          final String host = getHost(index);
          final String port = getPorts(index);
          innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelType", "ChannelType::RSSL_SOCKET" ));
          //..
          innerElementList.add(EmaFactory.createElementEntry().ascii( "Host", host));
          innerElementList.add(EmaFactory.createElementEntry().ascii("Port", port));
          innerMap.add(EmaFactory.createMapEntry().keyAscii( channelName, MapEntry.MapAction.ADD, innerElementList));
          innerElementList.clear();
       }


       innerElementList.clear();


       elementList.add(EmaFactory.createElementEntry().map( "ChannelList", innerMap ));
       innerMap.clear();


       configMap.add(EmaFactory.createMapEntry().keyAscii("ChannelGroup", MapEntry.MapAction.ADD, elementList ));
       elementList.clear();


       // create dictionary group
       innerElementList.add(EmaFactory.createElementEntry().ascii( "DictionaryType", "DictionaryType::FileDictionary"));
       innerElementList.add(EmaFactory.createElementEntry().ascii( "RdmFieldDictionaryFileName", "C:\\etc\\RDMFieldDictionary"));
       innerElementList.add(EmaFactory.createElementEntry().ascii( "EnumTypeDefFileName", "C:\\etc\\enumtype.def"));
       innerMap.add(EmaFactory.createMapEntry().keyAscii( "Dictionary_1", MapEntry.MapAction.ADD, innerElementList));
       innerElementList.clear();


       elementList.add(EmaFactory.createElementEntry().map( "DictionaryList", innerMap ));
       innerMap.clear();


       configMap.add(EmaFactory.createMapEntry().keyAscii( "DictionaryGroup", MapEntry.MapAction.ADD, elementList ));
       elementList.clear();

       OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig().config( configMap);
       config.position("127.0.0.1/net");
       config.applicationId("188");
       config.username("user");
       config.password("abc1111");
       config.operationModel(OperationModel.USER_DISPATCH);
     
       consumer = EmaFactory.createOmmConsumer(config, appClient, appClient);
       consumer.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").name("/EUR="), appClient);


       long startTime = System.currentTimeMillis();
       while (startTime + 60000 > System.currentTimeMillis())
          consumer.dispatch(10);        // calls to onRefreshMsg(), onUpdateMsg(), or onStatusMsg() execute on this thread
    } catch (OmmException excp) {
       System.out.println(excp.getMessage());
    } finally {
       if (consumer != null) consumer.uninitialize();
    }


  • Hello @rinki121

    I have tried the given code with the Oracle JDK 11, but I cannot replicate the same issues. The application works fine without any "Unable to find tagId for Position" and "no configuration exists in the config file for consumer dictionary" error messages.

    If the problem still persists in your environment, I highly recommend you contact the Real-Time API support team to investigate the issue in more detail. If you are an RDC subscription member, you can submit a ticket via the "contact premium support button" on the RTSDK Java page.

    rtsdk-contact-support.png

    Alternatively, you can submit the issue to the RTSDK team via the GitHub issue page.