//************************************************************************ // Copyright (c) 2011-2017 Numerix LLC. All rights reserved. // This software comprises valuable trade secrets and may be used, // copied, transmitted, stored, and distributed only in accordance // with the terms of a written license or trial agreement and with the // inclusion of this copyright notice. //************************************************************************ package com.numerix.market.marketdatatrepapi.datarequesters; import com.numerix.market.marketdatatrepapi.TrepDataRequesterCommon; import com.reuters.rfa.common.Handle; import com.reuters.rfa.config.ConfigDb; import com.reuters.rfa.omm.*; import com.reuters.rfa.rdm.RDMInstrument; import com.reuters.rfa.rdm.RDMMsgTypes; import com.reuters.rfa.rdm.RDMUser; import com.reuters.rfa.session.TimerIntSpec; import com.reuters.rfa.session.omm.OMMItemIntSpec; import java.util.ArrayList; import java.util.List; /** * This is a Client class that handles request and response for items in the * following Reuters Domains: * com.reuters.rfa.rdm.RDMMsgTypes.MARKET_PRICE, * com.reuters.rfa.rdm.RDMMsgTypes.MARKET_BY_ORDER, * com.reuters.rfa.rdm.RDMMsgTypes.MARKET_BY_PRICE, * com.reuters.rfa.rdm.RDMMsgTypes.MARKET_MAKER, * com.reuters.rfa.rdm.RDMMsgTypes.SYMBOL_LIST in generic way. * * This class is responsible for the following: * 1. Encoding streaming view and/or batch request message for the specified * model using OMM message. User can request view by passing sendView trep connection settings params * along with other view related parameters e.g. viewType and viewData. * If user is requesting more than one item, request is sent as batch request. * User can request Batch Reissues by passing sendReissue trep connection settings params * along with reissue related parameters e.g. reissueInterval, reissueWithPAR and reissueWithPriority. * 2. Register/subscribe one or multiple messages to RFA * 3. Implement a Client which processes events from an OMMConsumer GenericOMMParser tp parse response messages. * 4. Unregistered all items when the application is not interested anymore. */ public class ViewDataRequester { private List itemHandles; private int numItemsRequested = 0; private final TrepDataRequesterCommon dataRequesterApp; private final ConfigDb configDb; private Handle reissueTimerHandle; private boolean itemsPaused; private final boolean reissueParRequired; private final boolean reissuePriorityRequired; private byte reissuePriorityClass = 1; private byte reissuePriorityCount = 1; private int ricCodeCount; private String requestType = "snapshot"; public ViewDataRequester(TrepDataRequesterCommon dataRequesterApp) { this.dataRequesterApp = dataRequesterApp; configDb = dataRequesterApp.getConfigDb(); itemHandles = new ArrayList<>(); itemsPaused = Boolean.valueOf(configDb.variable("", "initialRequestPaused")); reissueParRequired = Boolean.valueOf(configDb.variable("", "reissueWithPAR")); reissuePriorityRequired = Boolean.valueOf(configDb.variable("", "reissueWithPriority")); } public String getRequestType() { return requestType; } public List getItemHandles() { return itemHandles; } public int getNumItemsRequested() { return numItemsRequested; } public int getRicCodeCount() { return ricCodeCount; } public void decRicCodeCount() { ricCodeCount--; } /** * Encodes streaming request messages and register them to RFA */ public void sendRequest(List ricCodesList, List fields, String messageType) { ricCodeCount = ricCodesList.size(); requestType = messageType; String serviceName = configDb.variable("", "serviceName"); String mmt = configDb.variable("", "mmt"); short capability = RDMMsgTypes.msgModelType(mmt); OMMItemIntSpec ommItemIntSpec = new OMMItemIntSpec(); // Preparing to send item request message OMMPool pool = dataRequesterApp.getPool(); OMMMsg ommmsg = pool.acquireMsg(); ommmsg.setMsgType(OMMMsg.MsgType.REQUEST); ommmsg.setMsgModelType(capability); ommmsg.setPriority((byte)1, 1); int indicationFlags = messageType.equalsIgnoreCase("snapshot") ? OMMMsg.Indication.NONSTREAMING : OMMMsg.Indication.REFRESH; if (itemsPaused) indicationFlags |= OMMMsg.Indication.PAUSE_REQ; // Setting OMMMsg with negotiated version info from login handle if (dataRequesterApp.getLoginHandle() != null) { ommmsg.setAssociatedMetaInfo(dataRequesterApp.getLoginHandle()); } numItemsRequested = ricCodesList.size(); // request without batching if (ricCodesList.size() == 1) { String itemName = ricCodesList.get(0).trim(); ommmsg.setAttribInfo(serviceName, itemName, RDMInstrument.NameType.RIC); // Set the message into interest spec ommItemIntSpec.setMsg(encodeRequestPayLoad(ommmsg, null, fields, indicationFlags)); } else { ommmsg.setAttribInfo(serviceName, null, RDMInstrument.NameType.RIC); // Set the message into interest spec ommItemIntSpec.setMsg(encodeRequestPayLoad(ommmsg, ricCodesList, fields, indicationFlags)); } Handle itemHandle = dataRequesterApp.getOMMConsumer().registerClient(dataRequesterApp.getEventQueue(), ommItemIntSpec, dataRequesterApp, null); itemHandles.add(itemHandle); pool.releaseMsg(ommmsg); if (Boolean.valueOf(configDb.variable("", "sendReissue"))) registerReissueTimer(); } /** * Unregisters/unsubscribes all items individually */ public void closeRequest() { try { if (itemHandles.size() > 0) { dataRequesterApp.getOMMConsumer().unregisterClient(itemHandles, null); itemHandles = null; } } catch (Exception ex) { // Suppress } } /** * Encode view payload into OMMMsg if user has indicated it and view * type is valid. Encode batch request if user has indicated to send request * as batch. This method also sets indication flag into OMMMsg. * * @param ommmsg ommmsg to populate with the view payload * @return OMMMsg with view encoded payload */ private OMMMsg encodeRequestPayLoad(OMMMsg ommmsg, List itemsList, List fieldsList, int indicationFlags) { int estimatedSize = 512; // pre-estimate the size of the itemList needed to initialize encoder if( itemsList != null ) { for (String itemName : itemsList) { estimatedSize = estimatedSize + itemName.length() + 1; //plus overhead } } // Due to encoder limitation, limit to 64000. Remaining items would be encoded in subsequent batch(s) if (estimatedSize > 64000) estimatedSize = 64000; int viewType = Integer.parseInt(configDb.variable("", "viewType")); boolean sendView = Boolean.valueOf(configDb.variable("", "sendView")); boolean sendBatch = itemsList != null && itemsList.size() > 0; // Ask provider to send OMMAttribInfo with update and status messages - indicationFlags |= (requestType.equalsIgnoreCase("snapshot")) ? 0 : OMMMsg.Indication.ATTRIB_INFO_IN_UPDATES; if (sendBatch) indicationFlags |= OMMMsg.Indication.BATCH_REQ; // If view doesn't need to be sent i.e. either user has not indicated it or viewType is not valid // set appropriate indication flag and return the OMMMsg with other data in the payload. if (!sendView || !(viewType == RDMUser.View.FIELD_ID_LIST || viewType == RDMUser.View.ELEMENT_NAME_LIST)) { ommmsg.setIndicationFlags(indicationFlags); if (sendBatch) { OMMEncoder encoder = encodeBatchPayLoad(ommmsg, itemsList, estimatedSize); encoder.encodeAggregateComplete(); // completes the element list return (OMMMsg)encoder.acquireEncodedObject(); } return ommmsg; } // View is valid and needs to be sent. View and Batch payload in element list indicationFlags = indicationFlags | OMMMsg.Indication.VIEW; ommmsg.setIndicationFlags(indicationFlags); OMMEncoder encoder = dataRequesterApp.getEncoder(); encoder.initialize(OMMTypes.MSG, sendBatch ? fieldsList.size() * 5 + estimatedSize + 200 : fieldsList.size() * 5 + 100); encoder.encodeMsgInit(ommmsg, OMMTypes.NO_DATA, OMMTypes.ELEMENT_LIST); encoder.encodeElementListInit(OMMElementList.HAS_STANDARD_DATA, (short)0, (short)0); // View Entries // 1 - ViewType // 2 - Array of field ids or element names if (viewType == RDMUser.View.FIELD_ID_LIST) { encoder.encodeElementEntryInit(RDMUser.View.ViewType, OMMTypes.UINT); encoder.encodeUInt(RDMUser.View.FIELD_ID_LIST); encoder.encodeElementEntryInit(RDMUser.View.ViewData, OMMTypes.ARRAY); // As type for FID is short, size of 2 for array entry is sufficient for FIELD_ID_LIST view data encoder.encodeArrayInit(OMMTypes.INT, 2); for (String fldIdStr : fieldsList) { encoder.encodeArrayEntryInit(); encoder.encodeInt(Integer.parseInt(fldIdStr.trim())); } encoder.encodeAggregateComplete(); // completes the array } else { encoder.encodeElementEntryInit(RDMUser.View.ViewType, OMMTypes.UINT); encoder.encodeUInt(RDMUser.View.ELEMENT_NAME_LIST); encoder.encodeElementEntryInit(RDMUser.View.ViewData, OMMTypes.ARRAY); encoder.encodeArrayInit(OMMTypes.ASCII_STRING, 0); for (String elemName : fieldsList) { encoder.encodeArrayEntryInit(); encoder.encodeString(elemName.trim(), OMMTypes.ASCII_STRING); } encoder.encodeAggregateComplete(); // completes the array } if (sendBatch) { encoder.encodeElementEntryInit(RDMUser.Feature.ItemList, OMMTypes.ARRAY); encoder.encodeArrayInit(OMMTypes.ASCII_STRING, 0); String itemName; while (itemsList.size() > 0) { itemName = itemsList.get(0); if ((itemName.length() + encoder.getEncodedSize()) > estimatedSize) { break; // don't encode any more items. } else { String itemName1 = itemsList.remove(0); encoder.encodeArrayEntryInit(); encoder.encodeString(itemName1, OMMTypes.ASCII_STRING); } } encoder.encodeAggregateComplete(); // completes the array } encoder.encodeAggregateComplete(); // completes the element list return (OMMMsg)encoder.getEncodedObject(); } public void unregisterReissueTimer() { if (reissueTimerHandle != null) { dataRequesterApp.getOMMConsumer().unregisterClient(reissueTimerHandle); reissueTimerHandle = null; } } private OMMEncoder encodeBatchPayLoad(OMMMsg ommmsg, List itemsList, int estimatedSize) { OMMEncoder encoder = dataRequesterApp.getEncoder(); encoder.initialize(OMMTypes.MSG, estimatedSize); encoder.encodeMsgInit(ommmsg, OMMTypes.NO_DATA, OMMTypes.ELEMENT_LIST); encoder.encodeElementListInit(OMMElementList.HAS_STANDARD_DATA, (short)0, (short)0); encoder.encodeElementEntryInit(RDMUser.Feature.ItemList, OMMTypes.ARRAY); encoder.encodeArrayInit(OMMTypes.ASCII_STRING, 0); String itemName; while (itemsList.size() > 0) { itemName = itemsList.get(0); if ((itemName.length() + encoder.getEncodedSize()) > estimatedSize) { break; // don't encode any more items in this payload. } else { String itemName1 = itemsList.remove(0); encoder.encodeArrayEntryInit(); encoder.encodeString(itemName1, OMMTypes.ASCII_STRING); } } encoder.encodeAggregateComplete(); // completes the array return encoder; } /** * Registers a timer to signal when to perform a reissue. */ private void registerReissueTimer() { if (reissueTimerHandle == null) { TimerIntSpec timer = new TimerIntSpec(); int interval = Integer.parseInt(configDb.variable("", "reissueInterval")); if (interval <= 1) interval = 1; timer.setDelay(interval * 1000); timer.setRepeating(true); reissueTimerHandle = dataRequesterApp.getOMMConsumer().registerClient(dataRequesterApp.getEventQueue(), timer, dataRequesterApp, null); } } public void sendReissue() { String serviceName = configDb.variable("", "serviceName"); String mmt = configDb.variable("", "mmt"); short capability = RDMMsgTypes.msgModelType(mmt); OMMItemIntSpec ommItemIntSpec = new OMMItemIntSpec(); // Preparing to send batch reissue message OMMPool pool = dataRequesterApp.getPool(); OMMMsg ommmsg = pool.acquireMsg(); ommmsg.setMsgType(OMMMsg.MsgType.REQUEST); ommmsg.setMsgModelType(capability); int indicationFlags = OMMMsg.Indication.REFRESH; if (reissueParRequired) { if (itemsPaused) { itemsPaused = false; } else { itemsPaused = true; indicationFlags = OMMMsg.Indication.PAUSE_REQ; } } if (reissuePriorityRequired) { // simply increment the priority each time. if (++reissuePriorityCount > Integer.MAX_VALUE) { reissuePriorityCount = 1; if (++reissuePriorityClass > Byte.MAX_VALUE) reissuePriorityClass = 1; } ommmsg.setPriority(reissuePriorityClass, reissuePriorityCount); } // Setting OMMMsg with negotiated version info from login handle if (dataRequesterApp.getLoginHandle() != null) { ommmsg.setAssociatedMetaInfo(dataRequesterApp.getLoginHandle()); } // Note: item name not needed for a batch reissue. ommmsg.setAttribInfo(serviceName, null, RDMInstrument.NameType.RIC); if (Boolean.valueOf(configDb.variable("", "sendView"))) { // Encode view into payload. ommItemIntSpec.setMsg(encodeRequestPayLoad(ommmsg, null, null, indicationFlags)); } else { ommmsg.setIndicationFlags(indicationFlags); ommItemIntSpec.setMsg(ommmsg); } dataRequesterApp.getOMMConsumer().reissueClient(itemHandles, ommItemIntSpec); pool.releaseMsg(ommmsg); } }