diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 0abc464360..873485b961 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -275,6 +275,11 @@ public class PagedReferenceImpl extends AbstractProtocolReference implements Pag getQueue().acknowledge(tx, this, reason, consumer, true); } + @Override + public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer, boolean delivering) throws Exception { + getQueue().acknowledge(tx, this, reason, consumer, delivering); + } + /* (non-Javadoc) * @see java.lang.Object#toString() */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index 6c290610d4..a803d88d9e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -114,6 +114,8 @@ public interface MessageReference { void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception; + void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer, boolean delivering) throws Exception; + void emptyConsumerID(); void setConsumerId(long consumerID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java index 146bacf639..2f2dec6357 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java @@ -168,6 +168,11 @@ public class GroupFirstMessageReference implements MessageReference { messageReference.acknowledge(tx, reason, consumer); } + @Override + public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer, boolean delivering) throws Exception { + messageReference.acknowledge(tx, reason, consumer, delivering); + } + @Override public void emptyConsumerID() { messageReference.emptyConsumerID(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 5371b78947..313a23f6d2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -266,10 +266,15 @@ public class MessageReferenceImpl extends AbstractProtocolReference implements M @Override public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception { + acknowledge(tx, reason, consumer, true); + } + + @Override + public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer, boolean delivering) throws Exception { if (tx == null) { getQueue().acknowledge(this, reason, consumer); } else { - getQueue().acknowledge(tx, this, reason, consumer, true); + getQueue().acknowledge(tx, this, reason, consumer, delivering); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java index 1bc9d24b66..712068bf86 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java @@ -175,9 +175,10 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { while (iter.hasNext()) { MessageReference ref = iter.next().getRef(); if (ref.getMessage().getMessageID() == id) { - ref.acknowledge(tx); + ref.acknowledge(tx, AckReason.NORMAL, null, false); iter.remove(); notifyScheduledReferencesUpdated(); + metrics.decrementMetrics(ref); return ref; } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/scheduling/ScheduledMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/scheduling/ScheduledMessageTest.java index 6ae52b8245..ba03ff527a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/scheduling/ScheduledMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/scheduling/ScheduledMessageTest.java @@ -557,8 +557,10 @@ public class ScheduledMessageTest extends ActiveMQTestBase { QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + atestq); assertEquals(1, queueControl.getMessageCount()); + assertEquals(1, queueControl.getScheduledCount()); assertTrue(queueControl.removeMessage((long) queueControl.listScheduledMessages()[0].get("messageID"))); assertEquals(0, queueControl.getMessageCount()); + assertEquals(0, queueControl.getScheduledCount()); } @Test