Computing messageSize for a durable subscription in KahaDB now runs much
faster (n vs n^2) which is noticable when there are a large number of
pending messages for a durable subscription.

(cherry picked from commit 25ff5699f1)
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-02-04 15:16:11 +00:00
parent 04b191ceb4
commit 23e9ecaec1
3 changed files with 53 additions and 14 deletions

View File

@ -2588,31 +2588,28 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
//grab the messages attached to this subscription
SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
long locationSize = 0; long locationSize = 0;
if (messageSequences != null) { if (messageSequences != null) {
Iterator<Long> sequences = messageSequences.iterator(); Sequence head = messageSequences.getHead();
if (head != null) {
//get an iterator over the order index starting at the first unacked message
//and go over each message to add up the size
Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
new MessageOrderCursor(head.getFirst()));
while (sequences.hasNext()) {
Long sequenceId = sequences.next();
//the last item is the next marker
if (!sequences.hasNext()) {
break;
}
Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx);
while (iterator.hasNext()) { while (iterator.hasNext()) {
Entry<Location, Long> entry = iterator.next(); Entry<Long, MessageKeys> entry = iterator.next();
if (entry.getValue() == sequenceId - 1) { locationSize += entry.getValue().location.getSize();
locationSize += entry.getKey().getSize();
break;
}
} }
} }
} }
return locationSize; return locationSize;
} }
protected String key(KahaDestination destination) { protected String key(KahaDestination destination) {
return destination.getType().getNumber() + ":" + destination.getName(); return destination.getType().getNumber() + ":" + destination.getName();
} }

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.broker.region.cursors; package org.apache.activemq.broker.region.cursors;
import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -276,6 +278,10 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
verifyPendingStats(dest, subKey, 200, publishedMessageSize.get()); verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
verifyStoreStats(dest, 200, publishedMessageSize.get()); verifyStoreStats(dest, 200, publishedMessageSize.get());
//should be equal in this case
assertEquals(dest.getDurableTopicSubs().get(subKey).getPendingMessageSize(),
dest.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
//consume all messages //consume all messages
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize); consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.broker.region.cursors; package org.apache.activemq.broker.region.cursors;
import static org.junit.Assert.assertEquals;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -84,6 +86,10 @@ public class KahaDBPendingMessageCursorTest extends
verifyPendingStats(topic, subKey, 200, publishedMessageSize.get()); verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
verifyStoreStats(topic, 200, publishedMessageSize.get()); verifyStoreStats(topic, 200, publishedMessageSize.get());
//should be equal in this case
assertEquals(topic.getDurableTopicSubs().get(subKey).getPendingMessageSize(),
topic.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
// stop, restart broker and publish more messages // stop, restart broker and publish more messages
stopBroker(); stopBroker();
this.setUpBroker(false); this.setUpBroker(false);
@ -101,6 +107,36 @@ public class KahaDBPendingMessageCursorTest extends
} }
@Test(timeout=60000)
public void testMessageSizeTwoDurablesPartialConsumption() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(
connection, new String[] {"sub1", "sub2"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
//verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
verifyStoreStats(dest, 200, publishedMessageSize.get());
//consume all messages
consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
//150 should be left
verifyPendingStats(dest, subKey, 150, publishedMessageSize.get());
//200 should be left
verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get());
verifyStoreStats(dest, 200, publishedMessageSize.get());
connection.close();
}
/** /**
* Test that the the counter restores size and works after restart and more * Test that the the counter restores size and works after restart and more
* messages are published * messages are published