diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index c5cb20a25d..bb28221a9b 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -3148,18 +3148,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (messageSequence != null) { SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); if (range != null && !range.isEmpty()) { - range.remove(messageSequence); + boolean removed = range.remove(messageSequence); if (!range.isEmpty()) { sd.ackPositions.put(tx, subscriptionKey, range); } else { sd.ackPositions.remove(tx, subscriptionKey); } - MessageKeys key = sd.orderIndex.get(tx, messageSequence); - decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey, + // Only decrement the statistics if the message was removed + // from the ack set for the subscription + // Fix for AMQ-9420 + if (removed) { + MessageKeys key = sd.orderIndex.get(tx, messageSequence); + decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey, key.location.getSize()); + } else { + LOG.warn("Received unexpected duplicate ack: messageId: {}, Sub: {}, Dest: {}", + command.getMessageId(), subscriptionKey, command.getDestination()); + } // Check if the message is reference by any other subscription. + // If removed was previously false then we could return before + // this check as this should always return true (should still be + // a reference) but removed being false is unexpected in the first + // place so this is a good second check to verify. if (isSequenceReferenced(tx, sd, messageSequence)) { return; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java index d2ac17b5e0..0ddea1ed73 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java @@ -533,7 +533,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat return publishTestMessagesDurable(connection, subNames, defaultTopicName, publishSize, 0, AbstractStoreStatTestSupport.defaultMessageSize, - publishedMessageSize, false, deliveryMode); + publishedMessageSize, null, false, deliveryMode); } protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize, diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java index 98dbaf4f05..d0dbca4eb7 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java @@ -18,19 +18,22 @@ package org.apache.activemq.store; import static org.junit.Assert.assertTrue; -import java.io.IOException; -import java.net.URI; -import java.util.concurrent.atomic.AtomicLong; - import jakarta.jms.Connection; import jakarta.jms.DeliveryMode; - +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; import org.apache.activemq.util.Wait; -import org.apache.activemq.util.Wait.Condition; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -47,11 +50,20 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat protected static final Logger LOG = LoggerFactory .getLogger(AbstractMessageStoreSizeStatTest.class); - protected BrokerService broker; protected URI brokerConnectURI; protected String defaultQueueName = "test.queue"; protected String defaultTopicName = "test.topic"; + // Only applies to KahaDB + protected final boolean subStatsEnabled; + + protected AbstractMessageStoreSizeStatTest(boolean subStatsEnabled) { + this.subStatsEnabled = subStatsEnabled; + } + + protected AbstractMessageStoreSizeStatTest() { + this.subStatsEnabled = false; + } @Before public void startBroker() throws Exception { @@ -119,17 +131,40 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat connection.setClientID("clientId"); connection.start(); - Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200, publishedMessageSize); + ActiveMQTopic topic = new ActiveMQTopic(defaultTopicName); + Set publishedMessages = new HashSet<>(); + Topic dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200, + publishedMessageSize, publishedMessages); + + PersistenceAdapter adapter = this.broker.getPersistenceAdapter(); + TopicMessageStore store = adapter.createTopicMessageStore(topic); //verify the count and size verifyStats(dest, 200, publishedMessageSize.get()); + if (subStatsEnabled) { + verifyDurableStats(dest, "clientId:sub1", 200, publishedMessageSize.get()); + } + //consume all messages consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize); //All messages should now be gone verifyStats(dest, 0, 0); + if (subStatsEnabled) { + verifyDurableStats(dest, "clientId:sub1", 0, publishedMessageSize.get()); + } + + // Send 10 duplicates to verify our metrics are not broken + sendAcks(store, publishedMessages); + + // Stats should still show 0 after duplicates + verifyStats(dest, 0, publishedMessageSize.get()); + if (subStatsEnabled) { + verifyDurableStats(dest, "clientId:sub1", 0, publishedMessageSize.get()); + } + connection.close(); } @@ -140,17 +175,46 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat connection.setClientID("clientId"); connection.start(); - Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200, publishedMessageSize); + ActiveMQTopic topic = new ActiveMQTopic(defaultTopicName); + Set publishedMessages = new HashSet<>(); + Topic dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200, + publishedMessageSize, publishedMessages); + + PersistenceAdapter adapter = this.broker.getPersistenceAdapter(); + TopicMessageStore store = adapter.createTopicMessageStore(topic); //verify the count and size verifyStats(dest, 200, publishedMessageSize.get()); + // Verify each subscription counter is correct + if (subStatsEnabled) { + verifyDurableStats(dest, "clientId:sub1", 200, publishedMessageSize.get()); + verifyDurableStats(dest, "clientId:sub2", 200, publishedMessageSize.get()); + } + //consume messages just for sub1 consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize); //There is still a durable that hasn't consumed so the messages should exist verifyStats(dest, 200, publishedMessageSize.get()); + if (subStatsEnabled) { + verifyDurableStats(dest, "clientId:sub1", 0, publishedMessageSize.get()); + verifyDurableStats(dest, "clientId:sub2", 200, publishedMessageSize.get()); + } + // Send 10 duplicates to verify our metrics are not broken + // This used to break in memory subscription statistics before AMQ-9420 + sendAcks(store, publishedMessages); + + // Stats should still show 200 after duplicates + verifyStats(dest, 200, publishedMessageSize.get()); + + // Verify each subscription counter is correct + // Sub 2 should still have 200 messages + if (subStatsEnabled) { + verifyDurableStats(dest, "clientId:sub1", 0, publishedMessageSize.get()); + verifyDurableStats(dest, "clientId:sub2", 200, publishedMessageSize.get()); + } connection.stop(); } @@ -179,30 +243,30 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat final MessageStore messageStore = dest.getMessageStore(); final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics(); - assertTrue(Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return (count == messageStore.getMessageCount()) && (messageStore.getMessageCount() == - storeStats.getMessageCount().getCount()) && (messageStore.getMessageSize() == - messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize()); - } - })); + assertTrue(Wait.waitFor( + () -> (count == messageStore.getMessageCount()) && (messageStore.getMessageCount() == + storeStats.getMessageCount().getCount()) && (messageStore.getMessageSize() == + messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize()))); if (count > 0) { assertTrue(storeStats.getMessageSize().getTotalSize() > minimumSize); - assertTrue(Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return storeStats.getMessageSize().getTotalSize() > minimumSize; - } - })); + assertTrue(Wait.waitFor(() -> storeStats.getMessageSize().getTotalSize() > minimumSize)); } else { - assertTrue(Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return storeStats.getMessageSize().getTotalSize() == 0; - } - })); + assertTrue(Wait.waitFor(() -> storeStats.getMessageSize().getTotalSize() == 0)); + } + } + + protected void verifyDurableStats(Topic dest, String subKey, final int count, final long minimumSize) throws Exception { + final TopicMessageStore messageStore = (TopicMessageStore) dest.getMessageStore(); + MessageStoreSubscriptionStatistics subStats = messageStore.getMessageStoreSubStatistics(); + + assertTrue(Wait.waitFor( + () -> count == subStats.getMessageCount(subKey).getCount())); + + if (count > 0) { + assertTrue(subStats.getMessageSize(subKey).getTotalSize() > minimumSize); + } else { + assertTrue(Wait.waitFor(() -> subStats.getMessageSize(subKey).getTotalSize() == 0)); } } @@ -226,11 +290,27 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize); } - protected Destination publishTestMessagesDurable(Connection connection, String[] subNames, - int publishSize, int expectedSize, AtomicLong publishedMessageSize) throws Exception { + protected Topic publishTestMessagesDurable(Connection connection, String[] subNames, + int publishSize, int expectedSize, AtomicLong publishedMessageSize, Set publishedMessages) throws Exception { return publishTestMessagesDurable(connection, subNames, defaultTopicName, publishSize, expectedSize, AbstractStoreStatTestSupport.defaultMessageSize, - publishedMessageSize, true); + publishedMessageSize, publishedMessages, true); + } + + protected void sendAcks(TopicMessageStore store, Set publishedMessages) { + publishedMessages.stream().limit(10).forEach(id -> { + try { + MessageId messageId = new MessageId(id); + MessageAck ack = new MessageAck(); + ack.setMessageID(messageId); + ack.setDestination(store.getDestination()); + ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE); + ack.setMessageCount(1); + store.acknowledge(broker.getAdminConnectionContext(), "clientId", "sub1", messageId, ack); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java index d900415daa..19167d4342 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull; import java.net.URI; import java.util.Enumeration; import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import jakarta.jms.BytesMessage; @@ -189,14 +190,14 @@ public abstract class AbstractStoreStatTestSupport { } protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName, - int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, + int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, Set publishedMessages, boolean verifyBrowsing) throws Exception { return this.publishTestMessagesDurable(connection, subNames, topicName, publishSize, expectedSize, messageSize, - publishedMessageSize, verifyBrowsing, DeliveryMode.PERSISTENT); + publishedMessageSize, publishedMessages, verifyBrowsing, DeliveryMode.PERSISTENT); } protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName, - int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, + int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, Set publishedMessages, boolean verifyBrowsing, int deliveryMode) throws Exception { // create a new queue final ActiveMQDestination activeMqTopic = new ActiveMQTopic( @@ -228,7 +229,11 @@ public abstract class AbstractStoreStatTestSupport { MessageProducer prod = session.createProducer(topic); prod.setDeliveryMode(deliveryMode); for (int i = 0; i < publishSize; i++) { - prod.send(createMessage(i, session, messageSize, publishedMessageSize)); + Message message = createMessage(i, session, messageSize, publishedMessageSize); + prod.send(message); + if (publishedMessages != null) { + publishedMessages.add(message.getJMSMessageID()); + } } //verify the view has expected messages diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java index 1efb59e403..5e09389852 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java @@ -18,6 +18,8 @@ package org.apache.activemq.store.kahadb; import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.broker.BrokerService; @@ -27,6 +29,9 @@ import org.apache.commons.io.FileUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,14 +42,29 @@ import org.slf4j.LoggerFactory; * AMQ-5748 * */ +@RunWith(Parameterized.class) public class KahaDBMessageStoreSizeStatTest extends AbstractMessageStoreSizeStatTest { protected static final Logger LOG = LoggerFactory .getLogger(KahaDBMessageStoreSizeStatTest.class); + @Parameters(name = "subStatsEnabled={0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + // Subscription stats on + {true}, + // Subscription stats off + {false} + }); + } + @Rule public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); + public KahaDBMessageStoreSizeStatTest(boolean subStatsEnabled) { + super(subStatsEnabled); + } + @Override protected void setUpBroker(boolean clearDataDir) throws Exception { if (clearDataDir && dataFileDir.getRoot().exists()) @@ -57,6 +77,8 @@ public class KahaDBMessageStoreSizeStatTest extends throws IOException { broker.setPersistent(true); broker.setDataDirectoryFile(dataFileDir.getRoot()); + KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + adapter.setEnableSubscriptionStatistics(subStatsEnabled); } /** diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java index 0c4fc763e6..8da8d9c16b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -31,6 +33,9 @@ import org.apache.commons.io.FileUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,14 +46,29 @@ import org.slf4j.LoggerFactory; * AMQ-5748 * */ +@RunWith(Parameterized.class) public class MultiKahaDBMessageStoreSizeStatTest extends AbstractMessageStoreSizeStatTest { protected static final Logger LOG = LoggerFactory .getLogger(MultiKahaDBMessageStoreSizeStatTest.class); + @Parameters(name = "subStatsEnabled={0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + // Subscription stats on + {true}, + // Subscription stats off + {false} + }); + } + @Rule public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); + public MultiKahaDBMessageStoreSizeStatTest(boolean subStatsEnabled) { + super(subStatsEnabled); + } + @Override protected void setUpBroker(boolean clearDataDir) throws Exception { if (clearDataDir && dataFileDir.getRoot().exists()) @@ -67,6 +87,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter(); kahaStore.setJournalMaxFileLength(1024 * 512); + kahaStore.setEnableSubscriptionStatistics(subStatsEnabled); //set up a store per destination FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java index 61ff28a9b0..0f608d003a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java @@ -55,7 +55,8 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat //The expected value is only 100 because for durables a LRUCache is being used //with a max size of 100 - Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100, publishedMessageSize); + Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100, + publishedMessageSize, null); //verify the count and size, should be 100 because of the LRUCache //verify size is at least the minimum of 100 messages times 100 bytes @@ -80,7 +81,8 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat //The expected value is only 100 because for durables a LRUCache is being used //with a max size of 100, so only 100 messages are kept - Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100, publishedMessageSize); + Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100, + publishedMessageSize, null); //verify the count and size //verify size is at least the minimum of 100 messages times 100 bytes