diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index 5bf8bea76f..a07608ebd7 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2252,4 +2252,14 @@ public interface AuditLogger extends BasicLogger { @Message(id = 601500, value = "User {0} is sending a core message on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT) void coreSendMessage(String user, Object source, Object... args); + + static void getAcknowledgeAttempts(Object source) { + LOGGER.getMessagesAcknowledged(getCaller(), source); + } + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 601501, value = "User {0} is getting messages acknowledged attemps on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT) + void getAcknowledgeAttempts(String user, Object source, Object... args); + + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index b57be21e6c..0d3ab0f945 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -168,6 +168,13 @@ public interface QueueControl { @Attribute(desc = "number of messages acknowledged from this queue since it was created") long getMessagesAcknowledged(); + /** + * Returns the number of messages added to this queue since it was created. + */ + @Attribute(desc = "number of messages acknowledged attempts from this queue since it was created") + long getAcknowledgeAttempts(); + + /** * Returns the number of messages expired from this queue since it was created. */ diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java index 4c5a887cf5..123dbb5d9b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.RefsOperation; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; @@ -67,8 +68,8 @@ public class ProtonTransactionImpl extends TransactionImpl { } @Override - public RefsOperation createRefsOperation(Queue queue) { - return new ProtonTransactionRefsOperation(queue, storageManager); + public RefsOperation createRefsOperation(Queue queue, AckReason reason) { + return new ProtonTransactionRefsOperation(queue, reason, storageManager); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java index 7b48ac0d0f..4bb00d2828 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.impl.RefsOperation; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -36,8 +37,8 @@ import org.apache.qpid.proton.engine.Delivery; */ public class ProtonTransactionRefsOperation extends RefsOperation { - public ProtonTransactionRefsOperation(final Queue queue, StorageManager storageManager) { - super(queue, storageManager); + public ProtonTransactionRefsOperation(final Queue queue, AckReason reason, StorageManager storageManager) { + super(queue, reason, storageManager); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 6270fe518a..b789347ba4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -400,6 +400,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override + public long getAcknowledgeAttempts() { + if (AuditLogger.isEnabled()) { + AuditLogger.getMessagesAcknowledged(queue); + } + checkStarted(); + + clearIO(); + try { + return queue.getAcknowledgeAttempts(); + } finally { + blockOnIO(); + } + } + @Override public long getMessagesExpired() { if (AuditLogger.isEnabled()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 2fcd7662b6..b722b5ff3a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -234,6 +234,8 @@ public interface Queue extends Bindable,CriticalComponent { long getMessagesAdded(); + long getAcknowledgeAttempts(); + long getMessagesAcknowledged(); long getMessagesExpired(); @@ -393,7 +395,7 @@ public interface Queue extends Bindable,CriticalComponent { */ void deliverScheduledMessages() throws ActiveMQException; - void postAcknowledge(MessageReference ref); + void postAcknowledge(MessageReference ref, AckReason reason); float getRate(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 9d04f5b3a4..44f938e33c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -193,6 +193,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private AtomicLong messagesAcknowledged = new AtomicLong(0); + private AtomicLong ackAttempts = new AtomicLong(0); + private AtomicLong messagesExpired = new AtomicLong(0); private AtomicLong messagesKilled = new AtomicLong(0); @@ -1473,7 +1475,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } else { if (ref.isPaged()) { pageSubscription.ack((PagedReference) ref); - postAcknowledge(ref); + postAcknowledge(ref, reason); } else { Message message = ref.getMessage(); @@ -1482,18 +1484,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (durableRef) { storageManager.storeAcknowledge(id, message.getMessageID()); } - postAcknowledge(ref); + postAcknowledge(ref, reason); } - if (reason == AckReason.EXPIRED) { - messagesExpired.incrementAndGet(); - } else if (reason == AckReason.KILLED) { - messagesKilled.incrementAndGet(); - } else if (reason == AckReason.REPLACED) { - messagesReplaced.incrementAndGet(); - } else { - messagesAcknowledged.incrementAndGet(); - } + ackAttempts.incrementAndGet(); if (server != null && server.hasBrokerMessagePlugins()) { server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer)); @@ -1508,10 +1502,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception { + RefsOperation refsOperation = getRefsOperation(tx, reason); + if (ref.isPaged()) { pageSubscription.ackTx(tx, (PagedReference) ref); - getRefsOperation(tx).addAck(ref); + refsOperation.addAck(ref); } else { Message message = ref.getMessage(); @@ -1523,15 +1519,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { tx.setContainsPersistent(); } - getRefsOperation(tx).addAck(ref); - } + ackAttempts.incrementAndGet(); - if (reason == AckReason.EXPIRED) { - messagesExpired.incrementAndGet(); - } else if (reason == AckReason.KILLED) { - messagesKilled.incrementAndGet(); - } else { - messagesAcknowledged.incrementAndGet(); + refsOperation.addAck(ref); } if (server != null && server.hasBrokerMessagePlugins()) { @@ -1547,7 +1537,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { tx.setContainsPersistent(); } - getRefsOperation(tx).addAck(ref); + getRefsOperation(tx, AckReason.NORMAL).addAck(ref); // https://issues.jboss.org/browse/HORNETQ-609 incDelivering(ref); @@ -1555,16 +1545,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { messagesAcknowledged.incrementAndGet(); } - private RefsOperation getRefsOperation(final Transaction tx) { - return getRefsOperation(tx, false); + private RefsOperation getRefsOperation(final Transaction tx, AckReason ackReason) { + return getRefsOperation(tx, ackReason, false); } - private RefsOperation getRefsOperation(final Transaction tx, boolean ignoreRedlieveryCheck) { + private RefsOperation getRefsOperation(final Transaction tx, AckReason ackReason, boolean ignoreRedlieveryCheck) { synchronized (tx) { RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION); if (oper == null) { - oper = tx.createRefsOperation(this); + oper = tx.createRefsOperation(this, ackReason); tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper); @@ -1586,7 +1576,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck) { - getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference); + getRefsOperation(tx, AckReason.NORMAL, ignoreRedeliveryCheck).addAck(reference); } @Override @@ -1705,6 +1695,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return messagesAcknowledged.get(); } + @Override + public long getAcknowledgeAttempts() { + return ackAttempts.get(); + } + @Override public long getMessagesExpired() { return messagesExpired.get(); @@ -3300,11 +3295,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override - public void postAcknowledge(final MessageReference ref) { + public void postAcknowledge(final MessageReference ref, AckReason reason) { QueueImpl queue = (QueueImpl) ref.getQueue(); queue.decDelivering(ref); + if (reason == AckReason.EXPIRED) { + messagesExpired.incrementAndGet(); + } else if (reason == AckReason.KILLED) { + messagesKilled.incrementAndGet(); + } else if (reason == AckReason.REPLACED) { + messagesReplaced.incrementAndGet(); + } else { + messagesAcknowledged.incrementAndGet(); + } + if (ref.isPaged()) { // nothing to be done return; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index 6d21be2405..de52cc4bfb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -38,6 +38,8 @@ public class RefsOperation extends TransactionOperationAbstract { private static final Logger logger = Logger.getLogger(RefsOperation.class); + private final AckReason reason; + private final StorageManager storageManager; private Queue queue; List refsToAck = new ArrayList<>(); @@ -50,11 +52,13 @@ public class RefsOperation extends TransactionOperationAbstract { */ protected boolean ignoreRedeliveryCheck = false; - public RefsOperation(Queue queue, StorageManager storageManager) { + public RefsOperation(Queue queue, AckReason reason, StorageManager storageManager) { this.queue = queue; + this.reason = reason; this.storageManager = storageManager; } + // once turned on, we shouldn't turn it off, that's why no parameters public void setIgnoreRedeliveryCheck() { ignoreRedeliveryCheck = true; @@ -163,7 +167,7 @@ public class RefsOperation extends TransactionOperationAbstract { public void afterCommit(final Transaction tx) { for (MessageReference ref : refsToAck) { synchronized (ref.getQueue()) { - queue.postAcknowledge(ref); + queue.postAcknowledge(ref, reason); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java index 0ddc2cb564..6fa2c5f0d8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.RefsOperation; /** @@ -95,5 +96,5 @@ public interface Transaction { void setTimeout(int timeout); - RefsOperation createRefsOperation(Queue queue); + RefsOperation createRefsOperation(Queue queue, AckReason reason); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/BindingsTransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/BindingsTransactionImpl.java index 50dff64d70..5bb1acc393 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/BindingsTransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/BindingsTransactionImpl.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.transaction.impl; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.RefsOperation; public class BindingsTransactionImpl extends TransactionImpl { @@ -45,7 +46,7 @@ public class BindingsTransactionImpl extends TransactionImpl { } @Override - public RefsOperation createRefsOperation(Queue queue) { + public RefsOperation createRefsOperation(Queue queue, AckReason reason) { return null; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index 9ef1554f00..d459975957 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.RefsOperation; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperation; @@ -162,8 +163,8 @@ public class TransactionImpl implements Transaction { } @Override - public RefsOperation createRefsOperation(Queue queue) { - return new RefsOperation(queue, storageManager); + public RefsOperation createRefsOperation(Queue queue, AckReason reason) { + return new RefsOperation(queue, reason, storageManager); } @Override diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 40ab28e88d..d241786656 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -793,6 +793,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } + @Override + public long getAcknowledgeAttempts() { + return 0; + } + @Override public boolean allowsReferenceCallback() { return false; @@ -1477,7 +1482,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public void postAcknowledge(MessageReference ref) { + public void postAcknowledge(MessageReference ref, AckReason reason) { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java index 6056fcb98e..1218852b9b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java @@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; @@ -523,7 +524,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { } @Override - public void postAcknowledge(final MessageReference ref) { + public void postAcknowledge(final MessageReference ref, AckReason reason) { System.out.println("Ignoring postACK on message " + ref); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jmx/JmxConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jmx/JmxConnectionTest.java index 9fd2c3a4f6..416dfcccaf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jmx/JmxConnectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jmx/JmxConnectionTest.java @@ -105,7 +105,7 @@ public class JmxConnectionTest extends ActiveMQTestBase { logAndSystemOut("Successfully connected to: " + urlString); } catch (Exception e) { logAndSystemOut("JMX connection failed: " + urlString, e); - Assert.fail(); + Assert.fail(e.getMessage()); return; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 4dd95db42c..4fa03b2195 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -36,6 +36,7 @@ import javax.json.JsonArray; import javax.json.JsonObject; import javax.management.Notification; import javax.management.openmbean.CompositeData; +import javax.transaction.xa.XAResource; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -66,6 +67,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.junit.Wait; import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil; import org.apache.activemq.artemis.utils.Base64; @@ -370,6 +372,92 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + @Test + public void testGetMessagesAcknowledgedOnXARollback() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(0, queueControl.getMessagesAcknowledged()); + + ClientProducer producer = session.createProducer(address); + producer.send(session.createMessage(durable)); + + ClientSessionFactory xaFactory = createSessionFactory(locator); + ClientSession xaSession = addClientSession(xaFactory.createSession(true, false, false)); + xaSession.start(); + + ClientConsumer consumer = xaSession.createConsumer(queue); + + int tries = 10; + for (int i = 0; i < tries; i++) { + XidImpl xid = newXID(); + xaSession.start(xid, XAResource.TMNOFLAGS); + ClientMessage message = consumer.receive(1000); + Assert.assertNotNull(message); + message.acknowledge(); + Assert.assertEquals(0, queueControl.getMessagesAcknowledged()); + xaSession.end(xid, XAResource.TMSUCCESS); + Assert.assertEquals(0, queueControl.getMessagesAcknowledged()); + xaSession.prepare(xid); + Assert.assertEquals(0, queueControl.getMessagesAcknowledged()); + if (i + 1 == tries) { + xaSession.commit(xid, false); + } else { + xaSession.rollback(xid); + } + } + + Wait.assertEquals(1, queueControl::getMessagesAcknowledged); + Wait.assertEquals(10, queueControl::getAcknowledgeAttempts); + + consumer.close(); + + session.deleteQueue(queue); + } + + @Test + public void testGetMessagesAcknowledgedOnRegularRollback() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(0, queueControl.getMessagesAcknowledged()); + + ClientProducer producer = session.createProducer(address); + producer.send(session.createMessage(durable)); + + ClientSessionFactory xaFactory = createSessionFactory(locator); + ClientSession txSession = addClientSession(xaFactory.createSession(false, false, false)); + txSession.start(); + + ClientConsumer consumer = txSession.createConsumer(queue); + + int tries = 10; + for (int i = 0; i < tries; i++) { + ClientMessage message = consumer.receive(1000); + Assert.assertNotNull(message); + message.acknowledge(); + Assert.assertEquals(0, queueControl.getMessagesAcknowledged()); + if (i + 1 == tries) { + txSession.commit(); + } else { + txSession.rollback(); + } + } + + Wait.assertEquals(1, queueControl::getMessagesAcknowledged); + Wait.assertEquals(10, queueControl::getAcknowledgeAttempts); + + consumer.close(); + + session.deleteQueue(queue); + } + @Test public void testGetScheduledCount() throws Exception { long delay = 500; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index c40f655895..254a28702e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -216,6 +216,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { return (Integer) proxy.retrieveAttributeValue("messagesAcknowledged", Integer.class); } + @Override + public long getAcknowledgeAttempts() { + return (Integer) proxy.retrieveAttributeValue("acknowledgeAttempts", Integer.class); + } + @Override public long getMessagesExpired() { return (Long) proxy.retrieveAttributeValue("messagesExpired", Long.class); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java index 01a568b734..591f54afd2 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.RefsOperation; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -251,7 +252,7 @@ public class BindingsImplTest extends ActiveMQTestBase { } @Override - public RefsOperation createRefsOperation(Queue queue) { + public RefsOperation createRefsOperation(Queue queue, AckReason reason) { // TODO Auto-generated method stub return null; } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 0aa631b13d..9d43b26c69 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -206,6 +206,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } + @Override + public long getAcknowledgeAttempts() { + return 0; + } + @Override public void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck) { // no-op @@ -841,7 +846,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public void postAcknowledge(MessageReference ref) { + public void postAcknowledge(MessageReference ref, AckReason reason) { } @Override