//************************************************************************ // Copyright (c) 2011-2017 Numerix LLC. All rights reserved. // This software comprises valuable trade secrets and may be used, // copied, transmitted, stored, and distributed only in accordance // with the terms of a written license or trial agreement and with the // inclusion of this copyright notice. //************************************************************************ package com.numerix.market.marketdatatrepapi.impl; import com.numerix.market.marketdatatrepapi.TrepDataRequester; import com.numerix.market.marketdatatrepapi.TrepDataRequesterCommon; import com.numerix.market.marketdatatrepapi.data.MarketQuoteKey; import com.numerix.market.marketdatatrepapi.data.TrepDataConnectionSetting; import com.numerix.market.marketdatatrepapi.datahandler.MarketDataSink; import com.numerix.market.marketdatatrepapi.datarequesters.AppLoginClient; import com.numerix.market.marketdatatrepapi.datarequesters.DictionaryRequester; import com.numerix.market.marketdatatrepapi.datarequesters.DirectoryRequester; import com.numerix.market.marketdatatrepapi.datarequesters.ViewDataRequester; import com.numerix.market.marketdatatrepapi.exceptions.ConnectionException; import com.numerix.market.marketdatatrepapi.exceptions.DirectoryServiceException; import com.numerix.market.marketdatatrepapi.utils.GenericOMMParser; import com.numerix.market.marketdatatrepapi.utils.Utils; import com.reuters.rfa.common.*; import com.reuters.rfa.config.ConfigDb; import com.reuters.rfa.omm.OMMEncoder; import com.reuters.rfa.omm.OMMMsg; import com.reuters.rfa.omm.OMMPool; import com.reuters.rfa.omm.OMMTypes; import com.reuters.rfa.session.Session; import com.reuters.rfa.session.omm.OMMConnectionIntSpec; import com.reuters.rfa.session.omm.OMMConsumer; import com.reuters.rfa.session.omm.OMMItemEvent; import java.net.ConnectException; import java.net.InetAddress; import java.util.*; public class TrepDataRequesterImpl implements TrepDataRequester, TrepDataRequesterCommon { private final TrepDataConnectionSetting trepDataConnectionSetting; private final MarketDataSink marketDataSink; // RFA objects private Session session = null; private EventQueue eventQueue = null; private OMMConsumer ommConsumer = null; private AppLoginClient loginClient = null; private ViewDataRequester viewDataRequester = null; private DirectoryRequester directoryRequester = null; private OMMEncoder encoder = null; private OMMPool pool = null; private final ConfigDb configDb = new ConfigDb(); private String position = "1.1.1.1/net"; // DACS position for login private Handle connIntSpecHandle; private List> resultData = null; private static final int RUN_TIME = 600; private static final int DISPATCHER_WAIT_MILLIS = 3000; private int connectionStatusCount = 0; private boolean loginSuccessFul = false; public TrepDataRequesterImpl(TrepDataConnectionSetting trepDataConnectionSetting, MarketDataSink marketDataSink) { this.trepDataConnectionSetting = trepDataConnectionSetting; this.marketDataSink = marketDataSink; } @Override public ConfigDb getConfigDb() { return configDb; } @Override public EventQueue getEventQueue() { return eventQueue; } @Override public OMMConsumer getOMMConsumer() { return ommConsumer; } @Override public OMMEncoder getEncoder() { return encoder; } public OMMPool getPool() { return pool; } @Override public Handle getLoginHandle() { if (loginClient != null) { return loginClient.getHandle(); } return null; } @Override public void startDataService() throws ConnectException { initializeConfigDb(configDb); Context.initialize(configDb); String appSession = configDb.variable("", "session"); session = Session.acquire(appSession); if(session == null) { throw new ConnectException("Failed to acquire TREP application session"); } initialize(); run(10); } /** * Process incoming events based on the event type. Events of type. */ @Override public void processEvent(Event event) { if (event.getType() == Event.COMPLETION_EVENT) { viewDataRequester.getItemHandles().remove(event.getHandle()); // update the handle list return; } else if (event.getType() == Event.TIMER_EVENT) { return; } if (event.getType() != Event.OMM_ITEM_EVENT) { cleanUp(true); return; } processOtherEvents(event); } @Override public void stopDataService() { cleanUp(true); } @Override public List> requestSnapshotData(List marketQuoteKeys) { resultData = new ArrayList<>(); List ricCodesList = Utils.getRicCodesFromMarketQuoteKeys(marketQuoteKeys); List fields = Utils.getFieldsFromMarketQuoteKeys(marketQuoteKeys); viewDataRequester = new ViewDataRequester(this); viewDataRequester.sendRequest(ricCodesList, fields, "snapshot"); run(RUN_TIME); return Utils.filterResult(fields, resultData); } @Override public void requestStreamingData(List marketQuoteKeys, boolean reissue, boolean resubscribe) { List ricCodesList = Utils.getRicCodesFromMarketQuoteKeys(marketQuoteKeys); List fields = Utils.getFieldsFromMarketQuoteKeys(marketQuoteKeys); if (reissue) { if(viewDataRequester != null) viewDataRequester.sendReissue(); } else { if(resubscribe) { cleanUpViewDataRequester(); } viewDataRequester = new ViewDataRequester(this); viewDataRequester.sendRequest(ricCodesList, fields, "streaming"); } run(RUN_TIME); } @Override public void processLogin(boolean success) throws ConnectionException { if(success) { directoryRequester = new DirectoryRequester(this); // Send directory request of directory info. Send request only once, // as we need this to download Dictionary from server directoryRequester.sendRequest(); loginSuccessFul = true; } else { cleanUp(false); throw new ConnectionException("TREP Login has been denied / rejected / closed."); } } @Override public void cleanUp(boolean doLoginCleanup) { cleanUpViewDataRequester(); if (connIntSpecHandle != null ) ommConsumer.unregisterClient(connIntSpecHandle); // Unregister login // Allow conditional login unregister. // loginCleanup flag is set to false if cleanup is called after login failure. if (loginClient != null && doLoginCleanup) loginClient.closeRequest(); if (ommConsumer != null) ommConsumer.destroy(); loginSuccessFul = false; connectionStatusCount = 0; eventQueue.deactivate(); session.release(); Context.uninitialize(); } @Override public void processDirectoryInfo() { Iterator>> iter = directoryRequester.getServiceMap().entrySet().iterator(); Map.Entry> entry; String dictionaryName; DictionaryRequester dictionaryRequester; // Download dictionary from the first service that available. if (iter.hasNext()) { entry = iter.next(); String serviceName = entry.getKey(); ArrayList dictionaryNames = entry.getValue(); dictionaryRequester = new DictionaryRequester(this, serviceName); for (Iterator arrayIter = dictionaryNames.iterator(); arrayIter.hasNext(); ) { dictionaryName = arrayIter.next(); dictionaryRequester.openFullDictionary(dictionaryName); } } else { cleanUp(true); throw new DirectoryServiceException("ERROR: No service is available to download data dictionaries"); } } @Override public void initialize() { // Create an Event Queue eventQueue = EventQueue.create("appEventQueue"); // Create a OMMPool pool = OMMPool.create(); // Create an OMMEncoder encoder = pool.acquireEncoder(); encoder.initialize(OMMTypes.MSG, 5000); // Initialize client for login domain loginClient = new AppLoginClient(this); // Create an OMMConsumer event source ommConsumer = (OMMConsumer)session.createEventSource(EventSource.OMM_CONSUMER, "appOMMConsumer", true); // OMMConnectionIntSpec is used to register interest for any OMMConnectionEvents OMMConnectionIntSpec connIntSpec = new OMMConnectionIntSpec(); connIntSpecHandle = ommConsumer.registerClient(eventQueue, connIntSpec, loginClient, null); // Application must send login request first loginClient.sendRequest(); } @Override public void run(int runTime) { // Save start time to measure run time long startTime = System.currentTimeMillis(); while ((System.currentTimeMillis() - startTime) < runTime * DISPATCHER_WAIT_MILLIS) { try { System.out.println("Dispatching the Event Queue"); eventQueue.dispatch(DISPATCHER_WAIT_MILLIS); } catch (DispatchException | RuntimeException de) { return; } if(!loginSuccessFul) { // This is just a trial number as there is no current way to know whether we can connect to TREP and we don't want to run this for OVER! connectionStatusCount++; if (connectionStatusCount > 30) { cleanUp(true); throw new ConnectionException( String.format("Could not connect to TREP, please telnet to check that you can reach TREP service on IP: %s and PORT: %s", trepDataConnectionSetting.getServiceIp(), trepDataConnectionSetting.getServicePort())); } } } } private void processOtherEvents(Event event) { OMMItemEvent ie = (OMMItemEvent)event; OMMMsg respMsg = ie.getMsg(); // Add the batch request item response to the item handle list for proper closing if (viewDataRequester.getNumItemsRequested() != 1 && respMsg.getMsgType() == OMMMsg.MsgType.REFRESH_RESP && respMsg.isSet(OMMMsg.Indication.REFRESH_COMPLETE)) { viewDataRequester.getItemHandles().add(event.getHandle()); } if(respMsg.getPayload() != null) { HashMap resultMap = new HashMap<>(); String ric = respMsg.getAttribInfo().getName(); resultMap.put("RICCODE", ric.trim().toUpperCase()); GenericOMMParser.parseOMMData(respMsg.getPayload(), resultMap); if(viewDataRequester.getRequestType().equalsIgnoreCase("snapshot")) { resultData.add(resultMap); } else { marketDataSink.quotesUpdateFromTrepListener(resultMap); } } // Only applicable to Non-Streaming (i.e., Snapshot) request as we // want to know when the data extraction is completed if(event.isEventStreamClosed()) { viewDataRequester.decRicCodeCount(); } if(viewDataRequester.getRicCodeCount() == 0) { cleanUp(true); } } private void cleanUpViewDataRequester() { // Unregister the reissue timer (if applicable) if(viewDataRequester != null) { viewDataRequester.unregisterReissueTimer(); viewDataRequester.closeRequest(); } } private void initializeConfigDb(ConfigDb configDb) { // Use default position if we can't get local IP address try { position = String.format("%s/%s", InetAddress.getLocalHost().getHostAddress(), InetAddress.getLocalHost().getHostName()); } catch (Exception e) { // Suppress } String[] namespaceSessions = trepDataConnectionSetting.getSessionName().split("::"); String namespace = namespaceSessions[0]; String sess = namespaceSessions[1]; String conSession = String.format("%s.Sessions.%s.connectionList", namespace, sess); String connections = String.format("%s.Connections.%s", namespace, trepDataConnectionSetting.getConnectionName()); configDb.addVariable(conSession, trepDataConnectionSetting.getConnectionName()); configDb.addVariable(String.format("%s.connectionType", connections), trepDataConnectionSetting.getConnectionType()); configDb.addVariable(String.format("%s.serverList", connections), trepDataConnectionSetting.getServiceIp()); configDb.addVariable(String.format("%s.portNumber", connections), trepDataConnectionSetting.getServicePort()); configDb.addVariable("session", trepDataConnectionSetting.getSessionName()); configDb.addVariable("user", trepDataConnectionSetting.getUserName()); String application = "256"; configDb.addVariable("application", application); configDb.addVariable("position", position); configDb.addVariable("serviceName", trepDataConnectionSetting.getServiceName()); // Service to request configDb.addVariable("sendView", "true"); // Send view request configDb.addVariable("viewType", "2"); // View type: 1 (list of Integer fields) for field list view, 2 (list of field names) for element list view configDb.addVariable("sendReissue", "true"); // Send Reissue requests configDb.addVariable("reissueInterval", "15"); // Interval between Reissues (in seconds) configDb.addVariable("reissueWithPAR", "true"); // Reissue with Pause and Resume, alternating each reissue configDb.addVariable("reissueWithPriority", "true"); //Reissue with different Priority, each reissue configDb.addVariable("initialRequestPaused", "false"); // Initial batch request has PAUSE_REQ indication? configDb.addVariable("mmt", trepDataConnectionSetting.getMessageModelType()); // Message Model Type // Add detailed log info configDb.addVariable(String.format("%s.ipcTraceFlags", connections), "7"); configDb.addVariable(String.format("%s.traceMsgTypes", connections), "ALL"); configDb.addVariable(String.format("%s.traceMsgDomains", connections), "NORMAL"); configDb.addVariable(String.format("%s.mountTrace", connections), "true"); configDb.addVariable(String.format("%s.logFileName", connections), "RFA_RSSL.Log"); } }