ARTEMIS-2007 - refactor to make use of existing refCountForConsumers for tracking consumer count and remove need for volatile redistributor

This commit is contained in:
gtully 2021-09-13 11:48:55 +01:00 committed by Gary Tully
parent 28a10450b7
commit 72cfda6b1a
11 changed files with 78 additions and 80 deletions

View File

@ -21,7 +21,8 @@ import java.util.Iterator;
public interface ResettableIterator<E> extends Iterator<E> {
/**
* Resets the iterator so you can re-iterate over all elements.
* Resets the iterator so that you can iterate over all elements from your current position.
* Your current position (when reached again) signals the end of iteration as if the collection is circular.
*/
void reset();
}

View File

@ -178,14 +178,12 @@ public interface Queue extends Bindable,CriticalComponent {
}
/**
* This will set a reference counter for every consumer present on the queue.
* This will hold a reference counter for every consumer present on the queue.
* The ReferenceCounter will know what to do when the counter became zeroed.
* This is used to control what to do with temporary queues, especially
* on shared subscriptions where the queue needs to be deleted when all the
* consumers are closed.
*/
void setConsumersRefCount(ReferenceCounter referenceCounter);
ReferenceCounter getConsumersRefCount();
/* Called when a message is cancelled back into the queue */

View File

@ -3805,12 +3805,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final Queue queue = queueFactory.createQueueWith(queueConfiguration, pagingManager);
if (queueConfiguration.isTransient()) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
} else {
queue.setConsumersRefCount(new QueueManagerImpl(this, queue.getName()));
}
final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
long txID = 0;

View File

@ -51,7 +51,6 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
@ -162,7 +161,6 @@ public class PostOfficeJournalLoader implements JournalLoader {
.setRingSize(queueBindingInfo.getRingSize()),
pagingManager);
queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
if (queueBindingInfo.getQueueStatusEncodings() != null) {
for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) {

View File

@ -179,7 +179,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
protected final PageSubscription pageSubscription;
private ReferenceCounter refCountForConsumers;
private final ReferenceCounter refCountForConsumers;
private final PageIterator pageIterator;
@ -218,17 +218,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
protected final ScheduledDeliveryHandler scheduledDeliveryHandler;
private AtomicLong messagesAdded = new AtomicLong(0);
private final AtomicLong messagesAdded = new AtomicLong(0);
private AtomicLong messagesAcknowledged = new AtomicLong(0);
private final AtomicLong messagesAcknowledged = new AtomicLong(0);
private AtomicLong ackAttempts = new AtomicLong(0);
private final AtomicLong ackAttempts = new AtomicLong(0);
private AtomicLong messagesExpired = new AtomicLong(0);
private final AtomicLong messagesExpired = new AtomicLong(0);
private AtomicLong messagesKilled = new AtomicLong(0);
private final AtomicLong messagesKilled = new AtomicLong(0);
private AtomicLong messagesReplaced = new AtomicLong(0);
private final AtomicLong messagesReplaced = new AtomicLong(0);
private boolean paused;
@ -261,8 +261,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final SimpleString address;
// redistributor goes in the consumers list, this signals its presence and allows for easy comparison/check
private volatile ConsumerHolder<Redistributor> redistributor;
// redistributor singleton goes in the consumers list
private ConsumerHolder<Redistributor> redistributor;
private ScheduledFuture<?> redistributorFuture;
@ -634,6 +634,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.id = queueConfiguration.getId();
this.address = queueConfiguration.getAddress();
this.refCountForConsumers = queueConfiguration.isTransient() ? new TransientQueueManagerImpl(server, queueConfiguration.getName()) : new QueueManagerImpl(server, queueConfiguration.getName());
this.addressInfo = postOffice == null ? null : postOffice.getAddressInfo(address);
@ -861,13 +862,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
// Queue implementation ----------------------------------------------------------------------------------------
@Override
public synchronized void setConsumersRefCount(final ReferenceCounter referenceCounter) {
if (refCountForConsumers == null) {
this.refCountForConsumers = referenceCounter;
}
}
@Override
public ReferenceCounter getConsumersRefCount() {
return refCountForConsumers;
@ -1442,13 +1436,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (delayBeforeDispatch >= 0) {
dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis());
}
}
if (refCountForConsumers != null) {
refCountForConsumers.increment();
}
}
}
}
@ -1485,7 +1474,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (consumerRemoved) {
consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis());
if (getConsumerCount() == 0) {
if (refCountForConsumers.decrement() == 0) {
stopDispatch();
}
}
@ -1496,11 +1485,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
groups.removeIf(consumer::equals);
if (refCountForConsumers != null) {
refCountForConsumers.decrement();
}
}
}
}
@ -1557,7 +1541,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public synchronized void cancelRedistributor() {
clearRedistributorFuture();
hasUnMatchedPending = false;
if (redistributor != null) {
try {
redistributor.consumer.stop();
@ -1572,18 +1556,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public int getConsumerCount() {
// we don't want to count the redistributor, it is an internal transient entry in the consumer list
if (redistributor != null) {
synchronized (this) {
final int size = consumers.size();
if (size > 0 && redistributor != null) {
return size - 1;
} else {
return size;
}
}
}
return consumers.size();
return refCountForConsumers.getCount();
}
@Override
@ -3014,7 +2987,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
synchronized (this) {
// Need to do these checks inside the synchronized
if (isPaused() || !canDispatch() && redistributor == null) {
if (isPaused() || !canDispatch()) {
return false;
}
@ -3082,9 +3055,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
numNoMatch = 0;
numAttempts = 0;
if (consumer != redistributor) {
ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);
}
ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);
deliveriesInTransit.countUp();
@ -3118,7 +3089,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
consumers.reset();
numNoMatch++;
// every attempt resulted in noMatch for number of consumers means we tried all consumers for a single message
if (numNoMatch == numAttempts && numAttempts == consumers.size()) {
if (numNoMatch == numAttempts && numAttempts == consumers.size() && redistributor == null) {
hasUnMatchedPending = true;
// one hit of unmatched message is enough, no need to reset counters
}
@ -3753,7 +3724,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (!supportsDirectDeliver) {
return false;
}
if (isPaused() || !canDispatch() && redistributor == null) {
if (isPaused() || !canDispatch()) {
return false;
}
@ -3777,12 +3748,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
final MessageReference reference;
if (consumer != redistributor) {
reference = handleMessageGroup(ref, consumer, groupConsumer, groupID);
} else {
reference = ref;
}
final MessageReference reference = handleMessageGroup(ref, consumer, groupConsumer, groupID);
incrementMesssagesAdded();
@ -3793,7 +3759,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
if (redistributor != null || groupConsumer != null) {
if (groupConsumer != null) {
break;
}
}

View File

@ -39,6 +39,9 @@ public class QueueConsumersImplTest {
assertFalse(queueConsumers.hasNext());
queueConsumers.add(testPriority);
// not visible till reset
assertFalse(queueConsumers.hasNext());
queueConsumers.reset();
assertTrue(queueConsumers.hasNext());
@ -109,6 +112,31 @@ public class QueueConsumersImplTest {
}
@Test
public void roundRobinEqualPriorityResetTest() {
queueConsumers.add(new TestPriority("A", 0));
queueConsumers.add(new TestPriority("B", 0));
queueConsumers.add(new TestPriority("C", 0));
queueConsumers.reset();
assertTrue(queueConsumers.hasNext());
assertEquals("A", queueConsumers.next().getName());
//Reset iterator should mark start as current position
queueConsumers.reset();
assertTrue(queueConsumers.hasNext());
assertEquals("B", queueConsumers.next().getName());
assertTrue(queueConsumers.hasNext());
assertEquals("C", queueConsumers.next().getName());
//Expect another A as after reset, we started at B so after A, we then expect the next level
assertTrue(queueConsumers.hasNext());
assertEquals("A", queueConsumers.next().getName());
//We have iterated all.
assertFalse(queueConsumers.hasNext());
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.junit.Assert;
import org.junit.Test;
@ -50,6 +51,8 @@ public class QueueImplTest {
PageSubscription pageSubscription = Mockito.mock(PageSubscription.class);
ExecutorService executorService = Executors.newSingleThreadExecutor();
StorageManager storageManager = Mockito.mock(StorageManager.class);
ActiveMQServer server = Mockito.mock(ActiveMQServer.class);
ExecutorFactory executorFactory = Mockito.mock(ExecutorFactory.class);
final int flushLimit = 100;
final int pagedReferences = 5 * flushLimit;
@ -76,10 +79,13 @@ public class QueueImplTest {
return null;
}).when(storageManager).afterCompleteOperations(Mockito.any(IOCallback.class));
// Mock server
Mockito.doReturn(executorFactory).when(server).getExecutorFactory();
QueueImpl queue = new QueueImpl(0, address, address, null, null, pageSubscription, null, false,
false, false, Mockito.mock(ScheduledExecutorService.class),
Mockito.mock(PostOffice.class), storageManager, null,
Mockito.mock(ArtemisExecutor.class), Mockito.mock(ActiveMQServer.class),
Mockito.mock(ArtemisExecutor.class), server,
Mockito.mock(QueueFactory.class));
Mockito.doReturn(queue).when(pageSubscription).getQueue();

View File

@ -1129,11 +1129,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return 0;
}
@Override
public void setConsumersRefCount(ReferenceCounter referenceCounter) {
}
@Override
public ReferenceCounter getConsumersRefCount() {
return null;

View File

@ -351,7 +351,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
// Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE,
new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false,
false, null, null, null, null, null, null, null),
false, null, null, null, null, null, server, null),
server.getNodeID());
server.getStorageManager().addQueueBinding(txID, newBinding);
server.getStorageManager().commitBindings(txID);

View File

@ -267,11 +267,6 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return 0;
}
@Override
public void setConsumersRefCount(ReferenceCounter referenceCounter) {
}
@Override
public void setInternalQueue(boolean internalQueue) {
// no-op

View File

@ -71,12 +71,16 @@ public class QueueImplTest extends ActiveMQTestBase {
private ExecutorService executor;
private ActiveMQServer defaultServer;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
scheduledExecutor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory());
executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
defaultServer = createServer(createDefaultConfig(1, false));
defaultServer.start();
}
@Override
@ -1016,6 +1020,19 @@ public class QueueImplTest extends ActiveMQTestBase {
Assert.assertEquals(0, consumer2.getReferences().size());
Assert.assertEquals(0, consumer3.getReferences().size());
// verify redistributor not yet needed, only consumer3 gets to
// peek at pending
// should not attempt to add (and throw) due to unmatched not being set
queue.addRedistributor(0);
// on new message dispatch, need for redistributor will kick in
MessageReference ref = generateReference(queue, numMessages);
ref.getMessage().putStringProperty("color", "red");
refs.add(ref);
queue.addTail(ref);
queue.deliverNow();
// verify redistributor is doing some work....
try {
// should attempt to add due to unmatched
@ -1024,7 +1041,7 @@ public class QueueImplTest extends ActiveMQTestBase {
} catch (NullPointerException expected) {
}
Assert.assertEquals(numMessages, getMessageCount(queue));
Assert.assertEquals(numMessages + 1, getMessageCount(queue));
}
@Test
@ -1450,7 +1467,7 @@ public class QueueImplTest extends ActiveMQTestBase {
final QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1,
null, null, false, true, false,
scheduledExecutor, null, null, null,
ArtemisExecutor.delegate(executor), null, null);
ArtemisExecutor.delegate(executor), defaultServer, null);
queue.addConsumer(groupConsumer);
queue.addConsumer(noConsumer);
final MessageReference firstMessageReference = generateReference(queue, 1);
@ -1490,6 +1507,6 @@ public class QueueImplTest extends ActiveMQTestBase {
private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter) {
return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor,
new FakePostOffice(), null, null, ArtemisExecutor.delegate(executor), null, null);
new FakePostOffice(), null, null, ArtemisExecutor.delegate(executor), defaultServer, null);
}
}