ARTEMIS-2264 PurgeOnNoConsumers prevent removal of messages with replication

Added test reproducer and changed Queue::isDurableMessage usages into
Queue::isDurable to allow acks to hit the journal and being
correctly replicated across nodes.
This commit is contained in:
Francesco Nigro 2019-02-26 18:34:27 +01:00 committed by Clebert Suconic
parent 2a92f2977a
commit 4ea9d25ca9
6 changed files with 22 additions and 10 deletions

View File

@ -1669,7 +1669,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
for (MessageReference ref : refs) {
Message message = ref.getMessage();
if (message.isDurable() && ref.getQueue().isDurableMessage()) {
if (message.isDurable() && ref.getQueue().isDurable()) {
message.decrementDurableRefCount();
}

View File

@ -1462,7 +1462,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} else {
Message message = ref.getMessage();
boolean durableRef = message.isDurable() && isDurableMessage();
boolean durableRef = message.isDurable() && isDurable();
if (durableRef) {
storageManager.storeAcknowledge(id, message.getMessageID());
@ -1500,7 +1500,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} else {
Message message = ref.getMessage();
boolean durableRef = message.isDurable() && isDurableMessage();
boolean durableRef = message.isDurable() && isDurable();
if (durableRef) {
storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
@ -1528,7 +1528,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception {
Message message = ref.getMessage();
if (message.isDurable() && isDurableMessage()) {
if (message.isDurable() && isDurable()) {
tx.setContainsPersistent();
}
@ -2760,7 +2760,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
if (!internalQueue && reference.isDurable() && isDurableMessage() && !reference.isPaged()) {
if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
storageManager.updateDeliveryCount(reference);
}
@ -2789,7 +2789,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
if (!reference.isPaged() && reference.isDurable() && isDurableMessage()) {
if (!reference.isPaged() && reference.isDurable() && isDurable()) {
storageManager.updateScheduledDeliveryTime(reference);
}
}
@ -3252,7 +3252,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (message == null)
return;
boolean durableRef = message.isDurable() && queue.isDurableMessage();
boolean durableRef = message.isDurable() && queue.isDurable();
try {
message.decrementRefCount();

View File

@ -119,7 +119,7 @@ public final class RoutingContextImpl implements RoutingContext {
RouteContextList listing = getContextListing(address);
if (queue.isDurableMessage()) {
if (queue.isDurable()) {
listing.getDurableQueues().add(queue);
} else {
listing.getNonDurableQueues().add(queue);

View File

@ -457,7 +457,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// If updateDeliveries = false (set by strict-update),
// the updateDeliveryCountAfterCancel would still be updated after c
if (strictUpdateDeliveryCount && !ref.isPaged()) {
if (ref.getMessage().isDurable() && ref.getQueue().isDurableMessage() &&
if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
!ref.getQueue().isInternalQueue() &&
!ref.isPaged()) {
storageManager.updateDeliveryCount(ref);

View File

@ -248,7 +248,7 @@ public class ManagementServiceImpl implements ManagementService {
QueueControlImpl queueControl = new QueueControlImpl(queue, addressInfo.getName().toString(), messagingServer, storageManager, securityStore, addressSettingsRepository);
if (messageCounterManager != null) {
MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurableMessage(), messageCounterManager.getMaxDayCount());
MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurable(), messageCounterManager.getMaxDayCount());
queueControl.setMessageCounter(counter);
messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
}

View File

@ -35,6 +35,8 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration;
@ -326,6 +328,16 @@ public class BackupSyncJournalTest extends FailoverTestBase {
assertNoMoreMessages();
}
@Test
public void testRemoveAllMessageWithPurgeOnNoConsumers() throws Exception {
final boolean purgeOnNoConsumers = true;
createProducerSendSomeMessages();
liveServer.getServer().locateQueue(ADDRESS).setPurgeOnNoConsumers(purgeOnNoConsumers);
assertEquals(n_msgs, ((QueueControl) liveServer.getServer().getManagementService().getResource(ResourceNames.QUEUE + ADDRESS.toString())).removeAllMessages());
startBackupCrashLive();
assertNoMoreMessages();
}
private void startBackupCrashLive() throws Exception {
assertFalse("backup is started?", backupServer.isStarted());
liveServer.removeInterceptor(syncDelay);