EMA api - query regarding failover and other config option in java
Hi Team,
I am using EMA api and need to supply multiple hosts for failover
As per article https://developers.refinitiv.com/en/article-catalog/article/elektron-message-api-ema-configuration-overview, I should use channel sets for definiing multiple channels for single consumer.
I am using programmatic way to OMMConsumerConfig as per example ex421_MP_ProgrammaticCfg.java.
However, I could not find channelSet configuration equivalent in programmtic api.
Suppose i create two channels : channel1 , channel 2
innerMap.add(EmaFactory.createMapEntry().keyAscii( "Channel_1", MapEntry.MapAction.ADD, innerElementList));
innerMap.add(EmaFactory.createMapEntry().keyAscii( "Channel_2", MapEntry.MapAction.ADD, innerElementList));
a) How i specify multiple set of for single consumer in below line:
innerElementList.add(EmaFactory.createElementEntry().ascii( "Channel", "Channel_1" ));
Should it be something like this :
innerElementList.add(EmaFactory.createElementEntry().ascii( "Channel", "Channel_1, Channel_2" ));
b) I am assuming that EMA api will take care of all failover scenarios Or I need to specify some configuration while sending Request for any RIC.
c) How I can specify operationModel , position, applicationId, userId and password If i am using programmtic way:
Is the below code correct:
EmaFactory.createOmmConsumerConfig().config( createProgramaticConfig());
config.position(trepConnectionConfig.getPosition());
config.applicationId(trepConnectionConfig.getApplicationId());
config.username(trepConnectionConfig.getDacsUserName());
config.password(trepConnectionConfig.getDacsPassword());
config.operationModel(trepConnectionConfig.getOperationModel());
d) Could you please post the link for EMAConfigOptions - what each parameter meant?
Regards
Rinki
Best Answer
-
a) To define the ChannelSet configuration, the code looks like this:
//innerElementList.add(EmaFactory.createElementEntry().ascii( "Channel", "Channel_1" ));
innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelSet", "Channel_1, Channel_2" ));b) You are correct. EMA will handle fail-over and resubscribe to all items.
c) The code looks like this:
OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig().config( createProgramaticConfig());
config.username("Hello").position("127.0.0.1/net").applicationId("256").operationModel(OperationModel.USER_DISPATCH);
consumer = EmaFactory.createOmmConsumer(config);d) You can refer to the EMA Reference Guide in the package for more information about OmmConsumerConfig.
For more information about EMA configurations, please refer to the EMAJ_ConfigGuide.pdf document in the package.
0
Answers
-
Hello @rinki121
While my colleague has already provided a clear answer for you, the example code of Channel_1 and Channel_2 for the programmatic ChannelSet are as follows
...
//Channel 1
innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelType", "ChannelType::RSSL_SOCKET" ));
innerElementList.add(EmaFactory.createElementEntry().ascii( "Host", "RTDS_1"));
innerElementList.add(EmaFactory.createElementEntry().ascii("Port", "14022"));
innerMap.add(EmaFactory.createMapEntry().keyAscii( "Channel_1", MapEntry.MapAction.ADD, innerElementList));
innerElementList.clear();
//Channel 2
innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelType", "ChannelType::RSSL_SOCKET" ));
innerElementList.add(EmaFactory.createElementEntry().ascii( "Host", "RTDS_2"));
innerElementList.add(EmaFactory.createElementEntry().ascii("Port", "14002"));
innerMap.add(EmaFactory.createMapEntry().keyAscii( "Channel_2", MapEntry.MapAction.ADD, innerElementList));
innerElementList.clear();
elementList.add(EmaFactory.createElementEntry().map( "ChannelList", innerMap ));
...0 -
Thanks @Jirapongse && @wasin.w . I am using below code:
for (int index=1; index <= connectionLength; index ++) {
channelNames.add("Channel_" + index);
}
final String channelSet = String.join(",", channelNames);
final String consumerName = "Consumer_RT_1";
// create consumer list
elementList.add(EmaFactory.createElementEntry().ascii("DefaultConsumer", consumerName ));
innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelSet", channelSet ));
innerElementList.add(EmaFactory.createElementEntry().ascii( "Dictionary", "Dictionary_1" ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "ItemCountHint", 5000 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "ServiceCountHint", 5000 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "ObeyOpenWindow", 0 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "RequestTimeout", 5000 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "MaxOutstandingPosts", 5000 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "DispatchTimeoutApiThread", 1 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "HandleException", 0 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "MaxDispatchCountApiThread", 500 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "MaxDispatchCountUserThread", 500 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "ReactorEventFdPort", 45000 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "ReconnectAttemptLimit", 10 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "ReconnectMinDelay", 2000 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "ReconnectMaxDelay", 6000 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "XmlTraceToStdout", 0 ));
innerElementList.add(EmaFactory.createElementEntry().intValue( "MsgKeyInUpdates", 1 ));
innerMap.add(EmaFactory.createMapEntry().keyAscii(consumerName, MapEntry.MapAction.ADD, innerElementList));
innerElementList.clear();
elementList.add(EmaFactory.createElementEntry().map( "ConsumerList", innerMap ));
innerMap.clear();
// create consumer group
configMap.add(EmaFactory.createMapEntry().keyAscii( "ConsumerGroup", MapEntry.MapAction.ADD, elementList ));
elementList.clear();
// create channel list
for (int index =0; index< connectionLength; index++) {
final String channelName = channelNames.get(index);
final String host = getHosts().get(index);
final String port = getPorts().get(index);
innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelType", "ChannelType::RSSL_SOCKET" ));
innerElementList.add(EmaFactory.createElementEntry().ascii( "CompressionType", "CompressionType::ZLib"));
innerElementList.add(EmaFactory.createElementEntry().intValue( "GuaranteedOutputBuffers", 5000));
innerElementList.add(EmaFactory.createElementEntry().intValue( "ConnectionPingTimeout", 50000));
innerElementList.add(EmaFactory.createElementEntry().ascii( "Host", host));
innerElementList.add(EmaFactory.createElementEntry().ascii("Port", port));
innerElementList.add(EmaFactory.createElementEntry().intValue( "TcpNodelay", 0));
innerMap.add(EmaFactory.createMapEntry().keyAscii( channelName, MapEntry.MapAction.ADD, innerElementList));
innerElementList.clear();
}
innerElementList.clear();
elementList.add(EmaFactory.createElementEntry().map( "ChannelList", innerMap ));
innerMap.clear();
configMap.add(EmaFactory.createMapEntry().keyAscii("ChannelGroup", MapEntry.MapAction.ADD, elementList ));
elementList.clear();
// create dictionary group
innerElementList.add(EmaFactory.createElementEntry().ascii( "DictionaryType", "DictionaryType::FileDictionary"));
innerElementList.add(EmaFactory.createElementEntry().ascii( "RdmFieldDictionaryFileName", trepConnectionConfig.getDictionaryPath() + FIELD_DICTIONARY_FILE_NAME));
innerElementList.add(EmaFactory.createElementEntry().ascii( "EnumTypeDefFileName", trepConnectionConfig.getDictionaryPath() + ENUM_TABLE_FILE_NAME));
innerMap.add(EmaFactory.createMapEntry().keyAscii( "Dictionary_1", MapEntry.MapAction.ADD, innerElementList));
innerElementList.clear();
elementList.add(EmaFactory.createElementEntry().map( "DictionaryList", innerMap ));
innerMap.clear();
configMap.add(EmaFactory.createMapEntry().keyAscii( "DictionaryGroup", MapEntry.MapAction.ADD, elementList ));
elementList.clear();
OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig().config( configMap);
config.position("167.10.0.1");
config.applicationId("188");
config.username("user");
config.password("abc1111);
config.operationModel(OperationModel.USER_DISPATCH);
final OmmConsumer ommConsumer = EmaFactory.createOmmConsumer(config, adminOmmConsumer, adminOmmConsumer);Even though i specify dictionary and position , I am still getting these warnings in logs
20220802 18:49:12.852 [efxrts] ERROR c.r.e.a.OmmConsumerImpl - loggerMsg
ClientName: EmaConfig
Severity: Error
Text: Unable to find tagId for Position
loggerMsgEnd
20220802 18:49:12.853 [efxrts] WARN c.r.e.a.OmmConsumerImpl - loggerMsg
ClientName: EmaConfig
Severity: Warning
Text: no configuration exists in the config file for consumer dictionary [DictionaryGroup|DictionaryList]. Will use dictionary defaults if not config programmatically.
loggerMsgEnd
Could you please what is missed?
Also, I would like to log below events - not trace level
1. When channel is connected
2. What login message are sent
3. Every request message that is being sent --> most important host name && port to which it is sent (server 1 or server2)
4. When failover happens, channel down , up etc all those events
0 -
Hello @rinki121
About the log for the "channel is connected" and "failover happens, channel down, up, etc" events, the application can register to the Loing Domain message. The channel/connection information will be sent to the onRefreshMsg and onStatusMsg callback methods. You can see more detail in the EMA API example 333 (ex333_Login_Streaming_DomainRep - Java/Cons333 - C/C++).
Example messages:
//Connect to Channel_1
Item Name: <Username>
Service Name: <not set>
Item State: Open / Ok / None / 'Login accepted by host cf5956052d10.'
AllowSuspectData : true
ApplicationId : 256
ApplicationName : ADS
Position : 10.42.84.98/Machine
ProvidePermissionExpressions : true
ProvidePermissionProfile : false
SingleOpen : true
SupportBatchRequests : 7
SupportOMMPost : true
SupportOptimizedPauseResume : false
SupportViewRequests : true
SupportStandby : true
SupportEnhancedSymbolList : 1
Solicited : true
UserName : <Username>
UserNameType : 1
State : StreamState: 1 DataState: 1 StatusCode: 0StatusText: Login accepted by host cf5956052d10.
...
//Disconnect
Item Name: <Username>
Service Name: <not set>
Item State: Open / Suspect / None / 'channel down'
//Connect to Channel_2
tem Name: U8004042
Service Name: <not set>
Item State: Open / Ok / None / 'Login accepted by host cd6d332fa58d.'
AllowSuspectData : true
ApplicationId : 256
ApplicationName : RTC
Position : 10.42.84.98/<Machine>
ProvidePermissionExpressions : true
ProvidePermissionProfile : false
SingleOpen : true
SupportBatchRequests : 7
SupportOMMPost : true
SupportOptimizedPauseResume : false
SupportViewRequests : true
SupportStandby : true
SupportEnhancedSymbolList : 1
Solicited : true
UserName : <Username>
UserNameType : 1
State : StreamState: 1 DataState: 1 StatusCode: 0StatusText: Login accepted by host cd6d332fa58d.
onStatusMsg
Item Name: <Username>
Service Name: <not set>
Item State: Open / Ok / None / 'channel up'0 -
About the log when the "login message is sent" and "Every request message that is being sent", the API does not have those log messages at the application level.
You can enable the trace log for those messages via the EmaConfig.xml file, please see more detail in section "Logger Configuration" of the Enterprise Message API (EMA) - Configuration Overview article.
0 -
@wasin.w , for logging I am using sl4j with logback in java, so that all output is logged into one file. I have gone through this documentation.
As per documentation, for enabling the tracing in java only we have one option XmlTraceToStdout :1
Let me try setting this option and see if all requests can be logged.
0 -
Hello @rinki121
I have tried to replicate the issue with the given code, but it works fine in my environment. The code can connect to my local RTDS server and perform failover fine without the following error/warning messages:
- "Unable to find tagId for Position" error message
- "no configuration exists in the config file for consumer dictionary [DictionaryGroup|DictionaryList]. Will use dictionary defaults if not config programmatically."
My replication details are as follows:
- EMA Java API 3.6.4
- OpenJDK 11
- Windows 10
- Logback-classic 1.2.11
- SLF4J-API 1.7.36
My replication code:
OmmConsumer consumer = null;
try{
AppClient appClient = new AppClient();
Map innerMap = EmaFactory.createMap();
Map configMap = EmaFactory.createMap();
ElementList elementList = EmaFactory.createElementList();
ElementList innerElementList = EmaFactory.createElementList();
int connectionLength = 2;
ArrayList<String> channelNames = new ArrayList<String>();
for (int index=1; index <= connectionLength; index ++) {
channelNames.add("Channel_" + index);
}
final String channelSet = String.join(",", channelNames);
final String consumerName = "Consumer_RT_1";
// create consumer list
elementList.add(EmaFactory.createElementEntry().ascii("DefaultConsumer", consumerName ));
innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelSet", channelSet ));
innerElementList.add(EmaFactory.createElementEntry().ascii( "Dictionary", "Dictionary_1" ));
//..
innerMap.add(EmaFactory.createMapEntry().keyAscii(consumerName, MapEntry.MapAction.ADD, innerElementList));
innerElementList.clear();
elementList.add(EmaFactory.createElementEntry().map( "ConsumerList", innerMap ));
innerMap.clear();
// create consumer group
configMap.add(EmaFactory.createMapEntry().keyAscii( "ConsumerGroup", MapEntry.MapAction.ADD, elementList ));
elementList.clear();
// create channel list
for (int index =0; index< connectionLength; index++) {
final String channelName = channelNames.get(index);
final String host = getHost(index);
final String port = getPorts(index);
innerElementList.add(EmaFactory.createElementEntry().ascii( "ChannelType", "ChannelType::RSSL_SOCKET" ));
//..
innerElementList.add(EmaFactory.createElementEntry().ascii( "Host", host));
innerElementList.add(EmaFactory.createElementEntry().ascii("Port", port));
innerMap.add(EmaFactory.createMapEntry().keyAscii( channelName, MapEntry.MapAction.ADD, innerElementList));
innerElementList.clear();
}
innerElementList.clear();
elementList.add(EmaFactory.createElementEntry().map( "ChannelList", innerMap ));
innerMap.clear();
configMap.add(EmaFactory.createMapEntry().keyAscii("ChannelGroup", MapEntry.MapAction.ADD, elementList ));
elementList.clear();
// create dictionary group
innerElementList.add(EmaFactory.createElementEntry().ascii( "DictionaryType", "DictionaryType::FileDictionary"));
innerElementList.add(EmaFactory.createElementEntry().ascii( "RdmFieldDictionaryFileName", "C:\\etc\\RDMFieldDictionary"));
innerElementList.add(EmaFactory.createElementEntry().ascii( "EnumTypeDefFileName", "C:\\etc\\enumtype.def"));
innerMap.add(EmaFactory.createMapEntry().keyAscii( "Dictionary_1", MapEntry.MapAction.ADD, innerElementList));
innerElementList.clear();
elementList.add(EmaFactory.createElementEntry().map( "DictionaryList", innerMap ));
innerMap.clear();
configMap.add(EmaFactory.createMapEntry().keyAscii( "DictionaryGroup", MapEntry.MapAction.ADD, elementList ));
elementList.clear();
OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig().config( configMap);
config.position("127.0.0.1/net");
config.applicationId("188");
config.username("user");
config.password("abc1111");
config.operationModel(OperationModel.USER_DISPATCH);
consumer = EmaFactory.createOmmConsumer(config, appClient, appClient);
consumer.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").name("/EUR="), appClient);
long startTime = System.currentTimeMillis();
while (startTime + 60000 > System.currentTimeMillis())
consumer.dispatch(10); // calls to onRefreshMsg(), onUpdateMsg(), or onStatusMsg() execute on this thread
} catch (OmmException excp) {
System.out.println(excp.getMessage());
} finally {
if (consumer != null) consumer.uninitialize();
}0 -
Hello @rinki121
I have tried the given code with the Oracle JDK 11, but I cannot replicate the same issues. The application works fine without any "Unable to find tagId for Position" and "no configuration exists in the config file for consumer dictionary" error messages.
If the problem still persists in your environment, I highly recommend you contact the Real-Time API support team to investigate the issue in more detail. If you are an RDC subscription member, you can submit a ticket via the "contact premium support button" on the RTSDK Java page.
Alternatively, you can submit the issue to the RTSDK team via the GitHub issue page.
0
Categories
- All Categories
- 3 Polls
- 6 AHS
- 37 Alpha
- 166 App Studio
- 6 Block Chain
- 4 Bot Platform
- 18 Connected Risk APIs
- 47 Data Fusion
- 34 Data Model Discovery
- 698 Datastream
- 1.5K DSS
- 633 Eikon COM
- 5.2K Eikon Data APIs
- 14 Electronic Trading
- 1 Generic FIX
- 7 Local Bank Node API
- 6 Trading API
- 2.9K Elektron
- 1.5K EMA
- 257 ETA
- 565 WebSocket API
- 40 FX Venues
- 16 FX Market Data
- 1 FX Post Trade
- 1 FX Trading - Matching
- 12 FX Trading – RFQ Maker
- 5 Intelligent Tagging
- 2 Legal One
- 25 Messenger Bot
- 3 Messenger Side by Side
- 9 ONESOURCE
- 7 Indirect Tax
- 60 Open Calais
- 283 Open PermID
- 47 Entity Search
- 2 Org ID
- 1 PAM
- PAM - Logging
- 6 Product Insight
- Project Tracking
- ProView
- ProView Internal
- 24 RDMS
- 2.1K Refinitiv Data Platform
- 800 Refinitiv Data Platform Libraries
- 4 LSEG Due Diligence
- LSEG Due Diligence Portal API
- 4 Refinitiv Due Dilligence Centre
- Rose's Space
- 1.2K Screening
- 18 Qual-ID API
- 13 Screening Deployed
- 23 Screening Online
- 12 World-Check Customer Risk Screener
- 1K World-Check One
- 46 World-Check One Zero Footprint
- 45 Side by Side Integration API
- 2 Test Space
- 3 Thomson One Smart
- 10 TR Knowledge Graph
- 151 Transactions
- 143 REDI API
- 1.8K TREP APIs
- 4 CAT
- 27 DACS Station
- 122 Open DACS
- 1.1K RFA
- 107 UPA
- 194 TREP Infrastructure
- 232 TRKD
- 918 TRTH
- 5 Velocity Analytics
- 9 Wealth Management Web Services
- 101 Workspace SDK
- 11 Element Framework
- 5 Grid
- 19 World-Check Data File
- 1 Yield Book Analytics
- 48 中文论坛