///*|----------------------------------------------------------------------------- // *| This source code is provided under the Apache 2.0 license -- // *| and is provided AS IS with no warranty or guarantee of fit for purpose. -- // *| See the project's LICENSE.md for details. -- // *| Copyright Thomson Reuters 2015. All rights reserved. -- ///*|----------------------------------------------------------------------------- #include "Consumer.h" #include #include using namespace thomsonreuters::ema::access; using namespace std; #include #include #include #include /* * utils function to monitor quote delay */ #define SECOND_FACTOR 1000 #define MINUTE_FACTOR 100000 #define HOUR_FACTOR 10000000 #define DAY_FACTOR 1000000000 #define MONTH_FACTOR 100000000000 #define YEAR_FACTOR 10000000000000 void GetCurrentTimestamp(Int64& date, Int64& tg_milli_sec) { struct timeval tv; gettimeofday(&tv, NULL); struct tm tmstruct; localtime_r(&tv.tv_sec, &tmstruct); UInt64 tg_year = (tmstruct.tm_year + 1900); UInt64 tg_month = tg_year * 100 + (tmstruct.tm_mon + 1); UInt64 tg_day = tg_month * 100 + tmstruct.tm_mday; UInt64 tg_hour = tg_day * 100 + tmstruct.tm_hour; UInt64 tg_min = tg_hour * 100 + tmstruct.tm_min; UInt64 tg_sec = tg_min * 100 + tmstruct.tm_sec; date = tg_day * DAY_FACTOR; tg_milli_sec = tg_sec * 1000 + tv.tv_usec / 1000; } enum FID_US { FID_US_BLK_DATE = 9210, FID_US_BLKTIM_MS = 14206, FID_US_ODD_DATE = 9244, FID_US_ODDTIM_MS = 14251, FID_US_IRGDATE = 4349, FID_US_IRGTIM_MS = 9063, FID_US_TRADE_DATE = 16, FID_US_SALTIM_MS = 3854, }; typedef struct { int fid_date_; int fid_tim_ms_; const char* tick_desc_; } TickFields; typedef struct { string trep_host_; string service_name_; string user_name_; string rics_; bool debug_msg_content_ = false; bool count_only_ = false; int step_ = 100; } Config; Config gConfig; static map tick_map ; static Int64 trade_date = 0; static Int64 msg_cnt = 0; namespace emaaccess = thomsonreuters::ema::access; Int64 GetTradeTime(const emaaccess::FieldEntry& fe) { return ((Int64)fe.getTime().getHour() * (Int64)3600 + (Int64)fe.getTime().getMinute() * (Int64)60 + (Int64)fe.getTime().getSecond()) * (Int64)1000 + (Int64)fe.getTime().getMillisecond() ; } const emaaccess::FieldEntry* GetFieldEntryByID( const emaaccess::FieldList& fl, int field_id ) { const emaaccess::FieldEntry* fe = NULL; if(fl.forth(field_id)) { fe = &fl.getEntry(); } fl.reset(); if ((fe != NULL) && (fe->getCode() != emaaccess::Data::BlankEnum) ) { return fe; } return (NULL); } bool is_leap_year(Int64 year) { return ((year % 400) == 0) || ((year % 4 == 0) && (year % 100 != 0)); } Int64 GetPreviousDate(Int64 date) { static int day_of_mon[] = { 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31 }; Int64 year = date / YEAR_FACTOR; Int64 mon = date % YEAR_FACTOR / MONTH_FACTOR; // 1 - 12 Int64 day = date % MONTH_FACTOR / DAY_FACTOR; // 1 - 31 if (day > 1) { day--; } else { // the last day of previous month mon = mon - 1; if(mon <= 0) { // last month of previous year year = year - 1; mon = 12; day = day_of_mon[mon - 1]; } else { day = day_of_mon[mon - 1]; if(is_leap_year(year) && (mon == 2)) { day++; } } } return year * YEAR_FACTOR + mon * MONTH_FACTOR + day * DAY_FACTOR; } Int64 ConvertUtc2EstTime(Int64 date, Int64 utc_ms) { Int64 sec = 0; if(/*UsExchangeTimeInfo::GetIsDayLightSavingTime()*/true) sec = utc_ms / 1000 - 4 * 3600; else sec = utc_ms / 1000 - 5 * 3600; Int64 msec = utc_ms % 1000; if(sec < 0) { sec += 24 * 3600; date = GetPreviousDate(date); } Int64 hour = sec / 3600; Int64 min = (sec - hour * 3600) / 60; Int64 secs = sec % 60; return date + hour * HOUR_FACTOR + min * MINUTE_FACTOR + secs * SECOND_FACTOR + msec; } void Swap(Int64& t1, Int64& t2) { Int64 t = t1; t1 = t2; t2 = t; } void ParseTime(Int64 t, Int64& sec, Int64& milli_sec) { milli_sec = t % SECOND_FACTOR; struct tm tm_date; memset((void*)&tm_date, 0, sizeof(tm_date)); tm_date.tm_year = t / YEAR_FACTOR - 1900; tm_date.tm_mon = t % YEAR_FACTOR / MONTH_FACTOR - 1; tm_date.tm_mday = t % MONTH_FACTOR / DAY_FACTOR; tm_date.tm_hour = t % DAY_FACTOR / HOUR_FACTOR; tm_date.tm_min = t % HOUR_FACTOR / MINUTE_FACTOR; tm_date.tm_sec = t % MINUTE_FACTOR / SECOND_FACTOR; sec = (Int64)mktime(&tm_date); //LOG_DEBUG("time %lld with sec:%lld, milli_sec:%lld", t, sec, milli_sec); } Int64 GetTimeDiff(Int64 t1, Int64 t2) { if(t1 < t2) { Swap(t1, t2); } Int64 sec1 = 0, milli_sec1 = 0; ParseTime(t1, sec1, milli_sec1); Int64 sec2 = 0, milli_sec2 = 0; ParseTime(t2, sec2, milli_sec2); if(milli_sec1 < milli_sec2) { sec1--; milli_sec1 += 1000; } return (sec1 - sec2) * SECOND_FACTOR + (milli_sec1 - milli_sec2); } int DebugMessageDelay(const emaaccess::UpdateMsg &updateMsg, Int64 date, Int64 current_time) { const emaaccess::EmaString& symbol = updateMsg.getName(); const emaaccess::FieldList& fl = updateMsg.getPayload().getFieldList(); for(map::iterator it = tick_map.begin(); it != tick_map.end(); ++it) { // Get trade date first const emaaccess::FieldEntry* tim_date_fe = GetFieldEntryByID(fl, it->second.fid_date_); if(tim_date_fe != NULL) { Int64 tim_date = (Int64)tim_date_fe->getDate().getYear() * (Int64)YEAR_FACTOR + (Int64)tim_date_fe->getDate().getMonth() * (Int64)MONTH_FACTOR + (Int64)tim_date_fe->getDate().getDay() * (Int64)DAY_FACTOR; if(tim_date > trade_date) { trade_date = tim_date; } } if(trade_date == 0) { trade_date = date; } // process tim_ms field for tick time const emaaccess::FieldEntry* tim_ms_fe = GetFieldEntryByID(fl, it->first); if(tim_ms_fe == NULL) { continue; } Int64 tim_ms = 0; switch(tim_ms_fe->getLoadType()) { case emaaccess::DataType::TimeEnum: tim_ms = GetTradeTime(*tim_ms_fe); break; case emaaccess::DataType::UIntEnum: tim_ms = tim_ms_fe->getUInt(); break; default: continue; } Int64 quote_time = ConvertUtc2EstTime(trade_date, tim_ms); fprintf(stdout, "symbol:%s, desc:%s, current time:%lld, quote time:%lld, diff:%lld\n", symbol.c_str(), it->second.tick_desc_, current_time, quote_time, GetTimeDiff(current_time, quote_time) - 12 * 3600 * 1000 ); } return (0); } void AppClient::onRefreshMsg( const RefreshMsg& refreshMsg, const OmmConsumerEvent& ) { cout << refreshMsg << endl; // defaults to refreshMsg.toString() } void AppClient::onUpdateMsg( const UpdateMsg& updateMsg, const OmmConsumerEvent& ) { Int64 date = 0, current_time = 0; GetCurrentTimestamp(date, current_time); ++msg_cnt; if(msg_cnt % gConfig.step_ == 0) { fprintf(stderr, "count %lld at %lld\n", msg_cnt, current_time); } if(gConfig.count_only_) { return; } if(gConfig.debug_msg_content_) { cout << updateMsg << endl; // defaults to updateMsg.toString() } DebugMessageDelay(updateMsg, date, current_time); } void AppClient::onStatusMsg( const StatusMsg& statusMsg, const OmmConsumerEvent& ) { cout << statusMsg << endl; // defaults to statusMsg.toString() } void ParseConfig(int argc, char* argv[]) { int ch = 0, opterr = 0; while((ch = getopt(argc, argv, "h:s:u:r:cdt:"))!= -1) { switch(ch) { case 'h': gConfig.trep_host_ = optarg; break; case 's': gConfig.service_name_ = optarg; break; case 'u': gConfig.user_name_ = optarg; break; case 'r': gConfig.rics_ = optarg; break; case 'c': gConfig.count_only_ = true; break; case 'd': gConfig.debug_msg_content_ = true; break; case 't': gConfig.step_ = atoi(optarg); break; default: fprintf(stderr, "unknown opt:%c\n", ch); break; } } if((gConfig.rics_ == "") || (gConfig.trep_host_ == "") || (gConfig.service_name_ == "") || (gConfig.user_name_ == "") ) { fprintf(stderr, "missing required argument, usage:%s -h trep_host -s service_name -u user_name -r rics -c -d -t msg_step\n" "where\n" "-h trep_host: trep host connected to in format ip:port\n" "-s service_name: elektron service name, like ELEKTRON_DD\n" "-u user_name: user name to connect to trep host\n" "-r rics: thomsonreuter instrument codes separated by comma\n" "-c: print time msg count only, default false\n" "-d: debug mode will print msg content if -c flag unspecified, default false\n" "-t msg_step: print message count every msg_step msg, default 100\n", argv[0] ); exit(-1); } } template int StrSplit(T& dst, const string& src, const string& spt, bool ignore_continued = false) { string::size_type bpos = 0, epos = 0; while((epos = src.find(spt, bpos)) != string::npos) { if(!ignore_continued || epos != bpos) { dst.insert(dst.end(), src.substr(bpos, epos - bpos)); } if(src.size() == epos + spt.size()) { bpos = epos; break; } bpos = epos + spt.size(); } dst.insert(dst.end(), src.substr(bpos,epos - bpos)); return 0; } int main( int argc, char* argv[] ) { setvbuf(stdout, NULL, _IONBF, 0); tick_map[FID_US_BLKTIM_MS] = {FID_US_BLK_DATE, FID_US_BLKTIM_MS, "BLKTIM_MS"}; tick_map[FID_US_ODDTIM_MS] = {FID_US_ODD_DATE, FID_US_ODDTIM_MS, "ODDTIM_MS"}; tick_map[FID_US_IRGTIM_MS] = {FID_US_IRGDATE, FID_US_IRGTIM_MS, "IRGTIM_MS"}; tick_map[FID_US_SALTIM_MS] = {FID_US_TRADE_DATE,FID_US_SALTIM_MS, "SALTIM_MS"}; ParseConfig(argc, argv); set set_rics; StrSplit(set_rics, gConfig.rics_, ",", true); // -h 10.56.125.112:14002 -s ELEKTRON_DD -u user12 -r AAPL.NB,MSFT.NB,AMZN.NB,QQQ.NB,GOOG.NB,TSLA.NB try { AppClient client; OmmConsumer consumer( OmmConsumerConfig().host( gConfig.trep_host_.c_str() ).username( gConfig.user_name_.c_str() ) ); for(set::const_iterator it = set_rics.begin(); it != set_rics.end(); it++) { consumer.registerClient( ReqMsg().serviceName( gConfig.service_name_.c_str() ).name( it->c_str() ), client ); printf("register subscription for ric:%s\n", it->c_str()); } while(true) { sleep( 60000 ); // API calls onRefreshMsg(), onUpdateMsg(), or onStatusMsg() } } catch ( const OmmException& excp ) { cout << excp << endl; } return 0; }