From 9012a7871b77da6ecdc403f6b44ef0221345bfb7 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 6 Nov 2018 16:49:52 -0800 Subject: [PATCH] AMQ-7091 - O(n) Memory consumption when broker has inactive durable subscribes causing OOM --- .../store/kahadb/MessageDatabase.java | 101 +++++------------- .../KahaDBStoreOpenWireVersionTest.java | 1 - .../DurableSubscriptionOfflineTest.java | 62 +++++++++-- .../DurableSubscriptionOfflineTestBase.java | 4 +- 4 files changed, 82 insertions(+), 86 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index db6239a164..21027c6ff0 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -2365,7 +2365,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe ListIndex subLocations; // Transient data used to track which Messages are no longer needed. - final TreeMap messageReferences = new TreeMap<>(); final HashSet subscriptionCache = new LinkedHashSet<>(); public void trackPendingAdd(Long seq) { @@ -2635,30 +2634,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } // Configure the message references index - Iterator> subscriptions = rc.ackPositions.iterator(tx); - while (subscriptions.hasNext()) { - Entry subscription = subscriptions.next(); - SequenceSet pendingAcks = subscription.getValue(); - if (pendingAcks != null && !pendingAcks.isEmpty()) { - Long lastPendingAck = pendingAcks.getTail().getLast(); - for (Long sequenceId : pendingAcks) { - Long current = rc.messageReferences.get(sequenceId); - if (current == null) { - current = new Long(0); - } - // We always add a trailing empty entry for the next position to start from - // so we need to ensure we don't count that as a message reference on reload. - if (!sequenceId.equals(lastPendingAck)) { - current = current.longValue() + 1; - } else { - current = Long.valueOf(0L); - } - - rc.messageReferences.put(sequenceId, current); - } - } - } // Configure the subscription cache for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { @@ -2677,10 +2653,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } else { // update based on ackPositions for unmatched, last entry is always the next - if (!rc.messageReferences.isEmpty()) { - Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1]; - rc.orderIndex.nextMessageId = - Math.max(rc.orderIndex.nextMessageId, nextMessageId); + Iterator> subscriptions = rc.ackPositions.iterator(tx); + while (subscriptions.hasNext()) { + Entry subscription = subscriptions.next(); + SequenceSet pendingAcks = subscription.getValue(); + if (pendingAcks != null && !pendingAcks.isEmpty()) { + for (Long sequenceId : pendingAcks) { + rc.orderIndex.nextMessageId = Math.max(rc.orderIndex.nextMessageId, sequenceId); + } + } } } } @@ -2884,13 +2865,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sequences.add(messageSequence); sd.ackPositions.put(tx, subscriptionKey, sequences); } - - Long count = sd.messageReferences.get(messageSequence); - if (count == null) { - count = Long.valueOf(0L); - } - count = count.longValue() + 1; - sd.messageReferences.put(messageSequence, count); } // new sub is interested in potentially all existing messages @@ -2904,18 +2878,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } sd.ackPositions.put(tx, subscriptionKey, allOutstanding); - - for (Long ackPosition : allOutstanding) { - Long count = sd.messageReferences.get(ackPosition); - - // There might not be a reference if the ackLocation was the last - // one which is a placeholder for the next incoming message and - // no value was added to the message references table. - if (count != null) { - count = count.longValue() + 1; - sd.messageReferences.put(ackPosition, count); - } - } } // on a new message add, all existing subs are interested in this message @@ -2933,16 +2895,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } MessageKeys key = sd.orderIndex.get(tx, messageSequence); - incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey, - key.location.getSize()); - - Long count = sd.messageReferences.get(messageSequence); - if (count == null) { - count = Long.valueOf(0L); - } - count = count.longValue() + 1; - sd.messageReferences.put(messageSequence, count); - sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L)); + incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey, key.location.getSize()); } } @@ -2957,16 +2910,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe ArrayList unreferenced = new ArrayList<>(); for(Long sequenceId : sequences) { - Long references = sd.messageReferences.get(sequenceId); - if (references != null) { - references = references.longValue() - 1; - - if (references.longValue() > 0) { - sd.messageReferences.put(sequenceId, references); - } else { - sd.messageReferences.remove(sequenceId); - unreferenced.add(sequenceId); - } + if(!isSequenceReferenced(tx, sd, sequenceId)) { + unreferenced.add(sequenceId); } } @@ -2986,6 +2931,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } + private boolean isSequenceReferenced(final Transaction tx, final StoredDestination sd, final Long sequenceId) throws IOException { + for(String subscriptionKey : sd.subscriptionCache) { + SequenceSet sequence = sd.ackPositions.get(tx, subscriptionKey); + if (sequence != null && sequence.contains(sequenceId)) { + return true; + } + } + return false; + } + /** * @param tx * @param sd @@ -3012,17 +2967,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe key.location.getSize()); // Check if the message is reference by any other subscription. - Long count = sd.messageReferences.get(messageSequence); - if (count != null) { - long references = count.longValue() - 1; - if (references > 0) { - sd.messageReferences.put(messageSequence, Long.valueOf(references)); - return; - } else { - sd.messageReferences.remove(messageSequence); - } + if (isSequenceReferenced(tx, sd, messageSequence)) { + return; } - // Find all the entries that need to get deleted. ArrayList> deletes = new ArrayList<>(); sd.orderIndex.getDeleteList(tx, deletes, messageSequence); diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java index e59767a3d4..13bfdd2d75 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java @@ -175,7 +175,6 @@ public class KahaDBStoreOpenWireVersionTest { entry.getValue().orderIndex.defaultPriorityIndex.clear(tx); entry.getValue().orderIndex.lowPriorityIndex.clear(tx); entry.getValue().orderIndex.highPriorityIndex.clear(tx); - entry.getValue().messageReferences.clear(); } } }); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index 00fb7de2a5..aa12593852 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -16,23 +16,19 @@ */ package org.apache.activemq.usecases; -import javax.management.openmbean.TabularData; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.store.kahadb.disk.page.PageFile; -import org.apache.activemq.transport.vm.VMTransport; -import org.apache.activemq.transport.vm.VMTransportFactory; -import org.apache.activemq.transport.vm.VMTransportServer; -import org.junit.Ignore; +import org.apache.activemq.util.Wait; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.Connection; +import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -40,12 +36,16 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.management.ObjectName; import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; import java.util.HashSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class DurableSubscriptionOfflineTest extends DurableSubscriptionOfflineTestBase { @@ -765,6 +765,54 @@ public class DurableSubscriptionOfflineTest extends DurableSubscriptionOfflineTe con.close(); } + @org.junit.Test(timeout = 640000) + public void testInactiveSubscribeAfterBrokerRestart() throws Exception { + final int messageCount = 20; + Connection alwaysOnCon = createConnection("subs1"); + Connection tearDownFacCon = createConnection("subs2"); + Session awaysOnCon = alwaysOnCon.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session tearDownCon = tearDownFacCon.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO"); + String consumerName = "consumerName"; + String tearDownconsumerName = "tearDownconsumerName"; + // Setup consumers + MessageConsumer remoteConsumer = awaysOnCon.createDurableSubscriber(topic, consumerName); + MessageConsumer remoteConsumer2 = tearDownCon.createDurableSubscriber(topic, tearDownconsumerName); + DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("listener"); + remoteConsumer.setMessageListener(listener); + remoteConsumer2.setMessageListener(listener); + // Setup producer + MessageProducer localProducer = awaysOnCon.createProducer(topic); + localProducer.setDeliveryMode(DeliveryMode.PERSISTENT); + // Send messages + for (int i = 0; i < messageCount; i++) { + if (i == 10) { + remoteConsumer2.close(); + tearDownFacCon.close(); + } + Message test = awaysOnCon.createTextMessage("test-" + i); + localProducer.send(test); + } + destroyBroker(); + createBroker(false); + Connection reconnectCon = createConnection("subs2"); + Session reconnectSession = reconnectCon.createSession(false, Session.AUTO_ACKNOWLEDGE); + remoteConsumer2 = reconnectSession.createDurableSubscriber(topic, tearDownconsumerName); + remoteConsumer2.setMessageListener(listener); + LOG.info("waiting for messages to flow"); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.count >= messageCount * 2; + } + }); + assertTrue("At least message " + messageCount * 2 + + " must be received, count=" + listener.count, + messageCount * 2 <= listener.count); + awaysOnCon.close(); + reconnectCon.close(); + } + // // https://issues.apache.org/jira/browse/AMQ-3768 // public void testPageReuse() throws Exception { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java index 74abf8ae58..5eeccbd705 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java @@ -216,7 +216,9 @@ class DurableSubscriptionOfflineTestListener implements MessageListener { } @Override public void onMessage(javax.jms.Message message) { - count++; + synchronized (this) { + count++; + } if (id != null) { try { LOG.info(id + ", " + message.getJMSMessageID());