C++ NIProvider publishing to ADH service does not work

Deary
Deary Newcomer
edited May 7 in EMA

I am testing according to lseg C++ NIProvider tutorial.I am testing on internal service “DTS_KL” of internal ric “USDFON=MBBGM”, the codes are like the following.OmmProvider.

provider(OmmNiProviderConfig("c:\\temp\\process\\x64\\EMAConfig.xml").username("drdwhw"));
UInt64 serviceId = 1000;
UInt64 sourceDirectoryHandle = 1;
UInt64 ibmHandle = 5;
RefreshMsg refresh;
UpdateMsg update;
FieldList fieldList;

Connector::ConnectionStateListener _connectionStateListener;

provider.registerClient(ReqMsg().domainType(MMT_LOGIN).name("drdwhw"),_connectionStateListener,(void*)0);

provider.submit(refresh.clear().domainType(MMT_MARKET_PRICE).serviceName("DTS_KL")
.name("USDFON=MBBGM").state(OmmState::OpenEnum,OmmState::OkEnum,OmmState::NoneEnum,"UnSolicited Refresh Completed")
.payload(fieldList
.addReal(22, 4.21, OmmReal::ExponentNeg2Enum) //BID_1 - 23, BID - 22
.addReal(25, 4.22, OmmReal::ExponentNeg2Enum) //ASK_1 - 26, ASK - 25
.complete()).complete(), ibmHandle);

provider.submit(update.clear().domainType(MMT_MARKET_PRICE).serviceName("DTS_KL")
.name("USDFON=MBBGM").payload(fieldList.clear()
.addReal(22, 4.21, OmmReal::ExponentNeg2Enum) //BID_1 - 23, BID - 22
.addReal(25, 4.22, OmmReal::ExponentNeg2Enum) //ASK_1 - 26, ASK - 25
.complete()), ibmHandle);

provider.submit(StatusMsg().state(OmmState::OpenEnum, // Stream state
OmmState::OkEnum, // Data state
OmmState::NoneEnum, // Data status
"All is good"), // Explanatory
textibmHandle);

Our IT has checked that the ADH service is running, and I can get call back saying that refresh is completed after I send refresh message. This shows that I can connect to ADH service. Now my problem is after I submit the update message the data is not updated.

support ticket raised as 14606472 as well.

Answers

  • wasin.w
    wasin.w admin

    Hello @Deary

    I see that an application code has published a refresh message, an update message (single update), and a status message to ADH continuously without a pause between the messages. The values of BID and ASK fields of the Refresh and Update messages are also the same value (4.21 and 4.22) which make it harder to detect a change.

    Could you please try the following tasks?

    • add some pause (sleep) between the message submit
    • add a loop to submit multiple update messages with various values updated

    Then you can track on a consumer's update callback easier

    Example:

    provider.submit( RefreshMsg().serviceName( "DTS_KL" ).name( "USDFON=MBBGM" )
    ...
    .complete(), itemHandle );

    sleep( 1000 );

    for ( Int32 i = 0; i < 60; i++ )
    {
    provider.submit( UpdateMsg().serviceName( "DTS_KL" ).name( "USDFON=MBBGM" )
    .payload( FieldList()
    .addReal( 22, 5.5 + i, OmmReal::ExponentNeg2Enum )
    .addReal( 25, 10.9 + i, OOmmReal::ExponentNeg2Enum )
    .complete() ), itemHandle );
    sleep( 1000 );
    }

    You can see the full example on GitHub EMA C++ 100_MP_Streaming example.

    Additionally, you can post a code by clicking on the menu on the left and choose "" —> </> Code Block option.

    post_code.png
  • Deary
    Deary Newcomer

    I did the code change and it does not work.

    the quote in workspace does not change at all.

    however if i use excel to publish, it works, now the values in workspace shows 4.31 and 4.32 for bid and ask respectively.

    i only receive on refresh message callback of the following, OmmProvider registerClient does not allow me to listen to updatemsg and statusmsg callback, can you tell me how to check update status?

    NIProvier OnRefreshMsg Handle: 3003537654640 Closure: 0000000000000000
    RefreshMsg
    streamId="1"
    domain="Login Domain"
    Solicited
    RefreshComplete
    ClearCache
    state="Open / Ok / None / 'Refresh Completed'"
    itemGroup=""
    name="drdwhw"
    nameType="1"
    Attrib dataType="ElementList"
    ElementList
    ElementEntry name="SingleOpen" dataType="UInt" value="1"
    ElementEntry name="AllowSuspectData" dataType="UInt" value="1"
    ElementEntry name="ProvidePermissionExpressions" dataType="UInt" value="1"
    ElementEntry name="ProvidePermissionProfile" dataType="UInt" value="1"
    ElementEntry name="SupportBatchRequests" dataType="UInt" value="0"
    ElementEntry name="SupportOptimizedPauseResume" dataType="UInt" value="0"
    ElementEntry name="SupportOMMPost" dataType="UInt" value="0"
    ElementEntry name="SupportEnhancedSymbolList" dataType="UInt" value="0"
    ElementEntry name="SupportViewRequests" dataType="UInt" value="0"
    ElementEntry name="ApplicationId" dataType="Ascii" value="256"
    ElementEntry name="ApplicationName" dataType="Ascii" value="ADH"
    ElementEntry name="Position" dataType="Ascii" value="10.3.45.98/net"
    ElementListEnd

    AttribEnd
    

    RefreshMsgEnd

    try
    {
    //OmmProvider provider(OmmNiProviderConfig().host("sgplpapp01trp.mbb.com.sg:14003").username("drdwhw"));
    OmmProvider provider(OmmNiProviderConfig("c:\\temp\\process\\x64\\EMAConfig.xml").username("drdwhw"));
    UInt64 serviceId = 1000;
    UInt64 sourceDirectoryHandle = 1;
    UInt64 ibmHandle = 5;
    RefreshMsg refresh;
    UpdateMsg update;
    FieldList fieldList;
    Connector::ConnectionStateListener _connectionStateListener;

    provider.registerClient(
    ReqMsg()
    .domainType(MMT_LOGIN)
    .name("drdwhw"),
    _connectionStateListener,
    (void*)0);

    provider.submit(
    refresh.clear().domainType(MMT_MARKET_PRICE).serviceName("DTS_KL").name("USDFON=MBBGM").state(
    OmmState::OpenEnum,
    OmmState::OkEnum,
    OmmState::NoneEnum,
    "UnSolicited Refresh Completed")
    .payload(
    fieldList
    .addReal(22, 4.21, OmmReal::ExponentNeg2Enum) //BID_1 - 23, BID - 22
    .addReal(25, 4.22, OmmReal::ExponentNeg2Enum) //ASK_1 - 26, ASK - 25
    .complete()).complete(), ibmHandle);

    sleep(1000);


    for (Int32 i = 0; i < 60; i++)
    {
    provider.submit(UpdateMsg().serviceName("DTS_KL").name("USDFON=MBBGM")

    .payload(FieldList()

    .addReal(22, 5.5 + i, OmmReal::ExponentNeg2Enum)

    .addReal(25, 10.9 + i, OmmReal::ExponentNeg2Enum)

    .complete()), ibmHandle);

    sleep(1000);

    }
    }
    catch (std::exception& exp)
    {
    std::cout << exp.what() << std::endl;
    }
    }


    ConsumerClient::ConsumerClient() :
    niProvider(0),
    pConsumer(0),
    handle(0),
    pItemList(0),
    bDumpOutput(false),
    bUsePrefix(false),
    prefixString(""),
    bConnectionUp(false)
    {

    }
    ConsumerClient::~ConsumerClient()
    {
    }
    bool ConsumerClient::isConnectionUp() const
    {
    return bConnectionUp;
    }
    void ConsumerClient::setConsumer(thomsonreuters::ema::access::OmmConsumer& consumer)
    {
    pConsumer = &consumer;
    }
    void ConsumerClient::setNIProvider(OmmProvider& provider)
    {
    niProvider = &provider;
    }
    void ConsumerClient::setNIProviderClient(NIProviderClient& providerClient)
    {
    this->niProviderClient = &providerClient;
    }
    void ConsumerClient::setItemList(ITEMLIST& itemlist)
    {
    this->pItemList = &itemlist;
    }
    void ConsumerClient::usePrefix(const bool& bUsePrefix, const std::string prefixString)
    {
    this->bUsePrefix = bUsePrefix;
    this->prefixString = prefixString;
    }
    void ConsumerClient::dumpOutput(const bool& flag)
    {
    this->bDumpOutput = flag;
    }
    void ConsumerClient::setNIPubServiceName(const EmaString& serviceName)
    {
    defaultNIPubServiceName = serviceName;
    }
    void ConsumerClient::onRefreshMsg(const RefreshMsg& refreshMsg, const OmmConsumerEvent& ommEvent)
    {
    try {

    if (refreshMsg.getDomainType() == MMT_LOGIN)
    {
    cout << endl << "Login OnRefreshMsg Handle: " << ommEvent.getHandle() << " Closure: " << (const char *)(ommEvent.getClosure()) << endl;
    if (bDumpOutput)
    cout << refreshMsg << endl;

    if (refreshMsg.getState().getStreamState() == OmmState::OpenEnum)
    {
    if (refreshMsg.getState().getDataState() == OmmState::OkEnum)
    bConnectionUp = true;
    else
    bConnectionUp = false;
    }
    else
    bConnectionUp = false;

    }
    if (refreshMsg.getDomainType() == MMT_MARKET_PRICE)
    {
    cout << endl << "Market Price OnRefreshMsg Handle: " << ommEvent.getHandle() << " ItemName " << refreshMsg.getName().c_str() << endl;
    if (bDumpOutput)
    cout << refreshMsg << endl;

    if (niProviderClient->isConnectionUp())
    {
    ITEMLIST::iterator it;
    it = pItemList->find(refreshMsg.getName().c_str());
    if (it != pItemList->end())
    {
    string pubItemName = "";
    if (bUsePrefix)
    pubItemName = prefixString;
    pubItemName.append((*it).first);
    UInt64 pubHandle = (*it).second.NiPubHandle;
    RefreshMsg refresh;
    cout << "Publish Refresh message for Item " << pubItemName.c_str() << " to Non Interactive provider service..." << endl;
    niProvider->submit(refresh.clear().serviceName(defaultNIPubServiceName).name(pubItemName.c_str()).domainType(MMT_MARKET_PRICE)
    .state(OmmState::OpenEnum, OmmState::OkEnum, OmmState::NoneEnum, "UnSolicited Refresh Completed").complete(true)
    .payload(refreshMsg.getPayload().getFieldList()), pubHandle);
    }
    }

    }
    }
    catch (OmmException& ex)
    {
    cout << ex << std::endl;
    }

    }

    void ConsumerClient::onUpdateMsg(const UpdateMsg& updateMsg, const OmmConsumerEvent& ommEvent)
    {
    try {

    if (updateMsg.getDomainType() == MMT_MARKET_PRICE)
    {
    cout << endl << "Market Price OnUpdateMsg Handle: " << ommEvent.getHandle() << " ItemName: " << updateMsg.getName().c_str() << endl;
    if (bDumpOutput)
    cout << updateMsg << endl;

    if (niProviderClient->isConnectionUp())
    {
    ITEMLIST::iterator it;
    it = pItemList->find(updateMsg.getName().c_str());
    if (it != pItemList->end())
    {
    string pubItemName = "";
    if (bUsePrefix)
    pubItemName = prefixString;
    pubItemName.append((*it).first);
    UInt64 pubHandle = (*it).second.NiPubHandle;
    UpdateMsg update;
    cout << "Publish Update message for Item " << pubItemName.c_str() << " to Non Interactive provider service..." << endl;
    niProvider->submit(update.clear().serviceName(defaultNIPubServiceName).name(pubItemName.c_str())
    .payload(updateMsg.getPayload().getFieldList()), pubHandle);

    }
    }
    }
    }
    catch (OmmException& ex)
    {
    cout << ex << std::endl;
    }
    }

    void ConsumerClient::onStatusMsg(const StatusMsg& statusMsg, const OmmConsumerEvent& ommEvent)
    {


    if (statusMsg.getDomainType() == MMT_LOGIN)
    {
    cout << endl << "Login OnStatusMsg Handle: " << ommEvent.getHandle() << " Closure: " << (char *)ommEvent.getClosure() << endl;
    cout << statusMsg << endl;

    if (statusMsg.hasState())
    {
    if (statusMsg.getState().getStreamState() == OmmState::OpenEnum)
    {
    if (statusMsg.getState().getDataState() == OmmState::OkEnum)
    bConnectionUp = true;
    else
    bConnectionUp = false;
    }
    else
    bConnectionUp = false;
    }
    }
    if (statusMsg.getDomainType() == MMT_MARKET_PRICE)
    {
    cout << endl << "Market Price OnStatusMsg Handle: " << ommEvent.getHandle() << " Closure: " << (char *)ommEvent.getClosure() << endl;
    cout << statusMsg << endl;
    }
    }
  • Jirapongse
    Jirapongse ✭✭✭✭✭

    @Deary

    As far as I know, Excel uses posting or contribution to post data to a service. It uses a different mechanism from NIProvider.

    What is the DTS_KL service? Is it the service from ATS or DTS server?

    Typically, the cacheLocation and cacheType of a NI service must be ssl and sourceDriven respectively.

    …*DIST_PUB*cacheLocation : ssl
    …*DIST_PUB*cacheType : sourceDriven
    

    Moreover, why does the ConsumerClient have the niProvider? It seems that it retrieves data and then republishes the data to another NI service. Please confirm the consumer's behavior.

    Moreover, please share the ADH configuration file and the application diagram.

  • Deary
    Deary Newcomer

    DTS_KL is DTS server

    ConsumerClient is responsible to receive message update from niProvider, I am using to see if i get any error from refresh or update message.

    I am getting IT to retrieve ADH configuration file and application diagram.

    in the meantime, can you tell me where to set cacheLocation and cacheType?

  • Jirapongse
    Jirapongse ✭✭✭✭✭

    @Deary

    DTS server is an interactive provider. You can't use NIProvider to publish data the to DTS server.

    To contribute data to DTS, you need to use Posting. Please refer to the following articles:

  • Deary
    Deary Newcomer

    By interactiveProvider do you mean refinitive workspace has to be loggon and the service must run at user's machiine?

    in the codes, it does not set any remote service ip address, only a port.

    in sample emaconfig i also do not see any ip address supplied.

    		AppClient appClient;
    		OmmProvider provider( OmmIProviderConfig().port( "14002" ), appClient );
    
  • Jirapongse
    Jirapongse ✭✭✭✭✭

    @Deary

    If you would like to post updates to items on the DTS service (DTS_KL), you need to use posting. Please see the example here.

    Moreover, please refer to the following articles:

    If not, could you please explain what you would like to do?

  • Deary
    Deary Newcomer

    Thank you for your help, yes i need to post update to DTS service.

    I managed to publish something to DTS on this RIC USDFON=MBBGM

    However I have two questions:

    1. the example shows publishing done inside onrefreshmsg after login, i move the post msg outside after getting the event handle.

    is this recommended? is the event handle valid throughout the consumer lifecycle?

    2. can you advise how to publish a value like 4.21 into BID filed? if i use Exponent0Enum, i can see only integer value 4 published.


    void AppConsumerClient::onRefreshMsg(const RefreshMsg& refreshMsg, const OmmConsumerEvent& event) {
    cout << "Received Refresh. Item Handle: " << event.getHandle() << " Closesure " <<event.getClosure() << std::endl;

    cout << "Item Name: " << (refreshMsg.hasName() ? refreshMsg.getName().c_str() : std::string("<not set>")) << std::endl;

    cout << "Service Name: " << (refreshMsg.hasServiceName() ? refreshMsg.getServiceName().c_str() : std::string("<not set>")) << std::endl;

    cout << "Item State: " << refreshMsg.getState() << std::endl;

    if (refreshMsg.getDomainType() == MMT_LOGIN)
    {
    itemHandle = event.getHandle();
    }
    cout << refreshMsg << std::endl;
    }
    AppConsumerClient appClient;
    OmmConsumer consumer(
    OmmConsumerConfig()
    .operationModel(OmmConsumerConfig::UserDispatchEnum)
    .host("sgplpapp01trp.mbb.com.sg:14002")
    .username("drdwhw" /*"drlexl"*/)
    );
    ReqMsg reqMsg;
    consumer.registerClient(reqMsg.domainType(MMT_LOGIN), appClient, &consumer);
    consumer.registerClient(
    ReqMsg()
    .serviceName("DTS_KL"/*"IDN_SELECTFEED"*/)
    .name("USDFON=MBBGM"/*"USDAM3L1Y=TTKL"*/),
    appClient,
    &consumer);
    sleep(1000);
    int postId = 0;
    for (Int32 i = 0; i < 60; i++)
    {
    consumer.dispatch(5000);
    if (itemHandle != 0)
    {
    PostMsg postMsg;
    FieldList fieldList;
    consumer.submit(postMsg.postId(postId++).serviceName("DTS_KL")
    .name("USDFON=MBBGM").solicitAck(true).complete(true)
    .payload(
    fieldList
    .addReal(22, 4.21, OmmReal::Exponent0Enum) //BID_1 - 23, BID - 22
    .addReal(25, 4.22, OmmReal::Exponent0Enum) //ASK_1 - 26, ASK - 25
    .complete()).complete(), itemHandle);
    }
    }
    UpdateMsg
    streamId="5"
    domain="MarketPrice Domain"
    updateTypeNum="0"
    name="USDFON=MBBGM"
    serviceId="59"
    serviceName="DTS_KL"
    Payload dataType="FieldList"
    FieldList
    FieldEntry fid="16" name="TRADE_DATE" dataType="Date" value="09 MAY 2025"
    FieldEntry fid="5" name="TIMACT" dataType="Time" value="00:44:00:000:000:000"
    FieldEntry fid="22" name="BID" dataType="Real" value="4"
    FieldEntry fid="25" name="ASK" dataType="Real" value="4"
    FieldListEnd

    PayloadEnd
    UpdateMsgEnd
  • wasin.w
    wasin.w admin

    Hello @Deary

    I can help with the 2nd question. You can use the ExponentNeg2Enum to send OmmReal with power of -2.

    Example Code:

    .addReal(22, 421, OmmReal::ExponentNeg2Enum )
    

    There is a document on <RTSDK C/C++ package>/Cpp-C/Ema/Docs/refman/ about each real number magnitude types:

    MagnitudeType.png
  • Jirapongse
    Jirapongse ✭✭✭✭✭

    For the first question, it is fine. The login handle will remain valid as long as the application is still connected to the server.

  • Deary
    Deary Newcomer

    Thank you all for your response. Now i am clear on DTS ric publishing.

    I do have another project that requires publishing data to refinitive directly not via DTS.

    this ric is created by refinitive as private ric "GBPSW=MBJK" that is only visible to us.

    What should be the service name here? since it is no longer "DTS_KL", i tried IDN_SELECTFEED, it does not work.

    Connector::ConnectionStateListener _connectionStateListener;
    //OmmProvider provider(OmmNiProviderConfig().host("sgplpapp01trp.mbb.com.sg:14003").username("drdwhw"));
    OmmProvider provider(OmmNiProviderConfig("c:\temp\process\x64\EMAConfig.xml").username("drdwhw"), _connectionStateListener);
    UInt64 serviceId = 1000;
    UInt64 sourceDirectoryHandle = 1;
    UInt64 ibmHandle = 5;
    RefreshMsg refresh;
    UpdateMsg update;
    FieldList fieldList;

    		provider.submit(
    refresh.clear().domainType(MMT_MARKET_PRICE).serviceName("IDN_SELECTFEED").name("GBPSWD=MBJK").state(
    OmmState::OpenEnum,
    OmmState::OkEnum,
    OmmState::NoneEnum,
    "UnSolicited Refresh Completed")
    .payload(
    fieldList
    .addReal(22, 4.21, OmmReal::ExponentNeg2Enum) //BID_1 - 23, BID - 22
    .addReal(25, 4.22, OmmReal::ExponentNeg2Enum) //ASK_1 - 26, ASK - 25
    .complete()).complete(), ibmHandle);

    sleep(1000);


    for (Int32 i = 0; i < 60; i++)
    {

    provider.submit(UpdateMsg().serviceName("IDN_SELECTFEED").name("GBPSWD=MBJK")

    .payload(FieldList()

    .addReal(22, 5.5 + i, OmmReal::ExponentNeg2Enum)

    .addReal(25, 10.9 + i, OmmReal::ExponentNeg2Enum)

    .complete()), ibmHandle);

    provider.dispatch(1000);

    }
  • Jirapongse
    Jirapongse ✭✭✭✭✭

    @Deary

    I found that GBPSW=MBJK is a RIC on our Real-Time network.

    To contribute data to this RIC, you need to have Contribution Channel. I am not sure how you will connect to the Contribution Channel. You can connect to it directly and configure RTDS to connect to the Contribution Channel.

    For more information, please refer to this Contributing your data to Refinitiv article.

    Please post a new question for this new topic.

  • Deary
    Deary Newcomer

    Thank you, I will raise a new question on this new topic