How to handle COMPLETION_EVENT while migrating Refinitiv RFA API to EMA API?

dimple.shah
dimple.shah Newcomer

We are currently upgrading our application from RFA API to use EMA API. We referred a similar question in this link:

https://community.developers.refinitiv.com/questions/92298/whats-the-equivalent-of-rfas-completion-event-in-r.html

But we are not able to use the rsslIsFinalState given in the solution here. Please provide a working example to identify the completion event or closing of a stream in any Consumer(given below) implementing OmmConsumerClient.

///*|----------------------------------------------------------------------------------------------------

// *| This source code is provided under the Apache 2.0 license --

// *| and is provided AS IS with no warranty or guarantee of fit for purpose. --

// *| See the project's LICENSE.md for details. --

// *| Copyright (C) 2019 Refinitiv. All rights reserved. --

///*|----------------------------------------------------------------------------------------------------


package com.refinitiv.ema.examples.training.consumer.series100.ex100_MP_Streaming;


import com.refinitiv.ema.access.Msg;

import com.refinitiv.ema.access.AckMsg;

import com.refinitiv.ema.access.GenericMsg;

import com.refinitiv.ema.access.RefreshMsg;

import com.refinitiv.ema.access.ReqMsg;

import com.refinitiv.ema.access.StatusMsg;

import com.refinitiv.ema.access.UpdateMsg;

import com.refinitiv.ema.access.EmaFactory;

import com.refinitiv.ema.access.OmmConsumer;

import com.refinitiv.ema.access.OmmConsumerClient;

import com.refinitiv.ema.access.OmmConsumerConfig;

import com.refinitiv.ema.access.OmmConsumerEvent;

import com.refinitiv.ema.access.OmmException;


class AppClient implements OmmConsumerClient

{

public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)

{

System.out.println(refreshMsg);

}

public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent event)

{

System.out.println(updateMsg);

}


public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event)

{

System.out.println(statusMsg);

}


public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent){}

public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent){}

public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent){}

}


public class Consumer

{

public static void main(String[] args)

{

OmmConsumer consumer = null;

try

{

AppClient appClient = new AppClient();

OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig();

consumer = EmaFactory.createOmmConsumer(config.host("localhost:14002").username("user"));

ReqMsg reqMsg = EmaFactory.createReqMsg();

consumer.registerClient(reqMsg.serviceName("DIRECT_FEED").name("IBM.N"), appClient);

Thread.sleep(60000); // API calls onRefreshMsg(), onUpdateMsg() and onStatusMsg()

}

catch (InterruptedException | OmmException excp)

{

System.out.println(excp.getMessage());

}

finally

{

if (consumer != null) consumer.uninitialize();

}

}

}





Best Answer

  • Jirapongse
    Jirapongse ✭✭✭✭✭
    Answer ✓

    @dimple.shah

    Thank you for reaching out to us.

    The code looks like this:

        public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)
        {
    ...
            
       if (isFinalState(refreshMsg.state())) {
                ...
            }

    ...
        }
        
        public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event) 
        {
     ...
     if (statusMsg.hasState()) {
                System.out.println("Item State: " +statusMsg.state());
                if (isFinalState(statusMsg.state())) {
                    ...
                }
            }

            
    ...
            
        }
        public boolean isFinalState(OmmState state) {
            
            if(state.streamState() == StreamState.CLOSED || 
                    state.streamState() == StreamState.CLOSED_RECOVER ||
                    state.streamState() == StreamState.CLOSED_REDIRECTED ||
                    state.streamState() == StreamState.NON_STREAMING) {
                return true;
            }
            return false;
    }

    The state is available in the Refresh messages and optional in Update messages.

    You need to check the stream state. If the stream state is CLOSED. CLOSED_RECOVER, REDIRECTED, or NON_STREAMING, this means that the stream has been closed.

Answers

Welcome!

It looks like you're new here. Sign in or register to get started.

Welcome!

It looks like you're new here. Sign in or register to get started.