diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 3a3b349f55..349289270a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -98,6 +98,12 @@ public interface QueueControl { @Attribute(desc = "number of messages acknowledged from this queue since it was created") long getMessagesAcknowledged(); + /** + * Returns the number of messages expired from this queue since it was created. + */ + @Attribute(desc = "number of messages expired from this queue since it was created") + long getMessagesExpired(); + /** * Returns the first message on the queue as JSON */ @@ -434,6 +440,12 @@ public interface QueueControl { @Operation(desc = "Resets the MessagesAcknowledged property", impact = MBeanOperationInfo.ACTION) void resetMessagesAcknowledged() throws Exception; + /** + * Resets the MessagesExpired property + */ + @Operation(desc = "Resets the MessagesExpired property", impact = MBeanOperationInfo.ACTION) + void resetMessagesExpired() throws Exception; + /** * it will flush one cycle on internal executors, so you would be sure that any pending tasks are done before you call * any other measure. diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java index 837ec68db6..c13e3b9930 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java @@ -55,6 +55,12 @@ public interface JMSQueueControl extends DestinationControl { @Attribute(desc = "number of consumers consuming messages from this queue") int getConsumerCount(); + /** + * Returns the number of messages expired from this queue since it was created. + */ + @Attribute(desc = "the number of messages expired from this queue since it was created") + long getMessagesExpired(); + /** * returns the selector for the queue */ diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java index 0516182c96..b037d72391 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java @@ -124,6 +124,11 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro return coreQueueControl.getMessagesAdded(); } + @Override + public long getMessagesExpired() { + return coreQueueControl.getMessagesExpired(); + } + @Override public int getConsumerCount() { return coreQueueControl.getConsumerCount(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index de2459f48c..8f1d6e68fa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -256,6 +256,19 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override + public long getMessagesExpired() { + checkStarted(); + + clearIO(); + try { + return queue.getMessagesExpired(); + } + finally { + blockOnIO(); + } + } + @Override public long getID() { checkStarted(); @@ -1011,6 +1024,20 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } + @Override + public void resetMessagesExpired() throws Exception { + checkStarted(); + + clearIO(); + try { + queue.resetMessagesExpired(); + } + finally { + blockOnIO(); + } + + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index fa39c221e2..6645d3606f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -121,6 +121,8 @@ public interface Queue extends Bindable { long getMessagesAcknowledged(); + long getMessagesExpired(); + MessageReference removeReferenceWithID(long id) throws Exception; MessageReference getReference(long id) throws ActiveMQException; @@ -234,6 +236,8 @@ public interface Queue extends Bindable { void resetMessagesAcknowledged(); + void resetMessagesExpired(); + void incrementMesssagesAdded(); /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index c809abff7e..724da5befa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -167,6 +167,8 @@ public class QueueImpl implements Queue { private long messagesAcknowledged; + private long messagesExpired; + protected final AtomicInteger deliveringCount = new AtomicInteger(0); private boolean paused; @@ -962,6 +964,10 @@ public class QueueImpl implements Queue { @Override public void acknowledge(final MessageReference ref) throws Exception { + acknowledge(ref, OperationType.NORMAL); + } + + private void acknowledge(final MessageReference ref, OperationType type) throws Exception { if (ref.isPaged()) { pageSubscription.ack((PagedReference) ref); postAcknowledge(ref); @@ -977,12 +983,21 @@ public class QueueImpl implements Queue { postAcknowledge(ref); } - messagesAcknowledged++; + if (type == OperationType.EXPIRED) { + messagesExpired++; + } + else { + messagesAcknowledged++; + } } @Override public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception { + acknowledge(tx, ref, OperationType.NORMAL); + } + + private void acknowledge(final Transaction tx, final MessageReference ref, OperationType type) throws Exception { if (ref.isPaged()) { pageSubscription.ackTx(tx, (PagedReference) ref); @@ -1002,7 +1017,12 @@ public class QueueImpl implements Queue { getRefsOperation(tx).addAck(ref); } - messagesAcknowledged++; + if (type == OperationType.EXPIRED) { + messagesExpired++; + } + else { + messagesAcknowledged++; + } } @Override @@ -1075,13 +1095,13 @@ public class QueueImpl implements Queue { if (logger.isTraceEnabled()) { logger.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName()); } - move(null, expiryAddress, ref, true, false); + move(null, expiryAddress, ref, true, false, OperationType.EXPIRED); } else { if (logger.isTraceEnabled()) { logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName()); } - acknowledge(ref); + acknowledge(ref, OperationType.EXPIRED); } } @@ -1127,6 +1147,11 @@ public class QueueImpl implements Queue { return messagesAcknowledged; } + @Override + public long getMessagesExpired() { + return messagesExpired; + } + @Override public int deleteAllReferences() throws Exception { return deleteAllReferences(DEFAULT_FLUSH_LIMIT); @@ -1508,7 +1533,7 @@ public class QueueImpl implements Queue { refRemoved(ref); incDelivering(); try { - move(null, toAddress, ref, false, rejectDuplicate); + move(null, toAddress, ref, false, rejectDuplicate, OperationType.NORMAL); } catch (Exception e) { decDelivering(); @@ -2353,7 +2378,7 @@ public class QueueImpl implements Queue { } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name); - move(tx, deadLetterAddress, ref, false, false); + move(tx, deadLetterAddress, ref, false, false, OperationType.NORMAL); } } else { @@ -2367,7 +2392,8 @@ public class QueueImpl implements Queue { final SimpleString address, final MessageReference ref, final boolean expiry, - final boolean rejectDuplicate) throws Exception { + final boolean rejectDuplicate, + final OperationType type) throws Exception { Transaction tx; if (originalTX != null) { @@ -2384,7 +2410,7 @@ public class QueueImpl implements Queue { postOffice.route(copyMessage, null, tx, false, rejectDuplicate); - acknowledge(tx, ref); + acknowledge(tx, ref, type); if (originalTX == null) { tx.commit(); @@ -2633,6 +2659,11 @@ public class QueueImpl implements Queue { messagesAcknowledged = 0; } + @Override + public synchronized void resetMessagesExpired() { + messagesExpired = 0; + } + @Override public float getRate() { float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f); @@ -2988,5 +3019,9 @@ public class QueueImpl implements Queue { } } } + + private enum OperationType { + EXPIRED, NORMAL + } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index c96606d76a..301833e1c0 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1046,6 +1046,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { return 0; } + @Override + public long getMessagesExpired() { + return 0; + } + @Override public MessageReference removeReferenceWithID(long id) throws Exception { return null; @@ -1255,6 +1260,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } + @Override + public void resetMessagesExpired() { + + } + @Override public void incrementMesssagesAdded() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java index 2a966cf3b3..8c14ff283e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java @@ -119,6 +119,11 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest { return (Integer) proxy.retrieveAttributeValue("consumerCount"); } + @Override + public long getMessagesExpired() { + return ((Number) proxy.retrieveAttributeValue("getMessagesExpired")).longValue(); + } + @Override public String getDeadLetterAddress() { return (String) proxy.retrieveAttributeValue("deadLetterAddress"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 0073c8d0fa..693b556554 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -1984,6 +1984,46 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + @Test + public void testResetMessagesExpired() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, queue, null, false); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(0, queueControl.getMessagesExpired()); + + ClientProducer producer = session.createProducer(address); + ClientMessage message = session.createMessage(false); + producer.send(message); + + // the message IDs are set on the server + Map[] messages = queueControl.listMessages(null); + Assert.assertEquals(1, messages.length); + long messageID = (Long) messages[0].get("messageID"); + + queueControl.expireMessage(messageID); + Assert.assertEquals(1, queueControl.getMessagesExpired()); + + message = session.createMessage(false); + producer.send(message); + + // the message IDs are set on the server + messages = queueControl.listMessages(null); + Assert.assertEquals(1, messages.length); + messageID = (Long) messages[0].get("messageID"); + + queueControl.expireMessage(messageID); + Assert.assertEquals(2, queueControl.getMessagesExpired()); + + queueControl.resetMessagesExpired(); + + Assert.assertEquals(0, queueControl.getMessagesExpired()); + + session.deleteQueue(queue); + } + //make sure notifications are always received no matter whether //a Queue is created via QueueControl or by JMSServerManager directly. @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index 68dfd48a9d..f27eaf1328 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -120,6 +120,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { return (Integer) proxy.retrieveAttributeValue("messagesAcknowledged"); } + @Override + public long getMessagesExpired() { + return ((Number) proxy.retrieveAttributeValue("messagesExpired")).longValue(); + } + @Override public void resetMessagesAdded() throws Exception { proxy.invokeOperation("resetMessagesAdded"); @@ -130,6 +135,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { proxy.invokeOperation("resetMessagesAcknowledged"); } + @Override + public void resetMessagesExpired() throws Exception { + proxy.invokeOperation("resetMessagesExpired"); + } + @Override public String getName() { return (String) proxy.retrieveAttributeValue("name"); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 250d211a1a..0633bfb971 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -311,6 +311,12 @@ public class FakeQueue implements Queue { return 0; } + @Override + public long getMessagesExpired() { + // no-op + return 0; + } + @Override public void resetMessagesAdded() { // no-op @@ -323,6 +329,12 @@ public class FakeQueue implements Queue { } + @Override + public void resetMessagesExpired() { + // no-op + + } + @Override public void incrementMesssagesAdded() {