ARTEMIS-4184 Bidges with concurrency not cleared properly on config reload

This commit is contained in:
a181321 2023-06-15 12:18:01 +02:00 committed by clebertsuconic
parent b967e6d940
commit 68e400b45c
4 changed files with 37 additions and 16 deletions

View File

@ -275,6 +275,9 @@ public final class BridgeConfiguration implements Serializable {
*/
public BridgeConfiguration setName(final String name) {
this.name = name;
if (this.parentName == null) {
this.parentName = name;
}
return this;
}
@ -769,7 +772,7 @@ public final class BridgeConfiguration implements Serializable {
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
} else if (!parentName.equals(other.parentName))
return false;
if (password == null) {
if (other.password != null)

View File

@ -4619,34 +4619,49 @@ public class ActiveMQServerImpl implements ActiveMQServer {
deployQueuesFromListQueueConfiguration(configuration.getQueueConfigs());
ActiveMQServerLogger.LOGGER.reloadingConfiguration("bridges");
for (BridgeConfiguration newBridgeConfig : configuration.getBridgeConfigurations()) {
newBridgeConfig.setParentName(newBridgeConfig.getName());
Bridge existingBridge = clusterManager.getBridges().get(newBridgeConfig.getParentName());
if (existingBridge != null && !existingBridge.getConfiguration().equals(newBridgeConfig) && existingBridge.getConfiguration().isConfigurationManaged()) {
String bridgeName = newBridgeConfig.getName();
newBridgeConfig.setParentName(bridgeName);
//Look for bridges with matching parentName. Only need first match in case of concurrent bridges
Bridge existingBridge = clusterManager.getBridges().values().stream()
.filter(bridge -> bridge.getConfiguration().getParentName().equals(bridgeName))
.findFirst()
.orElse(null);
if (existingBridge != null && existingBridge.getConfiguration().isConfigurationManaged() && !existingBridge.getConfiguration().equals(newBridgeConfig)) {
// this is an existing bridge but the config changed so stop the current bridge and deploy the new one
destroyBridge(existingBridge.getName().toString());
destroyBridge(bridgeName);
deployBridge(newBridgeConfig);
} else if (existingBridge == null) {
// this is a new bridge
deployBridge(newBridgeConfig);
}
}
for (final Bridge existingBridge: clusterManager.getBridges().values()) {
//Look for already running bridges no longer in configuration, stop if found
for (final Bridge existingBridge : clusterManager.getBridges().values()) {
BridgeConfiguration existingBridgeConfig = existingBridge.getConfiguration();
boolean destroy = true;
for (final BridgeConfiguration newBridgeConfig : configuration.getBridgeConfigurations()) {
if (existingBridgeConfig.isConfigurationManaged() && (existingBridgeConfig.getParentName().equals(newBridgeConfig.getName()) || existingBridgeConfig.getName().equals(newBridgeConfig.getName()) )) {
destroy = false;
break;
if (existingBridgeConfig.isConfigurationManaged()) {
String existingBridgeName = existingBridgeConfig.getParentName();
boolean noLongerConfigured = configuration.getBridgeConfigurations().stream()
.noneMatch(bridge -> bridge.getParentName().equals(existingBridgeName));
if (noLongerConfigured) {
destroyBridge(existingBridgeName);
}
}
if (destroy) {
// this bridge is running but it isn't in the new config which means it was removed so destroy it
destroyBridge(existingBridge.getConfiguration().getParentName());
}
}
recoverStoredConnectors();
recoverStoredBridges();
recoverStoredConnectors();
}
}

View File

@ -518,6 +518,7 @@ public class RedeployTest extends ActiveMQTestBase {
MessageProducer producer = session.createProducer(queue);
producer.send(session.createMessage());
Wait.assertEquals(1, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("a-to").getMessageCount());
Wait.assertEquals(3, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("a-from").getConsumerCount());
}
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
@ -543,6 +544,7 @@ public class RedeployTest extends ActiveMQTestBase {
producer.send(session.createMessage());
Wait.assertEquals(1, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("a-new").getMessageCount());
Wait.assertEquals(1, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("a-to").getMessageCount());
Wait.assertEquals(2, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("a-from").getConsumerCount());
}
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

View File

@ -39,6 +39,7 @@ under the License.
<bridge name="a">
<queue-name>a-from</queue-name>
<forwarding-address>a-to</forwarding-address>
<concurrency>3</concurrency>
<static-connectors>
<connector-ref>connector</connector-ref>
</static-connectors>