diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java index f807eb196c..9091a4eed3 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java @@ -21,7 +21,8 @@ import java.util.Iterator; public interface ResettableIterator extends Iterator { /** - * Resets the iterator so you can re-iterate over all elements. + * Resets the iterator so that you can iterate over all elements from your current position. + * Your current position (when reached again) signals the end of iteration as if the collection is circular. */ void reset(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index aa43170371..8bf2b31629 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -178,14 +178,12 @@ public interface Queue extends Bindable,CriticalComponent { } /** - * This will set a reference counter for every consumer present on the queue. + * This will hold a reference counter for every consumer present on the queue. * The ReferenceCounter will know what to do when the counter became zeroed. * This is used to control what to do with temporary queues, especially * on shared subscriptions where the queue needs to be deleted when all the * consumers are closed. */ - void setConsumersRefCount(ReferenceCounter referenceCounter); - ReferenceCounter getConsumersRefCount(); /* Called when a message is cancelled back into the queue */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index a66186dca0..8039a58a25 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -3805,12 +3805,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { final Queue queue = queueFactory.createQueueWith(queueConfiguration, pagingManager); - if (queueConfiguration.isTransient()) { - queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName())); - } else { - queue.setConsumersRefCount(new QueueManagerImpl(this, queue.getName())); - } - final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); long txID = 0; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 7701bbb788..f679188300 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -51,7 +51,6 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; -import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.NodeManager; @@ -162,7 +161,6 @@ public class PostOfficeJournalLoader implements JournalLoader { .setRingSize(queueBindingInfo.getRingSize()), pagingManager); - queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName())); if (queueBindingInfo.getQueueStatusEncodings() != null) { for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 6909cf6b50..c0ed1bd911 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -179,7 +179,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { protected final PageSubscription pageSubscription; - private ReferenceCounter refCountForConsumers; + private final ReferenceCounter refCountForConsumers; private final PageIterator pageIterator; @@ -218,17 +218,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { protected final ScheduledDeliveryHandler scheduledDeliveryHandler; - private AtomicLong messagesAdded = new AtomicLong(0); + private final AtomicLong messagesAdded = new AtomicLong(0); - private AtomicLong messagesAcknowledged = new AtomicLong(0); + private final AtomicLong messagesAcknowledged = new AtomicLong(0); - private AtomicLong ackAttempts = new AtomicLong(0); + private final AtomicLong ackAttempts = new AtomicLong(0); - private AtomicLong messagesExpired = new AtomicLong(0); + private final AtomicLong messagesExpired = new AtomicLong(0); - private AtomicLong messagesKilled = new AtomicLong(0); + private final AtomicLong messagesKilled = new AtomicLong(0); - private AtomicLong messagesReplaced = new AtomicLong(0); + private final AtomicLong messagesReplaced = new AtomicLong(0); private boolean paused; @@ -261,8 +261,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final SimpleString address; - // redistributor goes in the consumers list, this signals its presence and allows for easy comparison/check - private volatile ConsumerHolder redistributor; + // redistributor singleton goes in the consumers list + private ConsumerHolder redistributor; private ScheduledFuture redistributorFuture; @@ -634,6 +634,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { this.id = queueConfiguration.getId(); this.address = queueConfiguration.getAddress(); + this.refCountForConsumers = queueConfiguration.isTransient() ? new TransientQueueManagerImpl(server, queueConfiguration.getName()) : new QueueManagerImpl(server, queueConfiguration.getName()); this.addressInfo = postOffice == null ? null : postOffice.getAddressInfo(address); @@ -861,13 +862,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } // Queue implementation ---------------------------------------------------------------------------------------- - @Override - public synchronized void setConsumersRefCount(final ReferenceCounter referenceCounter) { - if (refCountForConsumers == null) { - this.refCountForConsumers = referenceCounter; - } - } - @Override public ReferenceCounter getConsumersRefCount() { return refCountForConsumers; @@ -1442,13 +1436,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (delayBeforeDispatch >= 0) { dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis()); } - - } - - if (refCountForConsumers != null) { refCountForConsumers.increment(); } - } } } @@ -1485,7 +1474,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (consumerRemoved) { consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis()); - if (getConsumerCount() == 0) { + if (refCountForConsumers.decrement() == 0) { stopDispatch(); } } @@ -1496,11 +1485,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { groups.removeIf(consumer::equals); - - if (refCountForConsumers != null) { - refCountForConsumers.decrement(); - } - } } } @@ -1557,7 +1541,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public synchronized void cancelRedistributor() { clearRedistributorFuture(); - + hasUnMatchedPending = false; if (redistributor != null) { try { redistributor.consumer.stop(); @@ -1572,18 +1556,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public int getConsumerCount() { - // we don't want to count the redistributor, it is an internal transient entry in the consumer list - if (redistributor != null) { - synchronized (this) { - final int size = consumers.size(); - if (size > 0 && redistributor != null) { - return size - 1; - } else { - return size; - } - } - } - return consumers.size(); + return refCountForConsumers.getCount(); } @Override @@ -3014,7 +2987,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { synchronized (this) { // Need to do these checks inside the synchronized - if (isPaused() || !canDispatch() && redistributor == null) { + if (isPaused() || !canDispatch()) { return false; } @@ -3082,9 +3055,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { numNoMatch = 0; numAttempts = 0; - if (consumer != redistributor) { - ref = handleMessageGroup(ref, consumer, groupConsumer, groupID); - } + ref = handleMessageGroup(ref, consumer, groupConsumer, groupID); deliveriesInTransit.countUp(); @@ -3118,7 +3089,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { consumers.reset(); numNoMatch++; // every attempt resulted in noMatch for number of consumers means we tried all consumers for a single message - if (numNoMatch == numAttempts && numAttempts == consumers.size()) { + if (numNoMatch == numAttempts && numAttempts == consumers.size() && redistributor == null) { hasUnMatchedPending = true; // one hit of unmatched message is enough, no need to reset counters } @@ -3753,7 +3724,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (!supportsDirectDeliver) { return false; } - if (isPaused() || !canDispatch() && redistributor == null) { + if (isPaused() || !canDispatch()) { return false; } @@ -3777,12 +3748,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { - final MessageReference reference; - if (consumer != redistributor) { - reference = handleMessageGroup(ref, consumer, groupConsumer, groupID); - } else { - reference = ref; - } + final MessageReference reference = handleMessageGroup(ref, consumer, groupConsumer, groupID); incrementMesssagesAdded(); @@ -3793,7 +3759,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return true; } - if (redistributor != null || groupConsumer != null) { + if (groupConsumer != null) { break; } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java index 1055af74eb..e599b883f9 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java @@ -39,6 +39,9 @@ public class QueueConsumersImplTest { assertFalse(queueConsumers.hasNext()); queueConsumers.add(testPriority); + // not visible till reset + assertFalse(queueConsumers.hasNext()); + queueConsumers.reset(); assertTrue(queueConsumers.hasNext()); @@ -109,6 +112,31 @@ public class QueueConsumersImplTest { } + @Test + public void roundRobinEqualPriorityResetTest() { + queueConsumers.add(new TestPriority("A", 0)); + queueConsumers.add(new TestPriority("B", 0)); + queueConsumers.add(new TestPriority("C", 0)); + queueConsumers.reset(); + assertTrue(queueConsumers.hasNext()); + + assertEquals("A", queueConsumers.next().getName()); + + //Reset iterator should mark start as current position + queueConsumers.reset(); + assertTrue(queueConsumers.hasNext()); + assertEquals("B", queueConsumers.next().getName()); + + assertTrue(queueConsumers.hasNext()); + assertEquals("C", queueConsumers.next().getName()); + + //Expect another A as after reset, we started at B so after A, we then expect the next level + assertTrue(queueConsumers.hasNext()); + assertEquals("A", queueConsumers.next().getName()); + + //We have iterated all. + assertFalse(queueConsumers.hasNext()); + } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueImplTest.java index b39c79a8a5..cac1039df1 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueImplTest.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.junit.Assert; import org.junit.Test; @@ -50,6 +51,8 @@ public class QueueImplTest { PageSubscription pageSubscription = Mockito.mock(PageSubscription.class); ExecutorService executorService = Executors.newSingleThreadExecutor(); StorageManager storageManager = Mockito.mock(StorageManager.class); + ActiveMQServer server = Mockito.mock(ActiveMQServer.class); + ExecutorFactory executorFactory = Mockito.mock(ExecutorFactory.class); final int flushLimit = 100; final int pagedReferences = 5 * flushLimit; @@ -76,10 +79,13 @@ public class QueueImplTest { return null; }).when(storageManager).afterCompleteOperations(Mockito.any(IOCallback.class)); + // Mock server + Mockito.doReturn(executorFactory).when(server).getExecutorFactory(); + QueueImpl queue = new QueueImpl(0, address, address, null, null, pageSubscription, null, false, false, false, Mockito.mock(ScheduledExecutorService.class), Mockito.mock(PostOffice.class), storageManager, null, - Mockito.mock(ArtemisExecutor.class), Mockito.mock(ActiveMQServer.class), + Mockito.mock(ArtemisExecutor.class), server, Mockito.mock(QueueFactory.class)); Mockito.doReturn(queue).when(pageSubscription).getQueue(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index a9e506f095..90b14e704e 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1129,11 +1129,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { return 0; } - @Override - public void setConsumersRefCount(ReferenceCounter referenceCounter) { - - } - @Override public ReferenceCounter getConsumersRefCount() { return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index c3fca7784a..794761c184 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -351,7 +351,7 @@ public class HangConsumerTest extends ActiveMQTestBase { // Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, - false, null, null, null, null, null, null, null), + false, null, null, null, null, null, server, null), server.getNodeID()); server.getStorageManager().addQueueBinding(txID, newBinding); server.getStorageManager().commitBindings(txID); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index fb2fdc572a..adb79f8857 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -267,11 +267,6 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { return 0; } - @Override - public void setConsumersRefCount(ReferenceCounter referenceCounter) { - - } - @Override public void setInternalQueue(boolean internalQueue) { // no-op diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java index 0493c8bfef..ae040686ff 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java @@ -71,12 +71,16 @@ public class QueueImplTest extends ActiveMQTestBase { private ExecutorService executor; + private ActiveMQServer defaultServer; + @Override @Before public void setUp() throws Exception { super.setUp(); scheduledExecutor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory()); executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); + defaultServer = createServer(createDefaultConfig(1, false)); + defaultServer.start(); } @Override @@ -1016,6 +1020,19 @@ public class QueueImplTest extends ActiveMQTestBase { Assert.assertEquals(0, consumer2.getReferences().size()); Assert.assertEquals(0, consumer3.getReferences().size()); + // verify redistributor not yet needed, only consumer3 gets to + // peek at pending + // should not attempt to add (and throw) due to unmatched not being set + queue.addRedistributor(0); + + // on new message dispatch, need for redistributor will kick in + MessageReference ref = generateReference(queue, numMessages); + ref.getMessage().putStringProperty("color", "red"); + refs.add(ref); + + queue.addTail(ref); + queue.deliverNow(); + // verify redistributor is doing some work.... try { // should attempt to add due to unmatched @@ -1024,7 +1041,7 @@ public class QueueImplTest extends ActiveMQTestBase { } catch (NullPointerException expected) { } - Assert.assertEquals(numMessages, getMessageCount(queue)); + Assert.assertEquals(numMessages + 1, getMessageCount(queue)); } @Test @@ -1450,7 +1467,7 @@ public class QueueImplTest extends ActiveMQTestBase { final QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, - ArtemisExecutor.delegate(executor), null, null); + ArtemisExecutor.delegate(executor), defaultServer, null); queue.addConsumer(groupConsumer); queue.addConsumer(noConsumer); final MessageReference firstMessageReference = generateReference(queue, 1); @@ -1490,6 +1507,6 @@ public class QueueImplTest extends ActiveMQTestBase { private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter) { return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor, - new FakePostOffice(), null, null, ArtemisExecutor.delegate(executor), null, null); + new FakePostOffice(), null, null, ArtemisExecutor.delegate(executor), defaultServer, null); } }