AMQ-7136 - Improve recovery of durable subscription metrics in KahaDB

Updated metrics recovery to only have to iterate over the order index 1
time to recovery the pending metrics for the subscriptions instead of
making a pass over the index once per subscription
This commit is contained in:
Christopher L. Shannon (cshannon) 2019-01-15 14:10:11 -05:00
parent 5c23dd53ad
commit c3714457f1
3 changed files with 87 additions and 19 deletions

View File

@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -1050,7 +1051,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
protected void recoverMessageStoreSubMetrics() throws IOException { protected void recoverMessageStoreSubMetrics() throws IOException {
if (isEnableSubscriptionStatistics()) { if (isEnableSubscriptionStatistics()) {
final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics(); final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics();
indexLock.writeLock().lock(); indexLock.writeLock().lock();
try { try {
@ -1058,19 +1058,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
@Override @Override
public void execute(Transaction tx) throws IOException { public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx); StoredDestination sd = getStoredDestination(dest, tx);
List<String> subscriptionKeys = new ArrayList<>();
for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions
.iterator(tx); iterator.hasNext();) { .iterator(tx); iterator.hasNext();) {
Entry<String, KahaSubscriptionCommand> entry = iterator.next(); Entry<String, KahaSubscriptionCommand> entry = iterator.next();
String subscriptionKey = entry.getKey(); final String subscriptionKey = entry.getKey();
LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); final LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
if (cursorPos != null) { if (cursorPos != null) {
long size = getStoredMessageSize(tx, sd, subscriptionKey); //add the subscriptions to a list for recovering pending sizes below
subscriptionKeys.add(subscriptionKey);
//recover just the count here as that is fast
statistics.getMessageCount(subscriptionKey) statistics.getMessageCount(subscriptionKey)
.setCount(getStoredMessageCount(tx, sd, subscriptionKey)); .setCount(getStoredMessageCount(tx, sd, subscriptionKey));
statistics.getMessageSize(subscriptionKey).addSize(size > 0 ? size : 0);
} }
} }
//Recover the message sizes for each subscription by iterating only 1 time over the order index
//to speed up recovery
final Map<String, AtomicLong> subPendingMessageSizes = getStoredMessageSize(tx, sd, subscriptionKeys);
subPendingMessageSizes.forEach((k,v) -> {
statistics.getMessageSize(k).addSize(v.get() > 0 ? v.get() : 0);
});
} }
}); });
} finally { } finally {

View File

@ -46,7 +46,6 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -3154,6 +3153,61 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return 0; return 0;
} }
/**
* Recovers durable subscription pending message size with only 1 pass over the order index on recovery
* instead of iterating over the index once per subscription
*
* @param tx
* @param sd
* @param subscriptionKeys
* @return
* @throws IOException
*/
protected Map<String, AtomicLong> getStoredMessageSize(Transaction tx, StoredDestination sd, List<String> subscriptionKeys) throws IOException {
final Map<String, AtomicLong> subPendingMessageSizes = new HashMap<>();
final Map<String, SequenceSet> messageSequencesMap = new HashMap<>();
if (sd.ackPositions != null) {
Long recoveryPosition = null;
//Go through each subscription and find matching ackPositions and their first
//position to find the initial recovery position which is the first message across all subs
//that needs to still be acked
for (String subscriptionKey : subscriptionKeys) {
subPendingMessageSizes.put(subscriptionKey, new AtomicLong());
final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
if (messageSequences != null && !messageSequences.isEmpty()) {
final long head = messageSequences.getHead().getFirst();
recoveryPosition = recoveryPosition != null ? Math.min(recoveryPosition, head) : head;
//cache the SequenceSet to speed up recovery of metrics below and avoid a second index hit
messageSequencesMap.put(subscriptionKey, messageSequences);
}
}
recoveryPosition = recoveryPosition != null ? recoveryPosition : 0;
final Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
new MessageOrderCursor(recoveryPosition));
//iterate through all messages starting at the recovery position to recover metrics
while (iterator.hasNext()) {
final Entry<Long, MessageKeys> messageEntry = iterator.next();
//For each message in the index check if each subscription needs to ack the message still
//if the ackPositions SequenceSet contains the message then it has not been acked and should be
//added to the pending metrics for that subscription
for (Entry<String, SequenceSet> seqEntry : messageSequencesMap.entrySet()) {
final String subscriptionKey = seqEntry.getKey();
final SequenceSet messageSequences = messageSequencesMap.get(subscriptionKey);
if (messageSequences.contains(messageEntry.getKey())) {
subPendingMessageSizes.get(subscriptionKey).addAndGet(messageEntry.getValue().location.getSize());
}
}
}
}
return subPendingMessageSizes;
}
protected long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { protected long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
long locationSize = 0; long locationSize = 0;

View File

@ -55,9 +55,9 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class KahaDBDurableMessageRecoveryTest { public class KahaDBDurableMessageRecoveryTest {
@Parameters(name = "{0}") @Parameters(name = "recoverIndex={0},enableSubscriptionStats={1}")
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { { false }, { true } }); return Arrays.asList(new Object[][] { { false, false }, { false, true }, { true, false }, { true, true } });
} }
@Rule @Rule
@ -66,6 +66,7 @@ public class KahaDBDurableMessageRecoveryTest {
private URI brokerConnectURI; private URI brokerConnectURI;
private boolean recoverIndex; private boolean recoverIndex;
private boolean enableSubscriptionStats;
@Before @Before
public void setUpBroker() throws Exception { public void setUpBroker() throws Exception {
@ -81,9 +82,10 @@ public class KahaDBDurableMessageRecoveryTest {
/** /**
* @param deleteIndex * @param deleteIndex
*/ */
public KahaDBDurableMessageRecoveryTest(boolean recoverIndex) { public KahaDBDurableMessageRecoveryTest(boolean recoverIndex, boolean enableSubscriptionStats) {
super(); super();
this.recoverIndex = recoverIndex; this.recoverIndex = recoverIndex;
this.enableSubscriptionStats = enableSubscriptionStats;
} }
protected void startBroker(boolean recoverIndex) throws Exception { protected void startBroker(boolean recoverIndex) throws Exception {
@ -105,6 +107,7 @@ public class KahaDBDurableMessageRecoveryTest {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
adapter.setForceRecoverIndex(forceRecoverIndex); adapter.setForceRecoverIndex(forceRecoverIndex);
adapter.setEnableSubscriptionStatistics(enableSubscriptionStats);
// set smaller size for test // set smaller size for test
adapter.setJournalMaxFileLength(1024 * 20); adapter.setJournalMaxFileLength(1024 * 20);
@ -210,10 +213,12 @@ public class KahaDBDurableMessageRecoveryTest {
assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500)); assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500)); assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
//Verify the pending size is less for sub1 // Verify the pending size is less for sub1
assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0); final long sub1PendingSizeBeforeRestart = getPendingMessageSize(topic, "clientId1", "sub1");
assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0); final long sub2PendingSizeBeforeRestart = getPendingMessageSize(topic, "clientId1", "sub2");
assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < getPendingMessageSize(topic, "clientId1", "sub2")); assertTrue(sub1PendingSizeBeforeRestart > 0);
assertTrue(sub2PendingSizeBeforeRestart > 0);
assertTrue(sub1PendingSizeBeforeRestart < sub2PendingSizeBeforeRestart);
subscriber1.close(); subscriber1.close();
subscriber2.close(); subscriber2.close();
@ -223,10 +228,9 @@ public class KahaDBDurableMessageRecoveryTest {
assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500)); assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500)); assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
//Verify the pending size is less for sub1 // Verify the pending size is less for sub1
assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0); assertEquals(sub1PendingSizeBeforeRestart, getPendingMessageSize(topic, "clientId1", "sub1"));
assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0); assertEquals(sub2PendingSizeBeforeRestart, getPendingMessageSize(topic, "clientId1", "sub2"));
assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < getPendingMessageSize(topic, "clientId1", "sub2"));
// Recreate subscriber and try and receive the other 8 messages // Recreate subscriber and try and receive the other 8 messages
session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE); session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
@ -293,7 +297,7 @@ public class KahaDBDurableMessageRecoveryTest {
subscriber2.close(); subscriber2.close();
restartBroker(recoverIndex); restartBroker(recoverIndex);
//Manually recover subscription and verify proper messages are loaded // Manually recover subscription and verify proper messages are loaded
final Topic brokerTopic = (Topic) broker.getDestination(topic); final Topic brokerTopic = (Topic) broker.getDestination(topic);
final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore(); final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
final AtomicInteger sub1Recovered = new AtomicInteger(); final AtomicInteger sub1Recovered = new AtomicInteger();
@ -348,7 +352,7 @@ public class KahaDBDurableMessageRecoveryTest {
} }
}); });
//Verify proper number of messages are recovered // Verify proper number of messages are recovered
assertEquals(8, sub1Recovered.get()); assertEquals(8, sub1Recovered.get());
assertEquals(10, sub2Recovered.get()); assertEquals(10, sub2Recovered.get());
} }