package com.iggroup.exchange.rfa.provider.domain.handler; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import com.iggroup.exchange.concurrency.util.TwoWayMapper; import com.iggroup.exchange.rfa.provider.domain.ProviderInteractiveCore; import com.iggroup.exchange.rfa.provider.domain.ProviderInteractiveEncoder; import com.iggroup.exchange.rfa.provider.domain.subscription.SubscriptionManager; import com.iggroup.exchange.rfa.provider.integration.instrument.InstrumentService; import com.iggroup.exchange.rfa.provider.integration.subscription.SubscriberService; import com.iggroup.exchange.rfa.provider.v2.integration.symbol.list.SymbolListSubscriberService; import com.iggroup.exchange.rfa.provider.v2.integration.symbol.list.SymbolListSubscriptionManager; import com.reuters.rfa.common.Handle; import com.reuters.rfa.common.Token; import com.reuters.rfa.omm.OMMAttribInfo; import com.reuters.rfa.omm.OMMMsg; import com.reuters.rfa.rdm.RDMMsgTypes; import com.reuters.rfa.session.omm.OMMActiveClientSessionEvent; import com.reuters.rfa.session.omm.OMMClientSessionIntSpec; import com.reuters.rfa.session.omm.OMMInactiveClientSessionEvent; import com.reuters.rfa.session.omm.OMMSolicitedItemEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author hydea * @since 26/06/14 09:02 OMMSubscriptionHandler ... * @deprecated */ public class OMMSubscriptionHandler { private static final Logger LOG = LoggerFactory.getLogger(OMMSubscriptionHandler.class); private final ProviderInteractiveCore providerInteractiveCore; private final Map clients = new HashMap<>(); private static final Object CLOSURE = null; private static final int DEFAULT_MAX_TOKENS_PER_CLIENT_SYMBOL_SUBSCRIPTION = 5; private final String serviceName; private final int maxTokensPerClientSymbolSubscription; private final SubscriberService dispatcher; private final SubscriptionManager marketPriceSubscriptionManager; private final SubscriptionManager marketByOrderSubscriptionManager; private final SymbolListSubscriptionManager symbolListSubscriptionStore; private final SymbolListSubscriberService symbolListSnapshotDispatcher; private final InstrumentService symbolService; private final TwoWayMapper instrumentToSequenceConvertor; private int clientRequestSeq; private final ProviderInteractiveEncoder providerInteractiveEncoder; /** * @param providerInteractiveEncoder * @param serviceName * @param dispatcher * @param marketPriceSubscriptionManager * @param marketByOrderSubscriptionManager * @param instrumentToSequenceConvertor * @param symbolService * @param symbolListSubscriptionStore * @param symbolListSnapshotDispatcher */ public OMMSubscriptionHandler(ProviderInteractiveEncoder providerInteractiveEncoder, ProviderInteractiveCore providerInteractiveCore, String serviceName, SubscriberService dispatcher, SubscriptionManager marketPriceSubscriptionManager, SubscriptionManager marketByOrderSubscriptionManager, TwoWayMapper instrumentToSequenceConvertor, InstrumentService symbolService, SymbolListSubscriptionManager symbolListSubscriptionStore, SymbolListSubscriberService symbolListSnapshotDispatcher) { this(providerInteractiveEncoder, providerInteractiveCore, serviceName, OMMSubscriptionHandler.DEFAULT_MAX_TOKENS_PER_CLIENT_SYMBOL_SUBSCRIPTION, dispatcher, marketPriceSubscriptionManager, marketByOrderSubscriptionManager, instrumentToSequenceConvertor, symbolService, symbolListSubscriptionStore, symbolListSnapshotDispatcher); } public OMMSubscriptionHandler(ProviderInteractiveEncoder providerInteractiveEncoder, ProviderInteractiveCore providerInteractiveCore, String serviceName, int maxTokensPerClientSymbolSubscription, SubscriberService dispatcher, SubscriptionManager marketPriceSubscriptionManager, SubscriptionManager marketByOrderSubscriptionManager, TwoWayMapper instrumentToSequenceConvertor, InstrumentService symbolService, final SymbolListSubscriptionManager symbolListSubscriptionStore, SymbolListSubscriberService symbolListSnapshotDispatcher) { this.providerInteractiveEncoder = providerInteractiveEncoder; this.providerInteractiveCore = providerInteractiveCore; this.providerInteractiveCore.setProcessOMMActiveClientSessionEventConsumer(e -> processOMMActiveClientSessionEvent(e)); this.serviceName = serviceName; this.maxTokensPerClientSymbolSubscription = maxTokensPerClientSymbolSubscription; this.dispatcher = dispatcher; this.marketPriceSubscriptionManager = marketPriceSubscriptionManager; this.marketByOrderSubscriptionManager = marketByOrderSubscriptionManager; this.instrumentToSequenceConvertor = instrumentToSequenceConvertor; this.symbolService = symbolService; this.symbolListSubscriptionStore = symbolListSubscriptionStore; this.symbolListSnapshotDispatcher = symbolListSnapshotDispatcher; LOG.info("Created OMMSubscriptionHandler for serviceName: {}", serviceName); } public void processOMMActiveClientSessionEvent(OMMActiveClientSessionEvent activeClientSessionEvent) { OMMClientSessionIntSpec intSpec = new OMMClientSessionIntSpec(); intSpec.setClientSessionHandle(activeClientSessionEvent.getClientSessionHandle()); MarketDataRegistrationSessionClient client = new MarketDataRegistrationSessionClient(providerInteractiveEncoder, serviceName, activeClientSessionEvent .getClientHostName(), activeClientSessionEvent .getListenerName()); clients.put(providerInteractiveCore.registerClient(intSpec, client, null), client); LOG.info("Accepted client session connection: hostname={}, listener name ={}", activeClientSessionEvent.getClientHostName(), activeClientSessionEvent.getListenerName()); } private class MarketDataRegistrationSessionClient extends DefaultSessionClient { // a client should not subscribe to a symbol more than once unless asking for different QOS // RMDS ADH/source servers should maintain a watchlist and therefore not send a request for a new // stream for each client but rather just request a re-issue to align the client private final Map myMarketPriceSubscriptions = new HashMap<>(); // reverse map for quick lookup private final Map myMarketPriceSubscriptionsReverse = new HashMap<>(); private final Map myMarketByOrderSubscriptions = new HashMap<>(); // reverse map for quick lookup private final Map myMarketByOrderSubscriptionsReverse = new HashMap<>(); public MarketDataRegistrationSessionClient(ProviderInteractiveEncoder providerInteractiveEncoder, String serviceName, String clienthostName, String listenerName) { super(providerInteractiveEncoder, false, clienthostName, listenerName, serviceName); } @Override int submitMsg(Token token, OMMMsg responseMsg, Object closure) { return providerInteractiveCore.submitMsg(token, responseMsg, closure); } @Override void onMarketPriceRequest(final OMMSolicitedItemEvent itemEvent) { onMarketDataRequest(itemEvent, myMarketPriceSubscriptions, myMarketPriceSubscriptionsReverse, marketPriceSubscriptionManager); } @Override void onMarketByOrderRequest(final OMMSolicitedItemEvent itemEvent) { onMarketDataRequest(itemEvent, myMarketByOrderSubscriptions, myMarketByOrderSubscriptionsReverse, marketByOrderSubscriptionManager); } void onMarketDataRequest(final OMMSolicitedItemEvent itemEvent, final Map mySubscriptions, final Map mySubscriptionsReverse, final SubscriptionManager subscriptionManager) { final OMMMsg msg = itemEvent.getMsg(); final Token newToken = itemEvent.getRequestToken(); if (msg.getMsgType() == OMMMsg.MsgType.CLOSE_REQ) { Integer item = mySubscriptionsReverse.remove(newToken); if (item == null) { final OMMAttribInfo attribInfo = msg.getAttribInfo(); item = instrumentToSequenceConvertor.to(attribInfo.getName()); } if (item == null) { LOG.warn("Client ({}) trying an unsubscribe using an unknown token", getClientId()); } Token myExistingToken = mySubscriptions.remove(item); if (myExistingToken == null) { LOG.warn("Client ({}) trying to unsubscribe from '({})' but has no subscriptions for this symbol", getClientId(), item); } else { if (myExistingToken != newToken) { final Map unsub = new HashMap(); unsub.put(item, myExistingToken); doUnsub(unsub, mySubscriptionsReverse, subscriptionManager); LOG.warn("Client ({}) unsubscribed from '({})' using an old token", getClientId(), item); } else { final Map unsub = new HashMap(); unsub.put(item, newToken); doUnsub(unsub, mySubscriptionsReverse, subscriptionManager); LOG.info("Client ({}) unsubscribed from '({})' ", getClientId(), item); } } } else { final OMMAttribInfo attribInfo = msg.getAttribInfo(); final String symbol = attribInfo.getName(); final Integer instrumentSequence = instrumentToSequenceConvertor.to(symbol); if (msg.isSet(OMMMsg.Indication.NONSTREAMING)) { final Token mySymbolSubscriptions = mySubscriptions.remove(instrumentSequence); if (mySymbolSubscriptions != null) { LOG.info("Unsubscribed client {} using non-Streaming market data request re-issued token for {}", super.getClientId(), symbol); } else { LOG.info("Received non-Streaming market data request from client {} for {}", super.getClientId(), symbol); final int theRequestSeq = clientRequestSeq++; subscriptionManager.snapshotOnly(instrumentSequence, newToken, theRequestSeq); dispatcher.snapshotOnly(instrumentSequence, theRequestSeq); } } else { final Token mySymbolSubscriptions = mySubscriptions.get(instrumentSequence); if (mySymbolSubscriptions == null) { if (symbolService.isValid(symbol)) { LOG.info("Subscribing client ({}) to symbol ({}) for domain ({})", super.getClientId(), symbol, RDMMsgTypes.toString(msg.getMsgModelType())); mySubscriptions.put(instrumentSequence, newToken); mySubscriptionsReverse.put(newToken, instrumentSequence); final int theRequestSeq = clientRequestSeq++; subscriptionManager.subscribe(instrumentSequence, newToken, theRequestSeq); dispatcher.subscribe(instrumentSequence, theRequestSeq); } else { LOG.warn("Client ({}) tried to subscribe to unknown symbol ({}). Rejecting", super.getClientId(), symbol); } } else { // we don't support modification of streams, only a REFRESH reissue // if the reissue should be performed on the same Token then Code.ALREADY_OPEN should be sent on the // new and we discard it here if (mySymbolSubscriptions != newToken) { mySubscriptions.put(instrumentSequence, newToken); mySubscriptionsReverse.put(newToken, instrumentSequence); } final int theRequestSeq = clientRequestSeq++; subscriptionManager.resissue(instrumentSequence, newToken, theRequestSeq); dispatcher.resissue(instrumentSequence, theRequestSeq); if (msg.isSet(OMMMsg.Indication.REFRESH)) { LOG.info("Client ({}) already subscribed to ({}). Re-issueing refresh", super.getClientId(), symbol); } else { LOG.warn("Client ({}) already subscribed to ({}). Re-issue without refresh indication unexpected", super.getClientId(), symbol); } } } } } @Override void onSymbolListRequest(OMMSolicitedItemEvent itemEvent) { final OMMMsg reqMsg = itemEvent.getMsg(); final Token reqToken = itemEvent.getRequestToken(); final OMMAttribInfo attribInfo = reqMsg.getAttribInfo(); final String itemName = attribInfo.getName(); LOG.info("Received SymbolList request for item {} client session ({})", itemName, getClientId()); if (reqMsg.getMsgType() == OMMMsg.MsgType.CLOSE_REQ) { unsubscribeSymbolList(itemName, reqToken); } else { if (reqMsg.isSet(OMMMsg.Indication.NONSTREAMING)) { symbolListSnapshotDispatcher.snapshotOnly(getClientId(), itemName, reqToken); } else { symbolListSnapshotDispatcher.subscribe(getClientId(), itemName, reqToken); } } } @Override void onLogout(OMMSolicitedItemEvent itemEvent) { LOG.info("Received logout for client session ({})", getClientId()); clearDownClientSubscriptions(); } @Override void onSessionDisconnection(OMMInactiveClientSessionEvent event) { LOG.info("Received session disconnection for client session ({})", getClientId()); clearDownClientSubscriptions(); } private void clearDownClientSubscriptions() { LOG.info("Clearing down all client subscriptions for client session ({})", getClientId()); doUnsub(myMarketPriceSubscriptions, myMarketPriceSubscriptionsReverse, marketPriceSubscriptionManager); doUnsub(myMarketByOrderSubscriptions, myMarketByOrderSubscriptionsReverse, marketByOrderSubscriptionManager); unsubscribeSymbolList(null, null); } private void unsubscribeSymbolList(String itemName, Token token) { symbolListSubscriptionStore.unsubscribe(getClientId(), itemName, token); } private void doUnsub(final Map unSubscribeTokens, Map reverseLookups, SubscriptionManager subscriptionManager) { final Iterator> entries = unSubscribeTokens.entrySet().iterator(); while (entries.hasNext()) { final Map.Entry next = entries.next(); try { final int theRequestSeq = clientRequestSeq++; subscriptionManager.unsubscribe(next.getKey(), next.getValue(), theRequestSeq); dispatcher.unsubscribe(next.getKey(), theRequestSeq); } catch (Exception ex) { String instrument = null; try { instrument = instrumentToSequenceConvertor.from(next.getKey()); } catch (RuntimeException re) { LOG.warn("Instrument seq {} not found in converter", next.getKey(), re); } String item = instrument == null ? "seq " + next.getKey() : "inst " + instrument; LOG.warn("Problem unsubscribing client {} from {}", super.getClientId(), item); } finally { reverseLookups.remove(next.getValue()); entries.remove(); } } } } }