ARTEMIS-2434 Don't lock ServerConsumerImpl for long period of time
This commit is contained in:
parent
8963cd91d2
commit
64ba930f43
|
@ -27,6 +27,7 @@ import java.util.List;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
|
@ -729,58 +730,68 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
|
||||
@Override
|
||||
public void setStarted(final boolean started) {
|
||||
synchronized (lock) {
|
||||
boolean locked = lockDelivery();
|
||||
|
||||
lockDelivery(locked -> {
|
||||
// This is to make sure nothing would sneak to the client while started = false
|
||||
// the client will stop the session and perform a rollback in certain cases.
|
||||
// in case something sneaks to the client you could get to messaging delivering forever until
|
||||
// you restart the server
|
||||
try {
|
||||
this.started = browseOnly || started;
|
||||
} finally {
|
||||
if (locked) {
|
||||
lockDelivery.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.started = browseOnly || started;
|
||||
});
|
||||
// Outside the lock
|
||||
if (started) {
|
||||
promptDelivery();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean lockDelivery() {
|
||||
try {
|
||||
if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
|
||||
ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
|
||||
if (server != null) {
|
||||
server.threadDump();
|
||||
private static final long LOCK_DELIVERY_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
|
||||
private static final long TRY_LOCK_NS = TimeUnit.MILLISECONDS.toNanos(100);
|
||||
|
||||
private boolean lockDelivery(java.util.function.Consumer<Boolean> task) {
|
||||
final long startWait = System.nanoTime();
|
||||
long now;
|
||||
while (((now = System.nanoTime()) - startWait) < LOCK_DELIVERY_TIMEOUT_NS) {
|
||||
try {
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
throw new InterruptedException();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
|
||||
synchronized (lock) {
|
||||
task.accept(false);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
|
||||
return false;
|
||||
synchronized (lock) {
|
||||
if (lockDelivery.writeLock().tryLock()) {
|
||||
try {
|
||||
task.accept(true);
|
||||
} finally {
|
||||
lockDelivery.writeLock().unlock();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
//entering the lock can take some time: discount that time from the
|
||||
//time before attempting to lock delivery
|
||||
final long timeToLock = System.nanoTime() - now;
|
||||
if (timeToLock < TRY_LOCK_NS) {
|
||||
final long timeToWait = TRY_LOCK_NS - timeToLock;
|
||||
LockSupport.parkNanos(timeToWait);
|
||||
}
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
|
||||
if (server != null) {
|
||||
server.threadDump();
|
||||
}
|
||||
synchronized (lock) {
|
||||
task.accept(false);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTransferring(final boolean transferring) {
|
||||
synchronized (lock) {
|
||||
// This is to make sure that the delivery process has finished any pending delivery
|
||||
// otherwise a message may sneak in on the client while we are trying to stop the consumer
|
||||
boolean locked = lockDelivery();
|
||||
try {
|
||||
this.transferring = transferring;
|
||||
} finally {
|
||||
if (locked) {
|
||||
lockDelivery.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
lockDelivery(locked -> this.transferring = transferring);
|
||||
|
||||
// Outside the lock
|
||||
if (transferring) {
|
||||
|
|
Loading…
Reference in New Issue