using channel_names_t = std::vector; const std::string_view consumer_name = "OmmConsumer"; const std::string_view channel_delimiter = ","; class omm_config_factory final { public: explicit omm_config_factory(const connector::base_settings& settings) : m_settings(settings) { } void get_omm_config(thomsonreuters::ema::access::Map& config) const { thomsonreuters::ema::access::ElementList channel_list; thomsonreuters::ema::access::ElementList consumer_list; channel_names_t names; get_channel_list(names, channel_list); get_consumer_list(names, consumer_list); config.addKeyAscii("ChannelGroup", thomsonreuters::ema::access::MapEntry::AddEnum, channel_list); config.addKeyAscii("ConsumerGroup", thomsonreuters::ema::access::MapEntry::AddEnum, consumer_list); config.complete(); } private: static std::string create_channel_name(const connector::stunnel& stunnel, const unsigned int index) { return fmt::format("{}:{} - attempt {}", stunnel.address.host, stunnel.address.port, index); } static std::string create_channel_set(const channel_names_t& names) { if (names.empty()) throw std::invalid_argument("Channel list is empty"); std::stringstream result_stream; for (const std::string& name : names) { result_stream << name << channel_delimiter; } auto result = result_stream.str(); result.erase(result.cend() - 1); return result; } static void get_consumer(const channel_names_t& names, thomsonreuters::ema::access::ElementList& consumer) { const std::string& channel_set = create_channel_set(names); consumer.addAscii("Name", consumer_name.data()); consumer.addAscii("ChannelSet", channel_set.c_str()); consumer.complete(); } static void get_channel(const std::string& channel_name, const connector::stunnel& stunnel, thomsonreuters::ema::access::ElementList& channel) { channel.addAscii("Name", channel_name.c_str()); channel.addAscii("Host", stunnel.address.host.c_str()); channel.addAscii("Port", std::to_string(stunnel.address.port).c_str()); channel.complete(); } void get_consumer_list(const channel_names_t& names, thomsonreuters::ema::access::ElementList& consumer_list) const { thomsonreuters::ema::access::ElementList consumer; get_consumer(names, consumer); thomsonreuters::ema::access::Map consumers; consumers.addKeyAscii(consumer_name.data(), thomsonreuters::ema::access::MapEntry::AddEnum, consumer); consumers.complete(); consumer_list.addMap("ConsumerList", consumers); consumer_list.complete(); } void get_channel_list(channel_names_t& names, thomsonreuters::ema::access::ElementList& channel_list) const { channel_names_t tmp_names; thomsonreuters::ema::access::Map channels; for (const auto& stunnel : m_settings.stunnels) { for (int i = 0; i < m_settings.network.connect_attempts; i++) { thomsonreuters::ema::access::ElementList channel; const std::string& channel_name = create_channel_name(stunnel, i); get_channel(channel_name, stunnel, channel); channels.addKeyAscii(channel_name.c_str(), thomsonreuters::ema::access::MapEntry::AddEnum, channel); tmp_names.push_back(channel_name); } } channels.complete(); channel_list.addMap("ChannelList", channels); channel_list.complete(); names.swap(tmp_names); } private: const connector::base_settings& m_settings; }; void omm_consumer_holder::connect() { try { m_stop_flag = false; while (!m_stop_flag) { try { omm_config_factory config_factory(m_settings); thomsonreuters::ema::access::Map config; config_factory.get_omm_config(config); thomsonreuters::ema::access::OmmConsumerConfig consumer_config; consumer_config.config(config); consumer_config.username(m_settings.user_name.c_str()); consumer_config.password(m_settings.user_password.c_str()); m_omm_channel_listener = create_channel_listener(m_connection_handler); set_omm_consumer(std::make_shared(consumer_config, *m_omm_channel_listener)); m_connection_handler.on_create(); return; } catch (const thomsonreuters::ema::access::OmmException& e) { log_error("Failed to create OMM client due to exception: ", e); } } } catch (const std::exception& e) { log_error("Failed to create OMM client due to exception: ", e.what()); } }