diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 349289270a..60957ad03b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -104,6 +104,12 @@ public interface QueueControl { @Attribute(desc = "number of messages expired from this queue since it was created") long getMessagesExpired(); + /** + * Returns the number of messages removed from this queue since it was created due to exceeding the max delivery attempts. + */ + @Attribute(desc = "number of messages removed from this queue since it was created due to exceeding the max delivery attempts") + long getMessagesKilled(); + /** * Returns the first message on the queue as JSON */ @@ -446,6 +452,12 @@ public interface QueueControl { @Operation(desc = "Resets the MessagesExpired property", impact = MBeanOperationInfo.ACTION) void resetMessagesExpired() throws Exception; + /** + * Resets the MessagesExpired property + */ + @Operation(desc = "Resets the MessagesKilled property", impact = MBeanOperationInfo.ACTION) + void resetMessagesKilled() throws Exception; + /** * it will flush one cycle on internal executors, so you would be sure that any pending tasks are done before you call * any other measure. diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java index c13e3b9930..3001ec539e 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java @@ -61,6 +61,12 @@ public interface JMSQueueControl extends DestinationControl { @Attribute(desc = "the number of messages expired from this queue since it was created") long getMessagesExpired(); + /** + * Returns the number of messages removed from this queue since it was created due to exceeding the max delivery attempts. + */ + @Attribute(desc = "number of messages removed from this queue since it was created due to exceeding the max delivery attempts") + long getMessagesKilled(); + /** * returns the selector for the queue */ diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java index b037d72391..0cb991a3b7 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java @@ -129,6 +129,11 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro return coreQueueControl.getMessagesExpired(); } + @Override + public long getMessagesKilled() { + return coreQueueControl.getMessagesKilled(); + } + @Override public int getConsumerCount() { return coreQueueControl.getConsumerCount(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 8f1d6e68fa..04baf1b547 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -269,6 +269,19 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override + public long getMessagesKilled() { + checkStarted(); + + clearIO(); + try { + return queue.getMessagesKilled(); + } + finally { + blockOnIO(); + } + } + @Override public long getID() { checkStarted(); @@ -1038,6 +1051,20 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } + @Override + public void resetMessagesKilled() throws Exception { + checkStarted(); + + clearIO(); + try { + queue.resetMessagesKilled(); + } + finally { + blockOnIO(); + } + + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 33e1086d6f..a447569caa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; import org.jboss.logging.Logger; @@ -215,11 +216,16 @@ public class PagedReferenceImpl implements PagedReference { @Override public void acknowledge(Transaction tx) throws Exception { + acknowledge(tx, AckReason.NORMAL); + } + + @Override + public void acknowledge(Transaction tx, AckReason reason) throws Exception { if (tx == null) { - getQueue().acknowledge(this); + getQueue().acknowledge(this, reason); } else { - getQueue().acknowledge(tx, this); + getQueue().acknowledge(tx, this, reason); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index b1e0ddea78..8c80eb0471 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; /** @@ -73,6 +74,8 @@ public interface MessageReference { void acknowledge(Transaction tx) throws Exception; + void acknowledge(Transaction tx, AckReason reason) throws Exception; + void setConsumerId(Long consumerID); Long getConsumerId(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 6645d3606f..a9a87c3f68 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.LinkedListIterator; import org.apache.activemq.artemis.utils.ReferenceCounter; @@ -74,8 +75,12 @@ public interface Queue extends Bindable { void acknowledge(MessageReference ref) throws Exception; + void acknowledge(final MessageReference ref, AckReason reason) throws Exception; + void acknowledge(Transaction tx, MessageReference ref) throws Exception; + void acknowledge(final Transaction tx, final MessageReference ref, AckReason reason) throws Exception; + void reacknowledge(Transaction tx, MessageReference ref) throws Exception; void cancel(Transaction tx, MessageReference ref); @@ -123,6 +128,8 @@ public interface Queue extends Bindable { long getMessagesExpired(); + long getMessagesKilled(); + MessageReference removeReferenceWithID(long id) throws Exception; MessageReference getReference(long id) throws ActiveMQException; @@ -238,6 +245,8 @@ public interface Queue extends Bindable { void resetMessagesExpired(); + void resetMessagesKilled(); + void incrementMesssagesAdded(); /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java new file mode 100644 index 0000000000..06b3d851c7 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl; + +public enum AckReason { + KILLED, EXPIRED, NORMAL +} \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 932c260c4b..906a43e63b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -262,6 +262,11 @@ public class LastValueQueue extends QueueImpl { ref.acknowledge(tx); } + @Override + public void acknowledge(Transaction tx, AckReason reason) throws Exception { + ref.acknowledge(tx, reason); + } + @Override public void setPersistedCount(int count) { ref.setPersistedCount(count); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index c5f8485cb3..866265ecb5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -192,11 +192,16 @@ public class MessageReferenceImpl implements MessageReference { @Override public void acknowledge(Transaction tx) throws Exception { + acknowledge(tx, AckReason.NORMAL); + } + + @Override + public void acknowledge(Transaction tx, AckReason reason) throws Exception { if (tx == null) { - getQueue().acknowledge(this); + getQueue().acknowledge(this, reason); } else { - getQueue().acknowledge(tx, this); + getQueue().acknowledge(tx, this, reason); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 724da5befa..b8d917afb1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -169,6 +169,8 @@ public class QueueImpl implements Queue { private long messagesExpired; + private long messagesKilled; + protected final AtomicInteger deliveringCount = new AtomicInteger(0); private boolean paused; @@ -964,10 +966,11 @@ public class QueueImpl implements Queue { @Override public void acknowledge(final MessageReference ref) throws Exception { - acknowledge(ref, OperationType.NORMAL); + acknowledge(ref, AckReason.NORMAL); } - private void acknowledge(final MessageReference ref, OperationType type) throws Exception { + @Override + public void acknowledge(final MessageReference ref, AckReason reason) throws Exception { if (ref.isPaged()) { pageSubscription.ack((PagedReference) ref); postAcknowledge(ref); @@ -983,9 +986,12 @@ public class QueueImpl implements Queue { postAcknowledge(ref); } - if (type == OperationType.EXPIRED) { + if (reason == AckReason.EXPIRED) { messagesExpired++; } + else if (reason == AckReason.KILLED) { + messagesKilled++; + } else { messagesAcknowledged++; } @@ -994,10 +1000,11 @@ public class QueueImpl implements Queue { @Override public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception { - acknowledge(tx, ref, OperationType.NORMAL); + acknowledge(tx, ref, AckReason.NORMAL); } - private void acknowledge(final Transaction tx, final MessageReference ref, OperationType type) throws Exception { + @Override + public void acknowledge(final Transaction tx, final MessageReference ref, AckReason reason) throws Exception { if (ref.isPaged()) { pageSubscription.ackTx(tx, (PagedReference) ref); @@ -1017,9 +1024,12 @@ public class QueueImpl implements Queue { getRefsOperation(tx).addAck(ref); } - if (type == OperationType.EXPIRED) { + if (reason == AckReason.EXPIRED) { messagesExpired++; } + else if (reason == AckReason.KILLED) { + messagesKilled++; + } else { messagesAcknowledged++; } @@ -1095,13 +1105,13 @@ public class QueueImpl implements Queue { if (logger.isTraceEnabled()) { logger.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName()); } - move(null, expiryAddress, ref, true, false, OperationType.EXPIRED); + move(null, expiryAddress, ref, false, AckReason.EXPIRED); } else { if (logger.isTraceEnabled()) { logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName()); } - acknowledge(ref, OperationType.EXPIRED); + acknowledge(ref, AckReason.EXPIRED); } } @@ -1152,6 +1162,11 @@ public class QueueImpl implements Queue { return messagesExpired; } + @Override + public long getMessagesKilled() { + return messagesKilled; + } + @Override public int deleteAllReferences() throws Exception { return deleteAllReferences(DEFAULT_FLUSH_LIMIT); @@ -1533,7 +1548,7 @@ public class QueueImpl implements Queue { refRemoved(ref); incDelivering(); try { - move(null, toAddress, ref, false, rejectDuplicate, OperationType.NORMAL); + move(null, toAddress, ref, rejectDuplicate, AckReason.NORMAL); } catch (Exception e) { decDelivering(); @@ -2374,26 +2389,25 @@ public class QueueImpl implements Queue { if (bindingList.getBindings().isEmpty()) { ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress); - ref.acknowledge(tx); + ref.acknowledge(tx, AckReason.KILLED); } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name); - move(tx, deadLetterAddress, ref, false, false, OperationType.NORMAL); + move(tx, deadLetterAddress, ref, false, AckReason.KILLED); } } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(name); - ref.acknowledge(tx); + ref.acknowledge(tx, AckReason.KILLED); } } private void move(final Transaction originalTX, final SimpleString address, final MessageReference ref, - final boolean expiry, final boolean rejectDuplicate, - final OperationType type) throws Exception { + final AckReason reason) throws Exception { Transaction tx; if (originalTX != null) { @@ -2404,13 +2418,13 @@ public class QueueImpl implements Queue { tx = new TransactionImpl(storageManager); } - ServerMessage copyMessage = makeCopy(ref, expiry); + ServerMessage copyMessage = makeCopy(ref, reason == AckReason.EXPIRED); copyMessage.setAddress(address); postOffice.route(copyMessage, null, tx, false, rejectDuplicate); - acknowledge(tx, ref, type); + acknowledge(tx, ref, reason); if (originalTX == null) { tx.commit(); @@ -2664,6 +2678,11 @@ public class QueueImpl implements Queue { messagesExpired = 0; } + @Override + public synchronized void resetMessagesKilled() { + messagesKilled = 0; + } + @Override public float getRate() { float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f); @@ -3019,9 +3038,5 @@ public class QueueImpl implements Queue { } } } - - private enum OperationType { - EXPIRED, NORMAL - } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 301833e1c0..4e05b3a1ab 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -956,11 +956,21 @@ public class ScheduledDeliveryHandlerTest extends Assert { } + @Override + public void acknowledge(MessageReference ref, AckReason reason) throws Exception { + + } + @Override public void acknowledge(Transaction tx, MessageReference ref) throws Exception { } + @Override + public void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception { + + } + @Override public void reacknowledge(Transaction tx, MessageReference ref) throws Exception { @@ -1051,6 +1061,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { return 0; } + @Override + public long getMessagesKilled() { + return 0; + } + @Override public MessageReference removeReferenceWithID(long id) throws Exception { return null; @@ -1265,6 +1280,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } + @Override + public void resetMessagesKilled() { + + } + @Override public void incrementMesssagesAdded() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java index 8c14ff283e..1443a4f7d4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java @@ -124,6 +124,11 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest { return ((Number) proxy.retrieveAttributeValue("getMessagesExpired")).longValue(); } + @Override + public long getMessagesKilled() { + return ((Number) proxy.retrieveAttributeValue("messagesKilled")).longValue(); + } + @Override public String getDeadLetterAddress() { return (String) proxy.retrieveAttributeValue("deadLetterAddress"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 693b556554..95a08a1e9c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -2024,6 +2024,51 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + @Test + public void testResetMessagesKilled() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, queue, null, false); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(0, queueControl.getMessagesExpired()); + + ClientProducer producer = session.createProducer(address); + ClientMessage message = session.createMessage(false); + producer.send(message); + + // the message IDs are set on the server + Map[] messages = queueControl.listMessages(null); + Assert.assertEquals(1, messages.length); + long messageID = (Long) messages[0].get("messageID"); + + queueControl.sendMessageToDeadLetterAddress(messageID); + Assert.assertEquals(1, queueControl.getMessagesKilled()); + + message = session.createMessage(false); + producer.send(message); + + // send to DLA the old-fashioned way + ClientConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < server.getAddressSettingsRepository().getMatch(queue.toString()).getMaxDeliveryAttempts(); i++) { + message = consumer.receive(500); + assertNotNull(message); + message.acknowledge(); + session.rollback(); + } + + consumer.close(); + + Assert.assertEquals(2, queueControl.getMessagesKilled()); + + queueControl.resetMessagesKilled(); + + Assert.assertEquals(0, queueControl.getMessagesKilled()); + + session.deleteQueue(queue); + } + //make sure notifications are always received no matter whether //a Queue is created via QueueControl or by JMSServerManager directly. @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index 230941fce6..b0cc3157d4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -125,6 +125,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { return (Long) proxy.retrieveAttributeValue("messagesExpired", Long.class); } + @Override + public long getMessagesKilled() { + return ((Number) proxy.retrieveAttributeValue("messagesKilled")).longValue(); + } + @Override public void resetMessagesAdded() throws Exception { proxy.invokeOperation("resetMessagesAdded"); @@ -140,6 +145,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { proxy.invokeOperation("resetMessagesExpired"); } + @Override + public void resetMessagesKilled() throws Exception { + proxy.invokeOperation("resetMessagesKilled"); + } + @Override public String getName() { return (String) proxy.retrieveAttributeValue("name"); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 0633bfb971..8eae7d6ab1 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.LinkedListIterator; import org.apache.activemq.artemis.utils.ReferenceCounter; @@ -157,12 +158,24 @@ public class FakeQueue implements Queue { } + @Override + public void acknowledge(MessageReference ref, AckReason reason) throws Exception { + // no-op + + } + @Override public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception { // no-op } + @Override + public void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception { + // no-op + + } + @Override public void addConsumer(final Consumer consumer) throws Exception { // no-op @@ -317,6 +330,12 @@ public class FakeQueue implements Queue { return 0; } + @Override + public long getMessagesKilled() { + // no-op + return 0; + } + @Override public void resetMessagesAdded() { // no-op @@ -335,6 +354,12 @@ public class FakeQueue implements Queue { } + @Override + public void resetMessagesKilled() { + // no-op + + } + @Override public void incrementMesssagesAdded() {