Dear team
Client is developing API to consume data from our ERT in cloud service with Real-Time-SDK-2.0.0.L1.java. When subscribe for update messages, they receive the following message when uploading RICs and enable to subscribe for all US equity RICs. This is not happening if they only subscribe for 1 RIC.
StatusMsg
streamId="17"
domain="MarketPrice Domain"
state="Closed / Ok / None / 'Stream closed for batch'"
serviceName="ELEKTRON_DD"
StatusMsgEnd
Could you please advise:
1. How to subscribe ERT in could using Chain RIC like 0#NASCONS.0?
2. Which demo client can refer to when subscribe multiple RICs in our SDK?
3. How to unsubscribe cerntain RICs after API started?
Please find client configure and code segment below, and let me know if you need any further information.
Thank you.
configuration.xml
<?xml version="1.0" encoding="UTF-8"?>
<system>
<category name="USConfig" description="US登录">
<item name="source" value="0" description="行情源0:云行情,1:专线"></item>
<item name="username" value="xxxxxxx" description="账号"></item>
<item name="password" value="xxxxxx" description="密码"></item>
<item name="clientId" value="xxxxx" description="客户端ID"></item>
<item name="location" value="eu-west-1a" description="源站点"></item>
<item name="keyfile" value="xxxx/test_keystore.jks" description="证书"></item>
<item name="keypasswd" value="xxxxx" description="证书密码"></item>
<item name="USCode" value="0#UNIVERSE.NB" description="美股列表"></item>
<item name="refinitivHost" value="apac-1-t2.streaming-pricing-api.refinitiv.com" description="主机"></item>
</category>
<category name="Disruptor" description="Disruptor">
<item name="queue" value="65536" description="Logon"></item>
<item name="isMutilProducter" value="true" description="Logout"></item>
<item name="consumer" value="4" description="Heartbeat"></item>
</category>
<category name="staticData" description="静态数据相关配置">
<item name="exRightUrl" value="/data/" description="除权信息本地保存的路径"></item>
</category>
</system>
EmaConfig.xml
<Dictionary>
<Name value="Dictionary_1"/>
<!-- dictionaryType is optional: defaulted to ChannelDictionary" -->
<!-- possible values: ChannelDictionary, FileDictionary -->
<!-- if dictionaryType is set to ChannelDictionary, file names are ignored -->
<DictionaryType value="DictionaryType::FileDictionary"/>
<RdmFieldDictionaryFileName value="/xxxx/xxx/RDMFieldDictionary"/>
<EnumTypeDefFileName value="/xxxxx/xxx/enumtype.def"/>
</Dictionary>
ConsumerDemo.java
package com.xx.dc.service;
import com.refinitiv.ema.access.Map;
import com.refinitiv.ema.access.*;
import com.refinitiv.ema.rdm.EmaRdm;
import lombok.Data;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConsumerDemo {
static class Configuration {
private static java.util.Map items = new HashMap();
private static void loadConfig(String path) {
try {
InputStream is = new FileInputStream(path);
if (is != null) {
SAXReader reader = new SAXReader();
Document document = reader.read(is);
Element systemElement = document.getRootElement();
List catList = systemElement.elements("category");
for (Iterator catIter = catList.iterator(); catIter.hasNext(); ) {
Element catElement = (Element) catIter.next();
String catName = catElement.attributeValue("name");
if (catName == null|| catName.isEmpty()) {
continue;
}
List itemList = catElement.elements("item");
for (Iterator itemIter = itemList.iterator(); itemIter.hasNext(); ) {
Element itemElement = (Element) itemIter.next();
String itemName = itemElement.attributeValue("name");
String value = itemElement.attributeValue("value");
if (itemName != null && !itemName.isEmpty()) {
items.put(catName + "." + itemName, value);
}
}
}
} else {
System.out.println("file is not found!");
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static String getString(String name) {
String value = (String) items.get(name);
return (value == null) ? "" : value;
}
public static String getString(String name, String defaultValue) {
String value = (String) items.get(name);
if (value != null && value.length() > 0)
return value;
else
return defaultValue;
}
public static int getInt(String name) {
String value = getString(name);
try {
return Integer.parseInt(value);
} catch (NumberFormatException ex) {
return 0;
}
}
public static int getInt(String name, int defaultValue) {
String value = getString(name);
try {
return Integer.parseInt(value);
} catch (NumberFormatException ex) {
}
return defaultValue;
}
public static boolean getBoolean(String name) {
String value = getString(name);
return Boolean.valueOf(value).booleanValue();
}
public static double getDouble(String name, double defaultValue) {
String value = getString(name);
try {
return Double.parseDouble(value);
} catch (NumberFormatException ex) {
}
return defaultValue;
}
public static int parseInt(String name) {
String value = (String) items.get(name);
return (value == null || "".equals(value.trim())) ? -1 : Integer.parseInt(value);
}
public static java.util.Map getItems() {
return items;
}
}
@Data
class LoginBean {
public LoginBean() {
this.setUserName(Configuration.getString("USConfig.username"));
this.setPassword(Configuration.getString("USConfig.password"));
this.setClientId(Configuration.getString("USConfig.clientId"));
this.setLocation(Configuration.getString("USConfig.location"));
this.setProxyHostName(Configuration.getString("USConfig.proxyHostName"));
this.setProxyPort(Configuration.getString("USConfig.proxyPort"));
this.setProxyUserName(Configuration.getString("USConfig.proxyUserName"));
this.setProxyPassword(Configuration.getString("USConfig.proxyPassword"));
this.setHost(Configuration.getString("USConfig.refinitivHost"));
this.setPort("14002");
}
private String userName;
private String password;
private String clientId;
private String proxyHostName;
private String proxyPort = "-1";
private String proxyUserName;
private String proxyPassword;
private String proxyDomain;
private String proxyKrb5Configfile;
private String host;
private String port;
private String location = "us-east";
}
class StaticDataCallback implements OmmConsumerClient, ServiceEndpointDiscoveryClient {
public final Set<Integer> fids = new HashSet<>();
public static final int nextPageFId = 815;
{
fids.add(800);
fids.add(801);
fids.add(802);
fids.add(803);
fids.add(804);
fids.add(805);
fids.add(806);
fids.add(807);
fids.add(808);
fids.add(809);
/*fids.add(809);
fids.add(809);
fids.add(809);
fids.add(809);*/
fids.add(810);
fids.add(811);
fids.add(812);
fids.add(813);
fids.add(815);
}
@Override
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent consumerEvent) {
System.out.println(refreshMsg.name());
if (0 != refreshMsg.state().statusCode()) {
// LogFactory.getOptionLogger().logInfo(refreshMsg.toString());
}
if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()) {
for (FieldEntry fieldEntry : refreshMsg.payload().fieldList()) {
if (!fids.contains(fieldEntry.fieldId())) {
continue;
}
String value = fieldEntry.load().toString();
if (nextPageFId == fieldEntry.fieldId()) {
if (US_EMPTY.equalsIgnoreCase(value)) {
staticDataListNextPage = US_EMPTY;
} else {
staticDataListNextPage = fieldEntry.ascii().toString();
}
break;
}
if (US_EMPTY.equalsIgnoreCase(value)) {
continue;
}
String code = fieldEntry.ascii().toString();
//if (!USDataCenter.getInstance().containsKey(code)) {
ricCodes.putIfAbsent(code,Boolean.FALSE);
//}
}
}
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
@Override
public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) {
System.out.println(updateMsg.name());
}
@Override
public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent consumerEvent) {
System.out.println(statusMsg.name());
}
@Override
public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent) {
}
@Override
public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent) {
}
@Override
public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent) {
}
@Override
public void onSuccess(ServiceEndpointDiscoveryResp serviceEndpointResp, ServiceEndpointDiscoveryEvent event) {
}
@Override
public void onError(String errorText, ServiceEndpointDiscoveryEvent event) {
}
}
class DynamicDataCallback implements OmmConsumerClient, ServiceEndpointDiscoveryClient {
@Override
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent consumerEvent) {
System.out.println("refreshMsg:" + refreshMsg.name());
}
@Override
public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) {
System.out.println("updateMsg:" + updateMsg.name());
}
@Override
public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent consumerEvent) {
System.out.println("statusMsg:" + statusMsg.toString());
}
@Override
public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent) {
}
@Override
public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent) {
}
@Override
public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent) {
}
@Override
public void onSuccess(ServiceEndpointDiscoveryResp serviceEndpointResp, ServiceEndpointDiscoveryEvent event) {
}
@Override
public void onError(String errorText, ServiceEndpointDiscoveryEvent event) {
}
}
private List<Long> staticHandles = new ArrayList<>();
StaticDataCallback staticDataCallback = new StaticDataCallback();
DynamicDataCallback dynamicDataCallback = new DynamicDataCallback();
ServiceEndpointDiscovery serviceDiscovery = EmaFactory.createServiceEndpointDiscovery();
private Map configDb = EmaFactory.createMap();
OmmConsumerConfig ommConsumerConfig;
private String staticDataListNextPage;
LoginBean loginBean;
private OmmConsumer consumer;
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private ConcurrentHashMap<String,Boolean> ricCodes = new ConcurrentHashMap<>();
public static final String US_EMPTY = "(blank data)";
private String confPath;
private String emaConfPath;
private static String DEFAULT_USER_CONFIG_FILE_NAME = "D:\\work\\gitRepo\\sis-gw\\DataCollectionUSAggregator\\conf\\DataCollectionUS\\configuration.xml";
private static String DEFAULT_EMA_CONFIG_FILE_NAME = "D:\\work\\gitRepo\\sis-gw\\DataCollectionUSAggregator\\conf\\DataCollectionUS\\configuration.xml";
public ConsumerDemo(String userConfPath, String emaConfPath) {
if (userConfPath == null || userConfPath.isEmpty())
userConfPath = DEFAULT_USER_CONFIG_FILE_NAME;
if (emaConfPath == null || emaConfPath.isEmpty())
emaConfPath = DEFAULT_EMA_CONFIG_FILE_NAME;
this.confPath = userConfPath;
this.emaConfPath = emaConfPath;
Configuration.loadConfig(userConfPath);
this.staticDataListNextPage = Configuration.getString("USConfig.USCode");
this.ommConsumerConfig = EmaFactory.createOmmConsumerConfig(emaConfPath);
this.loginBean = new LoginBean();
}
public static void main(String[] args) {
ConsumerDemo demo = createDemo(args);
demo.createConsumer();
if (demo.consumer == null) {
System.out.println("consumer is null.");
return;
}
while (true) {
if (US_EMPTY.equalsIgnoreCase(demo.staticDataListNextPage)) {
break;
}
demo.lock.lock();
try {
demo.staticHandles.add(demo.consumer
.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD")
.name(demo.staticDataListNextPage), demo.staticDataCallback));
demo.condition.await();
} catch (Exception e) {
e.printStackTrace();
} finally {
demo.lock.unlock();
}
}
try {
if (!demo.staticHandles.isEmpty()) {
demo.unregister(demo.staticHandles);
}
} catch (Exception e) {
e.printStackTrace();
}
int stocksSize = demo.ricCodes.size();
if (stocksSize <= 0) {
System.out.println("code size is 0!");
return;
}
ElementList batch = EmaFactory.createElementList();
OmmArray array = EmaFactory.createOmmArray();
// subscribe all
int num = stocksSize;
Iterator<String> codeIterator = demo.ricCodes.keySet().iterator();
while (codeIterator.hasNext()) {
try {
array.add(EmaFactory.createOmmArrayEntry().ascii(codeIterator.next()));
num--;
if (num % 1000 == 0) {
batch.add(EmaFactory.createElementEntry().array(EmaRdm.ENAME_BATCH_ITEM_LIST, array));
demo.consumer.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").payload(batch), demo.dynamicDataCallback);
array.clear();
batch.clear();
}
} catch (IllegalAccessError e) {
e.printStackTrace();
}
}
if (!array.isEmpty()) {
batch.add(EmaFactory.createElementEntry().array(EmaRdm.ENAME_BATCH_ITEM_LIST, array));
demo.consumer.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").payload(batch), demo.dynamicDataCallback);
}
try {
Thread.sleep(1000000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static ConsumerDemo createDemo(String[] args)
{
try
{
int argsCount = 0;
String userConfPath = null;
String emaConfPath = null;
while (argsCount < args.length)
{
if ("-userConfPath".equals(args[argsCount]))
{
userConfPath = argsCount < (args.length-1) ? args[++argsCount] : null;
++argsCount;
}
else if ("-emaConfPath".equals(args[argsCount]))
{
emaConfPath = argsCount < (args.length-1) ? args[++argsCount] : null;
++argsCount;
}
}
return new ConsumerDemo(userConfPath, emaConfPath);
}
catch (Exception e) {
e.printStackTrace();
}
return null;
}
public boolean createConsumer() {
boolean success = false;
try {
if (consumer == null) {
createProgramaticConfig(configDb);
if ((loginBean.getProxyHostName() == null) || (loginBean.getProxyHostName().isEmpty()) || (loginBean.getProxyPort() == "-1"))
{
consumer = EmaFactory.createOmmConsumer(ommConsumerConfig.consumerName("Consumer_1")
.username(loginBean.getUserName()).password(loginBean.getPassword())
.clientId(loginBean.getClientId())
.config(configDb));
}
else
{
consumer = EmaFactory.createOmmConsumer(ommConsumerConfig.consumerName("Consumer_1")
.username(loginBean.getUserName()).password(loginBean.getPassword())
.clientId(loginBean.getClientId()).config(configDb).tunnelingProxyHostName(loginBean.getProxyHostName())
.tunnelingProxyPort(loginBean.getProxyPort())
.tunnelingCredentialUserName(loginBean.getProxyUserName())
.tunnelingCredentialPasswd(loginBean.getProxyPassword())
.tunnelingCredentialDomain(null)
.tunnelingCredentialKRB5ConfigFile(null));
}
}
success = true;
} catch (Exception e) {
e.printStackTrace();
}
return success;
}
private void createProgramaticConfig(Map configDb)
{
Map elementMap = EmaFactory.createMap();
ElementList elementList = EmaFactory.createElementList();
ElementList innerElementList = EmaFactory.createElementList();
innerElementList.add(EmaFactory.createElementEntry().ascii("Channel", "Channel_1"));
elementMap.add(EmaFactory.createMapEntry().keyAscii("Consumer_1", MapEntry.MapAction.ADD, innerElementList));
innerElementList.clear();
elementList.add(EmaFactory.createElementEntry().map("ConsumerList", elementMap));
elementMap.clear();
configDb.add(EmaFactory.createMapEntry().keyAscii("ConsumerGroup", MapEntry.MapAction.ADD, elementList));
elementList.clear();
innerElementList.add(EmaFactory.createElementEntry().ascii("ChannelType", "ChannelType::RSSL_ENCRYPTED"));
innerElementList.add(EmaFactory.createElementEntry().ascii("Host", loginBean.getHost()));
innerElementList.add(EmaFactory.createElementEntry().ascii("Port", loginBean.getPort()));
innerElementList.add(EmaFactory.createElementEntry().intValue("EnableSessionManagement", 1));
elementMap.add(EmaFactory.createMapEntry().keyAscii("Channel_1", MapEntry.MapAction.ADD, innerElementList));
innerElementList.clear();
elementList.add(EmaFactory.createElementEntry().map("ChannelList", elementMap));
elementMap.clear();
configDb.add(EmaFactory.createMapEntry().keyAscii("ChannelGroup", MapEntry.MapAction.ADD, elementList));
}
public void unregister(List<Long> handles) {
for (Long handle : handles) {
if (consumer != null) {
consumer.unregister(handle);
}
}
}
}