diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 23df9a5ddc..40222b8b51 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -187,12 +187,23 @@ public class LastValueQueue extends QueueImpl { @Override public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception { - if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED ) { + if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED) { removeIfCurrent(ref); } super.acknowledge(ref, reason, consumer); } + @Override + public void acknowledge(Transaction tx, + MessageReference ref, + AckReason reason, + ServerConsumer consumer) throws Exception { + if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED) { + removeIfCurrent(ref); + } + super.acknowledge(tx, ref, reason, consumer); + } + private synchronized void removeIfCurrent(MessageReference ref) { SimpleString lastValueProp = ref.getLastValueProperty(); if (lastValueProp != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 090a83ee8a..15f76e19f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1593,28 +1593,35 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception { RefsOperation refsOperation = getRefsOperation(tx, reason); - if (ref.isPaged()) { - pageSubscription.ackTx(tx, (PagedReference) ref); - - refsOperation.addAck(ref); + if (nonDestructive && reason == AckReason.NORMAL) { + refsOperation.addOnlyRefAck(ref); + if (logger.isDebugEnabled()) { + logger.debug("acknowledge tx ignored nonDestructive=true and reason=NORMAL"); + } } else { - Message message = ref.getMessage(); + if (ref.isPaged()) { + pageSubscription.ackTx(tx, (PagedReference) ref); - boolean durableRef = message.isDurable() && isDurable(); + refsOperation.addAck(ref); + } else { + Message message = ref.getMessage(); - if (durableRef) { - storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID()); + boolean durableRef = message.isDurable() && isDurable(); - tx.setContainsPersistent(); + if (durableRef) { + storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID()); + + tx.setContainsPersistent(); + } + + ackAttempts.incrementAndGet(); + + refsOperation.addAck(ref); } - ackAttempts.incrementAndGet(); - - refsOperation.addAck(ref); - } - - if (server != null && server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer)); + if (server != null && server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer)); + } } } @@ -3435,6 +3442,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { QueueImpl queue = (QueueImpl) ref.getQueue(); queue.decDelivering(ref); + if (nonDestructive && reason == AckReason.NORMAL) { + return; + } if (reason == AckReason.EXPIRED) { messagesExpired.incrementAndGet(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index 925f4390f6..f0b6d340bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -64,6 +64,10 @@ public class RefsOperation extends TransactionOperationAbstract { ignoreRedeliveryCheck = true; } + synchronized void addOnlyRefAck(final MessageReference ref) { + refsToAck.add(ref); + } + synchronized void addAck(final MessageReference ref) { refsToAck.add(ref); if (ref.isPaged()) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java index 7b1f155f70..32dcbb84fd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java @@ -24,6 +24,9 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import java.util.Arrays; +import java.util.Collection; + import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -35,7 +38,10 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.LastValueQueue; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class JMSNonDestructiveTest extends JMSClientTestSupport { private static final String NON_DESTRUCTIVE_QUEUE_NAME = "NON_DESTRUCTIVE_QUEUE"; @@ -46,6 +52,18 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { private ConnectionSupplier AMQPConnection = () -> createConnection(); private ConnectionSupplier CoreConnection = () -> createCoreConnection(); + protected final boolean persistenceEnabled; + + public JMSNonDestructiveTest(boolean persistenceEnabled) { + this.persistenceEnabled = persistenceEnabled; + } + + @Parameterized.Parameters(name = "persistenceEnabled={0}") + public static Collection data() { + Object[][] params = new Object[][]{{false}, {true}}; + return Arrays.asList(params); + } + @Override protected String getConfiguredProtocols() { return "AMQP,OPENWIRE,CORE"; @@ -53,7 +71,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { @Override protected void addConfiguration(ActiveMQServer server) { - server.getConfiguration().setPersistenceEnabled(false); + server.getConfiguration().setPersistenceEnabled(persistenceEnabled); server.getConfiguration().setMessageExpiryScanPeriod(100); server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true)); server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true).setExpiryDelay(100L));