Unhandled Exception in Session Management Causing Application Crash [ERROR] [EndpointDefinition] GetDataAsync failed for endpoint: https://api.refinitiv.com/auth/cloud-credentials/v1/. Data services unavailable. Session is closed
We have encountered an unhandled exception in our application that caused a crash. The log below was recorded just before the crash, and it originates from your library:
[ERROR] [253] [EndpointDefinition] GetDataAsync failed for endpoint: https://api.refinitiv.com/auth/cloud-credentials/v1/. Data services unavailable. Session is closed
Unhandled exception. System.InvalidOperationException: Data services unavailable. Session is closed
at Refinitiv.Data.Delivery.Request.EndpointDefinition.GetDataAsync(ISession session, Action`3 cb, CancellationToken cancellationToken)
at Refinitiv.Data.Delivery.Queue.QueueNode.RefreshCloudCredentialsAsync()
at Refinitiv.Data.Delivery.Queue.QueueNode.CloudRefreshTimerHandler(Object source, ElapsedEventArgs e)
at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state)
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
We are unable to catch this exception in our code, and we would like to request that you implement exception handling within your library to prevent such exceptions from crashing the entire application.
To provide more context, below is how we create the session, queue, and subscription:
public async Task Subscribe(Func<IQueueResponse, CancellationToken, Task> callback, CancellationToken ct)
{
_session = PlatformSession.Definition()
.AppKey(options.AppKey)
.OAuthGrantType(new GrantPassword()
.UserName(options.UserName)
.Password(options.Password))
.TakeSignonControl(true)
.GetSession()
.OnState((state, msg, _) =>
logger.LogInformation("On session state changed: {state}. {msg}", state, msg))
.OnEvent((eventCode, msg, _) =>
logger.LogInformation("Event: {eventCode}. {msg}", eventCode, msg));
var openSessionStatus = await _session.OpenAsync(ct);
if (openSessionStatus != Session.State.Opened)
logger.LogWarning("Session is not open, status: {status}", openSessionStatus);
var queueDefinition = Queue.Definition(options.Endpoint);
var queueManager = queueDefinition
.CreateQueueManager()
.OnError((err, _) => logger.LogError("Error: {error}", err));
var queueCriteria = new JObject {{prop, value}};
var queue = await queueManager.CreateQueueAsync(queueCriteria, Queue.CloudType.AWS, ct);
var subscriber = queueDefinition.CreateAWSSubscriber(queue);
var status = await subscriber.StartPollingAsync((response, _) =>
{
callback(response, ct).GetAwaiter().GetResult();
});
if (!status)
logger.LogWarning("Start polling failed");
}
The synchronization point in our application is the subscriber.StartPollingAsync
method call. This is where message processing begins, and we are able to wrap this line in a try { } catch { }
block to catch exceptions, as shown below:
try
{
await client.Subscribe(async (news, ct) =>
{
await pipeline.Handle(news, ct); // Perform business logic
}, cancellationToken);
}
catch (Exception ex)
{
switch (ex)
{
case AmazonSQSException sqsEx:
// Log the exception
// Close the current session and queue subscription
// Re-create the session and re-subscribe to the queue
break;
default:
// Log any other exceptions
break;
}
}
However, the exception that caused our application to crash was not caught. It appears to have originated elsewhere, leading to the application's unexpected and permanent termination.
That method _session.OpenAsync
does not allow us to catch the [ERROR] [253] [EndpointDefinition]
exception. If this method were blocking further processing, it would be possible to catch all exceptions related to the session implementation.
To give more context: in our application, we also make HTTP API calls in the following way:
try
{
var response = await EndpointRequest.Definition(endpointUrl).GetDataAsync(ct);
// Perform business logic
}
catch (Exception ex)
{
// Log the exception
}
Under the hood, this static method EndpointRequest.Definition
implicitly uses the previously created session.
As we understand, the session functions like a singleton that all parts of your library implicitly rely on. Given the log messages [ERROR] [253] [EndpointDefinition]
that were recorded before the crash, it seems the problem might be related to the EndpointRequest.Definition
method. However, we did not catch any exceptions log in the try { } catch { }
block.
I suspect that the log [ERROR] [253] [EndpointDefinition]
originates from the implementation of the EndpointRequest.Definition
method. However, in this case, the method was not invoked by our logic but by some internal mechanism in the library responsible for session/token renewal.
I would also like to address the AmazonSQSException
, which occasionally occurs in our application. Despite a correct setup, this exception still appears. As we understand it, your library handles the responsibility for token refresh logic, session management, and session recovery. However, we observe situations where this exception occurs, ranging from several times a day to once every few days. When it does happen, we close the old session, unsubscribe from the queue, clean up, and then re-create the session and re-subscribe to the queues. We would like to ask if this is the correct approach and if you could provide any guidance or best practices to avoid these exceptions.
If our approach is correct, this information might help identify areas in your library that could be improved.
Lastly, on a related but separate note, could you provide an overload of the subscriber.StartPollingAsync
method that accepts an asynchronous callback, such as:
Task<bool> StartPollingAsync(Func<IQueueResponse, CancellationToken, Task> cb);
Currently, the only available method is:
Task<bool> StartPollingAsync(Action<IQueueResponse, IQueueSubscriber> cb);
This requires us to create a synchronization point for the callback using GetAwaiter
and GetResult
like this:
var status = await subscriber.StartPollingAsync((response, _) =>
{
callback(response, ct).GetAwaiter().GetResult();
});
Providing an asynchronous version of this method would simplify our code and could help in better handling exceptions.