diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index d26c7bd101..00b9bce25b 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1552,7 +1552,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } metadata.lastUpdate = location; } else { - MessageKeys messageKeys = sd.orderIndex.get(tx, previous); if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt @@ -1560,6 +1559,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } sd.messageIdIndex.put(tx, command.getMessageId(), previous); sd.locationIndex.remove(tx, location); + // ensure sequence is not broken + sd.orderIndex.revertNextMessageId(); id = -1; } } else { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java index cf22b272ad..cea5ca0194 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java @@ -36,11 +36,14 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.util.Wait; import org.junit.After; @@ -357,6 +360,44 @@ public class KahaDBDurableMessageRecoveryTest { assertEquals(10, sub2Recovered.get()); } + /** + * AMQ-9453 Validates that the order index doesn't have gaps in sequence + * tracking on duplicates + */ + @Test + public void durableRecoveryDuplicates() throws Exception { + String testTopic = "test.topic.duplicates"; + + Session session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE); + ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic); + MessageProducer producer = session.createProducer(topic); + TopicSubscriber subscriber = session.createDurableSubscriber(topic, "sub1"); + + final Destination brokerTopic = broker.getDestination(topic); + final MessageStore store = brokerTopic.getMessageStore(); + for (int i = 1; i <= 10; i++) { + TextMessage message = session.createTextMessage("msg: " + i); + producer.send(message); + // For each message try to add it twice to the store, the store should detect + // the duplicate and prevent it. This used to break the order index cursor + // which would cause the ack sequences to have gaps so the metrics reported + // more messages than existed which leads to the appearance of stuck messages. + // This has been fixed in AMQ-9453 + store.addMessage(broker.getAdminConnectionContext(), (ActiveMQMessage)message); + } + producer.close(); + + // Verify we only have 10 messages still and then consume them all + assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500)); + for (int i = 1; i <= 10; i++) { + TextMessage received = (TextMessage) subscriber.receive(1000); + assertNotNull(received); + } + + subscriber.close(); + assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500)); + } + protected long getPendingMessageCount(ActiveMQTopic topic, String clientId, String subId) throws Exception { final Topic brokerTopic = (Topic) broker.getDestination(topic); final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();