question

Upvote
16 0 0 1

.NET Refinitiv.Data - Unhandled Exception in Session Management

Unhandled Exception in Session Management Causing Application Crash [ERROR] [EndpointDefinition] GetDataAsync failed for endpoint: https://api.refinitiv.com/auth/cloud-credentials/v1/. Data services unavailable. Session is closed

We have encountered an unhandled exception in our application that caused a crash. The log below was recorded just before the crash, and it originates from your library:

[ERROR] [253] [EndpointDefinition] GetDataAsync failed for endpoint: https://api.refinitiv.com/auth/cloud-credentials/v1/. Data services unavailable.  Session is closed
Unhandled exception. System.InvalidOperationException: Data services unavailable.  Session is closed
   at Refinitiv.Data.Delivery.Request.EndpointDefinition.GetDataAsync(ISession session, Action`3 cb, CancellationToken cancellationToken)
   at Refinitiv.Data.Delivery.Queue.QueueNode.RefreshCloudCredentialsAsync()
   at Refinitiv.Data.Delivery.Queue.QueueNode.CloudRefreshTimerHandler(Object source, ElapsedEventArgs e)
   at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state)
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()


We are unable to catch this exception in our code, and we would like to request that you implement exception handling within your library to prevent such exceptions from crashing the entire application.

To provide more context, below is how we create the session, queue, and subscription:

public async Task Subscribe(Func<IQueueResponse, CancellationToken, Task> callback, CancellationToken ct)
{
    _session = PlatformSession.Definition()
        .AppKey(options.AppKey)
        .OAuthGrantType(new GrantPassword()
            .UserName(options.UserName)
            .Password(options.Password))
        .TakeSignonControl(true)
        .GetSession()
        .OnState((state, msg, _) =>
            logger.LogInformation("On session state changed: {state}. {msg}", state, msg))
        .OnEvent((eventCode, msg, _) =>
            logger.LogInformation("Event: {eventCode}. {msg}", eventCode, msg));

    var openSessionStatus = await _session.OpenAsync(ct);
    if (openSessionStatus != Session.State.Opened)
        logger.LogWarning("Session is not open, status: {status}", openSessionStatus);

    var queueDefinition = Queue.Definition(options.Endpoint);
    var queueManager = queueDefinition
        .CreateQueueManager()
        .OnError((err, _) => logger.LogError("Error: {error}", err));

    var queueCriteria = new JObject {
  {prop, value}};
    var queue = await queueManager.CreateQueueAsync(queueCriteria, Queue.CloudType.AWS, ct);

    var subscriber = queueDefinition.CreateAWSSubscriber(queue);
    var status = await subscriber.StartPollingAsync((response, _) =>
    {
        callback(response, ct).GetAwaiter().GetResult();
    });

    if (!status)
        logger.LogWarning("Start polling failed");
}


The synchronization point in our application is the subscriber.StartPollingAsync method call. This is where message processing begins, and we are able to wrap this line in a try { } catch { } block to catch exceptions, as shown below:

try
{
    await client.Subscribe(async (news, ct) =>
    {
        await pipeline.Handle(news, ct); // Perform business logic
    }, cancellationToken);
}
catch (Exception ex)
{
    switch (ex)
    {
        case AmazonSQSException sqsEx:
            // Log the exception
            // Close the current session and queue subscription
            // Re-create the session and re-subscribe to the queue
            break;
        default:
            // Log any other exceptions
            break;
    }
}


However, the exception that caused our application to crash was not caught. It appears to have originated elsewhere, leading to the application's unexpected and permanent termination.

That method _session.OpenAsync does not allow us to catch the [ERROR] [253] [EndpointDefinition] exception. If this method were blocking further processing, it would be possible to catch all exceptions related to the session implementation.

To give more context: in our application, we also make HTTP API calls in the following way:

try
{
    var response = await EndpointRequest.Definition(endpointUrl).GetDataAsync(ct);
    // Perform business logic
}
catch (Exception ex)
{
    // Log the exception
}


Under the hood, this static method EndpointRequest.Definition implicitly uses the previously created session.

As we understand, the session functions like a singleton that all parts of your library implicitly rely on. Given the log messages [ERROR] [253] [EndpointDefinition] that were recorded before the crash, it seems the problem might be related to the EndpointRequest.Definition method. However, we did not catch any exceptions log in the try { } catch { } block.

I suspect that the log [ERROR] [253] [EndpointDefinition] originates from the implementation of the EndpointRequest.Definition method. However, in this case, the method was not invoked by our logic but by some internal mechanism in the library responsible for session/token renewal.

I would also like to address the AmazonSQSException, which occasionally occurs in our application. Despite a correct setup, this exception still appears. As we understand it, your library handles the responsibility for token refresh logic, session management, and session recovery. However, we observe situations where this exception occurs, ranging from several times a day to once every few days. When it does happen, we close the old session, unsubscribe from the queue, clean up, and then re-create the session and re-subscribe to the queues. We would like to ask if this is the correct approach and if you could provide any guidance or best practices to avoid these exceptions.

If our approach is correct, this information might help identify areas in your library that could be improved.

Lastly, on a related but separate note, could you provide an overload of the subscriber.StartPollingAsync method that accepts an asynchronous callback, such as:

Task<bool> StartPollingAsync(Func<IQueueResponse, CancellationToken, Task> cb);


Currently, the only available method is:

Task<bool> StartPollingAsync(Action<IQueueResponse, IQueueSubscriber> cb);


This requires us to create a synchronization point for the callback using GetAwaiter and GetResult like this:

var status = await subscriber.StartPollingAsync((response, _) =>
{
    callback(response, ct).GetAwaiter().GetResult();
});


Providing an asynchronous version of this method would simplify our code and could help in better handling exceptions.


#productrefinitiv-data-libraries
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Hello @kamil.kozlowski1


Thank you for posting this issue, we will work on fixing this issue and keep you posted.


When creating a queue using credentials, the refresh of the credentials is done at the level of the queue node using an Endpoint Definition request.

When the session is not Open, credentials data (including expiration time for the timer) cannot be refreshed, leading to an Invalid Operation Exception.

Given the info above, I have a clarification question.

If you have a platform session open and you are polling messages, what do you expect to happen when the session closes without stopping the polling ?

Other Recommendations

In your code, please verify if other functionalities might be closing the session.

Also make sure that you create definitions and request data only after the session is Open.

When working with queues, it is recommended to stop the polling before stopping the session.

Same with streams, our recommendation is to stop any running streams before stopping the session.


Thanks,

Cristian

Upvotes
1 0 0 1

We never manually close the session. We use your session manager to create it, and as we understand, it is responsible for keeping the session alive.

That exception (quoted below) comes from the static session management manager:[ERROR] [EndpointDefinition] GetDataAsync failed for endpoint: https://api.refinitiv.com/auth/cloud-credentials/v1/. Data services unavailable. Session is closed.

As you can see in the example code provided in the issue, we first open the session, then check if the session is open, and then connect to the message queue. Our service runs 24/7, and this starting point is executed only once when the service starts. We open the session and create the queue using your queue manager.

We don't know the implementation of your session manager, but we assume there is some background job refreshing tokens. We understand that this session manager threw an exception, which was not caught, causing the application to shut down.

If we never manually close the session, then I understand we should never manually close the queue either.

Another issue is that the session manager is a separate entity from the one reading messages from the queue. The session manager may trigger a session closure event, but the queue manager does not know about it directly and does not receive this information synchronously. These are two separate entities in memory that don't have a synchronous dependency between them. In the current implementation, as I understand, there is always a non-zero risk that receiving the session closure event does not block the queue processing synchronously. So there is always a non-zero risk that the code reacts too late to the session closure event, causing the queues to close late, which can lead to exceptions.

What is crucial for us is that we would like you to allow us to catch this exception, which currently cannot be caught. Please note that there is no explicit dependency between the session manager and the queue manager. The process of creating a queue does not require passing a reference to the session. You can simply create a queue using one static method, and independently, the session is created using another static method. This setup can lead to a number of problems because these two entities live separately from each other. They have different lifetimes, and their lifetimes are not synchronized.

If you had an implementation where the queue manager received a session reference as a dependency, or each queue created its own entity that is a session, and each queue managed, observed, and controlled the session's lifetime, then the solution would be much simpler to use.

Imagine a scenario where the session requires a token refresh, but the refresh fails, and the session goes into a closed state. If each queue had access to the session as a dependency, it could easily and synchronously check the session state before each message fetch, allowing it to react 100% synchronously and stop processing due to the session closure.

Now imagine another case where we have more than one queue. If each queue managed the session state, then each queue could autonomously and synchronously manage its own session lifetime, based on its separate session copies. Each session copy could independently try to refresh its token based on the queue's needs, and if the refresh failed, that local queue could react 100% synchronously to that case.

Now imagine yet another use case: currently, the session manager and queue manager are separate entities living somewhere in memory, like singletons. The session manager provides events when the session is closed or opened, but it does so asynchronously and independently of the queues. If we had 10 queues, they would need to be notified asynchronously to stop their processing. There is no physical way to notify all queues to stop processing without risking race conditions. As long as the queues and session manager are separate entities, not working synchronously on their own copies, there will always be a non-zero probability that a queue will start processing the next message even though the session has already closed. The only solution would be to use critical sections (lock, monitor), but using locks here defeats the purpose and we strongly don't want to block critical sections with locks.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvote
237 4 2 3

Hello @artur.wincenciak


The queue manager does use the session - it's just not visible to the consumer.

When using a platform session, you can use with the same session a combination of multiple functionalities, like messaging services using queues, historical data retrieval and streaming for different kinds of data. We cannot allow a queue to control the lifetime of a session, because in that case if the queue fails, it will stop everything.


From your post above, I've identified several requirements that I tried to line up bellow:

- If you have a platform session with one or multiple queues polling for data, if the session closes for whatever reason, the queues should stop processing any incoming messages and no uncaught exceptions should fail the consumer application

- If a credential refresh on a given queue fails for whatever reason, that queue should stop processing and no uncaught exception should fail the consumer application

- If a queue stops processing messages for whatever reason, the consumer application needs to be notified and also be able to react based on this (eg. maybe delete the queue or restart polling or some other actions)

Please let me know if my understanding is correct and what other cases you might require.


Best regards,

Baciu Wahl Cristian

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
1 0 0 1

Thank you for your response. Your understanding is spot on and covers exactly what we are looking for.

We would like to ensure that no uncaught exceptions from the internal implementation of the library can cause the service to crash. Proper exception handling within the library is essential for maintaining the stability of our service. We appreciate your attention to this and look forward to the improvements.

Post Scriptum: Please treat this as a separate thread. I would like to comment on the statement, "We cannot allow a queue to control the lifetime of a session, because in that case if the queue fails, it will stop everything."

We are fully aware that this would be a significant change and that it would require modifications to the architectural assumptions within the library.

Here’s the change I would like to propose: instead of holding one session as a singleton for the entire application, I would suggest an approach where each queue or other component could create its own private instance of a session. This session would be managed solely for its own needs, independent of other parts of the system. Instead of having one session for the whole application and requiring all components to depend on it, it would be much simpler to allow each component to create its own session with its own lifecycle. This way, restarting or failing one session would not impact other components, as they would have their own private sessions.

Currently, your library's interface abstracts and theoretically simplifies session management. I understand that your intention was to create a session manager and allow the creation of other components that internally use this session. At first glance, this indeed seems simple from the user's perspective. However, after some thought, and more importantly, when the application starts using multiple components, this simplification becomes superficial.

The session has its own lifecycle, and the components using the session have their own lifecycles as well. Now, when there is only one session for the entire application, it's clear that this single shared session cannot be confined to a particular component, as you mentioned—it would cause problems in other components. But let’s look at this from the other side. By not placing the session within a specific component, it does not solve the problem—the problem still exists, and worse, it becomes implicit.

The issue is that all independent components of the system depend on a single shared session in an implicit way. Now, if the session enters a failure state or is closed, the burden falls on the user of your library, who must ensure that all components using the session are notified, restarted, or recreated or re-subscribed, etc. All of these components must listen for session state changes in order to react appropriately. We end up with code that still needs to manage session state but, even worse, this code is asynchronous. This significantly complicates managing the critical section that the session represents.

My solution is very simple and solves all the problems that we wouldn't need to solve—and could simply avoid—if each component were allowed to create its own private session object and manage it independently. This code could be fully synchronous, making it 100% safe, without requiring critical section management or dealing with race conditions.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvote
19k 85 39 63

Hi @artur.wincenciak

Thanks for your input and suggestions on the library - certainly helps shape the developer user experience.

I want to emphasize that the library does not hold one session as a singleton for the entire application. When a session is created, it assigns a 'defaultSession'. Users can create multiple sessions if they wish and when doing so will update the 'defaultSession' - basically the last session created will update the 'defaultSession'.

For convenience, and for the majority of use cases, the 'defaultSession' will be used for any library calls. However, the library has been designed to allow users to override the default. For example, when using the Queue APIs:

var definition = Queue.Definition(newsHeadlinesEndpoint);

// Specify a session
var manager = definition.CreateQueueManager().Session(mySession)
                                             .OnError((err, qm) => Console.WriteLine(err));

One caveat to creating multiple sessions is that the current authentication model used with the platform is based on a OAuth Password grant that only allows 1 active session per ID. So, if you try to create multiple sessions using the same ID, this will cause issues. The alternative is to obtain multiple IDs for such a scenario.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
1 0 0 1

Thank you for your response. It appears that your library's API is well-prepared to handle multiple sessions with independent lifecycles. It looks like you’ve designed this functionality thoughtfully, allowing such flexibility at the API level. I wasn’t aware that it was possible to override the default session with a different specific session when creating a queue. So theoretically, we could create multiple sessions, if not for one caveat.

To quote: "One caveat to creating multiple sessions is that the current authentication model used with the platform is based on an OAuth Password grant that only allows 1 active session per ID. So, if you try to create multiple sessions using the same ID, this will cause issues. The alternative is to obtain multiple IDs for such a scenario."

The library’s API effectively addresses and implements the use case we are discussing here. However, the limitation arises on a business level, and this is a significant constraint. This restriction means that, although your library supports the usage scenario we are discussing, we cannot take advantage of this capability in practice. In reality, we have a single license that allows subscription to multiple queues. So ultimately, due to this licensing model, we are forced to have a single session for multiple queues, which brings us back to the original issues that we will still need to solve.

The cost of the license is a significant aspect that cannot be ignored. One license allows us to connect to multiple queues. If we want to build multiple independent news-processing pipelines from several independent queues, we are still dependent on this single session with its own lifecycle. We have to propagate events related to that session's lifecycle across multiple queues, which leads to problems with critical sections and race conditions in processing.

If it were possible to create multiple sessions for the same user (for the same ID) under one license, and if your licensing model checked how many queues a client is subscribed to without checking how many copies of the session are created under the same ID, we would be able to take full advantage of the possibilities offered by your library.

Let me ask this: If one license with ten queues costs N, would ten licenses with one queue each cost the same as N, or would it be more than N? If the cost for N licenses with one queue each is comparable to the cost of a single license with N queues, we would certainly consider making changes to our licensing model. We will consider whether to purchase N separate licenses, each allowing for a single queue.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.