Ema.jar死锁问题

在使用Java api对接路透系统,通过consumer.registerClient(xxx)发消息过程中,出现了死锁的问题,查看了ema的jar包源码,发现在ema对消息处理过程中对线程上了jdk的锁,没有释放该锁导致线程阻塞,从而导致容器崩溃,下面附上代码,还请帮忙解惑

try
{//args为ric和serviceName的映射关系
Iterable<Map.Entry<String,String>> it = args.entrySet().iterator;
while(it.hasNext()){
Map.Entry<String,String> entry = it.next();
List<String> list = Array.asList(entry.getValue().split(";"));
for(String ric : list){
consumer.registerClient( EmaFactory.createReqMsg().serviceName(entry.getKey()).name(ric), appClient, 0);
}
}
}
catch (OmmException excp)
{
System.out.println(excp.getMessage());
}
项目启动时创建appClient和ommConsumer
@Bean
public AppClient creatOmmconsumerClient(){
AppClient appClient = new AppClient();
return appClient;
}
@Bean
public OmmConsumer creatOmmConsumer(){
try{
OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig();
OmmConsumer consumer = EmaFactory.createOmmConsumer(config.host(hostName).username(userName));
return consumer;
}catch (Exception e){
e.printStackTrace();
OmmConsumer consumer = new OmmConsumer() {
@Override
public String consumerName() {
return null;
}

@Override
public long registerClient(ReqMsg reqMsg, OmmConsumerClient client) {
return 0;
}

@Override
public long registerClient(ReqMsg reqMsg, OmmConsumerClient client, Object closure) {
return 0;
}

@Override
public long registerClient(ReqMsg reqMsg, OmmConsumerClient client, Object closure, long parentHandle) {
return 0;
}

@Override
public long registerClient(TunnelStreamRequest tunnelStreamRequest, OmmConsumerClient client) {
return 0;
}

@Override
public long registerClient(TunnelStreamRequest tunnelStreamRequest, OmmConsumerClient client, Object closure) {
return 0;
}

@Override
public void reissue(ReqMsg reqMsg, long handle) {

}

@Override
public void submit(GenericMsg genericMsg, long handle) {

}

@Override
public void submit(PostMsg postMsg, long handle) {

}

@Override
public long dispatch() {
return 0;
}

@Override
public long dispatch(long timeOut) {
return 0;
}

@Override
public void unregister(long handle) {

}

@Override
public void uninitialize() {

}

@Override
public void channelInformation(ChannelInformation ci) {

}

@Override
public void modifyIOCtl(int code, int value) {

}

@Override
public void renewOAuthCredentials(OAuth2CredentialRenewal credentials) {

}
}
}
}

1669686967962.png


1669688584467.png


ema报错的代码如下标粗斜体;异常捕获如上图
protected long registerClient(ReqMsg reqMsg, T client, Object closure)
{
if(checkClient(client))
return 0;
try
{
_userLock.lock();
long handle = _itemCallbackClient != null ? _itemCallbackClient.registerClient(reqMsg, client, closure, 0) : 0;
return handle;
}
finally
{
_userLock.unlock();
}
}

Best Answer

  • Jirapongse
    Jirapongse ✭✭✭✭✭
    Answer ✓

    @lingjia.jiang

    I found a deadlock in EMA 3.6.7.L2 but the call stack is different from yours.

    I modified the ex100_MP_Streaming to register items in a loop.

        for(int i=0;i<=10000;i++) {
                
                    reqMsg.clear();
                    System.out.println("A.L"+String.valueOf(i));
                    consumer.registerClient(reqMsg.serviceName("ELEKTRON_DD").name("A.L"+String.valueOf(i)), appClient);
                }

    Then, I got this deadlock.

    Found one Java-level deadlock:
    =============================
    "main":
      waiting for ownable synchronizer 0x00000006036bea58, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
      which is held by "pool-2-thread-1"
    "pool-2-thread-1":
      waiting for ownable synchronizer 0x00000006036bf7c0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
      which is held by "main"


    Java stack information for the threads listed above:
    ===================================================
    "main":
            at jdk.internal.misc.Unsafe.park(java.base@11.0.11/Native Method)
            - parking to wait for  <0x00000006036bea58> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
            at java.util.concurrent.locks.LockSupport.park(java.base@11.0.11/LockSupport.java:194)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.11/AbstractQueuedSynchronizer.java:885)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(java.base@11.0.11/AbstractQueuedSynchronizer.java:917)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@11.0.11/AbstractQueuedSynchronizer.java:1240)
            at java.util.concurrent.locks.ReentrantLock.lock(java.base@11.0.11/ReentrantLock.java:267)
            at com.refinitiv.eta.valueadd.reactor.ReactorChannel.submit(ReactorChannel.java:767)
            at com.refinitiv.ema.access.SingleItem.rsslSubmit(ItemCallbackClient.java:3009)
            at com.refinitiv.ema.access.SingleItem.open(ItemCallbackClient.java:2869)
            at com.refinitiv.ema.access.ItemCallbackClient.registerClient(ItemCallbackClient.java:2192)
            at com.refinitiv.ema.access.OmmBaseImpl.registerClient(OmmBaseImpl.java:532)
            at com.refinitiv.ema.access.OmmConsumerImpl.registerClient(OmmConsumerImpl.java:255)
            at com.refinitiv.ema.examples.training.consumer.series100.ex100_MP_Streaming.Consumer.main(Consumer.java:65)
    "pool-2-thread-1":
            at jdk.internal.misc.Unsafe.park(java.base@11.0.11/Native Method)
            - parking to wait for  <0x00000006036bf7c0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
            at java.util.concurrent.locks.LockSupport.park(java.base@11.0.11/LockSupport.java:194)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.11/AbstractQueuedSynchronizer.java:885)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(java.base@11.0.11/AbstractQueuedSynchronizer.java:917)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@11.0.11/AbstractQueuedSynchronizer.java:1240)
            at java.util.concurrent.locks.ReentrantLock.lock(java.base@11.0.11/ReentrantLock.java:267)
            at com.refinitiv.ema.access.ItemCallbackClient.removeFromMap(ItemCallbackClient.java:2477)
            at com.refinitiv.ema.access.SingleItem.remove(ItemCallbackClient.java:2932)
            at com.refinitiv.ema.access.ItemCallbackClient.processStatusMsg(ItemCallbackClient.java:1893)
            at com.refinitiv.ema.access.ItemCallbackClient.defaultMsgCallback(ItemCallbackClient.java:1630)
            at com.refinitiv.eta.valueadd.reactor.Reactor.sendDefaultMsgCallback(Reactor.java:2787)
            at com.refinitiv.eta.valueadd.reactor.Reactor.sendAndHandleDefaultMsgCallback(Reactor.java:2896)
            at com.refinitiv.eta.valueadd.reactor.WlItemHandler.callbackUser(WlItemHandler.java:3029)
            at com.refinitiv.eta.valueadd.reactor.WlItemHandler.readMsg(WlItemHandler.java:2002)
            at com.refinitiv.eta.valueadd.reactor.Watchlist.readMsg(Watchlist.java:302)
            at com.refinitiv.eta.valueadd.reactor.Reactor.processRwfMessage(Reactor.java:4662)
            at com.refinitiv.eta.valueadd.reactor.Reactor.performChannelRead(Reactor.java:5001)
            at com.refinitiv.eta.valueadd.reactor.Reactor.dispatchAll(Reactor.java:7674)
            at com.refinitiv.ema.access.OmmBaseImpl.rsslReactorDispatchLoop(OmmBaseImpl.java:1759)
            at com.refinitiv.ema.access.OmmBaseImpl.run(OmmBaseImpl.java:1889)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.11/ThreadPoolExecutor.java:1128)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.11/ThreadPoolExecutor.java:628)
            at java.lang.Thread.run(java.base@11.0.11/Thread.java:834)


    Found 1 deadlock.

    I used the jps command to list all Java processes, and then use the jstack -l <pid> command to print the call stack.

    1669776650482.png

    This issue has been reported on GitHub.

    However, the problem occurred at com.refinitiv.eta.valueadd.reactor.ReactorChannel.submit, not com.refinitiv.ema.access.OmmBaseImpl.registerClient which is different from your call stack.

    Please use the jstack -l<pid> command to generate the call stack when the deadlock information or modify the example in the package to replicate the issue.


Answers