///*|----------------------------------------------------------------------------------------------------
// *|            This source code is provided under the Apache 2.0 license      	--
// *|  and is provided AS IS with no warranty or guarantee of fit for purpose.  --
// *|                See the project's LICENSE.md for details.                  					--
// *|           Copyright Refinitiv 2020. All rights reserved.            		--
///*|----------------------------------------------------------------------------------------------------

import com.refinitiv.ema.access.*;
import com.refinitiv.ema.access.OmmConsumerConfig.OperationModel;
import com.refinitiv.ema.domain.login.Login.LoginReq;
import com.refinitiv.ema.rdm.EmaRdm;

class AppClient implements OmmConsumerClient	{
	private OmmConsumer _ommConsumer;
	private long _tunnelStreamHandle = 0, _subStreamHandle;
	private int _postStreamID;
	private boolean _subItemOpen;
	int _postID = 1;
	int _bid = 340, _ask = 341;
	//private boolean _connectionUp = false;
	int _ackCount=0;
	public boolean readytopost = false;
//	public void setLoginHandle(long loginhandle) {
//		_loginStreamHandle = loginhandle;
	
//	}
	public void  onSourceDirectoyUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent event) {
		System.out.println("-------- On Source Directory Update ----------------");
		System.out.println(_tunnelStreamHandle);
		if(_tunnelStreamHandle == 0) {
			System.out.println("Start Tunnel");
			StartTunnelStream();
		}
		System.out.println("----------------------------------------------------");
	}
	
	public void  onSourceDirectoyRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) {
		System.out.println("-------- On Source Directory Refresh ----------------");
		System.out.println(_tunnelStreamHandle);
		if(_tunnelStreamHandle == 0) {
			System.out.println("Start Tunnel");
			StartTunnelStream();
		}
		System.out.println("----------------------------------------------------");
	}
	public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)	{
		System.out.println("----- Refresh message ----");
			System.out.println(refreshMsg);
		

		
		if(refreshMsg.domainType() == EmaRdm.MMT_DIRECTORY) {
			onSourceDirectoyRefreshMsg( refreshMsg,  event);
		}
			
		System.out.println(event.handle());
		System.out.println(_tunnelStreamHandle);
		System.out.println(_subItemOpen);
		if(_subItemOpen && event.handle() == _subStreamHandle && refreshMsg.domainType() == EmaRdm.MMT_LOGIN && refreshMsg.state().streamState() == OmmState.StreamState.OPEN && refreshMsg.state().dataState() == OmmState.DataState.OK)	{
			// 3. Login accepted, app can post data now
			System.out.println("Login accepted, starting posting...");
			
			_postStreamID = refreshMsg.streamId();
			readytopost = true;
//			for(int i=1;i<=100;i++)
//				postMessage();
		}
		else	{
			System.out.println("Stream not open");
		}
	}

	public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event)	{
		System.out.println("----- Status message ----");
		System.out.println(statusMsg);
//		if(statusMsg.domainType()==EmaRdm.MMT_LOGIN && event.handle()==_loginStreamHandle) {
//			onLoginStatusMsg(statusMsg, event);
//		}
		if(event.handle()==_tunnelStreamHandle && statusMsg.domainType()==EmaRdm.MMT_SYSTEM && statusMsg.state().streamState()==OmmState.StreamState.CLOSED_RECOVER) {
			_tunnelStreamHandle = 0;
			_subItemOpen=false;
		}
		
		if(!_subItemOpen && event.handle() == _tunnelStreamHandle && statusMsg.state().streamState() == OmmState.StreamState.OPEN)	{
			// create a login request message
			ElementList elementList = EmaFactory.createElementList();
			elementList.add(EmaFactory.createElementEntry().ascii("Password", "<RCC Password>"));
			
			ReqMsg rMsg = EmaFactory.createReqMsg()
				.domainType(EmaRdm.MMT_LOGIN)
				.name("<RCC User>")
				.attrib(elementList)
				// .privateStream(true)
				.serviceId(statusMsg.serviceId())
				.streamId(statusMsg.streamId());
			
			System.out.println("Sending client login request...");
			// get events from login substream	
			_subStreamHandle = _ommConsumer.registerClient(rMsg, this, 1, _tunnelStreamHandle);
			_subItemOpen = true;
		}
		
		
	}
	
	
	public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent)	{
		System.out.println("----- Ack message ----");
		System.out.println(ackMsg);
		

	}

	
	public void postMessage()	{

		FieldList nestedFieldList = EmaFactory.createFieldList();
		nestedFieldList.add(EmaFactory.createFieldEntry().real(22, _bid++, OmmReal.MagnitudeType.EXPONENT_NEG_1));
		nestedFieldList.add(EmaFactory.createFieldEntry().real(25, _ask++, OmmReal.MagnitudeType.EXPONENT_NEG_1));
		
		// create an update message for our item
		UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg()
			.streamId(_postStreamID)
			.name("TRCCTEST01")
			.payload(nestedFieldList);
			
		// post this market price message
		_ommConsumer.submit(EmaFactory.createPostMsg()
			.streamId(_postStreamID)
			.postId(_postID++)
			.domainType(EmaRdm.MMT_MARKET_PRICE)
			.solicitAck(true)
			.complete(true)
			.payload(nestedUpdateMsg), _subStreamHandle);
	}

	public void postMessage(String RIC)	{

		System.out.println("----- Post message ----" + RIC);
		FieldList nestedFieldList = EmaFactory.createFieldList();
		nestedFieldList.add(EmaFactory.createFieldEntry().real(393, _bid++, OmmReal.MagnitudeType.EXPONENT_NEG_1));
		nestedFieldList.add(EmaFactory.createFieldEntry().real(275, _ask++, OmmReal.MagnitudeType.EXPONENT_NEG_1));
		nestedFieldList.add(EmaFactory.createFieldEntry().real(6, _ask++, OmmReal.MagnitudeType.EXPONENT_NEG_1));
		nestedFieldList.add(EmaFactory.createFieldEntry().real(19, _ask++, OmmReal.MagnitudeType.EXPONENT_NEG_1));
		nestedFieldList.add(EmaFactory.createFieldEntry().real(12, _ask++, OmmReal.MagnitudeType.EXPONENT_NEG_1));
		nestedFieldList.add(EmaFactory.createFieldEntry().real(13, _ask++, OmmReal.MagnitudeType.EXPONENT_NEG_1));
		nestedFieldList.add(EmaFactory.createFieldEntry().real(11, _ask++, OmmReal.MagnitudeType.EXPONENT_NEG_1));
		nestedFieldList.add(EmaFactory.createFieldEntry().real(56, _ask++, OmmReal.MagnitudeType.EXPONENT_NEG_1));
		nestedFieldList.add(EmaFactory.createFieldEntry().real(997, _ask++, OmmReal.MagnitudeType.EXPONENT_NEG_1));
		
		// create an update message for our item
		UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg()
			.streamId(_postStreamID)
			.name(RIC)
			.payload(nestedFieldList);
			
		// post this market price message
		_ommConsumer.submit(EmaFactory.createPostMsg()
			.streamId(_postStreamID)
			.postId(_postID++)
			.domainType(EmaRdm.MMT_MARKET_PRICE)
			.solicitAck(true)
			.complete(true)
			.payload(nestedUpdateMsg), _subStreamHandle);
	}
	public void setOmmConsumer(OmmConsumer ommConsumer)	{
		_ommConsumer = ommConsumer;
	}
	
	public void setTunnelHandle(long tunnelStreamHandle)	{
		_tunnelStreamHandle = tunnelStreamHandle;
	}

	public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent event)	{
		System.out.println("----- Update message ----");
		if(updateMsg.domainType() == EmaRdm.MMT_DIRECTORY) {
			onSourceDirectoyUpdateMsg( updateMsg,  event);
		}
	}

	public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent)	{
		System.out.println("----- Generic message ----");
	}

	public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent)	{}
	
	private void CloseTunnelStream() {
		//_ommConsumer.unregister(_tunnelStreamHandle);
	}
	private void StartTunnelStream() {
		ClassOfService cos = EmaFactory.createClassOfService()
				.authentication(EmaFactory.createCosAuthentication().type(CosAuthentication.CosAuthenticationType.NOT_REQUIRED))
				.dataIntegrity(EmaFactory.createCosDataIntegrity().type(CosDataIntegrity.CosDataIntegrityType.RELIABLE))
				.flowControl(EmaFactory.createCosFlowControl().type(CosFlowControl.CosFlowControlType.BIDIRECTIONAL).recvWindowSize(1200))
				.guarantee(EmaFactory.createCosGuarantee().type(CosGuarantee.CosGuaranteeType.NONE));

			System.out.println("Starting tunnel stream...");
			// Create a request for a tunnel stream
			TunnelStreamRequest tsr = EmaFactory.createTunnelStreamRequest()
				.classOfService(cos)
				.domainType(EmaRdm.MMT_SYSTEM)
				.name("TUNNEL1")
				.serviceName("DDS_TRCE");
			
			//tsr.guaranteedOutputBuffers(5000);

			// Send the request and register for events from tunnel stream	
			long tunnelStreamHandle = _ommConsumer.registerClient(tsr, this);
			
			setTunnelHandle(tunnelStreamHandle);
	}
}


public class Contributor	{
	public static void main(String[] args)	{
		
		try		{
			System.out.println("Contributing to Refinitiv Contributions Channel");
			AppClient appClient = new AppClient();

			System.out.println("Starting encrypted connection...");
			// Create an OMM consumer
			OmmConsumer consumer = EmaFactory.createOmmConsumer(EmaFactory.createOmmConsumerConfig() );
				//.tunnelingKeyStoreFile("KEYSTORE_FILE_NAME")
				//.tunnelingKeyStorePasswd("KEYSTORE_PASSWORD"));
			
			LoginReq loginReq = EmaFactory.Domain.createLoginReq();
			
			long loginhandle = consumer.registerClient(loginReq.message(), appClient);
			//appClient.setLoginHandle(loginhandle);
			
			ReqMsg reqMsg = EmaFactory.createReqMsg();
			consumer.registerClient(reqMsg.domainType(EmaRdm.MMT_DIRECTORY).serviceName("DDS_TRCE"), appClient);

//			ClassOfService cos = EmaFactory.createClassOfService()
//				.authentication(EmaFactory.createCosAuthentication().type(CosAuthentication.CosAuthenticationType.NOT_REQUIRED))
//				.dataIntegrity(EmaFactory.createCosDataIntegrity().type(CosDataIntegrity.CosDataIntegrityType.RELIABLE))
//				.flowControl(EmaFactory.createCosFlowControl().type(CosFlowControl.CosFlowControlType.BIDIRECTIONAL).recvWindowSize(1200))
//				.guarantee(EmaFactory.createCosGuarantee().type(CosGuarantee.CosGuaranteeType.NONE));
//
//			System.out.println("Starting tunnel stream...");
//			// Create a request for a tunnel stream
//			TunnelStreamRequest tsr = EmaFactory.createTunnelStreamRequest()
//				.classOfService(cos)
//				.domainType(EmaRdm.MMT_SYSTEM)
//				.name("TUNNEL1")
//				.serviceName("DDS_TRCE");
//
//			// Send the request and register for events from tunnel stream	
//			long tunnelStreamHandle = consumer.registerClient(tsr, appClient);
			appClient.setOmmConsumer(consumer);
			//appClient.setTunnelHandle(tunnelStreamHandle);
			while(appClient.readytopost == false)
				Thread.sleep(1000);
			
			String[] ricList = new String[] {
					"TRCCTEST01", "TRCCTEST02", "TRCCTEST03","TRCCTEST04", "TRCCTEST05",
					"TRCCTEST06", "TRCCTEST07", "TRCCTEST08","TRCCTEST09", "TRCCTEST10",
					"TRCCTEST11", "TRCCTEST12", "TRCCTEST13","TRCCTEST14", "TRCCTEST15",
					"TRCCTEST16", "TRCCTEST17", "TRCCTEST18","TRCCTEST19", "TRCCTEST20",
					"TRCCTEST21", "TRCCTEST22", "TRCCTEST23","TRCCTEST24", "TRCCTEST25",
					"TRCCTEST01", "TRCCTEST02", "TRCCTEST03","TRCCTEST04", "TRCCTEST05",
					"TRCCTEST06", "TRCCTEST07", "TRCCTEST08","TRCCTEST09", "TRCCTEST10",
					"TRCCTEST11", "TRCCTEST12", "TRCCTEST13","TRCCTEST14", "TRCCTEST15",
					"TRCCTEST16", "TRCCTEST17", "TRCCTEST18","TRCCTEST19", "TRCCTEST20",
					"TRCCTEST21", "TRCCTEST22", "TRCCTEST23","TRCCTEST24", "TRCCTEST25",
					"TRCCTEST01", "TRCCTEST02", "TRCCTEST03","TRCCTEST04", "TRCCTEST05",
					"TRCCTEST06", "TRCCTEST07", "TRCCTEST08","TRCCTEST09", "TRCCTEST10",
					"TRCCTEST11", "TRCCTEST12", "TRCCTEST13","TRCCTEST14", "TRCCTEST15",
					"TRCCTEST16", "TRCCTEST17", "TRCCTEST18","TRCCTEST19", "TRCCTEST20",
					"TRCCTEST21", "TRCCTEST22", "TRCCTEST23","TRCCTEST24", "TRCCTEST25",
					"TRCCTEST01", "TRCCTEST02", "TRCCTEST03","TRCCTEST04", "TRCCTEST05",
					"TRCCTEST06", "TRCCTEST07", "TRCCTEST08","TRCCTEST09", "TRCCTEST10",
					"TRCCTEST11", "TRCCTEST12", "TRCCTEST13","TRCCTEST14", "TRCCTEST15",
					"TRCCTEST16", "TRCCTEST17", "TRCCTEST18","TRCCTEST19", "TRCCTEST20",
					"TRCCTEST21", "TRCCTEST22", "TRCCTEST23","TRCCTEST24", "TRCCTEST25",
					};
			
			for (int i = 0; i < ricList.length; i++) {
				  
	            appClient.postMessage(ricList[i]);
	            //Thread.sleep(20);
	        }
			while(true)
				Thread.sleep(999999);
			
		} 
		catch (Exception excp)	{
			System.out.println(excp.getMessage());
		} 
	}
}