///*|----------------------------------------------------------------------------- // *| This source code is provided under the Apache 2.0 license -- // *| and is provided AS IS with no warranty or guarantee of fit for purpose. -- // *| See the project's LICENSE.md for details. -- // *| Copyright (C) 2019 Refinitiv. All rights reserved. -- ///*|----------------------------------------------------------------------------- #include #include #include "Ema.h" void sleep(int millisecs) { struct timespec sleeptime; sleeptime.tv_sec = millisecs / 1000; sleeptime.tv_nsec = (millisecs % 1000) * 1000000; nanosleep(&sleeptime, 0); } // application defined client class for receiving and processing of item messages class AppClient : public refinitiv::ema::access::OmmConsumerClient { protected: void onRefreshMsg(const refinitiv::ema::access::RefreshMsg &, const refinitiv::ema::access::OmmConsumerEvent &); void onUpdateMsg(const refinitiv::ema::access::UpdateMsg &, const refinitiv::ema::access::OmmConsumerEvent &); void onStatusMsg(const refinitiv::ema::access::StatusMsg &, const refinitiv::ema::access::OmmConsumerEvent &); }; using namespace refinitiv::ema::access; using namespace std; void make_rtsdk_config( Map &config_map, size_t item_count, const std::string &host1, int port1, const std::string &host2, int port2) { Map inner_map; ElementList element_list; element_list.addAscii("DefaultConsumer", "Consumer_2"); inner_map .addKeyAscii( "Consumer_2", MapEntry::AddEnum, ElementList() .addAscii("ChannelSet", "Channel_1,Channel_2") .addAscii("Logger", "Logger_1") .addUInt("ItemCountHint", item_count) .complete()) .complete(); element_list.addMap("ConsumerList", inner_map); element_list.complete(); inner_map.clear(); config_map.addKeyAscii("ConsumerGroup", MapEntry::AddEnum, element_list); element_list.clear(); inner_map .addKeyAscii( "Channel_1", MapEntry::AddEnum, ElementList() .addEnum("ChannelType", 0) .addEnum("CompressionType", 1) // zlib .addUInt("GuaranteedOutputBuffers", 5000) // default is 100 .addUInt("ConnectionPingTimeout", 10000) // default 30,000 ms .addAscii("Host", host1.c_str()) .addAscii("Port", std::to_string(port1).c_str()) .addUInt("SysRecvBufSize", 512ul * 1024) // default is OS default .complete()) .addKeyAscii( "Channel_2", MapEntry::AddEnum, ElementList() .addEnum("ChannelType", 0) .addEnum("CompressionType", 1) // zlib .addUInt("GuaranteedOutputBuffers", 5000) // default is 100 .addUInt("ConnectionPingTimeout", 10000) // default 30,000 ms .addAscii("Host", host2.c_str()) .addAscii("Port", std::to_string(port2).c_str()) .addUInt("SysRecvBufSize", 512ul * 1024) // default is OS default .complete()) .complete(); element_list.addMap("ChannelList", inner_map); element_list.complete(); inner_map.clear(); config_map.addKeyAscii("ChannelGroup", MapEntry::AddEnum, element_list); element_list.clear(); inner_map .addKeyAscii( "Logger_1", MapEntry::AddEnum, ElementList() .addEnum("LoggerType", 1) // 1 = stdout .addEnum("LoggerSeverity", 0) // 0 = verbose .complete()) .complete(); element_list.addMap("LoggerList", inner_map); element_list.complete(); inner_map.clear(); config_map.addKeyAscii("LoggerGroup", MapEntry::AddEnum, element_list); element_list.clear(); config_map.complete(); } void AppClient::onRefreshMsg(const RefreshMsg &refreshMsg, const OmmConsumerEvent &) { cout << refreshMsg << endl; // defaults to refreshMsg.toString() } void AppClient::onUpdateMsg(const UpdateMsg &updateMsg, const OmmConsumerEvent &) { cout << updateMsg << endl; // defaults to updateMsg.toString() } void AppClient::onStatusMsg(const StatusMsg &statusMsg, const OmmConsumerEvent &) { cout << statusMsg << endl; // defaults to statusMsg.toString() } int main() { try { AppClient client; Map config; make_rtsdk_config(config, 500, // this is the correct IP but the wrong port (on purpose to fail and cause us to try host2) "146.242.129.1", 14007, // host2 is correct "146.242.133.1", 14002); OmmConsumer consumer( OmmConsumerConfig().config(config).consumerName("Consumer_2")); // uses configuration from EmaConfig.xml consumer.registerClient(ReqMsg().serviceName("DIRECT_FEED").name("IBM.N"), client); sleep(60000); // API calls onRefreshMsg(), onUpdateMsg() and onStatusMsg() } catch (const OmmException &excp) { cout << excp << endl; } return 0; }