question

Upvotes
Accepted
1 0 0 0

Ema.jar死锁问题

在使用Java api对接路透系统,通过consumer.registerClient(xxx)发消息过程中,出现了死锁的问题,查看了ema的jar包源码,发现在ema对消息处理过程中对线程上了jdk的锁,没有释放该锁导致线程阻塞,从而导致容器崩溃,下面附上代码,还请帮忙解惑

try
{//args为ric和serviceName的映射关系
Iterable<Map.Entry<String,String>> it = args.entrySet().iterator;
while(it.hasNext()){
Map.Entry<String,String> entry = it.next();
List<String> list = Array.asList(entry.getValue().split(";"));
for(String ric : list){
consumer.registerClient( EmaFactory.createReqMsg().serviceName(entry.getKey()).name(ric), appClient, 0);
}
}
}
catch (OmmException excp)
{
System.out.println(excp.getMessage());
}
项目启动时创建appClient和ommConsumer
@Bean
public AppClient creatOmmconsumerClient(){
   AppClient appClient = new AppClient();
   return appClient;
}
@Bean
public OmmConsumer creatOmmConsumer(){
   try{
      OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig();
      OmmConsumer consumer = EmaFactory.createOmmConsumer(config.host(hostName).username(userName));
      return consumer;
   }catch (Exception e){
      e.printStackTrace();
      OmmConsumer consumer = new OmmConsumer() {
         @Override
         public String consumerName() {
            return null;
         }

         @Override
         public long registerClient(ReqMsg reqMsg, OmmConsumerClient client) {
            return 0;
         }

         @Override
         public long registerClient(ReqMsg reqMsg, OmmConsumerClient client, Object closure) {
            return 0;
         }

         @Override
         public long registerClient(ReqMsg reqMsg, OmmConsumerClient client, Object closure, long parentHandle) {
            return 0;
         }

         @Override
         public long registerClient(TunnelStreamRequest tunnelStreamRequest, OmmConsumerClient client) {
            return 0;
         }

         @Override
         public long registerClient(TunnelStreamRequest tunnelStreamRequest, OmmConsumerClient client, Object closure) {
            return 0;
         }

         @Override
         public void reissue(ReqMsg reqMsg, long handle) {

         }

         @Override
         public void submit(GenericMsg genericMsg, long handle) {

         }

         @Override
         public void submit(PostMsg postMsg, long handle) {

         }

         @Override
         public long dispatch() {
            return 0;
         }

         @Override
         public long dispatch(long timeOut) {
            return 0;
         }

         @Override
         public void unregister(long handle) {

         }

         @Override
         public void uninitialize() {

         }

         @Override
         public void channelInformation(ChannelInformation ci) {

         }

         @Override
         public void modifyIOCtl(int code, int value) {

         }

         @Override
         public void renewOAuthCredentials(OAuth2CredentialRenewal credentials) {

         }
      }
   }
}

1669686967962.png


1669688584467.png


ema报错的代码如下标粗斜体;异常捕获如上图
protected long registerClient(ReqMsg reqMsg, T client, Object closure)
{
   if(checkClient(client))
      return 0;
   try
   {
      _userLock.lock();
      long handle = _itemCallbackClient != null ? _itemCallbackClient.registerClient(reqMsg, client, closure, 0) : 0;
      return handle;
   }
   finally
   {
      _userLock.unlock();
   }
}
ema-api#technology#product
1669686967962.png (414.1 KiB)
1669688584467.png (50.5 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

Upvotes
Accepted
55.7k 144 45 65

@lingjia.jiang

I found a deadlock in EMA 3.6.7.L2 but the call stack is different from yours.

I modified the ex100_MP_Streaming to register items in a loop.

    for(int i=0;i<=10000;i++) {
            
                reqMsg.clear();
                System.out.println("A.L"+String.valueOf(i));
                consumer.registerClient(reqMsg.serviceName("ELEKTRON_DD").name("A.L"+String.valueOf(i)), appClient);
            }

Then, I got this deadlock.

Found one Java-level deadlock:
=============================
"main":
  waiting for ownable synchronizer 0x00000006036bea58, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "pool-2-thread-1"
"pool-2-thread-1":
  waiting for ownable synchronizer 0x00000006036bf7c0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "main"


Java stack information for the threads listed above:
===================================================
"main":
        at jdk.internal.misc.Unsafe.park(java.base@11.0.11/Native Method)
        - parking to wait for  <0x00000006036bea58> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(java.base@11.0.11/LockSupport.java:194)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.11/AbstractQueuedSynchronizer.java:885)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(java.base@11.0.11/AbstractQueuedSynchronizer.java:917)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@11.0.11/AbstractQueuedSynchronizer.java:1240)
        at java.util.concurrent.locks.ReentrantLock.lock(java.base@11.0.11/ReentrantLock.java:267)
        at com.refinitiv.eta.valueadd.reactor.ReactorChannel.submit(ReactorChannel.java:767)
        at com.refinitiv.ema.access.SingleItem.rsslSubmit(ItemCallbackClient.java:3009)
        at com.refinitiv.ema.access.SingleItem.open(ItemCallbackClient.java:2869)
        at com.refinitiv.ema.access.ItemCallbackClient.registerClient(ItemCallbackClient.java:2192)
        at com.refinitiv.ema.access.OmmBaseImpl.registerClient(OmmBaseImpl.java:532)
        at com.refinitiv.ema.access.OmmConsumerImpl.registerClient(OmmConsumerImpl.java:255)
        at com.refinitiv.ema.examples.training.consumer.series100.ex100_MP_Streaming.Consumer.main(Consumer.java:65)
"pool-2-thread-1":
        at jdk.internal.misc.Unsafe.park(java.base@11.0.11/Native Method)
        - parking to wait for  <0x00000006036bf7c0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(java.base@11.0.11/LockSupport.java:194)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.11/AbstractQueuedSynchronizer.java:885)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(java.base@11.0.11/AbstractQueuedSynchronizer.java:917)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@11.0.11/AbstractQueuedSynchronizer.java:1240)
        at java.util.concurrent.locks.ReentrantLock.lock(java.base@11.0.11/ReentrantLock.java:267)
        at com.refinitiv.ema.access.ItemCallbackClient.removeFromMap(ItemCallbackClient.java:2477)
        at com.refinitiv.ema.access.SingleItem.remove(ItemCallbackClient.java:2932)
        at com.refinitiv.ema.access.ItemCallbackClient.processStatusMsg(ItemCallbackClient.java:1893)
        at com.refinitiv.ema.access.ItemCallbackClient.defaultMsgCallback(ItemCallbackClient.java:1630)
        at com.refinitiv.eta.valueadd.reactor.Reactor.sendDefaultMsgCallback(Reactor.java:2787)
        at com.refinitiv.eta.valueadd.reactor.Reactor.sendAndHandleDefaultMsgCallback(Reactor.java:2896)
        at com.refinitiv.eta.valueadd.reactor.WlItemHandler.callbackUser(WlItemHandler.java:3029)
        at com.refinitiv.eta.valueadd.reactor.WlItemHandler.readMsg(WlItemHandler.java:2002)
        at com.refinitiv.eta.valueadd.reactor.Watchlist.readMsg(Watchlist.java:302)
        at com.refinitiv.eta.valueadd.reactor.Reactor.processRwfMessage(Reactor.java:4662)
        at com.refinitiv.eta.valueadd.reactor.Reactor.performChannelRead(Reactor.java:5001)
        at com.refinitiv.eta.valueadd.reactor.Reactor.dispatchAll(Reactor.java:7674)
        at com.refinitiv.ema.access.OmmBaseImpl.rsslReactorDispatchLoop(OmmBaseImpl.java:1759)
        at com.refinitiv.ema.access.OmmBaseImpl.run(OmmBaseImpl.java:1889)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.11/ThreadPoolExecutor.java:1128)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.11/ThreadPoolExecutor.java:628)
        at java.lang.Thread.run(java.base@11.0.11/Thread.java:834)


Found 1 deadlock.

I used the jps command to list all Java processes, and then use the jstack -l <pid> command to print the call stack.

1669776650482.png

This issue has been reported on GitHub.

However, the problem occurred at com.refinitiv.eta.valueadd.reactor.ReactorChannel.submit, not com.refinitiv.ema.access.OmmBaseImpl.registerClient which is different from your call stack.

Please use the jstack -l<pid> command to generate the call stack when the deadlock information or modify the example in the package to replicate the issue.



1669776650482.png (58.2 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

有一个信息或许可以帮到你,这边出现问题后,路透那边老师说可能是由于数据不完整导致的,路透老师补全数据后就没发生过这个死锁问题,所以现在我也无法复现,但是风险还是存在的,故需这边排查api修复此问题

Upvotes
55.7k 144 45 65

@lingjia.jiang

Thanks for reaching out to us.

Typically, deadlock relates to two resources and two threads.

Please share the call stack of another thread that locks this resource and confirm the version of EMA that you are using.

1669696875955.png



1669696875955.png (162.1 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 512.0 KiB each and 1.0 MiB total.

首先这个问题是ema的jar包抛出来的,另一个线程我定位不到,每次请求,ema都会上锁,具体哪个我定位不到,ema的jar版本是3.6.7.0

@lingjia.jiang

It should have another thread that owns this lock, as shown in the previous discussion.

1669700757036.png


1669700757036.png (26.7 KiB)

我们都知道造成死锁的必要条件,由于我这边只是依赖ema的jar进行发送请求,我这边只做了循环发送请求数据的操作,ema拿到请求做了什么以及另外一个线程是什么,这个我回答不了,而且这个异常还是容器捕获到的;看了ema的源码,很多方法都没有抛异常的操作