From 2b856f4da6de0395619bc5b87366d1400dcbb49d Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Thu, 15 Feb 2024 09:39:48 -0500 Subject: [PATCH] AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates This commit fixes a bug in KahaDB that caused gaps in sequence ack tracking for durables that would lead to the appearance of stuck messages on durable subs if duplicate messages were detected. The sequence is now correctly rolled back so that there is no gap if the message is not added to the order index (cherry picked from commit 10d94bd16585df7bf9c266958c5e45bb556e8c14) --- .../store/kahadb/MessageDatabase.java | 3 +- .../KahaDBDurableMessageRecoveryTest.java | 41 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index bb28221a9b..dbab306fc7 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1552,7 +1552,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } metadata.lastUpdate = location; } else { - MessageKeys messageKeys = sd.orderIndex.get(tx, previous); if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt @@ -1560,6 +1559,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } sd.messageIdIndex.put(tx, command.getMessageId(), previous); sd.locationIndex.remove(tx, location); + // ensure sequence is not broken + sd.orderIndex.revertNextMessageId(); id = -1; } } else { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java index 58cac05dfd..041fce238f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java @@ -36,11 +36,14 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.util.Wait; import org.junit.After; @@ -357,6 +360,44 @@ public class KahaDBDurableMessageRecoveryTest { assertEquals(10, sub2Recovered.get()); } + /** + * AMQ-9453 Validates that the order index doesn't have gaps in sequence + * tracking on duplicates + */ + @Test + public void durableRecoveryDuplicates() throws Exception { + String testTopic = "test.topic.duplicates"; + + Session session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE); + ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic); + MessageProducer producer = session.createProducer(topic); + TopicSubscriber subscriber = session.createDurableSubscriber(topic, "sub1"); + + final Destination brokerTopic = broker.getDestination(topic); + final MessageStore store = brokerTopic.getMessageStore(); + for (int i = 1; i <= 10; i++) { + TextMessage message = session.createTextMessage("msg: " + i); + producer.send(message); + // For each message try to add it twice to the store, the store should detect + // the duplicate and prevent it. This used to break the order index cursor + // which would cause the ack sequences to have gaps so the metrics reported + // more messages than existed which leads to the appearance of stuck messages. + // This has been fixed in AMQ-9453 + store.addMessage(broker.getAdminConnectionContext(), (ActiveMQMessage)message); + } + producer.close(); + + // Verify we only have 10 messages still and then consume them all + assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500)); + for (int i = 1; i <= 10; i++) { + TextMessage received = (TextMessage) subscriber.receive(1000); + assertNotNull(received); + } + + subscriber.close(); + assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500)); + } + protected long getPendingMessageCount(ActiveMQTopic topic, String clientId, String subId) throws Exception { final Topic brokerTopic = (Topic) broker.getDestination(topic); final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();