Too Many ACKs on Off-Line Posting Consumer

anthony.leon
anthony.leon Explorer

The main in the code below is simulating a servlet call. I was aiming to connect to the servers once, then create/destroy a client for each call - as I believe that the client only needs to exist for the time of publishing.

When running the client, the latest price is published. However, each response is returning a additional ACKs all corresponding to the last Handle ID published. For example.

  • 1st call = 1 ACK and HandleID 1
  • 2nd call = 2 ACKs and all Handle ID 2
  • 3rd call = 3 ACKs and all Handle ID 3 etc.

However, the ACK ID always the first Handle ID.

I was assuming that the would be some kind of 'unregisterClient' to deal with this but there is none in EMA. What am I not unregistering?

The values are publishing correctly, I am just concerned that there appears to be a leak.

Code is;

public class ReutersPublisherClientStandAloneTest2 implements OmmConsumerClient {

    public long bid, ask, postId;
    CountDownLatch counterDownLatch = new CountDownLatch(1);
    
    public ReutersPublisherClientStandAloneTest2(long bid, long ask) {
        this.bid = bid;
        this.ask = ask;
    }


    public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)    {
        
//        System.out.println("onRefreshMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + refreshMsg.toString());
        
        if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN && 
                refreshMsg.state().streamState() == OmmState.StreamState.OPEN &&
                refreshMsg.state().dataState() == OmmState.DataState.OK )
        {
// #1
                PostMsg postMsg = EmaFactory.createPostMsg();
                UpdateMsg updateMessage = EmaFactory.createUpdateMsg();
                FieldList nestedFieldList = EmaFactory.createFieldList();
                
                nestedFieldList.add(EmaFactory.createFieldEntry().real(22, bid, OmmReal.MagnitudeType.EXPONENT_NEG_4));
                nestedFieldList.add(EmaFactory.createFieldEntry().real(25, ask, OmmReal.MagnitudeType.EXPONENT_NEG_4));
                
                updateMessage.payload(nestedFieldList);
                
                ((OmmConsumer) event.closure()).submit( 
                    postMsg
                        .postId(postId++)
                        .serviceName(WRITE_SERVICE)
                        .name(RIC)
                        .solicitAck(true)
                        .complete(true)
                        .payload(updateMessage)
                    , event.handle() 
                );
        }
// #1 -end
    }
        
    public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent event) {
//        System.out.println("onUpdateMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + updateMsg.toString());
    }

    public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event) {
//        System.out.println("onStatusMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + statusMsg.toString());
    }
        
    public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent event)    {
        System.out.println("onAckMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + ackMsg.toString());
        counterDownLatch.countDown();
    }
        
    public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent event){}
    public void onAllMsg(Msg msg, OmmConsumerEvent event){}


    public static String READ_SERVICE = "IDN_RDF";
    public static String WRITE_SERVICE = "MARKETLINK";
    public static String RIC = "XXX=YYY";
        
    public static void main(String[] args) throws Exception    {
        OmmConsumer    consumer = EmaFactory.createOmmConsumer(
            EmaFactory
                .createOmmConsumerConfig("C:/cfg")
                .username("user")
        );

        long bid = 35000, ask = 35100;
        for (int i = 0; i < 3; i++) {
            System.out.println("Sending ============ " + bid + " " + ask);
            ReutersPublisherClientStandAloneTest2 appClient = new ReutersPublisherClientStandAloneTest2(bid += 100, ask += 100); // #2
            ReqMsg reqMsg = EmaFactory.createReqMsg();
            long handle1 = consumer.registerClient(reqMsg.domainType( EmaRdm.MMT_LOGIN), appClient, consumer);
            long handle2 = consumer.registerClient(reqMsg.clear().serviceName(READ_SERVICE).name(RIC).interestAfterRefresh(false), appClient, consumer); // #3
            appClient.counterDownLatch.await();
            consumer.unregister(handle1); // #5
            consumer.unregister(handle2); // #6
            Thread.sleep(5000);
        }

        if (consumer != null) consumer.uninitialize();
    }
    
}

Best Answer

  • umer.nalla
    umer.nalla LSEG
    Answer ✓

    Hi @anthony.leon

    I cannot comment on the additional ACKs without testing - and may require you to create a Support ticket for offline investigation.

    However, one thing I can point out is that your current implementation is not ideal.

    Each time you registerClient for MMT_LOGIN - you are sending a Login request to the server and then you are logging out again when you unregisterClient.

    If your servlet requirement allows it, I would just do the registerClient one time and when the initial Login Refresh is received, I would grab the handle - hold on to it and just use it as and when I want to carry out an off-stream post. Only unregistering the handle when you are finished.

    You don't have to post from within the onRefreshMsg call - as long as you are still logged in, you can use the Login Handle to post. So, you should handle the onStatusMsg for the Login - just in case you get logged out for whatever reason by the server (e.g. disconnect).


Answers