This closes #851

This commit is contained in:
Clebert Suconic 2016-10-19 15:35:12 -04:00
commit ad602780c7
1 changed files with 13 additions and 11 deletions

View File

@ -768,11 +768,7 @@ public class QueueImpl implements Queue {
@Override
public synchronized void addRedistributor(final long delay) {
if (redistributorFuture != null) {
redistributorFuture.cancel(false);
futures.remove(redistributorFuture);
}
clearRedistributorFuture();
if (redistributor != null) {
// Just prompt delivery
@ -792,6 +788,16 @@ public class QueueImpl implements Queue {
}
}
private void clearRedistributorFuture() {
ScheduledFuture<?> future = redistributorFuture;
redistributorFuture = null;
if (future != null) {
future.cancel(false);
futures.remove(future);
}
}
@Override
public synchronized void cancelRedistributor() throws Exception {
if (redistributor != null) {
@ -802,11 +808,7 @@ public class QueueImpl implements Queue {
removeConsumer(redistributorToRemove);
}
if (redistributorFuture != null) {
redistributorFuture.cancel(false);
redistributorFuture = null;
}
clearRedistributorFuture();
}
@Override
@ -2709,7 +2711,7 @@ public class QueueImpl implements Queue {
synchronized (QueueImpl.this) {
internalAddRedistributor(executor1);
futures.remove(this);
clearRedistributorFuture();
}
}
}