The main in the code below is simulating a servlet call. I was aiming to connect to the servers once, then create/destroy a client for each call - as I believe that the client only needs to exist for the time of publishing.
When running the client, the latest price is published. However, each response is returning a additional ACKs all corresponding to the last Handle ID published. For example.
- 1st call = 1 ACK and HandleID 1
- 2nd call = 2 ACKs and all Handle ID 2
- 3rd call = 3 ACKs and all Handle ID 3 etc.
However, the ACK ID always the first Handle ID.
I was assuming that the would be some kind of 'unregisterClient' to deal with this but there is none in EMA. What am I not unregistering?
The values are publishing correctly, I am just concerned that there appears to be a leak.
Code is;
public class ReutersPublisherClientStandAloneTest2 implements OmmConsumerClient {
public long bid, ask, postId;
CountDownLatch counterDownLatch = new CountDownLatch(1);
public ReutersPublisherClientStandAloneTest2(long bid, long ask) {
this.bid = bid;
this.ask = ask;
}
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) {
// System.out.println("onRefreshMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + refreshMsg.toString());
if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN &&
refreshMsg.state().streamState() == OmmState.StreamState.OPEN &&
refreshMsg.state().dataState() == OmmState.DataState.OK )
{
// #1
PostMsg postMsg = EmaFactory.createPostMsg();
UpdateMsg updateMessage = EmaFactory.createUpdateMsg();
FieldList nestedFieldList = EmaFactory.createFieldList();
nestedFieldList.add(EmaFactory.createFieldEntry().real(22, bid, OmmReal.MagnitudeType.EXPONENT_NEG_4));
nestedFieldList.add(EmaFactory.createFieldEntry().real(25, ask, OmmReal.MagnitudeType.EXPONENT_NEG_4));
updateMessage.payload(nestedFieldList);
((OmmConsumer) event.closure()).submit(
postMsg
.postId(postId++)
.serviceName(WRITE_SERVICE)
.name(RIC)
.solicitAck(true)
.complete(true)
.payload(updateMessage)
, event.handle()
);
}
// #1 -end
}
public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent event) {
// System.out.println("onUpdateMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + updateMsg.toString());
}
public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event) {
// System.out.println("onStatusMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + statusMsg.toString());
}
public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent event) {
System.out.println("onAckMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + ackMsg.toString());
counterDownLatch.countDown();
}
public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent event){}
public void onAllMsg(Msg msg, OmmConsumerEvent event){}
public static String READ_SERVICE = "IDN_RDF";
public static String WRITE_SERVICE = "MARKETLINK";
public static String RIC = "XXX=YYY";
public static void main(String[] args) throws Exception {
OmmConsumer consumer = EmaFactory.createOmmConsumer(
EmaFactory
.createOmmConsumerConfig("C:/cfg")
.username("user")
);
long bid = 35000, ask = 35100;
for (int i = 0; i < 3; i++) {
System.out.println("Sending ============ " + bid + " " + ask);
ReutersPublisherClientStandAloneTest2 appClient = new ReutersPublisherClientStandAloneTest2(bid += 100, ask += 100); // #2
ReqMsg reqMsg = EmaFactory.createReqMsg();
long handle1 = consumer.registerClient(reqMsg.domainType( EmaRdm.MMT_LOGIN), appClient, consumer);
long handle2 = consumer.registerClient(reqMsg.clear().serviceName(READ_SERVICE).name(RIC).interestAfterRefresh(false), appClient, consumer); // #3
appClient.counterDownLatch.await();
consumer.unregister(handle1); // #5
consumer.unregister(handle2); // #6
Thread.sleep(5000);
}
if (consumer != null) consumer.uninitialize();
}
}