question

Upvotes
Accepted
48 8 17 19

OmmConsumerImpl::registerClient hangs problem

Hello, my application is used to receive us markets quotes, parsing various fids and write them into a internal round buffer. When it collects enough records then write to log file in batch. But occasionally, it will hang as shown below.

Thread 9 is a message decoder thread, it will both decode chain & ric Refresh/Update message. See from its stacktrace from gdb, it blocks at OmmConsumerImpl::registerClient when using mutex lock. I also have the same application running in normal mode, parsing fids and write to kafka but it will not block. So what's the possible reason for the hang problem ?

Is there any memory corruption from the logging thread that cause decoder thread hang ? Thanks.


(gdb) thread 9
[Switching to thread 9 (Thread 0x7f134de34700 (LWP 21627))]
#0  0x00007f135f2c150d in __lll_lock_wait () from /lib64/libpthread.so.0
(gdb) bt
#0  0x00007f135f2c150d in __lll_lock_wait () from /lib64/libpthread.so.0
#1  0x00007f135f2bce76 in _L_lock_941 () from /lib64/libpthread.so.0
#2  0x00007f135f2bcd6f in pthread_mutex_lock () from /lib64/libpthread.so.0
#3  0x0000000000509e5c in thomsonreuters::ema::access::OmmConsumerImpl::registerClient(thomsonreuters::ema::access::ReqMsg const&, thomsonreuters::ema::access::OmmConsumerClient&, void*, unsigned long long) ()
#4  0x0000000000496239 in ChainHandler::batch_register (this=0x495af00, ema_codes=...) at server/emaparser/chain_handler_base.cpp:627
#5  0x0000000000496a67 in ChainHandler::single_register (this=0x495af00, ema_code=...) at server/emaparser/chain_handler_base.cpp:684
#6  0x000000000049549b in ChainHandler::ProcessChainUpdateMessage (this=0x495af00, updateMsg=..., chain_type=43) at server/emaparser/chain_handler_base.cpp:541
#7  0x00000000004d206e in EmaDecoderMgr::DecodeMsg (this=0x232a8b0 <EmaDecoderMgr::merger_mgr_>, queue_index=0) at server/emaparser/decoder_mgr.cpp:125
#8  0x00000000004d19f4 in DecodeMsgThreadFunc (queue_index=0, arg=0x0, length=0x0) at server/emaparser/decoder_mgr.cpp:11
#9  0x0000000000970bd2 in Thread::LoopThreadFunc (arg=0x49569c0) at comm/util/thread.cpp:147
#10 0x00007f135f2bae65 in start_thread () from /lib64/libpthread.so.0
#11 0x00007f135e8d588d in clone () from /lib64/libc.so.6
elektronrefinitiv-realtimeelektron-sdkema-apirrtelektron-message-api
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

@wangfugen

Could you please call stacks of all other threads? I need to verify if the mutex is used by which thread.


Upvotes
Accepted
78.1k 246 52 72

@wangfugen

Thread 9: OmmConsumerImpl::registerClient and Thread 2: OmmBaseImpl::rsslReactorDispatchLoop share the same mutex. While dispatching events, the application is unable to call the OmmConsumerImpl::registerClient method.

Therefore, Thread 9 may be starving. You can tune the dispatching thread (Thread 2) by decreasing the timeout or MaxDispatchCountUserThread configuration.

You may need to determine the number of messages that Thread 9 can handle per second and then compare it to the number of messages that Thread 2 can dispatch per second.

Otherwise, to avoid the locking issue, you can use Thread 2 to call the OmmConsumerImpl::registerClient method instead.



1597740275587.png (12.1 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
48 8 17 19

@jirapongse.phuriphanvichai hi, thanks for your reply. The following attachment is all threads' stack trace when my app hangs. Some explanation on the threads of my app. Thread 2 is a custom dispatcher thread in OmmConsumerConfig::UserDispatchEnum operation mode. It receives message from ema sdk and wraps the Refresh/UpdateMsg into queues by hashing the ema symbol. Thread 9 is the message decoder thread. It receives wrapped message from one of above queues and start process Refresh/UpdateMsg. The hang occurs when it process chain RefreshMsg.From the stack trace, we can see the thread detected chain update with new rics added. So it extracted the newly added symbols and call registerClient again. usemaparser.txt


usemaparser.txt (25.7 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
78.1k 246 52 72

@wangfugen

Thank you for the call stacks.

The owner of the mutex required by Thread 9 is Thread 2. However, Thread 2 is waiting for KDeque<RawMsg>.

Thread 2 (Thread 0x7f20e24e5700 (LWP 27043)):
#0  0x00007f20f73899f5 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x0000000000a8fa5c in std::condition_variable::wait(std::unique_lock<std::mutex>&) ()
#2  0x00000000004d3f7e in KDeque<RawMsg>::push_back (this=0x4b2e0e0, item=...) at comm/util/queue.hpp:182

I assume that the owner of KDeque<RawMsg> is Thread 9. Could you please verify it?

If we can find the thread that owns KDeque<RawMsg> required by Thread 2, we will understand the situation.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
48 8 17 19

@jirapongse.phuriphanvichai Thanks for your reply. In my app, Thread 2 is the callback of ema sdk. It communicates with Thread 9 via a blocked queue with maximum size 10000. The blocked queue is implemented with an internal mutex and condition variable. If decoder thread 9 hangs, then the queue will be immediately filled with number of messages reaching maximum queue size, thus thread 2 will be blocked by conditional variable waiting the queue to be not full. It has nothing to do with pthread_mutex_lock inside OmmConsumerImpl::registerClient.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
78.1k 246 52 72

@wangfugen

Thread 9 is blocked because Thread 2 holds the mutex. Thread 2 doesn't release the mutex because of the blocked queue.

I assume that the cause of the problem is the blocked queue.

For example, if the message decoder thread (Thread 9) is slow which causes the blocked queue to be full, then the application will hang.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
48 8 17 19

@jirapongse.phuriphanvichai Thread 9 is blocked by pthread_mutex_lock which is inside OmmConsumerImpl::registerClient. The message queue between Thread 2 and 9 is implemented via std::mutex and std::condition_variable. You may consider Thread 2 as a producer which generates message into a queue with max size and Thread 9 is a consumer which consumes message from the queue. If Thread 9 is blocked by pthread_mutex_lock, then messages produced by Thread 2 will soon fill queue to max size which will be blocked.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
48 8 17 19

@jirapongse.phuriphanvichai yes, I roughly read EMA cpp source and found your point "Thread 9: OmmConsumerImpl::registerClient and Thread 2: OmmBaseImpl::rsslReactorDispatchLoop share the same mutex."


Maybe I should choose the second method as you suggested. Many thanks for your suggestion.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
48 8 17 19

@jirapongse.phuriphanvichai One more question, If I call OmmConsumerImpl::registerClient inside thread 2, it will make the same mutex lock twice, thus hang again ? Because OmmBaseImpl::rsslReactorDispatchLoop already holds the lock, it calls OmmConsumerImpl::registerClient, which again try to hold the lock, won't it hang ?

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Refer to the Mutex.cpp source code, it uses the PTHREAD_MUTEX_RECURSIVE type.

If the mutex type is PTHREAD_MUTEX_RECURSIVE, then the mutex shall maintain the concept of a lock count. When a thread successfully acquires a mutex for the first time, the lock count shall be set to one. Every time a thread relocks this mutex, the lock count shall be incremented by one. Each time the thread unlocks the mutex, the lock count shall be decremented by one. When the lock count reaches zero, the mutex shall become available for other threads to acquire. If a thread attempts to unlock a mutex that it has not locked or a mutex which is unlocked, an error shall be returned. 


I see, thanks. I refigured the lock scene, my previous understanding is wrong. You are right in your reply:

""

Thread 9 is blocked because Thread 2 holds the mutex. Thread 2 doesn't release the mutex because of the blocked queue.

I assume that the cause of the problem is the blocked queue.

For example, if the message decoder thread (Thread 9) is slow which causes the blocked queue to be full, then the application will hang.

"""

Previous to prevent too many quote message from ema sdk thread which will consume too much memory, I set the maximum queue size to 10000. If Thread 2 is faster than Thread 9, it will block first by the queue. Then if previously Thread 9 got one chain update or refresh msg which contains new symbol, it calls OmmConsumerImpl::registerClient again , so both thread hangs.


Maybe another method I can choose is creating more threads like Thread 9 to speed up message consumption from message queue, and make the queue size unlimited. I will try this method, as it will cause minimum change to my current application.


Thanks for your suggestion again.

Write an Answer

Hint: Notify or tag a user in this post by typing @username.

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.