mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-13 13:35:47 +00:00
ARTEMIS-1280 Avoiding leak on Queue futures
This commit is contained in:
parent
311072b28b
commit
8326a24fd8
@ -83,7 +83,6 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||
import org.apache.activemq.artemis.utils.LinkedListIterator;
|
||||
import org.apache.activemq.artemis.utils.PriorityLinkedList;
|
||||
@ -200,12 +199,8 @@ public class QueueImpl implements Queue {
|
||||
|
||||
private Redistributor redistributor;
|
||||
|
||||
private final Set<ScheduledFuture<?>> futures = new ConcurrentHashSet<>();
|
||||
|
||||
private ScheduledFuture<?> redistributorFuture;
|
||||
|
||||
private ScheduledFuture<?> checkQueueSizeFuture;
|
||||
|
||||
// We cache the consumers here since we don't want to include the redistributor
|
||||
|
||||
private final AtomicInteger consumersCount = new AtomicInteger();
|
||||
@ -642,10 +637,6 @@ public class QueueImpl implements Queue {
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
if (checkQueueSizeFuture != null) {
|
||||
checkQueueSizeFuture.cancel(false);
|
||||
}
|
||||
|
||||
getExecutor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
@ -793,8 +784,6 @@ public class QueueImpl implements Queue {
|
||||
DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);
|
||||
|
||||
redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
|
||||
|
||||
futures.add(redistributorFuture);
|
||||
}
|
||||
} else {
|
||||
internalAddRedistributor(executor);
|
||||
@ -806,8 +795,6 @@ public class QueueImpl implements Queue {
|
||||
redistributorFuture = null;
|
||||
if (future != null) {
|
||||
future.cancel(false);
|
||||
|
||||
futures.remove(future);
|
||||
}
|
||||
}
|
||||
|
||||
@ -826,10 +813,6 @@ public class QueueImpl implements Queue {
|
||||
|
||||
@Override
|
||||
protected void finalize() throws Throwable {
|
||||
if (checkQueueSizeFuture != null) {
|
||||
checkQueueSizeFuture.cancel(false);
|
||||
}
|
||||
|
||||
cancelRedistributor();
|
||||
|
||||
super.finalize();
|
||||
|
Loading…
x
Reference in New Issue
Block a user