From 8078dd098ccc38cd57eefa3e94ba1dfe50c6865b Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 20 Feb 2023 13:18:18 -0500 Subject: [PATCH] ARTEMIS-4171 Messages leaking thorugh AMQP Delivery there are two leaks here: * QueueImpl::delivery might create a new iterator if a delivery happens right after a consumer was removed, and that iterator might belog to a consumer that was already closed as a result of that, the iterator may leak messages and hold references until a reboot is done. I have seen scenarios where messages would not be dleivered because of this. * ProtonTransaction holding references: the last transaction might hold messages in the memory longer than expected. In tests I have performed the messages were accumulating in memory. and I cleared it here. --- .../utils/collections/LinkedListImpl.java | 13 ++- .../amqp/proton/AMQPConnectionContext.java | 11 +-- .../proton/ProtonServerSenderContext.java | 4 +- .../transaction/ProtonTransactionImpl.java | 42 +++++---- .../artemis/core/server/Consumer.java | 4 + .../core/server/impl/QueueConsumersImpl.java | 2 +- .../artemis/core/server/impl/QueueImpl.java | 21 +++-- .../core/server/impl/ServerConsumerImpl.java | 16 +++- .../tests/leak/ConnectionLeakTest.java | 86 +++++++++++++++++-- .../tests/leak/LinkedListMemoryTest.java | 71 +++++++++++++++ .../artemis/tests/leak/MemoryAssertions.java | 6 +- 11 files changed, 231 insertions(+), 45 deletions(-) create mode 100644 tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/LinkedListMemoryTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java index 5e36d333d2..121b5ba924 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java @@ -350,6 +350,9 @@ public class LinkedListImpl implements LinkedList { @Override public LinkedListIterator iterator() { + if (logger.isTraceEnabled()) { + logger.trace("Creating new iterator at", new Exception("trace location")); + } return new Iterator(); } @@ -434,6 +437,9 @@ public class LinkedListImpl implements LinkedList { } private synchronized void removeIter(Iterator iter) { + if (logger.isTraceEnabled()) { + logger.trace("Removing iterator at", new Exception("trace location")); + } for (int i = 0; i < numIters; i++) { if (iter == iters[i]) { iters[i] = null; @@ -449,8 +455,10 @@ public class LinkedListImpl implements LinkedList { if (numIters >= INITIAL_ITERATOR_ARRAY_SIZE && numIters == iters.length / 2) { resize(numIters); } - nextIndex--; + if (nextIndex < iters.length) { + iters[nextIndex] = null; + } return; } @@ -515,8 +523,7 @@ public class LinkedListImpl implements LinkedList { } } - private class Iterator implements LinkedListIterator { - + public class Iterator implements LinkedListIterator { Node last; Node current = head.next; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index a81f0d8c20..45ab664053 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -743,14 +743,9 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH } } - /// we have to perform the link.close after the linkContext.close is finished. - // linkeContext.close will perform a few executions on the netty loop, - // this has to come next - runLater(() -> { - link.close(); - link.free(); - flush(); - }); + link.close(); + link.free(); + flush(); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 1068e5c044..312b9bda8d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -370,7 +370,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr OperationContext oldContext = sessionSPI.recoverContext(); try { - Message message = ((MessageReference) delivery.getContext()).getMessage(); + MessageReference reference = (MessageReference) delivery.getContext(); + Message message = reference != null ? reference.getMessage() : null; + DeliveryState remoteState = delivery.getRemoteState(); if (remoteState != null && remoteState.getType() == DeliveryStateType.Accepted) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java index 123dbb5d9b..0e3c4e63ea 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java @@ -45,26 +45,38 @@ public class ProtonTransactionImpl extends TransactionImpl { deliveries have been settled. We also need to ensure we are settling on the correct link. Hence why we keep a ref to the ProtonServerSenderContext here. */ - private final Map> deliveries = new HashMap<>(); + final Map> deliveries = new HashMap<>(); private boolean discharged; + private static class TXOperations extends TransactionOperationAbstract { + final ProtonTransactionImpl protonTransaction; + final AMQPConnectionContext connection; + + TXOperations(AMQPConnectionContext connection, ProtonTransactionImpl tx) { + this.protonTransaction = tx; + this.connection = connection; + } + + @Override + public void afterCommit(Transaction tx) { + super.afterCommit(tx); + connection.runNow(() -> { + // Settle all unsettled deliveries if commit is successful + for (Pair p : protonTransaction.deliveries.values()) { + if (!p.getA().isSettled()) + p.getB().settle(p.getA()); + } + connection.flush(); + protonTransaction.deliveries.forEach((a, b) -> b.getA().setContext(null)); + protonTransaction.deliveries.clear(); + }); + } + } + public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) { super(xid, storageManager, timeoutSeconds); - addOperation(new TransactionOperationAbstract() { - @Override - public void afterCommit(Transaction tx) { - super.afterCommit(tx); - connection.runNow(() -> { - // Settle all unsettled deliveries if commit is successful - for (Pair p : deliveries.values()) { - if (!p.getA().isSettled()) - p.getB().settle(p.getA()); - } - connection.flush(); - }); - } - }); + addOperation(new TXOperations(connection, this)); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java index f5178da111..7db22175da 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java @@ -53,6 +53,10 @@ public interface Consumer extends PriorityAware { default void promptDelivery() { } + default boolean isClosed() { + return false; + } + /** * This will proceed with the actual delivery. * Notice that handle should hold a readLock and proceedDelivery should release the readLock diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java index e32bbccac6..7f16ceea3a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java @@ -93,9 +93,9 @@ public class QueueConsumersImpl implements QueueConsume @Override public boolean remove(T t) { - iterator.removed(t); boolean result = consumers.remove(t); if (result) { + iterator.removed(t); iterator.update(consumers.resettableIterator()); if (consumers.isEmpty()) { reset(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 2aeb364588..738620b173 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -380,10 +380,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { boolean foundRef = false; synchronized (this) { - Iterator iter = messageReferences.iterator(); - while (iter.hasNext()) { - foundRef = true; - out.println("reference = " + iter.next()); + try (LinkedListIterator iter = messageReferences.iterator()) { + while (iter.hasNext()) { + foundRef = true; + out.println("reference = " + iter.next()); + } } } @@ -1483,7 +1484,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { logger.debug("Removing consumer {}", consumer); try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) { - synchronized (this) { + synchronized (QueueImpl.this) { boolean consumerRemoved = false; for (ConsumerHolder holder : consumers) { @@ -3060,7 +3061,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { MessageReference ref; Consumer handledconsumer = null; - synchronized (this) { + synchronized (QueueImpl.this) { if (queueDestroyed) { if (messageReferences.size() == 0) { @@ -3094,6 +3095,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { Consumer consumer = holder.consumer; Consumer groupConsumer = null; + // we remove the consumerHolder when the Consumer is closed + // however the QueueConsumerIterator may hold a reference until the reset is called, which + // could happen a little later. + if (consumer.isClosed()) { + deliverAsync(true); + return false; + } + if (holder.iter == null) { holder.iter = messageReferences.iterator(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index a13d362af1..d4c7e32272 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -163,6 +163,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private boolean isClosed = false; + @Override + public boolean isClosed() { + return isClosed; + } + ServerConsumerMetrics metrics = new ServerConsumerMetrics(); @@ -618,11 +623,14 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { server.callBrokerConsumerPlugins(plugin -> plugin.afterCloseConsumer(this, failed)); } - protocolContext = null; + messageQueue.getExecutor().execute(() -> { + protocolContext = null; - callback = null; + callback = null; + + session = null; + }); - session = null; } private void addLingerRefs() throws Exception { @@ -1116,7 +1124,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { */ @Override public String toString() { - return "ServerConsumerImpl [id=" + id + ", filter=" + filter + ", binding=" + binding + "]"; + return "ServerConsumerImpl [id=" + id + ", filter=" + filter + ", binding=" + binding + ", closed=" + isClosed + "]"; } @Override diff --git a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java index 9cbcda36f6..1af125e141 100644 --- a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java +++ b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java @@ -24,19 +24,23 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; - import java.lang.invoke.MethodHandles; +import io.github.checkleak.core.CheckLeak; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.server.impl.ServerStatus; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.utils.Wait; -import io.github.checkleak.core.CheckLeak; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.collections.LinkedListImpl; +import org.apache.qpid.proton.engine.impl.DeliveryImpl; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -81,7 +85,7 @@ public class ConnectionLeakTest extends ActiveMQTestBase { @Override @Before public void setUp() throws Exception { - server = createServer(true, createDefaultConfig(1, true)); + server = createServer(false, createDefaultConfig(1, true)); server.getConfiguration().setJournalPoolFiles(4).setJournalMinFiles(2); server.start(); } @@ -102,6 +106,9 @@ public class ConnectionLeakTest extends ActiveMQTestBase { } private void doTest(String protocol) throws Exception { + CheckLeak checkLeak = new CheckLeak(); + // Some protocols may create ServerConsumers + int originalConsumers = checkLeak.getAllObjects(ServerConsumerImpl.class).length; int REPEATS = 100; int MESSAGES = 20; basicMemoryAsserts(); @@ -143,12 +150,17 @@ public class ConnectionLeakTest extends ActiveMQTestBase { targetProducer.send(m); } Assert.assertNull(sourceConsumer.receiveNoWait()); + consumerSession.commit(); + + Wait.assertTrue(() -> validateClosedConsumers(checkLeak)); } - consumerSession.commit(); } } } + assertMemory(new CheckLeak(), 0, ServerConsumerImpl.class.getName()); + + // this is just to drain the messages try (Connection targetConnection = cf.createConnection(); Connection consumerConnection = cf.createConnection()) { Session targetSession = targetConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -160,6 +172,9 @@ public class ConnectionLeakTest extends ActiveMQTestBase { } Assert.assertNull(consumer.receiveNoWait()); + assertMemory(new CheckLeak(), 0, DeliveryImpl.class.getName()); + Wait.assertTrue(() -> validateClosedConsumers(checkLeak)); + consumer = null; } Queue sourceQueue = server.locateQueue("source"); @@ -173,6 +188,65 @@ public class ConnectionLeakTest extends ActiveMQTestBase { } basicMemoryAsserts(); + } + @Test + public void testCheckIteratorsAMQP() throws Exception { + testCheckIterators("AMQP"); + } + + @Test + public void testCheckIteratorsOpenWire() throws Exception { + testCheckIterators("OPENWIRE"); + } + + @Test + public void testCheckIteratorsCORE() throws Exception { + testCheckIterators("CORE"); + } + + public void testCheckIterators(String protocol) throws Exception { + CheckLeak checkLeak = new CheckLeak(); + + String queueName = getName(); + + Queue queue = server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST)); + + ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + for (int i = 0; i < 10; i++) { + Connection connection = cf.createConnection(); + connection.start(); + for (int j = 0; j < 10; j++) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue(queueName)); + producer.send(session.createTextMessage("test")); + session.commit(); + session.close(); + } + + for (int j = 0; j < 10; j++) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); + consumer.receiveNoWait(); // it doesn't matter if it received or not, just doing something in the queue to kick the iterators + session.commit(); + } + connection.close(); + + assertMemory(checkLeak, 0, 1, 1, ServerConsumerImpl.class.getName()); + assertMemory(checkLeak, 0, 2, 1, LinkedListImpl.Iterator.class.getName()); + } + } + + + private boolean validateClosedConsumers(CheckLeak checkLeak) throws Exception { + Object[] objecs = checkLeak.getAllObjects(ServerConsumerImpl.class); + for (Object obj : objecs) { + ServerConsumerImpl consumer = (ServerConsumerImpl) obj; + if (consumer.isClosed()) { + logger.info("References to closedConsumer {}\n{}", consumer, checkLeak.exploreObjectReferences(3, 1, true, consumer)); + return false; + } + } + return true; } } \ No newline at end of file diff --git a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/LinkedListMemoryTest.java b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/LinkedListMemoryTest.java new file mode 100644 index 0000000000..2663d94d0c --- /dev/null +++ b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/LinkedListMemoryTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.leak; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Random; + +import io.github.checkleak.core.CheckLeak; +import org.apache.activemq.artemis.utils.collections.LinkedListImpl; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LinkedListMemoryTest { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + Random random = new Random(); + + CheckLeak checkLeak = new CheckLeak(); + + public int randomInt(int x, int y) { + + int randomNumber = random.nextInt(y - x + 1) + x; + + return randomNumber; + } + + @Test + public void testRemoveIteratorsRandom() throws Exception { + LinkedListImpl linkedList = new LinkedListImpl<>((a, b) -> a.compareTo(b)); + + linkedList.addSorted("Test"); + + int iterators = 100; + ArrayList listIerators = new ArrayList(); + + for (int i = 0; i < iterators; i++) { + listIerators.add(linkedList.iterator()); + } + + int countRemoved = 0; + + while (listIerators.size() > 0) { + int removeElement = randomInt(0, listIerators.size() - 1); + countRemoved++; + LinkedListIterator toRemove = listIerators.remove(removeElement); + toRemove.close(); + toRemove = null; + MemoryAssertions.assertMemory(checkLeak, iterators - countRemoved, LinkedListImpl.Iterator.class.getName()); + } + MemoryAssertions.assertMemory(checkLeak, 0, LinkedListImpl.Iterator.class.getName()); + } +} diff --git a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java index 3e5eac93ea..47e4cab2b9 100644 --- a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java +++ b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java @@ -49,6 +49,10 @@ public class MemoryAssertions { } public static void assertMemory(CheckLeak checkLeak, int maxExpected, String clazz) throws Exception { + assertMemory(checkLeak, maxExpected, 10, 10, clazz); + } + + public static void assertMemory(CheckLeak checkLeak, int maxExpected, int maxLevel, int maxObjects, String clazz) throws Exception { Wait.waitFor(() -> checkLeak.getAllObjects(clazz).length <= maxExpected, 5000, 100); Object[] objects = checkLeak.getAllObjects(clazz); @@ -56,7 +60,7 @@ public class MemoryAssertions { for (Object obj : objects) { logger.warn("Object {} still in the heap", obj); } - String report = checkLeak.exploreObjectReferences(10, 10, true, objects); + String report = checkLeak.exploreObjectReferences(maxLevel, maxObjects, true, objects); logger.info(report); Assert.fail("Class " + clazz + " has leaked " + objects.length + " objects\n" + report);