Ema.jar死锁问题

在使用Java api对接路透系统,通过consumer.registerClient(xxx)发消息过程中,出现了死锁的问题,查看了ema的jar包源码,发现在ema对消息处理过程中对线程上了jdk的锁,没有释放该锁导致线程阻塞,从而导致容器崩溃,下面附上代码,还请帮忙解惑
try
{//args为ric和serviceName的映射关系
Iterable<Map.Entry<String,String>> it = args.entrySet().iterator;
while(it.hasNext()){
Map.Entry<String,String> entry = it.next();
List<String> list = Array.asList(entry.getValue().split(";"));
for(String ric : list){
consumer.registerClient( EmaFactory.createReqMsg().serviceName(entry.getKey()).name(ric), appClient, 0);
}
}
}
catch (OmmException excp)
{
System.out.println(excp.getMessage());
}
项目启动时创建appClient和ommConsumer
@Bean
public AppClient creatOmmconsumerClient(){
AppClient appClient = new AppClient();
return appClient;
}
@Bean
public OmmConsumer creatOmmConsumer(){
try{
OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig();
OmmConsumer consumer = EmaFactory.createOmmConsumer(config.host(hostName).username(userName));
return consumer;
}catch (Exception e){
e.printStackTrace();
OmmConsumer consumer = new OmmConsumer() {
@Override
public String consumerName() {
return null;
}
@Override
public long registerClient(ReqMsg reqMsg, OmmConsumerClient client) {
return 0;
}
@Override
public long registerClient(ReqMsg reqMsg, OmmConsumerClient client, Object closure) {
return 0;
}
@Override
public long registerClient(ReqMsg reqMsg, OmmConsumerClient client, Object closure, long parentHandle) {
return 0;
}
@Override
public long registerClient(TunnelStreamRequest tunnelStreamRequest, OmmConsumerClient client) {
return 0;
}
@Override
public long registerClient(TunnelStreamRequest tunnelStreamRequest, OmmConsumerClient client, Object closure) {
return 0;
}
@Override
public void reissue(ReqMsg reqMsg, long handle) {
}
@Override
public void submit(GenericMsg genericMsg, long handle) {
}
@Override
public void submit(PostMsg postMsg, long handle) {
}
@Override
public long dispatch() {
return 0;
}
@Override
public long dispatch(long timeOut) {
return 0;
}
@Override
public void unregister(long handle) {
}
@Override
public void uninitialize() {
}
@Override
public void channelInformation(ChannelInformation ci) {
}
@Override
public void modifyIOCtl(int code, int value) {
}
@Override
public void renewOAuthCredentials(OAuth2CredentialRenewal credentials) {
}
}
}
}
ema报错的代码如下标粗斜体;异常捕获如上图
protected long registerClient(ReqMsg reqMsg, T client, Object closure)
{
if(checkClient(client))
return 0;
try
{
_userLock.lock();
long handle = _itemCallbackClient != null ? _itemCallbackClient.registerClient(reqMsg, client, closure, 0) : 0;
return handle;
}
finally
{
_userLock.unlock();
}
}
Best Answer
-
I found a deadlock in EMA 3.6.7.L2 but the call stack is different from yours.
I modified the ex100_MP_Streaming to register items in a loop.
for(int i=0;i<=10000;i++) {
reqMsg.clear();
System.out.println("A.L"+String.valueOf(i));
consumer.registerClient(reqMsg.serviceName("ELEKTRON_DD").name("A.L"+String.valueOf(i)), appClient);
}Then, I got this deadlock.
Found one Java-level deadlock:
=============================
"main":
waiting for ownable synchronizer 0x00000006036bea58, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "pool-2-thread-1"
"pool-2-thread-1":
waiting for ownable synchronizer 0x00000006036bf7c0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "main"
Java stack information for the threads listed above:
===================================================
"main":
at jdk.internal.misc.Unsafe.park(java.base@11.0.11/Native Method)
- parking to wait for <0x00000006036bea58> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(java.base@11.0.11/LockSupport.java:194)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.11/AbstractQueuedSynchronizer.java:885)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(java.base@11.0.11/AbstractQueuedSynchronizer.java:917)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@11.0.11/AbstractQueuedSynchronizer.java:1240)
at java.util.concurrent.locks.ReentrantLock.lock(java.base@11.0.11/ReentrantLock.java:267)
at com.refinitiv.eta.valueadd.reactor.ReactorChannel.submit(ReactorChannel.java:767)
at com.refinitiv.ema.access.SingleItem.rsslSubmit(ItemCallbackClient.java:3009)
at com.refinitiv.ema.access.SingleItem.open(ItemCallbackClient.java:2869)
at com.refinitiv.ema.access.ItemCallbackClient.registerClient(ItemCallbackClient.java:2192)
at com.refinitiv.ema.access.OmmBaseImpl.registerClient(OmmBaseImpl.java:532)
at com.refinitiv.ema.access.OmmConsumerImpl.registerClient(OmmConsumerImpl.java:255)
at com.refinitiv.ema.examples.training.consumer.series100.ex100_MP_Streaming.Consumer.main(Consumer.java:65)
"pool-2-thread-1":
at jdk.internal.misc.Unsafe.park(java.base@11.0.11/Native Method)
- parking to wait for <0x00000006036bf7c0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(java.base@11.0.11/LockSupport.java:194)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.11/AbstractQueuedSynchronizer.java:885)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(java.base@11.0.11/AbstractQueuedSynchronizer.java:917)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@11.0.11/AbstractQueuedSynchronizer.java:1240)
at java.util.concurrent.locks.ReentrantLock.lock(java.base@11.0.11/ReentrantLock.java:267)
at com.refinitiv.ema.access.ItemCallbackClient.removeFromMap(ItemCallbackClient.java:2477)
at com.refinitiv.ema.access.SingleItem.remove(ItemCallbackClient.java:2932)
at com.refinitiv.ema.access.ItemCallbackClient.processStatusMsg(ItemCallbackClient.java:1893)
at com.refinitiv.ema.access.ItemCallbackClient.defaultMsgCallback(ItemCallbackClient.java:1630)
at com.refinitiv.eta.valueadd.reactor.Reactor.sendDefaultMsgCallback(Reactor.java:2787)
at com.refinitiv.eta.valueadd.reactor.Reactor.sendAndHandleDefaultMsgCallback(Reactor.java:2896)
at com.refinitiv.eta.valueadd.reactor.WlItemHandler.callbackUser(WlItemHandler.java:3029)
at com.refinitiv.eta.valueadd.reactor.WlItemHandler.readMsg(WlItemHandler.java:2002)
at com.refinitiv.eta.valueadd.reactor.Watchlist.readMsg(Watchlist.java:302)
at com.refinitiv.eta.valueadd.reactor.Reactor.processRwfMessage(Reactor.java:4662)
at com.refinitiv.eta.valueadd.reactor.Reactor.performChannelRead(Reactor.java:5001)
at com.refinitiv.eta.valueadd.reactor.Reactor.dispatchAll(Reactor.java:7674)
at com.refinitiv.ema.access.OmmBaseImpl.rsslReactorDispatchLoop(OmmBaseImpl.java:1759)
at com.refinitiv.ema.access.OmmBaseImpl.run(OmmBaseImpl.java:1889)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.11/ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.11/ThreadPoolExecutor.java:628)
at java.lang.Thread.run(java.base@11.0.11/Thread.java:834)
Found 1 deadlock.I used the jps command to list all Java processes, and then use the jstack -l <pid> command to print the call stack.
This issue has been reported on GitHub.
However, the problem occurred at com.refinitiv.eta.valueadd.reactor.ReactorChannel.submit, not com.refinitiv.ema.access.OmmBaseImpl.registerClient which is different from your call stack.
Please use the jstack -l<pid> command to generate the call stack when the deadlock information or modify the example in the package to replicate the issue.
0
Answers
-
Thanks for reaching out to us.
Typically, deadlock relates to two resources and two threads.
Please share the call stack of another thread that locks this resource and confirm the version of EMA that you are using.
0 -
首先这个问题是ema的jar包抛出来的,另一个线程我定位不到,每次请求,ema都会上锁,具体哪个我定位不到,ema的jar版本是3.6.7.0
0 -
0
-
我们都知道造成死锁的必要条件,由于我这边只是依赖ema的jar进行发送请求,我这边只做了循环发送请求数据的操作,ema拿到请求做了什么以及另外一个线程是什么,这个我回答不了,而且这个异常还是容器捕获到的;看了ema的源码,很多方法都没有抛异常的操作
0 -
我们都知道造成死锁的必要条件,由于我这边只是依赖ema的jar进行发送请求,我这边只做了循环发送请求数据的操作,ema拿到请求做了什么以及另外一个线程是什么,这个我回答不了,而且这个异常还是容器捕获到的;看了ema的源码,很多方法都没有抛异常的操作
0 -
有一个信息或许可以帮到你,这边出现问题后,路透那边老师说可能是由于数据不完整导致的,路透老师补全数据后就没发生过这个死锁问题,所以现在我也无法复现,但是风险还是存在的,故需这边排查api修复此问题
0
Categories
- All Categories
- 3 Polls
- 6 AHS
- 36 Alpha
- 166 App Studio
- 6 Block Chain
- 4 Bot Platform
- 18 Connected Risk APIs
- 47 Data Fusion
- 34 Data Model Discovery
- 687 Datastream
- 1.4K DSS
- 622 Eikon COM
- 5.2K Eikon Data APIs
- 10 Electronic Trading
- Generic FIX
- 7 Local Bank Node API
- 3 Trading API
- 2.9K Elektron
- 1.4K EMA
- 255 ETA
- 557 WebSocket API
- 38 FX Venues
- 14 FX Market Data
- 1 FX Post Trade
- 1 FX Trading - Matching
- 12 FX Trading – RFQ Maker
- 5 Intelligent Tagging
- 2 Legal One
- 23 Messenger Bot
- 3 Messenger Side by Side
- 9 ONESOURCE
- 7 Indirect Tax
- 60 Open Calais
- 276 Open PermID
- 44 Entity Search
- 2 Org ID
- 1 PAM
- PAM - Logging
- 6 Product Insight
- Project Tracking
- ProView
- ProView Internal
- 22 RDMS
- 1.9K Refinitiv Data Platform
- 680 Refinitiv Data Platform Libraries
- 4 LSEG Due Diligence
- LSEG Due Diligence Portal API
- 4 Refinitiv Due Dilligence Centre
- Rose's Space
- 1.2K Screening
- 18 Qual-ID API
- 13 Screening Deployed
- 23 Screening Online
- 12 World-Check Customer Risk Screener
- 1K World-Check One
- 46 World-Check One Zero Footprint
- 45 Side by Side Integration API
- 2 Test Space
- 3 Thomson One Smart
- 10 TR Knowledge Graph
- 151 Transactions
- 143 REDI API
- 1.8K TREP APIs
- 4 CAT
- 27 DACS Station
- 121 Open DACS
- 1.1K RFA
- 104 UPA
- 194 TREP Infrastructure
- 229 TRKD
- 918 TRTH
- 5 Velocity Analytics
- 9 Wealth Management Web Services
- 91 Workspace SDK
- 11 Element Framework
- 5 Grid
- 18 World-Check Data File
- 1 Yield Book Analytics
- 48 中文论坛