From 4ea9d25ca9461fdbae8d5550f081198ca215a199 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Tue, 26 Feb 2019 18:34:27 +0100 Subject: [PATCH] ARTEMIS-2264 PurgeOnNoConsumers prevent removal of messages with replication Added test reproducer and changed Queue::isDurableMessage usages into Queue::isDurable to allow acks to hit the journal and being correctly replicated across nodes. --- .../artemis/core/postoffice/impl/PostOfficeImpl.java | 2 +- .../activemq/artemis/core/server/impl/QueueImpl.java | 12 ++++++------ .../artemis/core/server/impl/RoutingContextImpl.java | 2 +- .../artemis/core/server/impl/ServerConsumerImpl.java | 2 +- .../management/impl/ManagementServiceImpl.java | 2 +- .../cluster/failover/BackupSyncJournalTest.java | 12 ++++++++++++ 6 files changed, 22 insertions(+), 10 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index ec7675e547..671d4f0e33 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1669,7 +1669,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding for (MessageReference ref : refs) { Message message = ref.getMessage(); - if (message.isDurable() && ref.getQueue().isDurableMessage()) { + if (message.isDurable() && ref.getQueue().isDurable()) { message.decrementDurableRefCount(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 79e53a92c3..57c33adce2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1462,7 +1462,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } else { Message message = ref.getMessage(); - boolean durableRef = message.isDurable() && isDurableMessage(); + boolean durableRef = message.isDurable() && isDurable(); if (durableRef) { storageManager.storeAcknowledge(id, message.getMessageID()); @@ -1500,7 +1500,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } else { Message message = ref.getMessage(); - boolean durableRef = message.isDurable() && isDurableMessage(); + boolean durableRef = message.isDurable() && isDurable(); if (durableRef) { storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID()); @@ -1528,7 +1528,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception { Message message = ref.getMessage(); - if (message.isDurable() && isDurableMessage()) { + if (message.isDurable() && isDurable()) { tx.setContainsPersistent(); } @@ -2760,7 +2760,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return true; } - if (!internalQueue && reference.isDurable() && isDurableMessage() && !reference.isPaged()) { + if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) { storageManager.updateDeliveryCount(reference); } @@ -2789,7 +2789,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { reference.setScheduledDeliveryTime(timeBase + redeliveryDelay); - if (!reference.isPaged() && reference.isDurable() && isDurableMessage()) { + if (!reference.isPaged() && reference.isDurable() && isDurable()) { storageManager.updateScheduledDeliveryTime(reference); } } @@ -3252,7 +3252,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (message == null) return; - boolean durableRef = message.isDurable() && queue.isDurableMessage(); + boolean durableRef = message.isDurable() && queue.isDurable(); try { message.decrementRefCount(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java index e9df830ab6..2c9276304c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java @@ -119,7 +119,7 @@ public final class RoutingContextImpl implements RoutingContext { RouteContextList listing = getContextListing(address); - if (queue.isDurableMessage()) { + if (queue.isDurable()) { listing.getDurableQueues().add(queue); } else { listing.getNonDurableQueues().add(queue); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 48e2f06bf3..2dc834bac6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -457,7 +457,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // If updateDeliveries = false (set by strict-update), // the updateDeliveryCountAfterCancel would still be updated after c if (strictUpdateDeliveryCount && !ref.isPaged()) { - if (ref.getMessage().isDurable() && ref.getQueue().isDurableMessage() && + if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue() && !ref.isPaged()) { storageManager.updateDeliveryCount(ref); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index 25354da495..b0e3a84f4d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -248,7 +248,7 @@ public class ManagementServiceImpl implements ManagementService { QueueControlImpl queueControl = new QueueControlImpl(queue, addressInfo.getName().toString(), messagingServer, storageManager, securityStore, addressSettingsRepository); if (messageCounterManager != null) { - MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurableMessage(), messageCounterManager.getMaxDayCount()); + MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurable(), messageCounterManager.getMaxDayCount()); queueControl.setMessageCounter(counter); messageCounterManager.registerMessageCounter(queue.getName().toString(), counter); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java index 8342c622cd..f1d23bebf3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java @@ -35,6 +35,8 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.FailoverEventListener; import org.apache.activemq.artemis.api.core.client.FailoverEventType; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.config.Configuration; @@ -326,6 +328,16 @@ public class BackupSyncJournalTest extends FailoverTestBase { assertNoMoreMessages(); } + @Test + public void testRemoveAllMessageWithPurgeOnNoConsumers() throws Exception { + final boolean purgeOnNoConsumers = true; + createProducerSendSomeMessages(); + liveServer.getServer().locateQueue(ADDRESS).setPurgeOnNoConsumers(purgeOnNoConsumers); + assertEquals(n_msgs, ((QueueControl) liveServer.getServer().getManagementService().getResource(ResourceNames.QUEUE + ADDRESS.toString())).removeAllMessages()); + startBackupCrashLive(); + assertNoMoreMessages(); + } + private void startBackupCrashLive() throws Exception { assertFalse("backup is started?", backupServer.isStarted()); liveServer.removeInterceptor(syncDelay);