ARTEMIS-5017 bridge leaks ClientSessionFactory instance on reconnect attempt

This commit is contained in:
Justin Bertram 2024-08-28 11:14:38 -05:00 committed by clebertsuconic
parent 898c09cb93
commit d0c83af40e
3 changed files with 46 additions and 0 deletions

View File

@ -1914,6 +1914,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return receivedTopology;
}
public int getClientSessionFactoryCount() {
return factories.size();
}
private String fromInterceptors(final List<Interceptor> interceptors) {
StringBuffer buffer = new StringBuffer();
boolean first = true;

View File

@ -205,6 +205,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return csf;
}
// for tests
public ServerLocatorInternal getServerLocator() {
return serverLocator;
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.server.Consumer#getDeliveringMessages()
*/
@ -1024,6 +1029,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
if (csf == null || csf.isClosed()) {
if (state == State.STOPPING || state == State.PAUSING)
return;
if (csf != null && csf.isClosed()) {
// ensure we release any references to the existing ClientSessionFactory before creating a new one otherwise we will leak
serverLocator.factoryClosed(csf);
}
csf = createSessionFactory();
if (csf == null) {
// Retrying. This probably means the node is not available (for the cluster connection case)

View File

@ -58,6 +58,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
@ -521,6 +522,38 @@ public class BridgeTest extends ActiveMQTestBase {
}
}
@TestTemplate
public void testClientSessionFactoryLeak() throws Exception {
Map<String, Object> server0Params = new HashMap<>();
server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
Map<String, Object> server1Params = new HashMap<>();
addTargetParameters(server1Params);
server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String forwardAddress = "forwardAddress";
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
HashMap<String, TransportConfiguration> connectors = new HashMap<>();
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
List<String> connectorConfig = List.of(server1tc.getName());
// intentionally configure a bridge that will fail to connect and attempt to reconnect multiple times quickly
server0.getConfiguration().setBridgeConfigurations(List.of(new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(0).setReconnectAttempts(-1).setStaticConnectors(connectorConfig)));
server0.getConfiguration().setQueueConfigs(List.of(QueueConfiguration.of(queueName0).setAddress(testAddress)));
server1.start();
server0.start();
ServerLocatorImpl serverLocator = (ServerLocatorImpl) ((BridgeImpl)server0.getClusterManager().getBridges().get("bridge1")).getServerLocator();
Wait.waitFor(() -> serverLocator.getClientSessionFactoryCount() > 1, 500, 10);
assertTrue(serverLocator.getClientSessionFactoryCount() <= 1);
}
/**
* @param server1Params
*/