AMQ-7091 - O(n) Memory consumption when broker has inactive durable subscribes causing OOM

This commit is contained in:
Alan Protasio 2018-11-06 16:49:52 -08:00 committed by jgoodyear
parent 98dc99e984
commit 9012a7871b
4 changed files with 82 additions and 86 deletions

View File

@ -2365,7 +2365,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
ListIndex<String, Location> subLocations;
// Transient data used to track which Messages are no longer needed.
final TreeMap<Long, Long> messageReferences = new TreeMap<>();
final HashSet<String> subscriptionCache = new LinkedHashSet<>();
public void trackPendingAdd(Long seq) {
@ -2635,30 +2634,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
// Configure the message references index
Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
while (subscriptions.hasNext()) {
Entry<String, SequenceSet> subscription = subscriptions.next();
SequenceSet pendingAcks = subscription.getValue();
if (pendingAcks != null && !pendingAcks.isEmpty()) {
Long lastPendingAck = pendingAcks.getTail().getLast();
for (Long sequenceId : pendingAcks) {
Long current = rc.messageReferences.get(sequenceId);
if (current == null) {
current = new Long(0);
}
// We always add a trailing empty entry for the next position to start from
// so we need to ensure we don't count that as a message reference on reload.
if (!sequenceId.equals(lastPendingAck)) {
current = current.longValue() + 1;
} else {
current = Long.valueOf(0L);
}
rc.messageReferences.put(sequenceId, current);
}
}
}
// Configure the subscription cache
for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
@ -2677,10 +2653,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
} else {
// update based on ackPositions for unmatched, last entry is always the next
if (!rc.messageReferences.isEmpty()) {
Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
rc.orderIndex.nextMessageId =
Math.max(rc.orderIndex.nextMessageId, nextMessageId);
Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
while (subscriptions.hasNext()) {
Entry<String, SequenceSet> subscription = subscriptions.next();
SequenceSet pendingAcks = subscription.getValue();
if (pendingAcks != null && !pendingAcks.isEmpty()) {
for (Long sequenceId : pendingAcks) {
rc.orderIndex.nextMessageId = Math.max(rc.orderIndex.nextMessageId, sequenceId);
}
}
}
}
}
@ -2884,13 +2865,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sequences.add(messageSequence);
sd.ackPositions.put(tx, subscriptionKey, sequences);
}
Long count = sd.messageReferences.get(messageSequence);
if (count == null) {
count = Long.valueOf(0L);
}
count = count.longValue() + 1;
sd.messageReferences.put(messageSequence, count);
}
// new sub is interested in potentially all existing messages
@ -2904,18 +2878,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
for (Long ackPosition : allOutstanding) {
Long count = sd.messageReferences.get(ackPosition);
// There might not be a reference if the ackLocation was the last
// one which is a placeholder for the next incoming message and
// no value was added to the message references table.
if (count != null) {
count = count.longValue() + 1;
sd.messageReferences.put(ackPosition, count);
}
}
}
// on a new message add, all existing subs are interested in this message
@ -2933,16 +2895,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
MessageKeys key = sd.orderIndex.get(tx, messageSequence);
incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey,
key.location.getSize());
Long count = sd.messageReferences.get(messageSequence);
if (count == null) {
count = Long.valueOf(0L);
}
count = count.longValue() + 1;
sd.messageReferences.put(messageSequence, count);
sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L));
incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey, key.location.getSize());
}
}
@ -2957,16 +2910,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
ArrayList<Long> unreferenced = new ArrayList<>();
for(Long sequenceId : sequences) {
Long references = sd.messageReferences.get(sequenceId);
if (references != null) {
references = references.longValue() - 1;
if (references.longValue() > 0) {
sd.messageReferences.put(sequenceId, references);
} else {
sd.messageReferences.remove(sequenceId);
unreferenced.add(sequenceId);
}
if(!isSequenceReferenced(tx, sd, sequenceId)) {
unreferenced.add(sequenceId);
}
}
@ -2986,6 +2931,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
private boolean isSequenceReferenced(final Transaction tx, final StoredDestination sd, final Long sequenceId) throws IOException {
for(String subscriptionKey : sd.subscriptionCache) {
SequenceSet sequence = sd.ackPositions.get(tx, subscriptionKey);
if (sequence != null && sequence.contains(sequenceId)) {
return true;
}
}
return false;
}
/**
* @param tx
* @param sd
@ -3012,17 +2967,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
key.location.getSize());
// Check if the message is reference by any other subscription.
Long count = sd.messageReferences.get(messageSequence);
if (count != null) {
long references = count.longValue() - 1;
if (references > 0) {
sd.messageReferences.put(messageSequence, Long.valueOf(references));
return;
} else {
sd.messageReferences.remove(messageSequence);
}
if (isSequenceReferenced(tx, sd, messageSequence)) {
return;
}
// Find all the entries that need to get deleted.
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
sd.orderIndex.getDeleteList(tx, deletes, messageSequence);

View File

@ -175,7 +175,6 @@ public class KahaDBStoreOpenWireVersionTest {
entry.getValue().orderIndex.defaultPriorityIndex.clear(tx);
entry.getValue().orderIndex.lowPriorityIndex.clear(tx);
entry.getValue().orderIndex.highPriorityIndex.clear(tx);
entry.getValue().messageReferences.clear();
}
}
});

View File

@ -16,23 +16,19 @@
*/
package org.apache.activemq.usecases;
import javax.management.openmbean.TabularData;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.transport.vm.VMTransport;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.transport.vm.VMTransportServer;
import org.junit.Ignore;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@ -40,12 +36,16 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class DurableSubscriptionOfflineTest extends DurableSubscriptionOfflineTestBase {
@ -765,6 +765,54 @@ public class DurableSubscriptionOfflineTest extends DurableSubscriptionOfflineTe
con.close();
}
@org.junit.Test(timeout = 640000)
public void testInactiveSubscribeAfterBrokerRestart() throws Exception {
final int messageCount = 20;
Connection alwaysOnCon = createConnection("subs1");
Connection tearDownFacCon = createConnection("subs2");
Session awaysOnCon = alwaysOnCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session tearDownCon = tearDownFacCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO");
String consumerName = "consumerName";
String tearDownconsumerName = "tearDownconsumerName";
// Setup consumers
MessageConsumer remoteConsumer = awaysOnCon.createDurableSubscriber(topic, consumerName);
MessageConsumer remoteConsumer2 = tearDownCon.createDurableSubscriber(topic, tearDownconsumerName);
DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("listener");
remoteConsumer.setMessageListener(listener);
remoteConsumer2.setMessageListener(listener);
// Setup producer
MessageProducer localProducer = awaysOnCon.createProducer(topic);
localProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// Send messages
for (int i = 0; i < messageCount; i++) {
if (i == 10) {
remoteConsumer2.close();
tearDownFacCon.close();
}
Message test = awaysOnCon.createTextMessage("test-" + i);
localProducer.send(test);
}
destroyBroker();
createBroker(false);
Connection reconnectCon = createConnection("subs2");
Session reconnectSession = reconnectCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
remoteConsumer2 = reconnectSession.createDurableSubscriber(topic, tearDownconsumerName);
remoteConsumer2.setMessageListener(listener);
LOG.info("waiting for messages to flow");
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.count >= messageCount * 2;
}
});
assertTrue("At least message " + messageCount * 2 +
" must be received, count=" + listener.count,
messageCount * 2 <= listener.count);
awaysOnCon.close();
reconnectCon.close();
}
// // https://issues.apache.org/jira/browse/AMQ-3768
// public void testPageReuse() throws Exception {

View File

@ -216,7 +216,9 @@ class DurableSubscriptionOfflineTestListener implements MessageListener {
}
@Override
public void onMessage(javax.jms.Message message) {
count++;
synchronized (this) {
count++;
}
if (id != null) {
try {
LOG.info(id + ", " + message.getJMSMessageID());