Merge pull request #1153 from cshannon/AMQ-9435

AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates
This commit is contained in:
Christopher L. Shannon 2024-02-15 10:37:39 -05:00 committed by GitHub
commit 30d54c4299
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 43 additions and 1 deletions

View File

@ -1552,7 +1552,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
metadata.lastUpdate = location;
} else {
MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
// If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
@ -1560,6 +1559,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
sd.messageIdIndex.put(tx, command.getMessageId(), previous);
sd.locationIndex.remove(tx, location);
// ensure sequence is not broken
sd.orderIndex.revertNextMessageId();
id = -1;
}
} else {

View File

@ -36,11 +36,14 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.Wait;
import org.junit.After;
@ -357,6 +360,44 @@ public class KahaDBDurableMessageRecoveryTest {
assertEquals(10, sub2Recovered.get());
}
/**
* AMQ-9453 Validates that the order index doesn't have gaps in sequence
* tracking on duplicates
*/
@Test
public void durableRecoveryDuplicates() throws Exception {
String testTopic = "test.topic.duplicates";
Session session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
MessageProducer producer = session.createProducer(topic);
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "sub1");
final Destination brokerTopic = broker.getDestination(topic);
final MessageStore store = brokerTopic.getMessageStore();
for (int i = 1; i <= 10; i++) {
TextMessage message = session.createTextMessage("msg: " + i);
producer.send(message);
// For each message try to add it twice to the store, the store should detect
// the duplicate and prevent it. This used to break the order index cursor
// which would cause the ack sequences to have gaps so the metrics reported
// more messages than existed which leads to the appearance of stuck messages.
// This has been fixed in AMQ-9453
store.addMessage(broker.getAdminConnectionContext(), (ActiveMQMessage)message);
}
producer.close();
// Verify we only have 10 messages still and then consume them all
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
for (int i = 1; i <= 10; i++) {
TextMessage received = (TextMessage) subscriber.receive(1000);
assertNotNull(received);
}
subscriber.close();
assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
}
protected long getPendingMessageCount(ActiveMQTopic topic, String clientId, String subId) throws Exception {
final Topic brokerTopic = (Topic) broker.getDestination(topic);
final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();