ARTEMIS-4171 Messages leaking thorugh AMQP Delivery

there are two leaks here:

* QueueImpl::delivery might create a new iterator if a delivery happens right after a consumer was removed, and that iterator might belog to a consumer that was already closed
             as a result of that, the iterator may leak messages and hold references until a reboot is done. I have seen scenarios where messages would not be dleivered because of this.

* ProtonTransaction holding references: the last transaction might hold messages in the memory longer than expected. In tests I have performed the messages were accumulating in memory. and I cleared it here.
This commit is contained in:
Clebert Suconic 2023-02-20 13:18:18 -05:00
parent 9e524d978d
commit 8078dd098c
11 changed files with 231 additions and 45 deletions

View File

@ -350,6 +350,9 @@ public class LinkedListImpl<E> implements LinkedList<E> {
@Override
public LinkedListIterator<E> iterator() {
if (logger.isTraceEnabled()) {
logger.trace("Creating new iterator at", new Exception("trace location"));
}
return new Iterator();
}
@ -434,6 +437,9 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
private synchronized void removeIter(Iterator iter) {
if (logger.isTraceEnabled()) {
logger.trace("Removing iterator at", new Exception("trace location"));
}
for (int i = 0; i < numIters; i++) {
if (iter == iters[i]) {
iters[i] = null;
@ -449,8 +455,10 @@ public class LinkedListImpl<E> implements LinkedList<E> {
if (numIters >= INITIAL_ITERATOR_ARRAY_SIZE && numIters == iters.length / 2) {
resize(numIters);
}
nextIndex--;
if (nextIndex < iters.length) {
iters[nextIndex] = null;
}
return;
}
@ -515,8 +523,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
}
private class Iterator implements LinkedListIterator<E> {
public class Iterator implements LinkedListIterator<E> {
Node<E> last;
Node<E> current = head.next;

View File

@ -743,14 +743,9 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
}
/// we have to perform the link.close after the linkContext.close is finished.
// linkeContext.close will perform a few executions on the netty loop,
// this has to come next
runLater(() -> {
link.close();
link.free();
flush();
});
link.close();
link.free();
flush();
}
@Override

View File

@ -370,7 +370,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
OperationContext oldContext = sessionSPI.recoverContext();
try {
Message message = ((MessageReference) delivery.getContext()).getMessage();
MessageReference reference = (MessageReference) delivery.getContext();
Message message = reference != null ? reference.getMessage() : null;
DeliveryState remoteState = delivery.getRemoteState();
if (remoteState != null && remoteState.getType() == DeliveryStateType.Accepted) {

View File

@ -45,26 +45,38 @@ public class ProtonTransactionImpl extends TransactionImpl {
deliveries have been settled. We also need to ensure we are settling on the correct link. Hence why we keep a ref
to the ProtonServerSenderContext here.
*/
private final Map<MessageReference, Pair<Delivery, ProtonServerSenderContext>> deliveries = new HashMap<>();
final Map<MessageReference, Pair<Delivery, ProtonServerSenderContext>> deliveries = new HashMap<>();
private boolean discharged;
private static class TXOperations extends TransactionOperationAbstract {
final ProtonTransactionImpl protonTransaction;
final AMQPConnectionContext connection;
TXOperations(AMQPConnectionContext connection, ProtonTransactionImpl tx) {
this.protonTransaction = tx;
this.connection = connection;
}
@Override
public void afterCommit(Transaction tx) {
super.afterCommit(tx);
connection.runNow(() -> {
// Settle all unsettled deliveries if commit is successful
for (Pair<Delivery, ProtonServerSenderContext> p : protonTransaction.deliveries.values()) {
if (!p.getA().isSettled())
p.getB().settle(p.getA());
}
connection.flush();
protonTransaction.deliveries.forEach((a, b) -> b.getA().setContext(null));
protonTransaction.deliveries.clear();
});
}
}
public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
super(xid, storageManager, timeoutSeconds);
addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
super.afterCommit(tx);
connection.runNow(() -> {
// Settle all unsettled deliveries if commit is successful
for (Pair<Delivery, ProtonServerSenderContext> p : deliveries.values()) {
if (!p.getA().isSettled())
p.getB().settle(p.getA());
}
connection.flush();
});
}
});
addOperation(new TXOperations(connection, this));
}
@Override

View File

@ -53,6 +53,10 @@ public interface Consumer extends PriorityAware {
default void promptDelivery() {
}
default boolean isClosed() {
return false;
}
/**
* This will proceed with the actual delivery.
* Notice that handle should hold a readLock and proceedDelivery should release the readLock

View File

@ -93,9 +93,9 @@ public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsume
@Override
public boolean remove(T t) {
iterator.removed(t);
boolean result = consumers.remove(t);
if (result) {
iterator.removed(t);
iterator.update(consumers.resettableIterator());
if (consumers.isEmpty()) {
reset();

View File

@ -380,10 +380,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
boolean foundRef = false;
synchronized (this) {
Iterator<MessageReference> iter = messageReferences.iterator();
while (iter.hasNext()) {
foundRef = true;
out.println("reference = " + iter.next());
try (LinkedListIterator<MessageReference> iter = messageReferences.iterator()) {
while (iter.hasNext()) {
foundRef = true;
out.println("reference = " + iter.next());
}
}
}
@ -1483,7 +1484,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.debug("Removing consumer {}", consumer);
try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) {
synchronized (this) {
synchronized (QueueImpl.this) {
boolean consumerRemoved = false;
for (ConsumerHolder holder : consumers) {
@ -3060,7 +3061,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
MessageReference ref;
Consumer handledconsumer = null;
synchronized (this) {
synchronized (QueueImpl.this) {
if (queueDestroyed) {
if (messageReferences.size() == 0) {
@ -3094,6 +3095,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
Consumer consumer = holder.consumer;
Consumer groupConsumer = null;
// we remove the consumerHolder when the Consumer is closed
// however the QueueConsumerIterator may hold a reference until the reset is called, which
// could happen a little later.
if (consumer.isClosed()) {
deliverAsync(true);
return false;
}
if (holder.iter == null) {
holder.iter = messageReferences.iterator();
}

View File

@ -163,6 +163,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private boolean isClosed = false;
@Override
public boolean isClosed() {
return isClosed;
}
ServerConsumerMetrics metrics = new ServerConsumerMetrics();
@ -618,11 +623,14 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
server.callBrokerConsumerPlugins(plugin -> plugin.afterCloseConsumer(this, failed));
}
protocolContext = null;
messageQueue.getExecutor().execute(() -> {
protocolContext = null;
callback = null;
callback = null;
session = null;
});
session = null;
}
private void addLingerRefs() throws Exception {
@ -1116,7 +1124,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
*/
@Override
public String toString() {
return "ServerConsumerImpl [id=" + id + ", filter=" + filter + ", binding=" + binding + "]";
return "ServerConsumerImpl [id=" + id + ", filter=" + filter + ", binding=" + binding + ", closed=" + isClosed + "]";
}
@Override

View File

@ -24,19 +24,23 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import io.github.checkleak.core.CheckLeak;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.server.impl.ServerStatus;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import io.github.checkleak.core.CheckLeak;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@ -81,7 +85,7 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
@Override
@Before
public void setUp() throws Exception {
server = createServer(true, createDefaultConfig(1, true));
server = createServer(false, createDefaultConfig(1, true));
server.getConfiguration().setJournalPoolFiles(4).setJournalMinFiles(2);
server.start();
}
@ -102,6 +106,9 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
}
private void doTest(String protocol) throws Exception {
CheckLeak checkLeak = new CheckLeak();
// Some protocols may create ServerConsumers
int originalConsumers = checkLeak.getAllObjects(ServerConsumerImpl.class).length;
int REPEATS = 100;
int MESSAGES = 20;
basicMemoryAsserts();
@ -143,12 +150,17 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
targetProducer.send(m);
}
Assert.assertNull(sourceConsumer.receiveNoWait());
consumerSession.commit();
Wait.assertTrue(() -> validateClosedConsumers(checkLeak));
}
consumerSession.commit();
}
}
}
assertMemory(new CheckLeak(), 0, ServerConsumerImpl.class.getName());
// this is just to drain the messages
try (Connection targetConnection = cf.createConnection(); Connection consumerConnection = cf.createConnection()) {
Session targetSession = targetConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -160,6 +172,9 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
}
Assert.assertNull(consumer.receiveNoWait());
assertMemory(new CheckLeak(), 0, DeliveryImpl.class.getName());
Wait.assertTrue(() -> validateClosedConsumers(checkLeak));
consumer = null;
}
Queue sourceQueue = server.locateQueue("source");
@ -173,6 +188,65 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
}
basicMemoryAsserts();
}
@Test
public void testCheckIteratorsAMQP() throws Exception {
testCheckIterators("AMQP");
}
@Test
public void testCheckIteratorsOpenWire() throws Exception {
testCheckIterators("OPENWIRE");
}
@Test
public void testCheckIteratorsCORE() throws Exception {
testCheckIterators("CORE");
}
public void testCheckIterators(String protocol) throws Exception {
CheckLeak checkLeak = new CheckLeak();
String queueName = getName();
Queue queue = server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
for (int i = 0; i < 10; i++) {
Connection connection = cf.createConnection();
connection.start();
for (int j = 0; j < 10; j++) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(queueName));
producer.send(session.createTextMessage("test"));
session.commit();
session.close();
}
for (int j = 0; j < 10; j++) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
consumer.receiveNoWait(); // it doesn't matter if it received or not, just doing something in the queue to kick the iterators
session.commit();
}
connection.close();
assertMemory(checkLeak, 0, 1, 1, ServerConsumerImpl.class.getName());
assertMemory(checkLeak, 0, 2, 1, LinkedListImpl.Iterator.class.getName());
}
}
private boolean validateClosedConsumers(CheckLeak checkLeak) throws Exception {
Object[] objecs = checkLeak.getAllObjects(ServerConsumerImpl.class);
for (Object obj : objecs) {
ServerConsumerImpl consumer = (ServerConsumerImpl) obj;
if (consumer.isClosed()) {
logger.info("References to closedConsumer {}\n{}", consumer, checkLeak.exploreObjectReferences(3, 1, true, consumer));
return false;
}
}
return true;
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.leak;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Random;
import io.github.checkleak.core.CheckLeak;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LinkedListMemoryTest {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
Random random = new Random();
CheckLeak checkLeak = new CheckLeak();
public int randomInt(int x, int y) {
int randomNumber = random.nextInt(y - x + 1) + x;
return randomNumber;
}
@Test
public void testRemoveIteratorsRandom() throws Exception {
LinkedListImpl<String> linkedList = new LinkedListImpl<>((a, b) -> a.compareTo(b));
linkedList.addSorted("Test");
int iterators = 100;
ArrayList<LinkedListIterator> listIerators = new ArrayList();
for (int i = 0; i < iterators; i++) {
listIerators.add(linkedList.iterator());
}
int countRemoved = 0;
while (listIerators.size() > 0) {
int removeElement = randomInt(0, listIerators.size() - 1);
countRemoved++;
LinkedListIterator toRemove = listIerators.remove(removeElement);
toRemove.close();
toRemove = null;
MemoryAssertions.assertMemory(checkLeak, iterators - countRemoved, LinkedListImpl.Iterator.class.getName());
}
MemoryAssertions.assertMemory(checkLeak, 0, LinkedListImpl.Iterator.class.getName());
}
}

View File

@ -49,6 +49,10 @@ public class MemoryAssertions {
}
public static void assertMemory(CheckLeak checkLeak, int maxExpected, String clazz) throws Exception {
assertMemory(checkLeak, maxExpected, 10, 10, clazz);
}
public static void assertMemory(CheckLeak checkLeak, int maxExpected, int maxLevel, int maxObjects, String clazz) throws Exception {
Wait.waitFor(() -> checkLeak.getAllObjects(clazz).length <= maxExpected, 5000, 100);
Object[] objects = checkLeak.getAllObjects(clazz);
@ -56,7 +60,7 @@ public class MemoryAssertions {
for (Object obj : objects) {
logger.warn("Object {} still in the heap", obj);
}
String report = checkLeak.exploreObjectReferences(10, 10, true, objects);
String report = checkLeak.exploreObjectReferences(maxLevel, maxObjects, true, objects);
logger.info(report);
Assert.fail("Class " + clazz + " has leaked " + objects.length + " objects\n" + report);