ARTEMIS-1280 Avoiding leak on Queue futures

This commit is contained in:
Clebert Suconic 2017-07-11 10:55:04 -04:00
parent eab0cb587c
commit 82f071ff46
2 changed files with 2 additions and 15 deletions

View File

@ -88,7 +88,6 @@ import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList; import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl; import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
@ -206,8 +205,6 @@ public class QueueImpl implements Queue {
private Redistributor redistributor; private Redistributor redistributor;
private final Set<ScheduledFuture<?>> futures = new ConcurrentHashSet<>();
private ScheduledFuture<?> redistributorFuture; private ScheduledFuture<?> redistributorFuture;
private ScheduledFuture<?> checkQueueSizeFuture; private ScheduledFuture<?> checkQueueSizeFuture;
@ -887,8 +884,6 @@ public class QueueImpl implements Queue {
DelayedAddRedistributor dar = new DelayedAddRedistributor(executor); DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);
redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS); redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
futures.add(redistributorFuture);
} }
} else { } else {
internalAddRedistributor(executor); internalAddRedistributor(executor);
@ -900,8 +895,6 @@ public class QueueImpl implements Queue {
redistributorFuture = null; redistributorFuture = null;
if (future != null) { if (future != null) {
future.cancel(false); future.cancel(false);
futures.remove(future);
} }
} }

View File

@ -135,10 +135,7 @@ public class QueueImplTest extends ActiveMQTestBase {
@Test @Test
public void testScheduled() throws Exception { public void testScheduled() throws Exception {
QueueImpl queue = QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true,
false, scheduledExecutor, null, null, null,
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
FakeConsumer consumer = null; FakeConsumer consumer = null;
@ -236,10 +233,7 @@ public class QueueImplTest extends ActiveMQTestBase {
public void disconnect() { public void disconnect() {
} }
}; };
QueueImpl queue = QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false,
scheduledExecutor, null, null, null,
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference = generateReference(queue, 1);
queue.addConsumer(consumer); queue.addConsumer(consumer);
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000); messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);