Hello. I am wondering if you could help clarify my understanding of RSSL and UPA, as well as answer a question. Say I am in a 64bit Linux environment, designing a C program with a single RSSL channel and two relevant threads, thread A and thread B. Say I have initialized RSSL on thread A with the rsslLocking parameter equal to RSSL_LOCK_GLOBAL. Thread A performs all channel management, such as rsslConnect, rsslInitChannel, rsslWrite, and rsslRead. However, we would like the encoding and decoding of UPA RDM messages to be on thread B. Would we be able to perform our own management of rsslBuffers? That is, if construct our own RsslBuffers for the encoding side on thread B and set the RsslBuffer’s data member to a character buffer that we manage ourselves, would I still be able to safely perform all encoding into an RsslBuffer that we constructed? That data would be queued to thread A, where we’d construct a similar RsslBuffer to target the queued data, and then thread A would perform rsslWrite using that buffer. Conversely for decoding, when rsslRead returns a buffer on thread A which we will later release also on thread A, before releasing or calling a subsequent read, are we able to copy the contents of the RsslBuffer’s data member up to the byte-length specified by the length member into our own buffer? This buffer would be queued to thread B, where we would construct an RsslBuffer pointed to the contents and perform the decoding. As an example, this would look loosely like the pseudocode shown below:
#include <stddef.h> #include <stdbool.h> #include <string.h> #define Linux #define COMPILE_64BITS #include "rtr/rsslTypes.h" #include "rtr/rsslTransport.h" #include "rtr/rsslMessagePackage.h" #define BUF_LEN 16000 /** * @brief This is a structure we use across the codebase for cross-core * communication with a standardized interface */ struct std_interface_message { /* ... */ size_t len; char buf[BUF_LEN]; /* ... */ }; /** * @brief acquire buffer from our pool */ struct std_interface_message *acquire_payload_buf_from_pool(); /** * @brief release buffer from our pool */ int release_payload_from_pool(struct std_interface_message *message); struct std_interface_queue; /** * @brief queue std_interface_message over queue @a queue */ int std_interface_queue_push(struct std_interface_queue *queue, struct std_interface_message *msg); /** * @brief pop from queue and place pointer to pool-allocated std_interface_message at *msg */ int std_interface_queue_pop(struct std_interface_queue *queue, struct std_interface_message **msg); RsslChannel *channel; struct std_interface_queue *threadA_to_B_read_queue; struct std_interface_queue *threadB_to_A_write_queue; void *threadA_func(void *arg0) { /* ......... */ bool condition; while (condition) { /* ......... HANDLE READS ......... */ RsslError error; RsslRet rc; /* Read data on thread A */ RsslBuffer *rsslbuffer = rsslRead(channel, &rc, &error); /* .... error handling and all .... */ /* acquire from our own managed buffer */ struct std_interface_message *message = acquire_payload_buf_from_pool(); /* copy read-in RsslBuffer contents to our own buffer, which we will queue to thread B */ size_t length; if (rsslbuffer->length > BUF_LEN) { length = BUF_LEN; } else { /* we would handle cases of data fragmentation .... */ length = rsslbuffer->length; } memcpy(message->buf, rsslbuffer->data, rsslbuffer->length); /* queue payload contents to thread B where it can be decoded. * Thread A is now free to read again */ std_interface_queue_push(threadA_to_B_read_queue, message); /* ......... HANDLE WRITES ........ */ struct std_interface_message *write_message; if (std_interface_queue_pop(threadB_to_A_write_queue, &write_message) != 0) { /* no data */ continue; } /* there is data that has been encoded on thread B */ RsslUInt32 written = 0; RsslUInt32 left = 0; RsslUInt8 wflags = RSSL_WRITE_NO_FLAGS; RsslBuffer rsslwritebuffer; rsslwritebuffer.length = write_message->len; rsslwritebuffer.data = write_message->buf; rsslWrite(channel, &rsslwritebuffer, RSSL_HIGH_PRIORITY, wflags, &written, &left, &error); /* error handling */ /* release payload */ release_payload_from_pool(write_message); } /* ......... */ return NULL; } void *threadB_func(void *arg0) { /* ......... */ bool condition; while (condition) { /* ......... HANDLE DECODING ......... */ struct std_interface_message *message; if (std_interface_queue_pop(threadA_to_B_read_queue, &message) != 0) { /* no data */ continue; } /* there is a read buffer from thread A to decode and process */ RsslBuffer rsslbuffer; rsslbuffer.length = message->len; rsslbuffer.data = message->buf; RsslMsg msg = RSSL_INIT_MSG; RsslDecodeIterator decoder; rsslClearDecodeIterator(&decoder); rsslSetDecodeIteratorBuffer(&decoder, &rsslbuffer); rsslDecodeMsg(&decoder, &msg); switch (msg.msgBase.domainType) { /* ... */ // more decoding } release_payload_from_pool(message); /* ......... HANDLE ENCODING ......... */ /* acquire from our own managed buffer */ struct std_interface_message *write_message = acquire_payload_buf_from_pool(); /* encode a message */ RsslEncodeIterator encoder; rsslClearEncodeIterator(&encoder); RsslBuffer rsslwritebuffer; rsslwritebuffer.data = write_message->buf; rsslSetEncodeIteratorBuffer(&encoder, &rsslwritebuffer); /* ...continue with encoding ... */ write_message->len = rsslGetEncodedBufferLength(&encoder); /* queue to thread A to be sent */ std_interface_queue_push(threadB_to_A_write_queue, write_message); } /* ......... */ return NULL; }
Second, as a less desirable alternative, if we have to use RsslBuffers acquired from rsslGetBuffer, could we still perform encoding/decoding off the transport handling thread by queueing the acquired buffers to thread B without channel locking? This would look like stack-allocating and clearing an RsslEncodeIterator on thread B, calling rsslSetEncodeIteratorBuffer with the buffer acquired from thread A, and then proceeding (on thread B) to encode the message, where the rsslBuffer would afterward be sent and released on thread A. Is this safe?
Thank you very much for your time!!!