ARTEMIS-1111 Fixing deadlock
There is a deadlock on flow controlling the lock is using the wrong method and that is causing some issues under perf load.
This commit is contained in:
parent
ee261e736c
commit
bfc07a7e01
|
@ -472,22 +472,25 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
connection.lock();
|
||||
try {
|
||||
receiver.flow(credits);
|
||||
connection.flush();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
return;
|
||||
}
|
||||
final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address));
|
||||
store.checkMemory(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
if (receiver.getRemoteCredit() <= threshold) {
|
||||
receiver.flow(credits);
|
||||
connection.flush();
|
||||
}
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -24,7 +24,6 @@ import java.util.UUID;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
|
@ -129,16 +128,16 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
return false;
|
||||
}
|
||||
|
||||
public ReentrantLock getLock() {
|
||||
return handler.getLock();
|
||||
public boolean tryLock(long time, TimeUnit timeUnit) {
|
||||
return handler.tryLock(time, timeUnit);
|
||||
}
|
||||
|
||||
public void lock() {
|
||||
handler.getLock().lock();
|
||||
handler.lock();
|
||||
}
|
||||
|
||||
public void unlock() {
|
||||
handler.getLock().unlock();
|
||||
handler.unlock();
|
||||
}
|
||||
|
||||
public int capacity() {
|
||||
|
|
|
@ -630,7 +630,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
|
||||
int size = nettyBuffer.writerIndex();
|
||||
|
||||
while (!connection.getLock().tryLock(1, TimeUnit.SECONDS)) {
|
||||
while (!connection.tryLock(1, TimeUnit.SECONDS)) {
|
||||
if (closed || sender.getLocalState() == EndpointState.CLOSED) {
|
||||
// If we're waiting on the connection lock, the link might be in the process of closing. If this happens
|
||||
// we return.
|
||||
|
|
|
@ -114,8 +114,23 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
}
|
||||
}
|
||||
|
||||
public ReentrantLock getLock() {
|
||||
return lock;
|
||||
public void lock() {
|
||||
lock.lock();
|
||||
}
|
||||
|
||||
public void unlock() {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
public boolean tryLock(long time, TimeUnit timeUnit) {
|
||||
try {
|
||||
return lock.tryLock(time, timeUnit);
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Transport getTransport() {
|
||||
|
|
Loading…
Reference in New Issue