question

Upvotes
Accepted
1 1 2 5

Failed to receive notification on ADH unavailability when connection is active using java reactor api

I am connecting to reuters using java reactor api for consuming market data, and have supplied two hosts using below piece of code :

 dispatchOptions.maxMessages(1);
        connectOptions.reconnectAttemptLimit(trepConnectionConfig.connectMaxRetries()); // attempt to recover forever
        connectOptions.reconnectMinDelay(trepConnectionConfig.connectMinDelayInSec()); // 1 second minimum
        connectOptions.reconnectMaxDelay(trepConnectionConfig.connectMaxDelayInSec()); // 60 second maximum

        this.connectOptions.connectionList().add(connectInfo);

        final ConnectOptions primaryConnectOptions = this.connectOptions.connectionList().get(0).connectOptions();
        primaryConnectOptions.majorVersion(Codec.majorVersion());
        primaryConnectOptions.minorVersion(Codec.minorVersion());
        primaryConnectOptions.connectionType(ConnectionTypes.SOCKET);
        primaryConnectOptions.unifiedNetworkInfo().address(trepConnectionConfig.host());
        primaryConnectOptions.unifiedNetworkInfo().serviceName(trepConnectionConfig.port());
        primaryConnectOptions.guaranteedOutputBuffers(trepConnectionConfig.buffers());


        if (StringUtils.isNotBlank(trepConnectionConfig.backupHost()) && StringUtils.isNotBlank(trepConnectionConfig.backupPort())) {
            this.connectOptions.connectionList().add(backUpConnectInfo);
            final ConnectOptions backUpConnectOptions = this.connectOptions.connectionList().get(1).connectOptions();
            backUpConnectOptions.majorVersion(Codec.majorVersion());
            backUpConnectOptions.minorVersion(Codec.minorVersion());
            backUpConnectOptions.connectionType(ConnectionTypes.SOCKET);
            backUpConnectOptions.unifiedNetworkInfo().address(trepConnectionConfig.backupHost());
            backUpConnectOptions.unifiedNetworkInfo().serviceName(trepConnectionConfig.backupPort());
            backUpConnectOptions.guaranteedOutputBuffers(trepConnectionConfig.buffers());
        }

1. Connect to reuters - both ADH and ADS are up

2. send valid subscription

3. App is able to receive data

4.While doing test with connectivity team, stop only ADH server - which results in unavailability of IDN_SELECTFEED service.

5. Java consumer APP should have received service unavailability or some kind of notification, however app did not receive any notification.


Could someone please assist here as this is blocking onboarding of application.

javaeta-apireactor
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
Accepted
85.2k 290 53 77

@rinki121

When the service is down, the application will receive the group stale update in the source directory update message which indicates all items in this group are suspect (stale). The application can fan out this message to all items in the group to indicate that the items are stale.

<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0xC0 (DO_NOT_CACHE|DO_NOT_CONFLATE)" updateType="0" dataSize="116">
    <dataBody>
        <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
            <mapEntry flags="0x00" action="UPDATE" key="4594" >
                <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                    <filterEntry id="3" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="Group" dataType="BUFFER" data="1"/>
                            <elementEntry name="Status" dataType="STATE" State: Open/Suspect/None - text: "A23: Service has gone down. Will recall when service becomes available."/>
                        </elementList>
                    </filterEntry>
                </filterList>
            </mapEntry>
        </map>
    </dataBody>
</UPDATE>


Received Source Directory Update
DirectoryUpdate: 
    streamId: 2
    filter: 
    Service:
        serviceId: 4594
[        GroupFilter:
            group: 0000: 00 01                                              ..
            status: State: Open/Suspect/None - text: "A23: Service has gone down. Will recall when service becomes available."
]

However, the item streams are still opened so the application doesn't need to resubscribe to items.

When the service is up, the application will get a source directly update message indicating that the service is up.

<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0x80 (DO_NOT_CONFLATE)" updateType="0" dataSize="291">
    <dataBody>
        <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
            <mapEntry flags="0x00" action="UPDATE" key="4594" >
                <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                    <filterEntry id="1" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="Name" dataType="ASCII_STRING" data="ELEKTRON_DD"/>
                            <elementEntry name="SupportsQoSRange" dataType="UINT" data="0"/>
                            <elementEntry name="Capabilities" dataType="ARRAY">
                                <array itemLength="1" primitiveType="UINT">
                                    <arrayEntry data="5"/>
                                    <arrayEntry data="6"/>
                                    ...
                                </array>
                            </elementEntry>
                            <elementEntry name="QoS" dataType="ARRAY">
                                <array itemLength="0" primitiveType="QOS">
                                    <arrayEntry Qos: Realtime/TickByTick/Static - timeInfo: 0 - rateInfo: 0/>
                                </array>
                            </elementEntry>
                            <elementEntry name="DictionariesProvided" dataType="ARRAY">
                                <array itemLength="0" primitiveType="ASCII_STRING">
                                    <arrayEntry data="RWFFld"/>
                                    <arrayEntry data="RWFEnum"/>
                                </array>
                            </elementEntry>
                            <elementEntry name="DictionariesUsed" dataType="ARRAY">
                                <array itemLength="0" primitiveType="ASCII_STRING">
                                    <arrayEntry data="RWFFld"/>
                                    <arrayEntry data="RWFEnum"/>
                                </array>
                            </elementEntry>
                            <elementEntry name="Vendor" dataType="ASCII_STRING" data="Thomson Reuters"/>
                            <elementEntry name="AcceptingConsumerStatus" dataType="UINT" data="1"/>
                        </elementList>
                    </filterEntry>
                    <filterEntry id="2" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="ServiceState" dataType="UINT" data="1"/>
                            <elementEntry name="AcceptingRequests" dataType="UINT" data="1"/>
                        </elementList>
                    </filterEntry>
                </filterList>
            </mapEntry>
        </map>
    </dataBody>
</UPDATE>


Received Source Directory Update
DirectoryUpdate: 
    streamId: 2
    filter: 
    Service:
        serviceId: 4594
        InfoFilter:
            serviceName: ELEKTRON_DD
            vendor: Thomson Reuters
            isSource: 0
            supportsQosRange: 0
            supportsOutOfBandSnapshots: 1
            acceptingConsumerStatus: 1
            capabilities: [5, 6, 7, 8, 9, 10, 11, 13, 14, 18, 19, 20, 21, 23, 24, 25, 26, 28, 29, 33, 34, 35, 126]
            dictionariesProvided: [RWFFld, RWFEnum]
            dictionariesUsed: [RWFFld, RWFEnum]
            qos: [Qos: Realtime/TickByTick/Static - timeInfo: 0 - rateInfo: 0]
        StateFilter:
            ServiceState: 1
            AcceptingRequests: 1


Received serviceName: ELEKTRON_DD

Then, the application will retrieve refresh messages of subscribed items.

<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<REFRESH domainType="MARKET_PRICE" streamId="5" containerType="FIELD_LIST" flags="0x1DA (HAS_PERM_DATA|HAS_MSG_KEY|HAS_SEQ_NUM|REFRESH_COMPLETE|HAS_QOS|CLEAR_CACHE)" groupId="1" seqNum="15966" permData="0311 F252 6C" Qos: Realtime/TickByTick/Static - timeInfo: 0 - rateInfo: 0 State: Open/Ok/None - text: "All is well" dataSize="2093">
    <key flags="0x07 (HAS_SERVICE_ID|HAS_NAME|HAS_NAME_TYPE)" serviceId="4594" name="JPY=" nameType="1"/>
    <dataBody>
        <fieldList flags="0x09 (HAS_FIELD_LIST_INFO|HAS_STANDARD_DATA)" fieldListNum="99" dictionaryId="1">
            <fieldEntry fieldId="1" data="020E"/>
            <fieldEntry fieldId="2" data="99"/>
            <fieldEntry fieldId="3" data="5341 4E54 414E 4445 5220 2020 2048 4B47"/>
            <fieldEntry fieldId="5" data="0407"/>
...
JPY=
DOMAIN: MARKET_PRICE
State: Open/Ok/None - text: "All is well"
    1/PROD_PERM: 526
    2/RDNDISPLAY: 153
    3/DSPLY_NAME: SANTANDER    HKG
    5/TIMACT: 04:07:00:000:000:000
    11/NETCHNG_1: -0.19

The refresh message has the groupId and data state which is Ok.

For more information about Item Group, please refer to the 13.4 Item Group Use section in the ETA Developers Guide.

Next, please see the answers to the questions.

1. When ads service state is changed from UP-->down, Will consumer receives Status message or update messages?

The consumer will get the source directory update messages that indicate the group and service are down. This is the order of the source directory update messages.

<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0xC0 (DO_NOT_CACHE|DO_NOT_CONFLATE)" updateType="0" dataSize="116">
    <dataBody>
        <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
            <mapEntry flags="0x00" action="UPDATE" key="4594" >
                <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                    <filterEntry id="3" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="Group" dataType="BUFFER" data="1"/>
                            <elementEntry name="Status" dataType="STATE" State: Open/Suspect/None - text: "A23: Service has gone down. Will recall when service becomes available."/>
                        </elementList>
                    </filterEntry>
                </filterList>
            </mapEntry>
        </map>
    </dataBody>
</UPDATE>
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0xC0 (DO_NOT_CACHE|DO_NOT_CONFLATE)" updateType="0" dataSize="51">
    <dataBody>
        <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
            <mapEntry flags="0x00" action="UPDATE" key="4594" >
                <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                    <filterEntry id="3" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="Group" dataType="BUFFER" data="1"/>
                            <elementEntry name="MergedToGroup" dataType="BUFFER" data="0"/>
                        </elementList>
                    </filterEntry>
                </filterList>
            </mapEntry>
        </map>
    </dataBody>
</UPDATE>


<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0x80 (DO_NOT_CONFLATE)" updateType="0" dataSize="72">
    <dataBody>
        <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
            <mapEntry flags="0x00" action="UPDATE" key="4594" >
                <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                    <filterEntry id="2" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="ServiceState" dataType="UINT" data="0"/>
                            <elementEntry name="AcceptingRequests" dataType="UINT" data="1"/>
                            <elementEntry name="Status" dataType="STATE" State: Open/Suspect/None - text: ""/>
                        </elementList>
                    </filterEntry>
                </filterList>
            </mapEntry>
        </map>
    </dataBody>
</UPDATE>


2. If we are not using watch list, do we need to re-subscribe for existing subscription when service becomes available again?

If you use the SingleOpen (1) and AllowSuspectData (1) which is the default behavior, the application doesn't need to re-subscribe to items because the item streams are still open and ADS will recover the items on behalf of the application.

However, if you use the SingleOpen (0) and AllowSuspectData (0), the item streams will be Closed, Recoverable and the application needs to resubscribe to items. For more information regarding SingleOpen and AllowSuspectData, please refer to the 13.5 Single Open and Allow Suspect Data Behavior section in the ETA Developer Guide.


3. Will we receive any chanel Up or FD change etc events when service becomes available again?

In this case, the service is down but the channel remains connected. Therefore, the application will not receive the channel up or FD change.

3. When ads service state is changed from down ->up, what would be message flow state for directory updates i.e. will we receive refresh again ?

When the service is up, the application will receive the source directory update message that indicates that the service is up, as mentioned above.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
85.2k 290 53 77

@rinki121

I tested with the com.refinitiv.eta.valueadd.examples.consumer example and found that if I stop ADS, the example will get the Source Directory Update that indicates that the services provided by ADH are down, as shown below.

Received Source Directory Update
DirectoryUpdate: 
    streamId: 2
    filter: 
    Service:
        serviceId: 4594
[        GroupFilter:
            group: 0000: 00 01                                              ..
            status: State: Open/Suspect/None - text: "A23: Service has gone down. Will recall when service becomes available."
]
Received Source Directory Update
DirectoryUpdate: 
    streamId: 2
    filter: 
    Service:
        serviceId: 4594
[        GroupFilter:
            group: 0000: 00 01                                              ..
            mergedToGroup: 0000: 00 00                                              ..
]
Received Source Directory Update
DirectoryUpdate: 
    streamId: 2
    filter: 
    Service:
        serviceId: 4594
        StateFilter:
            ServiceState: 0
            AcceptingRequests: 1
            State: Open/Suspect/None - text: "null"

Please try to run the com.refinitiv.eta.valueadd.examples.consumer example to verify if you see this behavior. You can run the example with the -x option to verify the messages retrieved by the API.

-c <hostname>:<port> <service name> mp:JPY= -x
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

@Jirapongse What is the exact flow of events if service becomes unavailable ?


1. When ads service state is changed from UP-->down, Will consumer receives

Status message or update messages ?

If both of them are received, what would be exact sequence and what could be value for status

2. If we are not using watch list, do we need to re-subscribe for existing subscription when service becomes available again ?

3. Will we receive any chanel Up or FD change etc events when service becomes available again?

3. When ads service state is changed from down ->up, what would be message flow state for directory updates i.e. will we receive refresh again ?




Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.