Discover Refinitiv
MyRefinitiv Refinitiv Perspectives Careers
Created with Sketch.
All APIs Questions & Answers  Register |  Login
Ask a question
  • Questions
  • Tags
  • Badges
  • Unanswered
Search:
  • Home /
  • Elektron /
  • ETA /
avatar image
Question by rinki121 · Apr 27 at 01:35 PM · javaeta-apireactor

Failed to receive notification on ADH unavailability when connection is active using java reactor api

I am connecting to reuters using java reactor api for consuming market data, and have supplied two hosts using below piece of code :

 dispatchOptions.maxMessages(1);
        connectOptions.reconnectAttemptLimit(trepConnectionConfig.connectMaxRetries()); // attempt to recover forever
        connectOptions.reconnectMinDelay(trepConnectionConfig.connectMinDelayInSec()); // 1 second minimum
        connectOptions.reconnectMaxDelay(trepConnectionConfig.connectMaxDelayInSec()); // 60 second maximum

        this.connectOptions.connectionList().add(connectInfo);

        final ConnectOptions primaryConnectOptions = this.connectOptions.connectionList().get(0).connectOptions();
        primaryConnectOptions.majorVersion(Codec.majorVersion());
        primaryConnectOptions.minorVersion(Codec.minorVersion());
        primaryConnectOptions.connectionType(ConnectionTypes.SOCKET);
        primaryConnectOptions.unifiedNetworkInfo().address(trepConnectionConfig.host());
        primaryConnectOptions.unifiedNetworkInfo().serviceName(trepConnectionConfig.port());
        primaryConnectOptions.guaranteedOutputBuffers(trepConnectionConfig.buffers());


        if (StringUtils.isNotBlank(trepConnectionConfig.backupHost()) && StringUtils.isNotBlank(trepConnectionConfig.backupPort())) {
            this.connectOptions.connectionList().add(backUpConnectInfo);
            final ConnectOptions backUpConnectOptions = this.connectOptions.connectionList().get(1).connectOptions();
            backUpConnectOptions.majorVersion(Codec.majorVersion());
            backUpConnectOptions.minorVersion(Codec.minorVersion());
            backUpConnectOptions.connectionType(ConnectionTypes.SOCKET);
            backUpConnectOptions.unifiedNetworkInfo().address(trepConnectionConfig.backupHost());
            backUpConnectOptions.unifiedNetworkInfo().serviceName(trepConnectionConfig.backupPort());
            backUpConnectOptions.guaranteedOutputBuffers(trepConnectionConfig.buffers());
        }

1. Connect to reuters - both ADH and ADS are up

2. send valid subscription

3. App is able to receive data

4.While doing test with connectivity team, stop only ADH server - which results in unavailability of IDN_SELECTFEED service.

5. Java consumer APP should have received service unavailability or some kind of notification, however app did not receive any notification.


Could someone please assist here as this is blocking onboarding of application.

People who like this

0 Show 0
Comment
10 |1500 characters needed characters left characters exceeded
▼
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
  • Advanced visibility
Viewable by all users

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

2 Replies

  • Sort: 
avatar image
REFINITIV
Best Answer
Answer by Jirapongse · Apr 29 at 05:23 AM

@rinki121

When the service is down, the application will receive the group stale update in the source directory update message which indicates all items in this group are suspect (stale). The application can fan out this message to all items in the group to indicate that the items are stale.

<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0xC0 (DO_NOT_CACHE|DO_NOT_CONFLATE)" updateType="0" dataSize="116">
    <dataBody>
        <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
            <mapEntry flags="0x00" action="UPDATE" key="4594" >
                <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                    <filterEntry id="3" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="Group" dataType="BUFFER" data="1"/>
                            <elementEntry name="Status" dataType="STATE" State: Open/Suspect/None - text: "A23: Service has gone down. Will recall when service becomes available."/>
                        </elementList>
                    </filterEntry>
                </filterList>
            </mapEntry>
        </map>
    </dataBody>
</UPDATE>


Received Source Directory Update
DirectoryUpdate: 
    streamId: 2
    filter: 
    Service:
        serviceId: 4594
[        GroupFilter:
            group: 0000: 00 01                                              ..
            status: State: Open/Suspect/None - text: "A23: Service has gone down. Will recall when service becomes available."
]

However, the item streams are still opened so the application doesn't need to resubscribe to items.

When the service is up, the application will get a source directly update message indicating that the service is up.

<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0x80 (DO_NOT_CONFLATE)" updateType="0" dataSize="291">
    <dataBody>
        <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
            <mapEntry flags="0x00" action="UPDATE" key="4594" >
                <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                    <filterEntry id="1" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="Name" dataType="ASCII_STRING" data="ELEKTRON_DD"/>
                            <elementEntry name="SupportsQoSRange" dataType="UINT" data="0"/>
                            <elementEntry name="Capabilities" dataType="ARRAY">
                                <array itemLength="1" primitiveType="UINT">
                                    <arrayEntry data="5"/>
                                    <arrayEntry data="6"/>
                                    ...
                                </array>
                            </elementEntry>
                            <elementEntry name="QoS" dataType="ARRAY">
                                <array itemLength="0" primitiveType="QOS">
                                    <arrayEntry Qos: Realtime/TickByTick/Static - timeInfo: 0 - rateInfo: 0/>
                                </array>
                            </elementEntry>
                            <elementEntry name="DictionariesProvided" dataType="ARRAY">
                                <array itemLength="0" primitiveType="ASCII_STRING">
                                    <arrayEntry data="RWFFld"/>
                                    <arrayEntry data="RWFEnum"/>
                                </array>
                            </elementEntry>
                            <elementEntry name="DictionariesUsed" dataType="ARRAY">
                                <array itemLength="0" primitiveType="ASCII_STRING">
                                    <arrayEntry data="RWFFld"/>
                                    <arrayEntry data="RWFEnum"/>
                                </array>
                            </elementEntry>
                            <elementEntry name="Vendor" dataType="ASCII_STRING" data="Thomson Reuters"/>
                            <elementEntry name="AcceptingConsumerStatus" dataType="UINT" data="1"/>
                        </elementList>
                    </filterEntry>
                    <filterEntry id="2" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="ServiceState" dataType="UINT" data="1"/>
                            <elementEntry name="AcceptingRequests" dataType="UINT" data="1"/>
                        </elementList>
                    </filterEntry>
                </filterList>
            </mapEntry>
        </map>
    </dataBody>
</UPDATE>


Received Source Directory Update
DirectoryUpdate: 
    streamId: 2
    filter: 
    Service:
        serviceId: 4594
        InfoFilter:
            serviceName: ELEKTRON_DD
            vendor: Thomson Reuters
            isSource: 0
            supportsQosRange: 0
            supportsOutOfBandSnapshots: 1
            acceptingConsumerStatus: 1
            capabilities: [5, 6, 7, 8, 9, 10, 11, 13, 14, 18, 19, 20, 21, 23, 24, 25, 26, 28, 29, 33, 34, 35, 126]
            dictionariesProvided: [RWFFld, RWFEnum]
            dictionariesUsed: [RWFFld, RWFEnum]
            qos: [Qos: Realtime/TickByTick/Static - timeInfo: 0 - rateInfo: 0]
        StateFilter:
            ServiceState: 1
            AcceptingRequests: 1


Received serviceName: ELEKTRON_DD

Then, the application will retrieve refresh messages of subscribed items.

<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<REFRESH domainType="MARKET_PRICE" streamId="5" containerType="FIELD_LIST" flags="0x1DA (HAS_PERM_DATA|HAS_MSG_KEY|HAS_SEQ_NUM|REFRESH_COMPLETE|HAS_QOS|CLEAR_CACHE)" groupId="1" seqNum="15966" permData="0311 F252 6C" Qos: Realtime/TickByTick/Static - timeInfo: 0 - rateInfo: 0 State: Open/Ok/None - text: "All is well" dataSize="2093">
    <key flags="0x07 (HAS_SERVICE_ID|HAS_NAME|HAS_NAME_TYPE)" serviceId="4594" name="JPY=" nameType="1"/>
    <dataBody>
        <fieldList flags="0x09 (HAS_FIELD_LIST_INFO|HAS_STANDARD_DATA)" fieldListNum="99" dictionaryId="1">
            <fieldEntry fieldId="1" data="020E"/>
            <fieldEntry fieldId="2" data="99"/>
            <fieldEntry fieldId="3" data="5341 4E54 414E 4445 5220 2020 2048 4B47"/>
            <fieldEntry fieldId="5" data="0407"/>
...
JPY=
DOMAIN: MARKET_PRICE
State: Open/Ok/None - text: "All is well"
    1/PROD_PERM: 526
    2/RDNDISPLAY: 153
    3/DSPLY_NAME: SANTANDER    HKG
    5/TIMACT: 04:07:00:000:000:000
    11/NETCHNG_1: -0.19

The refresh message has the groupId and data state which is Ok.

For more information about Item Group, please refer to the 13.4 Item Group Use section in the ETA Developers Guide.

Next, please see the answers to the questions.

1. When ads service state is changed from UP-->down, Will consumer receives Status message or update messages?

The consumer will get the source directory update messages that indicate the group and service are down. This is the order of the source directory update messages.

<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0xC0 (DO_NOT_CACHE|DO_NOT_CONFLATE)" updateType="0" dataSize="116">
    <dataBody>
        <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
            <mapEntry flags="0x00" action="UPDATE" key="4594" >
                <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                    <filterEntry id="3" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="Group" dataType="BUFFER" data="1"/>
                            <elementEntry name="Status" dataType="STATE" State: Open/Suspect/None - text: "A23: Service has gone down. Will recall when service becomes available."/>
                        </elementList>
                    </filterEntry>
                </filterList>
            </mapEntry>
        </map>
    </dataBody>
</UPDATE>
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0xC0 (DO_NOT_CACHE|DO_NOT_CONFLATE)" updateType="0" dataSize="51">
    <dataBody>
        <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
            <mapEntry flags="0x00" action="UPDATE" key="4594" >
                <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                    <filterEntry id="3" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="Group" dataType="BUFFER" data="1"/>
                            <elementEntry name="MergedToGroup" dataType="BUFFER" data="0"/>
                        </elementList>
                    </filterEntry>
                </filterList>
            </mapEntry>
        </map>
    </dataBody>
</UPDATE>


<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<UPDATE domainType="SOURCE" streamId="2" containerType="MAP" flags="0x80 (DO_NOT_CONFLATE)" updateType="0" dataSize="72">
    <dataBody>
        <map flags="0x00" countHint="0" keyPrimitiveType="UINT" containerType="FILTER_LIST" >
            <mapEntry flags="0x00" action="UPDATE" key="4594" >
                <filterList containerType="ELEMENT_LIST" countHint="0" flags="0x00">
                    <filterEntry id="2" action="SET" flags="0x00" containerType="ELEMENT_LIST">
                        <elementList flags="0x08 (HAS_STANDARD_DATA)">
                            <elementEntry name="ServiceState" dataType="UINT" data="0"/>
                            <elementEntry name="AcceptingRequests" dataType="UINT" data="1"/>
                            <elementEntry name="Status" dataType="STATE" State: Open/Suspect/None - text: ""/>
                        </elementList>
                    </filterEntry>
                </filterList>
            </mapEntry>
        </map>
    </dataBody>
</UPDATE>


2. If we are not using watch list, do we need to re-subscribe for existing subscription when service becomes available again?

If you use the SingleOpen (1) and AllowSuspectData (1) which is the default behavior, the application doesn't need to re-subscribe to items because the item streams are still open and ADS will recover the items on behalf of the application.

However, if you use the SingleOpen (0) and AllowSuspectData (0), the item streams will be Closed, Recoverable and the application needs to resubscribe to items. For more information regarding SingleOpen and AllowSuspectData, please refer to the 13.5 Single Open and Allow Suspect Data Behavior section in the ETA Developer Guide.


3. Will we receive any chanel Up or FD change etc events when service becomes available again?

In this case, the service is down but the channel remains connected. Therefore, the application will not receive the channel up or FD change.

3. When ads service state is changed from down ->up, what would be message flow state for directory updates i.e. will we receive refresh again ?

When the service is up, the application will receive the source directory update message that indicates that the service is up, as mentioned above.

Comment

People who like this

0 Show 0 · Share
10 |1500 characters needed characters left characters exceeded
▼
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
  • Advanced visibility
Viewable by all users

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

avatar image
REFINITIV
Answer by Jirapongse · Apr 28 at 04:21 AM

@rinki121

I tested with the com.refinitiv.eta.valueadd.examples.consumer example and found that if I stop ADS, the example will get the Source Directory Update that indicates that the services provided by ADH are down, as shown below.

Received Source Directory Update
DirectoryUpdate: 
    streamId: 2
    filter: 
    Service:
        serviceId: 4594
[        GroupFilter:
            group: 0000: 00 01                                              ..
            status: State: Open/Suspect/None - text: "A23: Service has gone down. Will recall when service becomes available."
]
Received Source Directory Update
DirectoryUpdate: 
    streamId: 2
    filter: 
    Service:
        serviceId: 4594
[        GroupFilter:
            group: 0000: 00 01                                              ..
            mergedToGroup: 0000: 00 00                                              ..
]
Received Source Directory Update
DirectoryUpdate: 
    streamId: 2
    filter: 
    Service:
        serviceId: 4594
        StateFilter:
            ServiceState: 0
            AcceptingRequests: 1
            State: Open/Suspect/None - text: "null"

Please try to run the com.refinitiv.eta.valueadd.examples.consumer example to verify if you see this behavior. You can run the example with the -x option to verify the messages retrieved by the API.

-c <hostname>:<port> <service name> mp:JPY= -x
Comment

People who like this

0 Show 1 · Share
10 |1500 characters needed characters left characters exceeded
▼
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
  • Advanced visibility
Viewable by all users

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

avatar image
rinki121 · Apr 28 at 10:41 AM 0
Share

@Jirapongse What is the exact flow of events if service becomes unavailable ?


1. When ads service state is changed from UP-->down, Will consumer receives

Status message or update messages ?

If both of them are received, what would be exact sequence and what could be value for status

2. If we are not using watch list, do we need to re-subscribe for existing subscription when service becomes available again ?

3. Will we receive any chanel Up or FD change etc events when service becomes available again?

3. When ads service state is changed from down ->up, what would be message flow state for directory updates i.e. will we receive refresh again ?




Watch this question

Add to watch list
Add to your watch list to receive emailed updates for this question. Too many emails? Change your settings >
9 People are following this question.

Related Questions

Java ETA Reactor Programming model for managing RCC Channel A & B hosts

Why SEQNUM is in order but QUOTIM_MS is not?

Electron Java SDK SocketChannel.read returned -1 (end-of-stream)

Reactor premature CHANNEL_READY on reconnect

Elektron - Source dictionary service down

  • Copyright
  • Cookie Policy
  • Privacy Statement
  • Terms of Use
  • Anonymous
  • Sign in
  • Create
  • Ask a question
  • Spaces
  • Alpha
  • App Studio
  • Block Chain
  • Bot Platform
  • Connected Risk APIs
  • DSS
  • Data Fusion
  • Data Model Discovery
  • Datastream
  • Eikon COM
  • Eikon Data APIs
  • Electronic Trading
    • Generic FIX
    • Local Bank Node API
    • Trading API
  • Elektron
    • EMA
    • ETA
    • WebSocket API
  • FX Venues
    • FX Trading – RFQ Maker
  • Intelligent Tagging
  • Legal One
  • Messenger Bot
  • Messenger Side by Side
  • ONESOURCE
    • Indirect Tax
  • Open Calais
  • Open PermID
    • Entity Search
  • Org ID
  • PAM
    • PAM - Logging
  • ProView
  • ProView Internal
  • Product Insight
  • Project Tracking
  • RDMS
  • Refinitiv Data Platform
    • Refinitiv Data Platform Libraries
  • Rose's Space
  • Screening
    • Qual-ID API
    • Screening Deployed
    • Screening Online
    • World-Check One
    • World-Check One Zero Footprint
  • Side by Side Integration API
  • TR Knowledge Graph
  • TREP APIs
    • CAT
    • DACS Station
    • Open DACS
    • RFA
    • UPA
  • TREP Infrastructure
  • TRKD
  • TRTH
  • Thomson One Smart
  • Transactions
    • REDI API
  • Velocity Analytics
  • Wealth Management Web Services
  • Workspace SDK
    • Element Framework
    • Grid
  • World-Check Data File
  • Yield Book Analytics
  • 中文论坛
  • Explore
  • Tags
  • Questions
  • Badges