mirror of https://github.com/apache/activemq.git
AMQ-7129 - fix durable message size statistics with individual ack
Make sure that the pending message size for a durable sub only includes messages part of the ack range
This commit is contained in:
parent
d29c1be66a
commit
fa2daa25e9
|
@ -3030,16 +3030,23 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
|
||||
|
||||
if (messageSequences != null) {
|
||||
Sequence head = messageSequences.getHead();
|
||||
if (head != null) {
|
||||
if (!messageSequences.isEmpty()) {
|
||||
final Sequence head = messageSequences.getHead();
|
||||
|
||||
//get an iterator over the order index starting at the first unacked message
|
||||
//and go over each message to add up the size
|
||||
Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
|
||||
new MessageOrderCursor(head.getFirst()));
|
||||
|
||||
final boolean contiguousRange = messageSequences.size() == 1;
|
||||
while (iterator.hasNext()) {
|
||||
Entry<Long, MessageKeys> entry = iterator.next();
|
||||
locationSize += entry.getValue().location.getSize();
|
||||
//Verify sequence contains the key
|
||||
//if contiguous we just add all starting with the first but if not
|
||||
//we need to check if the id is part of the range - could happen if individual ack mode was used
|
||||
if (contiguousRange || messageSequences.contains(entry.getKey())) {
|
||||
locationSize += entry.getValue().location.getSize();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -209,6 +209,12 @@ public class KahaDBDurableMessageRecoveryTest {
|
|||
// Verify there are 8 messages left still and restart broker
|
||||
assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
|
||||
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
|
||||
|
||||
//Verify the pending size is less for sub1
|
||||
assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
|
||||
assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
|
||||
assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < getPendingMessageSize(topic, "clientId1", "sub2"));
|
||||
|
||||
subscriber1.close();
|
||||
subscriber2.close();
|
||||
restartBroker(recoverIndex);
|
||||
|
@ -217,6 +223,11 @@ public class KahaDBDurableMessageRecoveryTest {
|
|||
assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
|
||||
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
|
||||
|
||||
//Verify the pending size is less for sub1
|
||||
assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
|
||||
assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
|
||||
assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < getPendingMessageSize(topic, "clientId1", "sub2"));
|
||||
|
||||
// Recreate subscriber and try and receive the other 8 messages
|
||||
session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
|
||||
subscriber1 = session.createDurableSubscriber(topic, "sub1");
|
||||
|
@ -347,4 +358,10 @@ public class KahaDBDurableMessageRecoveryTest {
|
|||
final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
|
||||
return store.getMessageCount(clientId, subId);
|
||||
}
|
||||
|
||||
protected long getPendingMessageSize(ActiveMQTopic topic, String clientId, String subId) throws Exception {
|
||||
final Topic brokerTopic = (Topic) broker.getDestination(topic);
|
||||
final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
|
||||
return store.getMessageSize(clientId, subId);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue