question

Upvotes
Accepted
18 2 2 1

EMA c++ iProvider onPostMsg, how to update values from the post message.

The latest version (2.0) of EMA has fixed the bug where onPostMsg is not being received by the EMA interactive provider.

I now have an update message in the post message matching the update sent by my old SSL sfc consumer. Thanks for fixing that.

Can you help me to use this update message to actually change the values all subscribers see, i.e. I want to do the update.


I've tried doing a submit in the onPostMsg with the UpdateMsg but the values are not changing from a consumers point of view.


What do I need to do in the onPostMsg to complete the update.

elektronrefinitiv-realtimeelektron-sdkema-apirrtelektron-message-apic++
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
Accepted
25.3k 87 12 25

Hi @gproctor

Not 100% sure what you are trying to do but my understanding is as follows:

  • Your Provider application receives some data as a PostMsg from an older system.
  • You have checked the data payload you receive in the PostMsg and have confirmed it looks correct
  • You are then trying to use this data to update a Provider record of the same name on the same service name, using the servicename, recordname, streamID and handle from the PostMsg.

However, looking at your trace, the PostMsg is off-stream i.e. on the Login Handle - so you are trying to publish a provider UpdateMsg for a MarketPrice item on the Login handle.

This is not possible - you can only send Post Msgs on the Login stream handle (or updates/refreshes of the MMT_Login itself e.g. login rejected etc).

If you refer to any of the IProvider examples that we provide you will note that they

  • extract and store the handle from the incoming item request message event (in processMarketPriceRequest)
  • they then use the above handle when they submit the RefreshMsg and UpdateMsg for the item for publication
  • they do NOT specify a streamID in the submitted UpdateMsg or RefreshMsg

Please apply the behaviour of the examples e.g. IProvider\100_Series\100_MP_Streaming\IProvider.cpp to your code.

You should still use the onPostMsg() event handle for sending any AckMsgs as per the 340_MP_Posting example.


icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Thank you very much, works now.

Upvotes
21.8k 57 14 21

Hello @gproctor,

I don't understand the issue. Can you please elaborate by attaching the code sample of what you are trying to achieve. This will let us reproduce the problem and provide guidance. Thanks.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
32.2k 40 11 19

Hello @gproctor,

In addition to the information suggested by @gproctor, so we can reproduce the problem you are facing, it may be helpful to review Interactive Provider MarketPrice 100 example, that comes with RTSDK.

It shows RefreshMsg on request, and then UpdateMsgs, consequently. Perhaps this is what you are looking for, or some of what you are looking to implement.

If that is the case, then getting this example running first, and then building up to your requirements, may be easier, give you a head-start.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
18 2 2 1

I've tried the examples and they work ok. The updates are seen by our old client app.

These updates are always done in the main thread where the provider was created and I'm wondering if that is the issue for me.


I'm trying to submit an update message in the onPostMsg, which receives the update message from our old client. Is this the wrong place to submit an update from? It is a different thread than the provider was created on.


Code to follow. I commented out our logging as it used our logger.

void AppClient::onPostMsg(const refinitiv::ema::access::PostMsg& postMsg, const refinitiv::ema::access::OmmProviderEvent& event)

{

LOG("onPostMsg\n");


string type = (event.getHandle() == _loginStreamHandle ? "off-stream" : "on-stream" ) ;

string post = (postMsg.hasName() ? postMsg.getName() : "n/a") ;

//LOG("Received an %s item post %s, threadId : %d\n", type, post, GetCurrentThreadId());


//LOG("postMsg string\n%s\n", postMsg.toString().c_str());

DATE dateStamp = 0.0;


String service;

String recordStr;


bool processUpdate = true;


if(postMsg.hasServiceName()) {

service = postMsg.getServiceName().c_str();

//LOG("onPostMsg2 service: %s\n", service.str());

}

else {

//LOG("hasServiceName false\n");

service = "TAILORDEV";

}


if(postMsg.hasName()) {

recordStr = postMsg.getName().c_str();

//LOG("onPostMsg2 name: %s\n", recordStr.str());

}

else {

//LOG("hasName false\n");

processUpdate = false;

}


if(processUpdate) {

auto& payload = postMsg.getPayload();


const auto type = payload.getDataType();

//LOG("onPostMsg2 type: %d\n", type);


if (type == DataType::DataTypeEnum::UpdateMsgEnum) {

auto updateMsg = payload.getUpdateMsg();

//LOG("onPostMsg3\n");


if (updateMsg.hasServiceName()) {

service = updateMsg.getServiceName().c_str();

//LOG("onPostMsg4 service: %s\n", service.str());

}


updateMsg.serviceId(postMsg.getServiceId());

//LOG("onPostMsg updateMsg1 string\n%s\n", updateMsg.toString().c_str());

updateMsg.streamId(postMsg.getStreamId());


auto& updatePayload = updateMsg.getPayload();

auto type = updatePayload.getDataType();

//LOG("onPostMsg4, type: %d\n", type);


if (type == DataType::DataTypeEnum::FieldListEnum) {

const auto& fields = updatePayload.getFieldList();


CRecordPtr record = SourceManager::instance().getRecord(service, updateMsg.getName().c_str());

//LOG("onPostMsg10\n");


if (record) {

//LOG("onPostMsg13\n");


static int i = 0;

try {

event.getProvider().submit( UpdateMsg().name(postMsg.getName() ).serviceId(postMsg.getServiceId()).streamId(postMsg.getStreamId()).payload( fields ), event.getHandle() );

}

catch (OmmInvalidUsageException& ex) {

//LOG("onPostMsg13: %s\n", ex.getText().c_str());

}

catch (OmmInvalidHandleException& ex) {

//LOG("onPostMsg13: %s\n", ex.getText().c_str());

}

catch (...) {

//LOG("onPostMsg13 catch ...\n");

}


//LOG("AppClient::onPostMsg, submit\n");


}

}

LOG("onPostMsg8\n");

}

}


if (postMsg.getSolicitAck())

{

AckMsg ackMsg;


ackMsg.domainType(postMsg.getDomainType());

ackMsg.ackId(postMsg.getPostId());

if (postMsg.hasSeqNum())

{

ackMsg.seqNum(postMsg.getSeqNum());

}


event.getProvider().submit(ackMsg, event.getHandle());


LOG("onPostMsg ACK\n");

}

}



Our logs follow.

24/03/2021 16:47:52 onPostMsg

24/03/2021 16:47:52 Received an off-stream item post Record1, threadId : 15948

24/03/2021 16:47:52 postMsg string

PostMsg

streamId="1"

domain="MarketPrice Domain"

Complete

Ack Requested

postId="77075"

publisherIdUserId="1835295090"

publisherIdUserAddress="168884002"

name="Record1"

serviceId="1"

Payload dataType="UpdateMsg"

UpdateMsg

streamId="0"

domain="MarketPrice Domain"

updateTypeNum="0"

name="Record1"

serviceId="22020"

Payload dataType="FieldList"

FieldList

FieldEntry fid="22" name="BID" dataType="Real" value="25.6"

FieldListEnd


PayloadEnd

UpdateMsgEnd


PayloadEnd

PostMsgEnd


24/03/2021 16:47:52 hasServiceName false

24/03/2021 16:47:52 onPostMsg2 name: Record1

24/03/2021 16:47:52 onPostMsg2 type: 258

24/03/2021 16:47:52 onPostMsg3

24/03/2021 16:47:52 onPostMsg updateMsg1 string

UpdateMsg

streamId="0"

domain="MarketPrice Domain"

updateTypeNum="0"

name="Record1"

serviceId="22020"

Payload dataType="FieldList"

FieldList

FieldEntry fid="22" name="BID" dataType="Real" value="25.6"

FieldListEnd


PayloadEnd

UpdateMsgEnd


24/03/2021 16:47:52 onPostMsg4, type: 132

24/03/2021 16:47:52 onPostMsg10

24/03/2021 16:47:52 onPostMsg13

24/03/2021 16:47:52 AppClient::onPostMsg, submit

24/03/2021 16:47:52 onPostMsg8

24/03/2021 16:47:52 onPostMsg ACK



icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Hello @gproctor,

I would think, that only the event of the expected type, in response, can be submitted on the provider of the event, for example MP request -> MP refresh, login request -> login response.

And all unsolicited events, such as various updates, need to be submitted via the main provider created.

In my opinion, it's an interesting idea, to use post like this, for interactive communication between the provider and the consumer, but do not think the implementation supports this type of usage.

Upvotes
18 2 2 1

Thanks for your response.

Our consumer is 10 years old and it updates with this off-stream post message, so we have no choice if we want the new provider to work with the old consumer.


I've tried the update in the onPostMsg method, both with the event provider (as shown in the code snippet) and with our stored main provider created.

They do not update the values.


I've even offloaded the update to a separate worker thread queue of jobs to do this update with the main provider created. Still no update.


None of the examples show a post message update from a consumer. They just show an off-stream post ack in the onPostMsg.


My goal is to apply the update message received by the EMA provider from our old SFC consumer. We get the update message in the onPostMsg ok now with the bug fix applied in the latest EMA version 2.0, but we still need to find a way to apply the update. Does this need to be done on the same thread as the main created provider. This has not been tried yet, as I've tried on the new worker thread and the onPostMsg thread.


My understanding is that consumers should be able to send updates to the provider. In fact our old SFC provider does exactly that with the mentioned old SFC consumer.

Any help will be appreciated.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
18 2 2 1

In all three ways of submitting the update, I use the event handle from the onPostMsg

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
18 2 2 1

This works now.

Storing the handle from the processMarketPriceRequest and using it in the submit of the update in onPostMsg did the job.


It works with both the provider I created and the provider from the event.


Not sure yet if I need to store the handle for each record created by this provider or whether this handle is used for this service or all services on this provider.

I'll do some testing on this.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
25.3k 87 12 25

Hi @gproctor

The handle represents each individual item event stream - and should be unique for each item on each service.

For example, if you want to close an instrument item's stream, you call unregister() with the handle of the item.

One other thing I did not ask earlier (because I did not want to diverge from the issue at hand):
Once you receive the PostMsg, do you intend to alter the data in any way before publishing e.g. amend field values or add fields?

I ask because, if all you want to do is allow a consumer to Post some data to a cache/service so that others within your organisation can consume it - then you can just use a Non-Interactive Cache service. This does not require a Provider application to be developed and maintained. Your Market Data team needs to define the service on the ADH, your consumer(s) can then post to the service via the ADS and other consumers can consume the data. In this older article of mine, I talk about create auto-expiring records (via NIProvider OR PostMsgs)

Learn how to create auto-expiring records on Refinitiv Real-Time | Refinitiv Developers

The auto-expiry is optional and does not have to be configured on the ADH


icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Thanks, interesting.

I'm not sure your suggestions will work for us as we would have to change many more components in our system.


Our system is at least 12 years old, using the old c++ SSL SFC API from Reuters.

It consists of several hundreds of consumers (brokers) who occasionally update values of current items, but mainly consume many items. Also several tens of calculation servers that update lots of current items and also consume items for calculations.

The items are maintained in 20 or so sink-driven sources. These source applications enforce item types, which specify what fields each item type can have. When new items are created, the item type must be specified. This source application also caches all items (typically 20,000 per source instance) to disk for recovery. Also the source applications work in hot standby pairs, synchronizing with each other.


It is this source application that I'm updating the API to EMA. It needs to work with the old consumers, which send the off-stream post message when updating items. We will update them to EMA later.


I'm trying to keep most of the old source application and just update the API to EMA. Seems to be achievable and I'm getting close to being fully functional with using an EMA interactive provider in this source application.

Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.