question

Upvotes
Accepted
1 0 0 2

How to unsubscribe a RIC using EMA API OmmConsumerClient?

Hi Team,

We are currently upgrading our application from RFA API to EMA API and we are facing issues while unsubscribing a RIC in some scenarios. We are using the OmmConsumerClient as explained in the examples. But while unsubscribing a RIC we are still getting updates for it in the form of UpdateMessage.

We request you to please provide a working example to unsubscribe a RIC in Java using this OmmConsumerClient.


class AppClient implements OmmConsumerClient

{

public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)

{

System.out.println(refreshMsg);

}

public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent event)

{

System.out.println(updateMsg);

}


public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event)

{

System.out.println(statusMsg);

}


public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent){}

public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent){}

public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent){}

}


public class Consumer

{

public static void main(String[] args)

{

OmmConsumer consumer = null;

try

{

AppClient appClient = new AppClient();

OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig();

consumer = EmaFactory.createOmmConsumer(config.host("localhost:14002").username("user"));

ReqMsg reqMsg = EmaFactory.createReqMsg();

consumer.registerClient(reqMsg.serviceName("DIRECT_FEED").name("IBM.N"), appClient);

Thread.sleep(60000); // API calls onRefreshMsg(), onUpdateMsg() and onStatusMsg()

}

catch (InterruptedException | OmmException excp)

{

System.out.println(excp.getMessage());

}

finally

{

if (consumer != null) consumer.uninitialize();

}

}

}

#technologyema-apijavarefinitiv-realtime-sdk
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvote
Accepted
27.5k 67 18 14

Hello @dimple.shah

Basically, the OmmConsumer.registerClient() method returns a handle which is a reference to the subscription RIC.

registerclient.png

Once you get a handle, you can use OmmConsumer.unregister(handle) method to unsubscribe that RIC stream.

unregister.png

Example Code:

long handle = consumer.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").name("JPY="), appClient, 0)


Thread.sleep(6000);      // API calls onRefreshMsg(), onUpdateMsg() and onStatusMsg()
System.out.println("Close Item request");
consumer.unregister(handle);
Thread.sleep(60000);     // API calls onRefreshMsg(), onUpdateMsg() and onStatusMsg()

If you are using Batch, the batch request message is always closed by the backend, and then the backend creates individual streams for each of the RICs and allocate a unique handle to each stream. You can get the item subscription handler from the OmmConsumerEvent.handler() method in the OmmConsumerClient class too.

Please see the full example code from the EMA Java ex300_MP_Close example.

Example Code:

class AppClient implements OmmConsumerClient{
    long numOfUpdates = 0;
    OmmConsumer _ommConsumer = null;
    
    void setOmmConsumer(OmmConsumer consumer){
        _ommConsumer = consumer;
    }


    public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event){
        ....
    }
    
    public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent event) {
        System.out.println("Item Name: " + (updateMsg.hasName() ? updateMsg.name() : "<not set>"));
        System.out.println("Service Name: " + (updateMsg.hasServiceName() ? updateMsg.serviceName() : "<not set>"));
        
        if (DataType.DataTypes.FIELD_LIST == updateMsg.payload().dataType())
            decode(updateMsg.payload().fieldList());
        
        System.out.println();
        
        // Close streaming request after receiving 3 update messages
        if (++numOfUpdates == 3){
            System.out.println("Close Item request");
            _ommConsumer.unregister(event.handle());


        }
    }

    public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event) {
        ...
    }
    
    ....

    void decode(FieldList fieldList){
        ....
    }


}
public class Consumer {
   public static void main(String[] args) {
      OmmConsumer consumer = null;
      try{
         AppClient appClient = new AppClient();
         consumer = EmaFactory.createOmmConsumer(EmaFactory.createOmmConsumerConfig().host("localhost:14002").username("user"));

        appClient.setOmmConsumer(consumer);

         ....
      }
   }
}



registerclient.png (30.4 KiB)
unregister.png (9.6 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
1 0 0 2

Hi @wasin.w ,

Thanks for your prompt reply.

We also want to know that do we get any acknowledgement message or event when we call the method consumer.unregister(handle) to unsubscribe any streaming RIC. If we yes, then what is the type of java object and how do we process it?


icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Hello @dimple.shah

Based on my observation, the OmmConsumer.unregister(handle) method does not return any acknowledgements or events back to the application.

Note: If you enable the API trace log (<XmlTraceToStdout value="1"/>), you will see the following log message that the API sends item close request message to the backend.

<CLOSE domainType="MARKET_PRICE" streamId="3" containerType="NO_DATA" flags="0x00" dataSize="0">
    <dataBody>
    </dataBody>
</CLOSE>
Upvotes
1 0 0 2

Hi @wasin.w,
Thanks again for your response.

We also want to know if it is guaranteed that once we call consumer.unregister(handle), then we will stop receiving updates for that item.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
27.5k 67 18 14

Hello @dimple.shah

Once the application calls OmmConsumer.unregister(handle) method with a valid handle, the API sends the close request message for that item to the server, and the server will stop send that item's data to the API.

By default, the application will stop receiving updates for that item immediately.

  • Unless there are some updates in the API queue that not dispatched to the application yet. The API will dispatch those updates to the application until no more updates of that item. However, this is rarely happened.

If you encounter the issue that the application still receiving updates after the OmmConsumer.unregister(handle) method called, I suggest you contact the API support team to verify the API issue.

You can submit the GitHub - Issue to the RTSDK team directly via the https://github.com/Refinitiv/Real-Time-SDK/issues page.

Alternatively, you can contact your colleagues who are the RDC named user to submit a support ticket to the Real-Time APIs support team via the "contact premium support" button on the https://developers.lseg.com/en/api-catalog/refinitiv-real-time-opnsrc/rt-sdk-java page.

rtsdk-contact-support.png




icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.