This commit is contained in:
Clebert Suconic 2019-07-26 13:00:57 -04:00
commit 549e167a5a
1 changed files with 45 additions and 34 deletions

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -729,58 +730,68 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override @Override
public void setStarted(final boolean started) { public void setStarted(final boolean started) {
synchronized (lock) { lockDelivery(locked -> {
boolean locked = lockDelivery();
// This is to make sure nothing would sneak to the client while started = false // This is to make sure nothing would sneak to the client while started = false
// the client will stop the session and perform a rollback in certain cases. // the client will stop the session and perform a rollback in certain cases.
// in case something sneaks to the client you could get to messaging delivering forever until // in case something sneaks to the client you could get to messaging delivering forever until
// you restart the server // you restart the server
try { this.started = browseOnly || started;
this.started = browseOnly || started; });
} finally {
if (locked) {
lockDelivery.writeLock().unlock();
}
}
}
// Outside the lock // Outside the lock
if (started) { if (started) {
promptDelivery(); promptDelivery();
} }
} }
private boolean lockDelivery() { private static final long LOCK_DELIVERY_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
try { private static final long TRY_LOCK_NS = TimeUnit.MILLISECONDS.toNanos(100);
if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
ActiveMQServerLogger.LOGGER.timeoutLockingConsumer(); private boolean lockDelivery(java.util.function.Consumer<Boolean> task) {
if (server != null) { final long startWait = System.nanoTime();
server.threadDump(); long now;
while (((now = System.nanoTime()) - startWait) < LOCK_DELIVERY_TIMEOUT_NS) {
try {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
synchronized (lock) {
task.accept(false);
} }
return false; return false;
} }
return true; synchronized (lock) {
} catch (Exception e) { if (lockDelivery.writeLock().tryLock()) {
ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e); try {
return false; task.accept(true);
} finally {
lockDelivery.writeLock().unlock();
}
return true;
}
}
//entering the lock can take some time: discount that time from the
//time before attempting to lock delivery
final long timeToLock = System.nanoTime() - now;
if (timeToLock < TRY_LOCK_NS) {
final long timeToWait = TRY_LOCK_NS - timeToLock;
LockSupport.parkNanos(timeToWait);
}
} }
ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
if (server != null) {
server.threadDump();
}
synchronized (lock) {
task.accept(false);
}
return false;
} }
@Override @Override
public void setTransferring(final boolean transferring) { public void setTransferring(final boolean transferring) {
synchronized (lock) { lockDelivery(locked -> this.transferring = transferring);
// This is to make sure that the delivery process has finished any pending delivery
// otherwise a message may sneak in on the client while we are trying to stop the consumer
boolean locked = lockDelivery();
try {
this.transferring = transferring;
} finally {
if (locked) {
lockDelivery.writeLock().unlock();
}
}
}
// Outside the lock // Outside the lock
if (transferring) { if (transferring) {