mirror of https://github.com/apache/activemq.git
AMQ-9435 - Ensure orderIndex next id is rolled back on duplicates
This commit fixes a bug in KahaDB that caused gaps in sequence ack tracking for durables that would lead to the appearance of stuck messages on durable subs if duplicate messages were detected. The sequence is now correctly rolled back so that there is no gap if the message is not added to the order index (cherry picked from commit10d94bd165
) (cherry picked from commit2b856f4da6
)
This commit is contained in:
parent
97e379be8a
commit
722a075e2c
|
@ -1552,7 +1552,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
}
|
||||
metadata.lastUpdate = location;
|
||||
} else {
|
||||
|
||||
MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
|
||||
if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
|
||||
// If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
|
||||
|
@ -1560,6 +1559,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
}
|
||||
sd.messageIdIndex.put(tx, command.getMessageId(), previous);
|
||||
sd.locationIndex.remove(tx, location);
|
||||
// ensure sequence is not broken
|
||||
sd.orderIndex.revertNextMessageId();
|
||||
id = -1;
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -36,11 +36,14 @@ import org.apache.activemq.ActiveMQConnectionFactory;
|
|||
import org.apache.activemq.ActiveMQSession;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.Topic;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.store.MessageRecoveryListener;
|
||||
import org.apache.activemq.store.MessageStore;
|
||||
import org.apache.activemq.store.TopicMessageStore;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.junit.After;
|
||||
|
@ -357,6 +360,44 @@ public class KahaDBDurableMessageRecoveryTest {
|
|||
assertEquals(10, sub2Recovered.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* AMQ-9453 Validates that the order index doesn't have gaps in sequence
|
||||
* tracking on duplicates
|
||||
*/
|
||||
@Test
|
||||
public void durableRecoveryDuplicates() throws Exception {
|
||||
String testTopic = "test.topic.duplicates";
|
||||
|
||||
Session session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
|
||||
ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "sub1");
|
||||
|
||||
final Destination brokerTopic = broker.getDestination(topic);
|
||||
final MessageStore store = brokerTopic.getMessageStore();
|
||||
for (int i = 1; i <= 10; i++) {
|
||||
TextMessage message = session.createTextMessage("msg: " + i);
|
||||
producer.send(message);
|
||||
// For each message try to add it twice to the store, the store should detect
|
||||
// the duplicate and prevent it. This used to break the order index cursor
|
||||
// which would cause the ack sequences to have gaps so the metrics reported
|
||||
// more messages than existed which leads to the appearance of stuck messages.
|
||||
// This has been fixed in AMQ-9453
|
||||
store.addMessage(broker.getAdminConnectionContext(), (ActiveMQMessage)message);
|
||||
}
|
||||
producer.close();
|
||||
|
||||
// Verify we only have 10 messages still and then consume them all
|
||||
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
|
||||
for (int i = 1; i <= 10; i++) {
|
||||
TextMessage received = (TextMessage) subscriber.receive(1000);
|
||||
assertNotNull(received);
|
||||
}
|
||||
|
||||
subscriber.close();
|
||||
assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
|
||||
}
|
||||
|
||||
protected long getPendingMessageCount(ActiveMQTopic topic, String clientId, String subId) throws Exception {
|
||||
final Topic brokerTopic = (Topic) broker.getDestination(topic);
|
||||
final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
|
||||
|
|
Loading…
Reference in New Issue