ARTEMIS-3687 bridges with concurrency > 1 can leak

This commit is contained in:
Justin Bertram 2022-02-16 17:06:58 -06:00
parent c51fda09cd
commit bc65438344
No known key found for this signature in database
GPG Key ID: F41830B875BB8633
7 changed files with 87 additions and 45 deletions

View File

@ -24,7 +24,6 @@ import javax.management.MBeanOperationInfo;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.logs.AuditLogger;
@ -34,15 +33,11 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
private final Bridge bridge;
private final BridgeConfiguration configuration;
public BridgeControlImpl(final Bridge bridge,
final StorageManager storageManager,
final BridgeConfiguration configuration) throws Exception {
final StorageManager storageManager) throws Exception {
super(BridgeControl.class, storageManager);
this.bridge = bridge;
this.configuration = configuration;
}
// BridgeControlMBean implementation ---------------------------
@ -54,7 +49,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
List<String> staticConnectors = configuration.getStaticConnectors();
List<String> staticConnectors = bridge.getConfiguration().getStaticConnectors();
return staticConnectors.toArray(new String[staticConnectors.size()]);
} finally {
blockOnIO();
@ -68,7 +63,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
return configuration.getForwardingAddress();
return bridge.getConfiguration().getForwardingAddress();
} finally {
blockOnIO();
}
@ -81,7 +76,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
return configuration.getQueueName();
return bridge.getConfiguration().getQueueName();
} finally {
blockOnIO();
}
@ -94,7 +89,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
return configuration.getDiscoveryGroupName();
return bridge.getConfiguration().getDiscoveryGroupName();
} finally {
blockOnIO();
}
@ -107,7 +102,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
return configuration.getFilterString();
return bridge.getConfiguration().getFilterString();
} finally {
blockOnIO();
}
@ -120,7 +115,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
return configuration.getReconnectAttempts();
return bridge.getConfiguration().getReconnectAttempts();
} finally {
blockOnIO();
}
@ -133,7 +128,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
return configuration.getName();
return bridge.getConfiguration().getName();
} finally {
blockOnIO();
}
@ -146,7 +141,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
return configuration.getRetryInterval();
return bridge.getConfiguration().getRetryInterval();
} finally {
blockOnIO();
}
@ -159,7 +154,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
return configuration.getRetryIntervalMultiplier();
return bridge.getConfiguration().getRetryIntervalMultiplier();
} finally {
blockOnIO();
}
@ -172,7 +167,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
return configuration.getMaxRetryInterval();
return bridge.getConfiguration().getMaxRetryInterval();
} finally {
blockOnIO();
}
@ -185,7 +180,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getClassName();
return bridge.getConfiguration().getTransformerConfiguration() == null ? null : bridge.getConfiguration().getTransformerConfiguration().getClassName();
} finally {
blockOnIO();
}
@ -206,7 +201,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getProperties();
return bridge.getConfiguration().getTransformerConfiguration() == null ? null : bridge.getConfiguration().getTransformerConfiguration().getProperties();
} finally {
blockOnIO();
}
@ -232,7 +227,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
return configuration.isUseDuplicateDetection();
return bridge.getConfiguration().isUseDuplicateDetection();
} finally {
blockOnIO();
}
@ -245,7 +240,7 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
}
clearIO();
try {
return configuration.isHA();
return bridge.getConfiguration().isHA();
} finally {
blockOnIO();
}

View File

@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
@ -480,7 +481,7 @@ public class ClusterManager implements ActiveMQComponent {
String name = config.getConcurrency() > 1 ? (config.getName() + "-" + i) : config.getName();
Bridge bridge = new BridgeImpl(serverLocator, new BridgeConfiguration(config).setName(name), nodeManager.getUUID(), queue, executorFactory.getExecutor(), scheduledExecutor, server);
bridges.put(name, bridge);
managementService.registerBridge(bridge, config);
managementService.registerBridge(bridge);
bridge.start();
if (server.hasBrokerBridgePlugins()) {
@ -529,16 +530,24 @@ public class ClusterManager implements ActiveMQComponent {
}
public void destroyBridge(final String name) throws Exception {
Bridge bridge;
List<Bridge> bridgesToRemove = new ArrayList<>();
synchronized (this) {
bridge = bridges.remove(name);
if (bridge != null) {
bridge.stop();
managementService.unregisterBridge(name);
for (Bridge bridge : bridges.values()) {
if (bridge.getName().toString().matches(name + "|" + name + "-\\d+")) {
bridge = bridges.get(bridge.getName().toString());
if (bridge != null) {
bridgesToRemove.add(bridge);
}
}
}
for (Bridge bridgeToRemove : bridgesToRemove) {
bridges.remove(bridgeToRemove.getName().toString());
bridgeToRemove.stop();
managementService.unregisterBridge(bridgeToRemove.getName().toString());
}
}
if (bridge != null) {
for (Bridge bridge : bridgesToRemove) {
bridge.flushExecutor();
}
}

View File

@ -28,7 +28,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
@ -120,7 +119,7 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
//void unregisterDiscoveryGroup(String name) throws Exception;
void registerBridge(Bridge bridge, BridgeConfiguration configuration) throws Exception;
void registerBridge(Bridge bridge) throws Exception;
void unregisterBridge(String name) throws Exception;

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.core.server.management.impl;
import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
@ -57,7 +55,6 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.management.impl.AcceptorControlImpl;
@ -111,6 +108,8 @@ import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
public class ManagementServiceImpl implements ManagementService {
private static final Logger logger = Logger.getLogger(ManagementServiceImpl.class);
@ -455,13 +454,12 @@ public class ManagementServiceImpl implements ManagementService {
}
@Override
public synchronized void registerBridge(final Bridge bridge,
final BridgeConfiguration configuration) throws Exception {
public synchronized void registerBridge(final Bridge bridge) throws Exception {
bridge.setNotificationService(this);
ObjectName objectName = objectNameBuilder.getBridgeObjectName(configuration.getName());
BridgeControl control = new BridgeControlImpl(bridge, storageManager, configuration);
ObjectName objectName = objectNameBuilder.getBridgeObjectName(bridge.getName().toString());
BridgeControl control = new BridgeControlImpl(bridge, storageManager);
registerInJMX(objectName, control);
registerInRegistry(ResourceNames.BRIDGE + configuration.getName(), control);
registerInRegistry(ResourceNames.BRIDGE + bridge.getName(), control);
}
@Override

View File

@ -31,7 +31,6 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
@ -306,7 +305,7 @@ public class ClusteredResetMockTest extends ActiveMQTestBase {
}
@Override
public void registerBridge(Bridge bridge, BridgeConfiguration configuration) throws Exception {
public void registerBridge(Bridge bridge) throws Exception {
}

View File

@ -186,13 +186,19 @@ Let's take a look at all the parameters in turn:
flexibility to deal with any situation. Valid values are `ANYCAST`,
`MULTICAST`, `PASS`, & `STRIP`. The default is `PASS`.
- `concurrency`. For bridging high latency networks, and particularly for destinations
with a high throughput, more workers might have to be commited to the bridge. This is
done with the concurrency parameter. Increasing the concurrency will get reflected
by more consumers and producers showing up on the bridged destination, allowing
for increased parallelism across high latency networks.
- `concurrency`. For bridging high latency networks, and particularly for
destinations with a high throughput, more workers might have to be commited
to the bridge. This is done with the `concurrency` parameter. Increasing the
concurrency will get reflected by more consumers and producers showing up on
the bridged destination, allowing for increased parallelism across high
latency networks. The default is `1`.
Default=1
When using a `concurrency` value greater than 1 multiple bridges will be
created and named with an index. For example, if a bridge named `myBridge`
was configured with a `concurrency` of `3` then actually 3 bridges would
be created named `myBridge-0`, `myBridge-1`, and `myBridge-2`. This is
important to note for management operations as each bridge will have its
own associated `BridgeControl`.
- `static-connectors` or `discovery-group-ref`. Pick either of these options to
connect the bridge to the target server.

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
@ -2044,6 +2045,41 @@ public class BridgeTest extends ActiveMQTestBase {
Assert.assertNotNull(consumer.receive(200));
}
@Test
public void testManagementLeak() throws Exception {
final SimpleString ADDRESS = new SimpleString("myAddress");
final SimpleString QUEUE = new SimpleString("myQueue");
final SimpleString FORWARDING_ADDRESS = new SimpleString("myForwardingAddress");
final SimpleString FORWARDING_QUEUE = new SimpleString("myForwardingQueue");
final String BRIDGE = "myBridge";
ActiveMQServer server = addServer(new ActiveMQServerImpl(createDefaultConfig(0, isNetty()).addConnectorConfiguration("myConnector", new TransportConfiguration(getConnector()))));
server.start();
server.waitForActivation(100, TimeUnit.MILLISECONDS);
server.createQueue(new QueueConfiguration(QUEUE)
.setAddress(ADDRESS)
.setRoutingType(RoutingType.ANYCAST)
.setDurable(false));
server.createQueue(new QueueConfiguration(FORWARDING_QUEUE)
.setAddress(FORWARDING_ADDRESS)
.setRoutingType(RoutingType.ANYCAST)
.setDurable(false));
ArrayList<String> connectors = new ArrayList<>();
connectors.add("myConnector");
final int concurrency = 20;
BridgeConfiguration config = new BridgeConfiguration()
.setName(BRIDGE)
.setQueueName(QUEUE.toString())
.setForwardingAddress(FORWARDING_ADDRESS.toString())
.setStaticConnectors(connectors)
.setConcurrency(concurrency);
server.deployBridge(config);
assertEquals(concurrency, server.getManagementService().getResources(BridgeControl.class).length);
server.destroyBridge(config.getName());
assertEquals(0, server.getManagementService().getResources(BridgeControl.class).length);
}
/**
* It will inspect the journal directly and determine if there are queues on this journal,
*