ARTEMIS-4161 AMQP and OpenWire leaking leaking objects in certain conditions

The issue identified with AMQP was under Transaction usage, and while opening and closing sessions.
It seems the leak would be released once the connection is closed.

We added a new testsuite under ./tests/leak-tests To fix and validate these issues
This commit is contained in:
Clebert Suconic 2023-02-07 17:46:02 -05:00 committed by clebertsuconic
parent 49cc8ba1c8
commit a2ba6ed298
16 changed files with 715 additions and 13 deletions

View File

@ -24,4 +24,6 @@ public interface RepeatableIterator<E> extends Iterator<E> {
* If the current value should repeat.
*/
void repeat();
void removed(E itemRemoved);
}

View File

@ -35,6 +35,13 @@ public class RepeatableIteratorWrapper<E> implements RepeatableIterator<E>, Rese
}
}
@Override
public void removed(E removed) {
if (last == removed) {
last = null;
}
}
@Override
public boolean hasNext() {
return repeat || iterator.hasNext();

View File

@ -87,4 +87,9 @@ public class UpdatableIterator<E> implements ResettableIterator<E>, RepeatableIt
public void repeat() {
currentIterator.repeat();
}
@Override
public void removed(E removed) {
currentIterator.removed(removed);
}
}

View File

@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@ -154,6 +155,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private boolean destroyed = false;
private volatile ScheduledFuture ttlCheck;
//separated in/out wireFormats allow deliveries (eg async and consumers) to not slow down bufferReceived
private final OpenWireFormat inWireFormat;
@ -686,10 +689,18 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private void shutdown(boolean fail) {
if (fail) {
transportConnection.forceClose();
} else {
transportConnection.close();
try {
if (fail) {
transportConnection.forceClose();
} else {
transportConnection.close();
}
} finally {
ScheduledFuture ttlCheckToCancel = this.ttlCheck;
this.ttlCheck = null;
if (ttlCheckToCancel != null) {
ttlCheckToCancel.cancel(true);
}
}
}
@ -1012,7 +1023,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.maxInactivityDuration = inactivityDuration;
if (this.useKeepAlive) {
protocolManager.getScheduledPool().schedule(new Runnable() {
ttlCheck = protocolManager.getScheduledPool().schedule(new Runnable() {
@Override
public void run() {
if (inactivityDuration >= 0) {

View File

@ -86,11 +86,20 @@ public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsume
return result;
}
@Override
public void removed(T t) {
iterator.removed(t);
}
@Override
public boolean remove(T t) {
iterator.removed(t);
boolean result = consumers.remove(t);
if (result) {
iterator.update(consumers.resettableIterator());
if (consumers.isEmpty()) {
reset();
}
}
return result;
}

View File

@ -1457,7 +1457,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
groups.removeAll();
}
ConsumerHolder<Consumer> newConsumerHolder = new ConsumerHolder<>(consumer);
ConsumerHolder<Consumer> newConsumerHolder = new ConsumerHolder<>(consumer, this);
if (consumers.add(newConsumerHolder)) {
if (delayBeforeDispatch >= 0) {
dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis());
@ -1480,6 +1480,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void removeConsumer(final Consumer consumer) {
logger.debug("Removing consumer {}", consumer);
try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) {
synchronized (this) {
@ -1489,6 +1490,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (holder.consumer == consumer) {
if (holder.iter != null) {
holder.iter.close();
holder.iter = null;
}
consumers.remove(holder);
consumerRemoved = true;
@ -3393,7 +3395,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (redistributor == null && (consumers.isEmpty() || hasUnMatchedPending)) {
logger.trace("QueueImpl::Adding redistributor on queue {}", this);
redistributor = new ConsumerHolder(new Redistributor(this, storageManager, postOffice));
redistributor = new ConsumerHolder(new Redistributor(this, storageManager, postOffice), this);
redistributor.consumer.start();
consumers.add(redistributor);
hasUnMatchedPending = false;
@ -4220,11 +4222,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
protected static class ConsumerHolder<T extends Consumer> implements PriorityAware {
ConsumerHolder(final T consumer) {
ConsumerHolder(final T consumer, final QueueImpl queue) {
this.consumer = consumer;
this.queue = queue;
}
final T consumer;
final QueueImpl queue;
LinkedListIterator<MessageReference> iter;
@ -4256,6 +4260,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public int getPriority() {
return consumer.getPriority();
}
@Override
public String toString() {
return "ConsumerHolder::queue=" + queue + ", consumer=" + consumer;
}
}
private class DelayedAddRedistributor implements Runnable {

View File

@ -97,7 +97,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private final int minLargeMessageSize;
private final ServerSession session;
private ServerSession session;
protected final Object lock = new Object();
@ -141,7 +141,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private final java.util.Deque<MessageReference> deliveringRefs = new ArrayDeque<>();
private final SessionCallback callback;
private SessionCallback callback;
private boolean preAcknowledge;
@ -617,6 +617,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
if (server.hasBrokerConsumerPlugins()) {
server.callBrokerConsumerPlugins(plugin -> plugin.afterCloseConsumer(this, failed));
}
protocolContext = null;
callback = null;
session = null;
}
private void addLingerRefs() throws Exception {

View File

@ -33,6 +33,11 @@ public class ServerStatus {
private static final ServerStatus instance = new ServerStatus();
public static void clear() {
instance.server = null;
instance.immutableStateValues.clear();
}
public static synchronized ServerStatus getInstanceFor(ActiveMQServerImpl activeMQServer) {
if (instance.server == null) {
instance.server = activeMQServer;

View File

@ -251,6 +251,9 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected void clearServers() {
servers.clear();
}
private final Collection<ActiveMQServer> servers = new ArrayList<>();
private final Collection<ServerLocator> locators = new ArrayList<>();
@ -364,7 +367,7 @@ public abstract class ActiveMQTestBase extends Assert {
}
stopComponentOutputExceptions(server);
}
servers.clear();
clearServers();
}
closeAllOtherComponents();

View File

@ -190,6 +190,7 @@
<skipJmsTests>true</skipJmsTests>
<skipExtraTests>true</skipExtraTests>
<skipIntegrationTests>true</skipIntegrationTests>
<skipLeakTests>true</skipLeakTests>
<skipCompatibilityTests>true</skipCompatibilityTests>
<skipSmokeTests>true</skipSmokeTests>
<skipJoramTests>true</skipJoramTests>
@ -1277,6 +1278,7 @@
<skipSoakTests>true</skipSoakTests>
<skipPerformanceTests>true</skipPerformanceTests>
<skipExtraTests>false</skipExtraTests>
<skipLeakTests>false</skipLeakTests>
</properties>
</profile>
<profile>
@ -1289,6 +1291,7 @@
<skipJmsTests>false</skipJmsTests>
<skipJoramTests>false</skipJoramTests>
<skipCompatibilityTests>false</skipCompatibilityTests>
<skipLeakTests>false</skipLeakTests>
<testFailureIgnore>false</testFailureIgnore>
<!-- This enables the karaf-<foo>-integration-tests and
integration-tests module tests, see fast-tests profile

View File

@ -363,14 +363,17 @@ public class ConsumerTest extends ActiveMQTestBase {
}
}
consumer.close();
Assert.assertTrue(serverConsumer.getProtocolContext() instanceof ProtonServerSenderContext);
final AMQPSessionContext sessionContext = ((ProtonServerSenderContext)
serverConsumer.getProtocolContext()).getSessionContext();
consumer.close();
final ServerConsumer lambdaServerConsumer = serverConsumer;
Wait.assertTrue(() -> lambdaServerConsumer.getProtocolContext() == null);
Wait.assertEquals(0, () -> sessionContext.getSenderCount(), 1000, 10);
} finally {
context.stop();
context.close();

255
tests/leak-tests/pom.xml Normal file
View File

@ -0,0 +1,255 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.tests</groupId>
<artifactId>artemis-tests-pom</artifactId>
<version>2.29.0-SNAPSHOT</version>
</parent>
<artifactId>leak-tests</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis Leak Tests</name>
<properties>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
<check-leak-version>0.7</check-leak-version>
<!-- whatever name you choose it has to match between the check-leak-maven-plugin install call and the JVM argument -->
<check-leak-deploy-name>checkleak.lib</check-leak-deploy-name>
</properties>
<dependencies>
<!-- this is needed for ActivemqTestBase.
TODO: We should eventually move ActiveMQTestBase into artemis-unit-test-support -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<!-- this is needed for Wait -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-unit-test-support</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- this is needed for CFUtil -->
<dependency>
<groupId>org.apache.activemq.tests</groupId>
<artifactId>artemis-test-support</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Dependencies for the ActiveMQ Artemis Server and its protocols -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-journal</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-amqp-protocol</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-stomp-protocol</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-openwire-protocol</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-artemis-native</artifactId>
<version>${activemq-artemis-native-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-mqtt-protocol</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- openwire client -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<scope>test</scope>
<exclusions>
<!-- we are replacing this dependency by jakarta JMS, which is being brought by Artemis JMS Client -->
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</exclusion>
<!-- we are replacing this dependency by jakarta.management.j2ee-api, otherwise it would clash with Core JMS client -->
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-j2ee-management_1.1_spec</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- this is required for openwire clients since we remove the management dependency on the exclusions for activemq-client -->
<dependency>
<groupId>jakarta.management.j2ee</groupId>
<artifactId>jakarta.management.j2ee-api</artifactId>
<scope>test</scope>
</dependency>
<!-- Core Client -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- AMQP Client -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<scope>test</scope>
</dependency>
<!-- logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<!-- The johnzon-core and json-api contents are repackaged in -commons,
However maven can still need them during tests, which run against
the original -commons classes when built+run in the same reactor,
and not the jar containing the shaded bits. -->
<dependency>
<groupId>org.apache.johnzon</groupId>
<artifactId>johnzon-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<scope>test</scope>
</dependency>
<!-- check-leak, which is a library to inspect the VM using JVMTI and assert successful garbage cleanups.
https://github.com/check-leak/check-leak -->
<dependency>
<groupId>io.github.check-leak</groupId>
<artifactId>core</artifactId>
<version>${check-leak-version}</version>
<scope>test</scope>
</dependency>
<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<filtering>true</filtering>
</testResource>
</testResources>
<plugins>
<plugin>
<!-- It will install the proper native library required for the current platform. -->
<groupId>io.github.check-leak</groupId>
<artifactId>checkleak-maven-plugin</artifactId>
<version>${check-leak-version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<id>find-native</id>
<goals>
<goal>install</goal>
</goals>
<configuration>
<target>${project.basedir}/target/lib</target>
<!-- I'm selecting a name for the deployed file here. it can be any name and extension you choose as long as it matches the one passed on the JDK argument -->
<lib>${check-leak-deploy-name}</lib>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<nonFilteredFileExtensions>
<nonFilteredFileExtension>jks</nonFilteredFileExtension>
</nonFilteredFileExtensions>
</configuration>
<executions>
<execution>
<id>copy-security-resources</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/test-classes</outputDirectory>
<resources>
<resource>
<directory>../security-resources</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>${skipLeakTests}</skipTests>
<argLine>-agentpath:${project.basedir}/target/lib/${check-leak-deploy-name} -Djgroups.bind_addr=::1 ${activemq-surefire-argline} -Dorg.apache.activemq.SERIALIZABLE_PACKAGES="java.lang,javax.security,java.util,org.apache.activemq,org.fusesource.hawtbuf"</argLine>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.leak;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServerStatus;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import io.github.checkleak.core.CheckLeak;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.activemq.artemis.tests.leak.MemoryAssertions.assertMemory;
import static org.apache.activemq.artemis.tests.leak.MemoryAssertions.basicMemoryAsserts;
public class ConnectionLeakTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
ActiveMQServer server;
@BeforeClass
public static void beforeClass() throws Exception {
Assume.assumeTrue(CheckLeak.isLoaded());
}
@After
public void validateServer() throws Exception {
CheckLeak checkLeak = new CheckLeak();
// I am doing this check here because the test method might hold a client connection
// so this check has to be done after the test, and before the server is stopped
assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName());
server.stop();
server = null;
clearServers();
ServerStatus.clear();
assertMemory(checkLeak, 0, ActiveMQServerImpl.class.getName());
}
@Override
@Before
public void setUp() throws Exception {
server = createServer(true, createDefaultConfig(1, true));
server.start();
}
@Test
public void testAMQP() throws Exception {
doTest("AMQP");
}
@Test
public void testCore() throws Exception {
doTest("CORE");
}
@Test
public void testOpenWire() throws Exception {
doTest("OPENWIRE");
}
private void doTest(String protocol) throws Exception {
int REPEATS = 100;
int MESSAGES = 20;
basicMemoryAsserts();
ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection producerConnection = cf.createConnection(); Connection consumerConnection = cf.createConnection()) {
Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
consumerConnection.start();
for (int i = 0; i < REPEATS; i++) {
{
Destination source = producerSession.createQueue("source");
try (MessageProducer sourceProducer = producerSession.createProducer(source)) {
for (int msg = 0; msg < MESSAGES; msg++) {
Message message = producerSession.createTextMessage("hello " + msg);
message.setIntProperty("i", msg);
sourceProducer.send(message);
}
producerSession.commit();
}
}
{
Destination source = consumerSession.createQueue("source");
Destination target = consumerSession.createQueue("target");
// notice I am not closing the consumer directly, just relying on the connection closing
MessageProducer targetProducer = consumerSession.createProducer(target);
// I am receiving messages, and pushing them to a different queue
try (MessageConsumer sourceConsumer = consumerSession.createConsumer(source)) {
for (int msg = 0; msg < MESSAGES; msg++) {
TextMessage m = (TextMessage) sourceConsumer.receive(5000);
Assert.assertNotNull(m);
Assert.assertEquals("hello " + msg, m.getText());
Assert.assertEquals(msg, m.getIntProperty("i"));
targetProducer.send(m);
}
Assert.assertNull(sourceConsumer.receiveNoWait());
}
consumerSession.commit();
}
}
}
// this is just to drain the messages
try (Connection targetConnection = cf.createConnection(); Connection consumerConnection = cf.createConnection()) {
Session targetSession = targetConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = targetSession.createConsumer(targetSession.createQueue("target"));
targetConnection.start();
for (int msgI = 0; msgI < REPEATS * MESSAGES; msgI++) {
Assert.assertNotNull(consumer.receive(5000));
}
Assert.assertNull(consumer.receiveNoWait());
}
Queue sourceQueue = server.locateQueue("source");
Queue targetQueue = server.locateQueue("target");
Wait.assertEquals(0, sourceQueue::getMessageCount);
Wait.assertEquals(0, targetQueue::getMessageCount);
if (cf instanceof ActiveMQConnectionFactory) {
((ActiveMQConnectionFactory)cf).close();
}
basicMemoryAsserts();
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.leak;
import java.lang.invoke.MethodHandles;
import io.github.checkleak.core.CheckLeak;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MemoryAssertions {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/** most tests should have these as 0 after execution. */
public static void basicMemoryAsserts() throws Exception {
CheckLeak checkLeak = new CheckLeak();
assertMemory(checkLeak, 0, OpenWireConnection.class.getName());
assertMemory(checkLeak, 0, ProtonServerSenderContext.class.getName());
assertMemory(checkLeak, 0, ProtonServerReceiverContext.class.getName());
assertMemory(checkLeak, 0, AMQPSessionContext.class.getName());
assertMemory(checkLeak, 0, ServerConsumerImpl.class.getName());
assertMemory(checkLeak, 0, RoutingContextImpl.class.getName());
assertMemory(checkLeak, 0, MessageReferenceImpl.class.getName());
}
public static void assertMemory(CheckLeak checkLeak, int maxExpected, String clazz) throws Exception {
Wait.waitFor(() -> checkLeak.getAllObjects(clazz).length <= maxExpected, 5000, 100);
Object[] objects = checkLeak.getAllObjects(clazz);
if (objects.length > maxExpected) {
for (Object obj : objects) {
logger.warn("Object {} still in the heap", obj);
}
String report = checkLeak.exploreObjectReferences(5, 10, true, objects);
logger.info(report);
Assert.fail("Class " + clazz + " has leaked " + objects.length + " objects\n" + report);
}
}
}

140
tests/leak-tests/test.log Normal file
View File

@ -0,0 +1,140 @@
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Detecting the operating system and CPU architecture
[INFO] ------------------------------------------------------------------------
[INFO] os.detected.name: osx
[INFO] os.detected.arch: x86_64
[INFO] os.detected.bitness: 64
[INFO] os.detected.version: 10.16
[INFO] os.detected.version.major: 10
[INFO] os.detected.version.minor: 16
[INFO] os.detected.classifier: osx-x86_64
[INFO]
[INFO] ----------------< org.apache.activemq.tests:leak-tests >----------------
[INFO] Building ActiveMQ Artemis Leak Tests 2.29.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-clean-plugin:3.2.0:clean (default-clean) @ leak-tests ---
[INFO] Deleting /Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/target
[INFO]
[INFO] --- maven-enforcer-plugin:3.1.0:enforce (enforce-maven-version) @ leak-tests ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.1.0:enforce (enforce-java-version) @ leak-tests ---
[INFO]
[INFO] --- maven-resources-plugin:3.2.0:copy-resources (copy-security-resources) @ leak-tests ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Using 'UTF-8' encoding to copy filtered properties files.
[INFO] Copying 35 resources
[INFO]
[INFO] --- checkleak-maven-plugin:0.7:install (find-native) @ leak-tests ---
[INFO]
[INFO] --- maven-remote-resources-plugin:1.7.0:process (process-resource-bundles) @ leak-tests ---
[INFO] Preparing remote bundle org.apache:apache-jar-resource-bundle:1.4
[INFO] Copying 3 resources from 1 bundle.
[INFO]
[INFO] --- maven-resources-plugin:3.2.0:resources (default-resources) @ leak-tests ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Using 'UTF-8' encoding to copy filtered properties files.
[INFO] skip non existing resourceDirectory /Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/src/main/resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.10.1:compile (default-compile) @ leak-tests ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-resources-plugin:3.2.0:testResources (default-testResources) @ leak-tests ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Using 'UTF-8' encoding to copy filtered properties files.
[INFO] skip non existing resourceDirectory /Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/src/test/resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.10.1:testCompile (default-testCompile) @ leak-tests ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 2 source files to /Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/target/test-classes
[INFO]
[INFO] --- maven-surefire-plugin:2.22.2:test (default-test) @ leak-tests ---
[WARNING] The system property basedir is configured twice! The property appears in <argLine/> and any of <systemPropertyVariables/>, <systemProperties/> or user property.
[INFO]
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running org.apache.activemq.artemis.tests.leak.ConnectionLeakTest
[main] 14:39:26,380 INFO [org.apache.activemq.artemis.tests.util.ActiveMQTestBase] **** start #test testAMQP() ***
[main] 14:39:26,650 INFO [org.apache.activemq.artemis.core.server] AMQ221000: live Message Broker is starting with configuration Broker Configuration (clustered=false,journalDirectory=/Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/./target/tmp/ConnectionLeakTest/junit16008448135972800424/journal1-L,bindingsDirectory=/Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/./target/tmp/ConnectionLeakTest/junit16008448135972800424/bindings1-L,largeMessagesDirectory=/Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/./target/tmp/ConnectionLeakTest/junit16008448135972800424/large-msg1-L,pagingDirectory=/Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/./target/tmp/ConnectionLeakTest/junit16008448135972800424/page1-L)
[main] 14:39:26,679 INFO [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
[main] 14:39:26,791 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
[main] 14:39:26,792 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
[main] 14:39:26,792 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
[main] 14:39:26,793 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
[main] 14:39:26,793 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
[main] 14:39:26,857 INFO [org.apache.activemq.artemis.core.server] AMQ221034: Waiting indefinitely to obtain live lock
[main] 14:39:26,857 INFO [org.apache.activemq.artemis.core.server] AMQ221035: Live Server Obtained live lock
[main] 14:39:27,445 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started KQUEUE Acceptor at localhost:61616 for protocols [CORE,MQTT,AMQP,STOMP,OPENWIRE]
[main] 14:39:27,464 INFO [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
[main] 14:39:27,464 INFO [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.29.0-SNAPSHOT [localhost, nodeID=9f35d8ec-ae31-11ed-a464-3c22fb430ab3]
[AmqpProvider :(1):[amqp://localhost:61616]] 14:39:28,238 INFO [org.apache.qpid.jms.JmsConnection] Connection ID:e2a47639-2cff-419e-a456-6ebec189d2fd:1 connected to server: amqp://localhost:61616
[AmqpProvider :(2):[amqp://localhost:61616]] 14:39:28,266 INFO [org.apache.qpid.jms.JmsConnection] Connection ID:7f0b3201-7ab3-48d4-b834-f026f6890c29:2 connected to server: amqp://localhost:61616
[AmqpProvider :(3):[amqp://localhost:61616]] 14:39:29,587 INFO [org.apache.qpid.jms.JmsConnection] Connection ID:e840b858-2b58-4813-a749-f486f3a2f3f6:3 connected to server: amqp://localhost:61616
[AmqpProvider :(4):[amqp://localhost:61616]] 14:39:29,704 WARN [org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder] Open of resource:(JmsConnectionInfo { ID:950c1cc8-36a5-4a56-a283-c800eaf43da2:4, configuredURI = amqp://localhost:61616, connectedURI = null }) failed: Open failed unexpectedly.
[Thread-3 (activemq-netty-threads)] 14:39:29,706 WARN [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session a0fc19cd-ae31-11ed-a464-3c22fb430ab3
[Thread-3 (activemq-netty-threads)] 14:39:29,706 WARN [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session a0fc19cd-ae31-11ed-a464-3c22fb430ab3
[main] 14:39:30,644 INFO [org.apache.activemq.artemis.core.server] AMQ221002: Apache ActiveMQ Artemis Message Broker version 2.29.0-SNAPSHOT [9f35d8ec-ae31-11ed-a464-3c22fb430ab3] stopped, uptime 4.065 seconds
[main] 14:39:30,766 INFO [org.apache.activemq.artemis.tests.util.ActiveMQTestBase] **** end #test testAMQP() ***
[main] 14:39:30,767 INFO [org.apache.activemq.artemis.tests.util.ActiveMQTestBase] **** start #test testCore() ***
[main] 14:39:30,812 INFO [org.apache.activemq.artemis.core.server] AMQ221000: live Message Broker is starting with configuration Broker Configuration (clustered=false,journalDirectory=/Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/./target/tmp/ConnectionLeakTest/junit8509090511689323465/journal1-L,bindingsDirectory=/Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/./target/tmp/ConnectionLeakTest/junit8509090511689323465/bindings1-L,largeMessagesDirectory=/Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/./target/tmp/ConnectionLeakTest/junit8509090511689323465/large-msg1-L,pagingDirectory=/Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/./target/tmp/ConnectionLeakTest/junit8509090511689323465/page1-L)
[main] 14:39:30,813 INFO [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
[main] 14:39:30,813 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
[main] 14:39:30,814 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
[main] 14:39:30,814 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
[main] 14:39:30,815 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
[main] 14:39:30,815 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
[main] 14:39:30,844 INFO [org.apache.activemq.artemis.core.server] AMQ221034: Waiting indefinitely to obtain live lock
[main] 14:39:30,844 INFO [org.apache.activemq.artemis.core.server] AMQ221035: Live Server Obtained live lock
[main] 14:39:30,871 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started KQUEUE Acceptor at localhost:61616 for protocols [CORE,MQTT,AMQP,STOMP,OPENWIRE]
[main] 14:39:30,890 INFO [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
[main] 14:39:30,890 INFO [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.29.0-SNAPSHOT [localhost, nodeID=a1b41f2f-ae31-11ed-a464-3c22fb430ab3]
[main] 14:39:33,361 INFO [org.apache.activemq.artemis.core.server] AMQ221002: Apache ActiveMQ Artemis Message Broker version 2.29.0-SNAPSHOT [a1b41f2f-ae31-11ed-a464-3c22fb430ab3] stopped, uptime 2.591 seconds
[main] 14:39:33,495 INFO [org.apache.activemq.artemis.tests.util.ActiveMQTestBase] **** end #test testCore() ***
[main] 14:39:33,496 INFO [org.apache.activemq.artemis.tests.util.ActiveMQTestBase] **** start #test testOpenWire() ***
[main] 14:39:33,538 INFO [org.apache.activemq.artemis.core.server] AMQ221000: live Message Broker is starting with configuration Broker Configuration (clustered=false,journalDirectory=/Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/./target/tmp/ConnectionLeakTest/junit11559466446765387077/journal1-L,bindingsDirectory=/Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/./target/tmp/ConnectionLeakTest/junit11559466446765387077/bindings1-L,largeMessagesDirectory=/Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/./target/tmp/ConnectionLeakTest/junit11559466446765387077/large-msg1-L,pagingDirectory=/Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/./target/tmp/ConnectionLeakTest/junit11559466446765387077/page1-L)
[main] 14:39:33,538 INFO [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
[main] 14:39:33,539 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
[main] 14:39:33,539 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
[main] 14:39:33,540 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
[main] 14:39:33,541 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
[main] 14:39:33,541 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
[main] 14:39:33,566 INFO [org.apache.activemq.artemis.core.server] AMQ221034: Waiting indefinitely to obtain live lock
[main] 14:39:33,566 INFO [org.apache.activemq.artemis.core.server] AMQ221035: Live Server Obtained live lock
[main] 14:39:33,590 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started KQUEUE Acceptor at localhost:61616 for protocols [CORE,MQTT,AMQP,STOMP,OPENWIRE]
[main] 14:39:33,610 INFO [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
[main] 14:39:33,610 INFO [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.29.0-SNAPSHOT [localhost, nodeID=a354130a-ae31-11ed-a464-3c22fb430ab3]
[main] 14:39:36,178 INFO [org.apache.activemq.artemis.core.server] AMQ221002: Apache ActiveMQ Artemis Message Broker version 2.29.0-SNAPSHOT [a354130a-ae31-11ed-a464-3c22fb430ab3] stopped, uptime 2.680 seconds
[main] 14:39:36,318 INFO [org.apache.activemq.artemis.tests.util.ActiveMQTestBase] **** end #test testOpenWire() ***
[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 10.423 s - in org.apache.activemq.artemis.tests.leak.ConnectionLeakTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO]
[INFO] --- maven-jar-plugin:3.2.2:jar (default-jar) @ leak-tests ---
[INFO] Building jar: /Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/target/leak-tests-2.29.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-site-plugin:3.12.0:attach-descriptor (attach-descriptor) @ leak-tests ---
[INFO] Skipping because packaging 'jar' is not pom.
[INFO]
[INFO] --- maven-source-plugin:3.2.1:jar-no-fork (attach-sources) @ leak-tests ---
[INFO] Building jar: /Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/target/leak-tests-2.29.0-SNAPSHOT-sources.jar
[INFO]
[INFO] --- dependency-check-maven:6.1.0:check (default) @ leak-tests ---
[INFO] Skipping dependency-check
[INFO]
[INFO] --- maven-install-plugin:2.5.2:install (default-install) @ leak-tests ---
[INFO] Installing /Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/target/leak-tests-2.29.0-SNAPSHOT.jar to /Users/clebertsuconic/.m2/repository/org/apache/activemq/tests/leak-tests/2.29.0-SNAPSHOT/leak-tests-2.29.0-SNAPSHOT.jar
[INFO] Installing /Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/pom.xml to /Users/clebertsuconic/.m2/repository/org/apache/activemq/tests/leak-tests/2.29.0-SNAPSHOT/leak-tests-2.29.0-SNAPSHOT.pom
[INFO] Installing /Users/clebertsuconic/work/apache/activemq-artemis/tests/leak-tests/target/leak-tests-2.29.0-SNAPSHOT-sources.jar to /Users/clebertsuconic/.m2/repository/org/apache/activemq/tests/leak-tests/2.29.0-SNAPSHOT/leak-tests-2.29.0-SNAPSHOT-sources.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 16.564 s
[INFO] Finished at: 2023-02-16T14:39:37-05:00
[INFO] ------------------------------------------------------------------------

View File

@ -139,5 +139,6 @@
<module>artemis-test-support</module>
<module>smoke-tests</module>
<module>e2e-tests</module>
<module>leak-tests</module>
</modules>
</project>