question

Upvotes
Accepted
11 12 9 14

Calling ETA Reactor rsslReactorConnect in tight loop to stress test corrupts reactor

To stress test our code we've created a test case that takes all the symbols from a symbol list (about 9500) and attempts to establish a new reactor connection for each of them in turn.

We know that this will be rejected by the edge device given the limit on the number of connections and will most likely fail in the reactor due to open file handle limit.

We wanted to make sure our code could recover from those situations.

What we didn't expect was reactor corruption.

After about 11 calls to connect (all parameters the same), the reactor returns produces a RECONNECTING event with the following:

rsslErrorInfo {
rsslErrorInfoCode = RSSL_EIC_FAILURE
rsslError {
RsslErrorId = -29;
sysError = 0;
text = "(ret = rsslEncodeMsgInit(pEncodeIter, &msg, 0)) == RSSL_RET_ENCODE_MSG_KEY_OPAQUE"
}
errorLocation = "RDM/Impl/rsslRDMLoginMsg.c:53"
}

This happens after the CHANNEL_UP, but before a CHANNEL_READY

After this another ReactorChannel that had also received a Channel_UP event gets an CHANNEL_RECONNECTING, with the error info:

rsslErrorInfo {
rsslErrorInfoCode = RSSL_EIC_FAILURE
rsslError {
RsslErrorId = -1;
sysError = 0;
text = "Unknown login msg type 6116480"
errorLocation = "RDM/Impl/rsslRDMLoginMsg.c:604"
}

This is easily reproducable.

We're using the ElektronSDK 1.0.6

elektronrefinitiv-realtimeelektron-sdkrrteta-apielektron-transport-apiconnectionreactorstress-test
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
Accepted
361 1 4 3

Joe,

rsslReactorConnect does not copy the messages on the role such as the LoginRequest. Try setting a long-lived message and see if that helps. I also believe a cleared RDMLoginRequest will successfully encode (but would still be rejected by the edge).

Regarding the username: rsslInitDefaultRDMLoginRequest() sets the username on the request to the user of the system. Are you calling this function?

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
11 12 9 14

Additionally, another session receives an RDM_MT_STATUS in the LoginCallback stating that the user is not registered.

The user displayed is the name of the user running the process, NOT the user that was set in the RDMLoginRequest. This was verified to be properly populated before calling rsslReactorConnect.

I initially thought that the reactor might be holding onto the local RDMLoginRequest in the stack of the called, so after each call, I clear the contents of the variable in an attempt discover the source of the problem. But this did not cause any additional problems, so I'm assuming the RDMLoginRequest is copied in the rsslReactorConnect function.

Prior to receiving this Status message:

Closed/Suspect/None - text: "Error! User <user> is not registered, a CHANNEL_UP event was delivered on the reactor channel.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
11 12 9 14

Rarely, I receive the expected response:
Closed/Suspect/Login rejected, exceeded maximum number of mounts per user - text: "Exceeded maximum number of mounts per user."

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
11 12 9 14

When it fails, the first error is the -29 / RSSL_RET_ENCODE_MSG_KEY_OPAQUE error.

The subsequent failures reporting unknown login msg types is random.

Subsequent errors on other channels occur as well with:

rsslErrorInfo {
rsslErrorInfoCode = RSSL_EIC_SUCCESS
rsslError {
RsslErrorId = -1;
sysError = 0;
text = "<Impl/ripcsrvr.c:5981> Error: 1006 This connection has received a negative acknowledgement response from the server."

errorLocation = "Reactor/Impl/rsslReactorWorker.c:437"
}

At this point the select() that periodically wakes up, calling dispatch no longer does so.
Further, all sockets have been removed from the fd_set passed to the select call as a result of the CHANNEL_DOWN_RECONNECTING events.

I've run the code using the address sanitizer built into gcc and valgrind to look for memory corruption issues and none were found.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
1.5k 5 6 7

The experienced problems aside I'm not sure I understand the need to test how your application behaves when you create 9500 connections ?? You are almost creating a DoS attack on the server! Perhaps I've misunderstood what you mean.

In real world your application will typically have 1 (one) connection to the endpoint (a ADS, Elektron Edge device, other provider app, etc) and the connection will be longlived. You will then consume all 9500 data item streams via this one connection.

In some cases you might benefit performance wise from multi-plexing a few physical connections to achieve massive throughput. And there's the use case where you are effectively proxying user connections so that you need connections with distinct DACS credentials for each of them. Both of these uses cases are rare, AFAIK.

Perhaps if you can elaborate on the requirement ?

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
11 12 9 14

Yes, normally, there would be many fewer connections.
The test was designed to see how the application handles being rejected by the Elektron server -- the easiest way was to exceed the maximum allowable connections.

After only a bit more than a handful of attempts, the reactor corrupted and was no longer usable. That the reactor itself becomes corrupt is the issue here. I've be enable to get the reactor to respond in the same way, using the incorrect userid (that of the process and not that specified in the RDMLoginRequest) with only two connections from the reactor. I can easily reproduce the behavior with many connections, but it generally takes a handful (around 10)

The server appears to be hardened against what you describe as a DoS, but the reactor code does not appear to be able to handle multiple connections to the server when prior connections have not yet been fully established (UP, then READY) in all cases.

======================================================

Aside from that, from advice I was given at the outset of the project, multiple connections to the server from the application allows for better load handling when the responses return.

As the application is currently written, the reactor will connect for every 10,000 or so symbols to which it subscribes. If this is not the correct behavior and should instead maintain a single connection from the reactor over which all information flows, I can make that change, but it contradicts advice I was given from Reuters at the beginning of development.

Further, guidance for creating an application with many, many subscriptions and multiple threads is lacking in the documentation since the examples I've seen are all single-threaded, single symbol examples.

For example, there is no mention of thread affinity with respect to if it is necessary to perform all requests on a ReactorChannel from the same thread that did the rsslReactorConnect.

Lars@Addicticks

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
11 12 9 14

Aside:

Reviewing the code, I misstated the problem, we are using about 10K securities for the stress test, but those symbols are hashed into one of 16 buckets, each of which has an ReactorChannel created from rsslReactorConnect, so there are not 10K connections attempted.

So, this problem occurs when a single process attempts many fewer connections than 9500 as stated above.

=============================================================

To answer your question about the reactor connection, code fragments from the connect function are:

RsslRet ret;
RsslRDMLoginRequest loginRequest;
if ((ret = rsslInitDefaultRDMLoginRequest(&loginRequest, 1)) != RSSL_RET_SUCCESS) {
LOG_ERROR("init login request failed: " << ret);
return false;
}

// if a username was specified in the configuration file, it will be used now:

setString(&loginRequest.userName, pCfgDat->userName);

Where the setString function is:
bool setString(RsslBuffer *buf, const std::string& src)
{
// set up for type safe comparison
const auto maxLen = std::numeric_limits<decltype(buf->length)>::max();
const std::common_type<decltype(maxLen), decltype(src.length())>::type srcLen = src.length();
if (srcLen > maxLen) { return false; }
// truncation due to type size differences eliminated here due to check above
buf->data = const_cast<decltype(buf->data)>(src.c_str());
buf->length = static_cast<decltype(buf->length)>(src.length());
return true;
}

That loginRequest is later set to be a member in the consumerRole:

RsslReactorOMMConsumerRole consumerRole; rsslClearOMMConsumerRole(&consumerRole);
consumerRole.pLoginRequest = &loginRequest;

After the consumerRole and RsslReactorConnectOptions are set, just call:
rc = rsslReactorConnect(pReactor, &cnctOpts, &consumerRol, &errInfo);

After this call, to try to discover of stack variables pointers maintained by the reactor after the call returns, thecontents of all stack variables passed to the rsslReactorConnect function are set to 0xFF, e.g.:

memset(&cnctOpts, 0xFF, sizeof cnctOpts);
memset(&consumerRole, 0xFF, sizeof consumerRole);
memset(&loginRequest, 0xFF, sizeof loginRequst);
memset(&dirRequest, 0xFF, sizeof dirRequest);

Once the rsslReactorConnect function returns, the reactor code should not rely on ANY stack variables that were passed to it. If any of that data needs to be maintained, it needs to be allocated from the heap and the appropriate pointer passed to the reactor. This is NOT shown in any of the example code I saw.

As to a "long-lived message", I'm not sure what you mean.

Thanks for your help thus far


jim.carroll
Lars@Addicticks

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Joe,

By "long-lived" I mean an RDMLoginRequest object that will be valid for at least as long as the ReactorChannel, i.e. one that is created either on the heap, or on the stack in a place where it will remain in scope -- our examples do the latter.

The Reactor should ideally copy the message so that the above usage works; we will look into this. But for now, try keeping the LoginRequest in scope -- this should help.

Based on the code in the VAConsumer example, in particular, the members of the ChannelCommand structure, it looks like the RsslReactorConnectOptions is required as well.

After testing, persisting the RsslReactorConnectOptions and the RsslRDMLoginRequest past the call to rsslReactorConnect() solves some of the intermittent errors that were cropping up: (use of wrong userName or RECONNECTING messages seemingly forever). Invalidating the RsslRDMDirectoryRequest and RsslReactorConsumerRole after the connect call also results in failed connections (never any callbacks), leading me to believe that that data must be persisted for the life of the connection as well.

It would be beneficial if the documentation stated what data must exist after an asynchronous function (such as rsslReactorConnect) is called.

In the case of rsslReactorConnect, it looks like 4 data structures (and most likely the character pointers / RsslBuffers) contained/set within them.

Having discovered this (undocumented) requirement for rsslReactorConnect(), I'll need to examine any pointers passed to other reactor functions and test what happens if the contents of those pointers are invalidated after the functions return.

Upvotes
11 12 9 14

UPDATE

The code in my application is based on the code example in the Transport API C Edition V3.0.2, Value Added Components, "6.4.1.4 rsslReactorConnect Example" on pages 35 qnd 36.

That example shows the use of stack variables for reactorConnectOpts, consumerRole, loginRequest, and directoryRequest,

These values are passed by pointer to the rsslReactorConnect() function.

At that point the connection is NOT established; the connection process has been initialized. The reactor code continues to need to be able to access the data passed into the rsslReactorConnect() function -- that is it needs to be able to access the data that is exists on the stack, long after that stack data may have been overwritten by other function calls.

Instead the VAConsumer example provided in the Elektron SDK/ETA/Examples show the use of those data structures required by the reactor for connect to exist as members of a global array of "ChannelCommand"

That these data items MUST persist after the rsslReactorConnect() function returns should be documented and the example in the documentation should be updated as well, since it cannot function if any other function is called that overwrites the stack data.

Either that or the reactor itself should be modified to maintain copies of the data it needs after the connect function returns.

This has the feeling of a Java API back-ported to C, where the garbage collector assures that the necessary data structures exist in memory until they are no longer referenced.

jim.carroll
Lars@Addicticks

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.