question

Upvote
Accepted
55 2 2 6

Is it safe/valid to construct our own RsslBuffers for encoding, decoding, and writing?

Hello. I am wondering if you could help clarify my understanding of RSSL and UPA, as well as answer a question. Say I am in a 64bit Linux environment, designing a C program with a single RSSL channel and two relevant threads, thread A and thread B. Say I have initialized RSSL on thread A with the rsslLocking parameter equal to RSSL_LOCK_GLOBAL. Thread A performs all channel management, such as rsslConnect, rsslInitChannel, rsslWrite, and rsslRead. However, we would like the encoding and decoding of UPA RDM messages to be on thread B. Would we be able to perform our own management of rsslBuffers? That is, if construct our own RsslBuffers for the encoding side on thread B and set the RsslBuffer’s data member to a character buffer that we manage ourselves, would I still be able to safely perform all encoding into an RsslBuffer that we constructed? That data would be queued to thread A, where we’d construct a similar RsslBuffer to target the queued data, and then thread A would perform rsslWrite using that buffer. Conversely for decoding, when rsslRead returns a buffer on thread A which we will later release also on thread A, before releasing or calling a subsequent read, are we able to copy the contents of the RsslBuffer’s data member up to the byte-length specified by the length member into our own buffer? This buffer would be queued to thread B, where we would construct an RsslBuffer pointed to the contents and perform the decoding. As an example, this would look loosely like the pseudocode shown below:

#include <stddef.h>
#include <stdbool.h>
#include <string.h>
#define Linux
#define COMPILE_64BITS
#include "rtr/rsslTypes.h"
#include "rtr/rsslTransport.h"
#include "rtr/rsslMessagePackage.h"
#define BUF_LEN 16000
/**
 * @brief This is a structure we use across the codebase for cross-core
 * communication with a standardized interface
 */
struct std_interface_message {
    /* ... */
    size_t len;
    char buf[BUF_LEN];
    /* ... */
};

/**
 * @brief acquire buffer from our pool
 */
struct std_interface_message *acquire_payload_buf_from_pool();

/**
 * @brief release buffer from our pool
 */
int release_payload_from_pool(struct std_interface_message *message);

struct std_interface_queue;

/**
 * @brief queue std_interface_message over queue @a queue
 */
int std_interface_queue_push(struct std_interface_queue *queue, struct std_interface_message *msg);

/**
 * @brief pop from queue and place pointer to pool-allocated std_interface_message at *msg
 */
int std_interface_queue_pop(struct std_interface_queue *queue, struct std_interface_message **msg);

RsslChannel *channel;
struct std_interface_queue *threadA_to_B_read_queue;
struct std_interface_queue *threadB_to_A_write_queue;

void *threadA_func(void *arg0)
{
    /* ......... */
    bool condition;
    while (condition) {
        /* ......... HANDLE READS ......... */
        RsslError error;
        RsslRet rc;
        /* Read data on thread A */
        RsslBuffer *rsslbuffer = rsslRead(channel, &rc, &error);
        /* .... error handling and all .... */
        /* acquire from our own managed buffer */
        struct std_interface_message *message = acquire_payload_buf_from_pool();
        /* copy read-in RsslBuffer contents to our own buffer, which we will queue to thread B */
        size_t length;
        if (rsslbuffer->length > BUF_LEN) {
            length = BUF_LEN;
        } else {
            /* we would handle cases of data fragmentation .... */
            length = rsslbuffer->length;
        }
        memcpy(message->buf, rsslbuffer->data, rsslbuffer->length);
        /* queue payload contents to thread B where it can be decoded.
         * Thread A is now free to read again */
        std_interface_queue_push(threadA_to_B_read_queue, message);
        
        /* ......... HANDLE WRITES ........ */
        struct std_interface_message *write_message;
        if (std_interface_queue_pop(threadB_to_A_write_queue, &write_message) != 0) {
            /* no data */
            continue;
        }
        /* there is data that has been encoded on thread B */
        RsslUInt32 written = 0;
        RsslUInt32 left = 0;
        RsslUInt8 wflags = RSSL_WRITE_NO_FLAGS;
        RsslBuffer rsslwritebuffer;
        rsslwritebuffer.length = write_message->len;
        rsslwritebuffer.data = write_message->buf;
        rsslWrite(channel, &rsslwritebuffer, RSSL_HIGH_PRIORITY, wflags, &written, &left, &error);
        /* error handling */
        /* release payload */
        release_payload_from_pool(write_message);
    }
    /* ......... */
    return NULL;
}

void *threadB_func(void *arg0)
{
    /* ......... */
    bool condition;
    while (condition) {
        /* ......... HANDLE DECODING ......... */
        struct std_interface_message *message;
        if (std_interface_queue_pop(threadA_to_B_read_queue, &message) != 0) {
            /* no data */
            continue;
        }
        /* there is a read buffer from thread A to decode and process */
        RsslBuffer rsslbuffer;
        rsslbuffer.length = message->len;
        rsslbuffer.data = message->buf;
        RsslMsg msg = RSSL_INIT_MSG;
        RsslDecodeIterator decoder;
        rsslClearDecodeIterator(&decoder);
        rsslSetDecodeIteratorBuffer(&decoder, &rsslbuffer);
        rsslDecodeMsg(&decoder, &msg);
        switch (msg.msgBase.domainType) {
            /* ... */
            // more decoding
        }
        release_payload_from_pool(message);
        
        /* ......... HANDLE ENCODING ......... */
        /* acquire from our own managed buffer */
        struct std_interface_message *write_message = acquire_payload_buf_from_pool();
        /* encode a message */
        RsslEncodeIterator encoder;
        rsslClearEncodeIterator(&encoder);
        RsslBuffer rsslwritebuffer;
        rsslwritebuffer.data = write_message->buf;
        rsslSetEncodeIteratorBuffer(&encoder, &rsslwritebuffer);
        /* ...continue with encoding ... */
        write_message->len = rsslGetEncodedBufferLength(&encoder);
        /* queue to thread A to be sent */
        std_interface_queue_push(threadB_to_A_write_queue, write_message);
    }
    /* ......... */
    return NULL;
}

Second, as a less desirable alternative, if we have to use RsslBuffers acquired from rsslGetBuffer, could we still perform encoding/decoding off the transport handling thread by queueing the acquired buffers to thread B without channel locking? This would look like stack-allocating and clearing an RsslEncodeIterator on thread B, calling rsslSetEncodeIteratorBuffer with the buffer acquired from thread A, and then proceeding (on thread B) to encode the message, where the rsslBuffer would afterward be sent and released on thread A. Is this safe?

Thank you very much for your time!!!

#technologylinuxcrdmupaupa-api
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Great question!

I have to agree

1 Answer

· Write an Answer
Upvote
Accepted
78.6k 248 52 74

@bneway

Thank you for reaching out to us.

Incoming and outgoing buffers used by the transport layer are reserved and managed by ETA. The application can have its own RsslBuffer.

However, when sending data to the transport layer, the application must encode data to the ETA reserved buffer or copy the data from the application owned buffer to the ETA reserved buffer.

The ETA outgoing reserved buffers are retrieved by calling the rsslGetBuffer method.

ETA also owns the buffer returned from the rsslRead method. When an RsslBuffer is returned from rsslRead, the contents are only valid until the next call to rsslRead. Therefore, to use the content by another thread, the application can copy the content from the buffer returned from rsslRead to the application's owned buffer.

The flow could be like:

1702614912312.png

Regarding locking, according to the ETA developer guide, you can use RSSL_LOCK_NONE.

1702615020223.png



1702614912312.png (45.8 KiB)
1702615020223.png (86.2 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Thank you very much!

Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.