From d0c83af40e59ac5ae64a20fbbd0506336c24319f Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Wed, 28 Aug 2024 11:14:38 -0500 Subject: [PATCH] ARTEMIS-5017 bridge leaks ClientSessionFactory instance on reconnect attempt --- .../core/client/impl/ServerLocatorImpl.java | 4 +++ .../core/server/cluster/impl/BridgeImpl.java | 9 +++++ .../cluster/bridge/BridgeTest.java | 33 +++++++++++++++++++ 3 files changed, 46 insertions(+) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index c7eff111ca..dd250a9b58 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -1914,6 +1914,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return receivedTopology; } + public int getClientSessionFactoryCount() { + return factories.size(); + } + private String fromInterceptors(final List interceptors) { StringBuffer buffer = new StringBuffer(); boolean first = true; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 01ded75a40..e8659b0cb1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -205,6 +205,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled return csf; } + // for tests + public ServerLocatorInternal getServerLocator() { + return serverLocator; + } + /* (non-Javadoc) * @see org.apache.activemq.artemis.core.server.Consumer#getDeliveringMessages() */ @@ -1024,6 +1029,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled if (csf == null || csf.isClosed()) { if (state == State.STOPPING || state == State.PAUSING) return; + if (csf != null && csf.isClosed()) { + // ensure we release any references to the existing ClientSessionFactory before creating a new one otherwise we will leak + serverLocator.factoryClosed(csf); + } csf = createSessionFactory(); if (csf == null) { // Retrying. This probably means the node is not available (for the cluster connection case) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index 23dd1c63a8..72a87c8b97 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -58,6 +58,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.management.BridgeControl; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.TransformerConfiguration; @@ -521,6 +522,38 @@ public class BridgeTest extends ActiveMQTestBase { } } + @TestTemplate + public void testClientSessionFactoryLeak() throws Exception { + Map server0Params = new HashMap<>(); + server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params); + + Map server1Params = new HashMap<>(); + addTargetParameters(server1Params); + server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params); + + final String testAddress = "testAddress"; + final String queueName0 = "queue0"; + final String forwardAddress = "forwardAddress"; + + TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params); + + HashMap connectors = new HashMap<>(); + connectors.put(server1tc.getName(), server1tc); + server0.getConfiguration().setConnectorConfigurations(connectors); + + List connectorConfig = List.of(server1tc.getName()); + // intentionally configure a bridge that will fail to connect and attempt to reconnect multiple times quickly + server0.getConfiguration().setBridgeConfigurations(List.of(new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(0).setReconnectAttempts(-1).setStaticConnectors(connectorConfig))); + server0.getConfiguration().setQueueConfigs(List.of(QueueConfiguration.of(queueName0).setAddress(testAddress))); + + server1.start(); + server0.start(); + + ServerLocatorImpl serverLocator = (ServerLocatorImpl) ((BridgeImpl)server0.getClusterManager().getBridges().get("bridge1")).getServerLocator(); + Wait.waitFor(() -> serverLocator.getClientSessionFactoryCount() > 1, 500, 10); + assertTrue(serverLocator.getClientSessionFactoryCount() <= 1); + } + /** * @param server1Params */