diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index cbbf9b62a6..9fd03514e0 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.broker.ConnectionContext; @@ -960,7 +961,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, protected void recoverMessageStoreSubMetrics() throws IOException { if (isEnableSubscriptionStatistics()) { - final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics(); indexLock.writeLock().lock(); try { @@ -968,19 +968,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, @Override public void execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); + + List subscriptionKeys = new ArrayList<>(); for (Iterator> iterator = sd.subscriptions .iterator(tx); iterator.hasNext();) { Entry entry = iterator.next(); - String subscriptionKey = entry.getKey(); - LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); + final String subscriptionKey = entry.getKey(); + final LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); if (cursorPos != null) { - long size = getStoredMessageSize(tx, sd, subscriptionKey); + //add the subscriptions to a list for recovering pending sizes below + subscriptionKeys.add(subscriptionKey); + //recover just the count here as that is fast statistics.getMessageCount(subscriptionKey) .setCount(getStoredMessageCount(tx, sd, subscriptionKey)); - statistics.getMessageSize(subscriptionKey).addSize(size > 0 ? size : 0); } } + + //Recover the message sizes for each subscription by iterating only 1 time over the order index + //to speed up recovery + final Map subPendingMessageSizes = getStoredMessageSize(tx, sd, subscriptionKeys); + subPendingMessageSizes.forEach((k,v) -> { + statistics.getMessageSize(k).addSize(v.get() > 0 ? v.get() : 0); + }); } }); } finally { diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 83650e84ec..83d3fffd53 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -46,7 +46,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; -import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -3003,6 +3002,61 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return 0; } + /** + * Recovers durable subscription pending message size with only 1 pass over the order index on recovery + * instead of iterating over the index once per subscription + * + * @param tx + * @param sd + * @param subscriptionKeys + * @return + * @throws IOException + */ + protected Map getStoredMessageSize(Transaction tx, StoredDestination sd, List subscriptionKeys) throws IOException { + + final Map subPendingMessageSizes = new HashMap<>(); + final Map messageSequencesMap = new HashMap<>(); + + if (sd.ackPositions != null) { + Long recoveryPosition = null; + //Go through each subscription and find matching ackPositions and their first + //position to find the initial recovery position which is the first message across all subs + //that needs to still be acked + for (String subscriptionKey : subscriptionKeys) { + subPendingMessageSizes.put(subscriptionKey, new AtomicLong()); + final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); + if (messageSequences != null && !messageSequences.isEmpty()) { + final long head = messageSequences.getHead().getFirst(); + recoveryPosition = recoveryPosition != null ? Math.min(recoveryPosition, head) : head; + //cache the SequenceSet to speed up recovery of metrics below and avoid a second index hit + messageSequencesMap.put(subscriptionKey, messageSequences); + } + } + recoveryPosition = recoveryPosition != null ? recoveryPosition : 0; + + final Iterator> iterator = sd.orderIndex.iterator(tx, + new MessageOrderCursor(recoveryPosition)); + + //iterate through all messages starting at the recovery position to recover metrics + while (iterator.hasNext()) { + final Entry messageEntry = iterator.next(); + + //For each message in the index check if each subscription needs to ack the message still + //if the ackPositions SequenceSet contains the message then it has not been acked and should be + //added to the pending metrics for that subscription + for (Entry seqEntry : messageSequencesMap.entrySet()) { + final String subscriptionKey = seqEntry.getKey(); + final SequenceSet messageSequences = messageSequencesMap.get(subscriptionKey); + if (messageSequences.contains(messageEntry.getKey())) { + subPendingMessageSizes.get(subscriptionKey).addAndGet(messageEntry.getValue().location.getSize()); + } + } + } + } + + return subPendingMessageSizes; + } + protected long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { long locationSize = 0; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java index 519648ea9e..cf22b272ad 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java @@ -55,9 +55,9 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class KahaDBDurableMessageRecoveryTest { - @Parameters(name = "{0}") + @Parameters(name = "recoverIndex={0},enableSubscriptionStats={1}") public static Collection data() { - return Arrays.asList(new Object[][] { { false }, { true } }); + return Arrays.asList(new Object[][] { { false, false }, { false, true }, { true, false }, { true, true } }); } @Rule @@ -66,6 +66,7 @@ public class KahaDBDurableMessageRecoveryTest { private URI brokerConnectURI; private boolean recoverIndex; + private boolean enableSubscriptionStats; @Before public void setUpBroker() throws Exception { @@ -81,9 +82,10 @@ public class KahaDBDurableMessageRecoveryTest { /** * @param deleteIndex */ - public KahaDBDurableMessageRecoveryTest(boolean recoverIndex) { + public KahaDBDurableMessageRecoveryTest(boolean recoverIndex, boolean enableSubscriptionStats) { super(); this.recoverIndex = recoverIndex; + this.enableSubscriptionStats = enableSubscriptionStats; } protected void startBroker(boolean recoverIndex) throws Exception { @@ -105,6 +107,7 @@ public class KahaDBDurableMessageRecoveryTest { KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); adapter.setForceRecoverIndex(forceRecoverIndex); + adapter.setEnableSubscriptionStatistics(enableSubscriptionStats); // set smaller size for test adapter.setJournalMaxFileLength(1024 * 20); @@ -210,10 +213,12 @@ public class KahaDBDurableMessageRecoveryTest { assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500)); assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500)); - //Verify the pending size is less for sub1 - assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0); - assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0); - assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < getPendingMessageSize(topic, "clientId1", "sub2")); + // Verify the pending size is less for sub1 + final long sub1PendingSizeBeforeRestart = getPendingMessageSize(topic, "clientId1", "sub1"); + final long sub2PendingSizeBeforeRestart = getPendingMessageSize(topic, "clientId1", "sub2"); + assertTrue(sub1PendingSizeBeforeRestart > 0); + assertTrue(sub2PendingSizeBeforeRestart > 0); + assertTrue(sub1PendingSizeBeforeRestart < sub2PendingSizeBeforeRestart); subscriber1.close(); subscriber2.close(); @@ -223,10 +228,9 @@ public class KahaDBDurableMessageRecoveryTest { assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500)); assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500)); - //Verify the pending size is less for sub1 - assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0); - assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0); - assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < getPendingMessageSize(topic, "clientId1", "sub2")); + // Verify the pending size is less for sub1 + assertEquals(sub1PendingSizeBeforeRestart, getPendingMessageSize(topic, "clientId1", "sub1")); + assertEquals(sub2PendingSizeBeforeRestart, getPendingMessageSize(topic, "clientId1", "sub2")); // Recreate subscriber and try and receive the other 8 messages session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE); @@ -293,7 +297,7 @@ public class KahaDBDurableMessageRecoveryTest { subscriber2.close(); restartBroker(recoverIndex); - //Manually recover subscription and verify proper messages are loaded + // Manually recover subscription and verify proper messages are loaded final Topic brokerTopic = (Topic) broker.getDestination(topic); final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore(); final AtomicInteger sub1Recovered = new AtomicInteger(); @@ -348,7 +352,7 @@ public class KahaDBDurableMessageRecoveryTest { } }); - //Verify proper number of messages are recovered + // Verify proper number of messages are recovered assertEquals(8, sub1Recovered.get()); assertEquals(10, sub2Recovered.get()); }