mirror of https://github.com/apache/activemq.git
Computing messageSize for a durable subscription in KahaDB now runs much faster (n vs n^2) which is noticable when there are a large number of pending messages for a durable subscription.
This commit is contained in:
parent
c17b7fdc7f
commit
25ff5699f1
|
@ -2588,31 +2588,28 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
}
|
||||
|
||||
public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
|
||||
//grab the messages attached to this subscription
|
||||
SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
|
||||
|
||||
long locationSize = 0;
|
||||
if (messageSequences != null) {
|
||||
Iterator<Long> sequences = messageSequences.iterator();
|
||||
Sequence head = messageSequences.getHead();
|
||||
if (head != null) {
|
||||
//get an iterator over the order index starting at the first unacked message
|
||||
//and go over each message to add up the size
|
||||
Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
|
||||
new MessageOrderCursor(head.getFirst()));
|
||||
|
||||
while (sequences.hasNext()) {
|
||||
Long sequenceId = sequences.next();
|
||||
//the last item is the next marker
|
||||
if (!sequences.hasNext()) {
|
||||
break;
|
||||
}
|
||||
Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx);
|
||||
while (iterator.hasNext()) {
|
||||
Entry<Location, Long> entry = iterator.next();
|
||||
if (entry.getValue() == sequenceId - 1) {
|
||||
locationSize += entry.getKey().getSize();
|
||||
break;
|
||||
}
|
||||
|
||||
Entry<Long, MessageKeys> entry = iterator.next();
|
||||
locationSize += entry.getValue().location.getSize();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return locationSize;
|
||||
}
|
||||
|
||||
protected String key(KahaDestination destination) {
|
||||
return destination.getType().getNumber() + ":" + destination.getName();
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.region.cursors;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -276,6 +278,10 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
|
|||
verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
|
||||
verifyStoreStats(dest, 200, publishedMessageSize.get());
|
||||
|
||||
//should be equal in this case
|
||||
assertEquals(dest.getDurableTopicSubs().get(subKey).getPendingMessageSize(),
|
||||
dest.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
|
||||
|
||||
//consume all messages
|
||||
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
|
||||
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.region.cursors;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -84,6 +86,10 @@ public class KahaDBPendingMessageCursorTest extends
|
|||
verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
|
||||
verifyStoreStats(topic, 200, publishedMessageSize.get());
|
||||
|
||||
//should be equal in this case
|
||||
assertEquals(topic.getDurableTopicSubs().get(subKey).getPendingMessageSize(),
|
||||
topic.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
|
||||
|
||||
// stop, restart broker and publish more messages
|
||||
stopBroker();
|
||||
this.setUpBroker(false);
|
||||
|
@ -101,6 +107,36 @@ public class KahaDBPendingMessageCursorTest extends
|
|||
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testMessageSizeTwoDurablesPartialConsumption() throws Exception {
|
||||
AtomicLong publishedMessageSize = new AtomicLong();
|
||||
|
||||
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
|
||||
connection.setClientID("clientId");
|
||||
connection.start();
|
||||
|
||||
SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
|
||||
SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
|
||||
org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(
|
||||
connection, new String[] {"sub1", "sub2"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
|
||||
|
||||
//verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
|
||||
verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
|
||||
verifyStoreStats(dest, 200, publishedMessageSize.get());
|
||||
|
||||
//consume all messages
|
||||
consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
|
||||
|
||||
//150 should be left
|
||||
verifyPendingStats(dest, subKey, 150, publishedMessageSize.get());
|
||||
|
||||
//200 should be left
|
||||
verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get());
|
||||
verifyStoreStats(dest, 200, publishedMessageSize.get());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the the counter restores size and works after restart and more
|
||||
* messages are published
|
||||
|
|
Loading…
Reference in New Issue