mirror of https://github.com/apache/activemq.git
AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates
This commit fixes a bug in KahaDB that caused gaps in sequence ack
tracking for durables that would lead to the appearance of stuck
messages on durable subs if duplicate messages were detected. The
sequence is now correctly rolled back so that there is no gap if the
message is not added to the order index
(cherry picked from commit 10d94bd165
)
This commit is contained in:
parent
682c2035ad
commit
2b856f4da6
|
@ -1552,7 +1552,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
}
|
||||
metadata.lastUpdate = location;
|
||||
} else {
|
||||
|
||||
MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
|
||||
if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
|
||||
// If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
|
||||
|
@ -1560,6 +1559,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
}
|
||||
sd.messageIdIndex.put(tx, command.getMessageId(), previous);
|
||||
sd.locationIndex.remove(tx, location);
|
||||
// ensure sequence is not broken
|
||||
sd.orderIndex.revertNextMessageId();
|
||||
id = -1;
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -36,11 +36,14 @@ import org.apache.activemq.ActiveMQConnectionFactory;
|
|||
import org.apache.activemq.ActiveMQSession;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.Topic;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.store.MessageRecoveryListener;
|
||||
import org.apache.activemq.store.MessageStore;
|
||||
import org.apache.activemq.store.TopicMessageStore;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.junit.After;
|
||||
|
@ -357,6 +360,44 @@ public class KahaDBDurableMessageRecoveryTest {
|
|||
assertEquals(10, sub2Recovered.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* AMQ-9453 Validates that the order index doesn't have gaps in sequence
|
||||
* tracking on duplicates
|
||||
*/
|
||||
@Test
|
||||
public void durableRecoveryDuplicates() throws Exception {
|
||||
String testTopic = "test.topic.duplicates";
|
||||
|
||||
Session session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
|
||||
ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "sub1");
|
||||
|
||||
final Destination brokerTopic = broker.getDestination(topic);
|
||||
final MessageStore store = brokerTopic.getMessageStore();
|
||||
for (int i = 1; i <= 10; i++) {
|
||||
TextMessage message = session.createTextMessage("msg: " + i);
|
||||
producer.send(message);
|
||||
// For each message try to add it twice to the store, the store should detect
|
||||
// the duplicate and prevent it. This used to break the order index cursor
|
||||
// which would cause the ack sequences to have gaps so the metrics reported
|
||||
// more messages than existed which leads to the appearance of stuck messages.
|
||||
// This has been fixed in AMQ-9453
|
||||
store.addMessage(broker.getAdminConnectionContext(), (ActiveMQMessage)message);
|
||||
}
|
||||
producer.close();
|
||||
|
||||
// Verify we only have 10 messages still and then consume them all
|
||||
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
|
||||
for (int i = 1; i <= 10; i++) {
|
||||
TextMessage received = (TextMessage) subscriber.receive(1000);
|
||||
assertNotNull(received);
|
||||
}
|
||||
|
||||
subscriber.close();
|
||||
assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
|
||||
}
|
||||
|
||||
protected long getPendingMessageCount(ActiveMQTopic topic, String clientId, String subId) throws Exception {
|
||||
final Topic brokerTopic = (Topic) broker.getDestination(topic);
|
||||
final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
|
||||
|
|
Loading…
Reference in New Issue