ARTEMIS-3685 support reloading bridges

This commit is contained in:
Justin Bertram 2022-02-14 11:49:11 -06:00
parent d5b6bfb0cd
commit c51fda09cd
No known key found for this signature in database
GPG Key ID: F41830B875BB8633
11 changed files with 355 additions and 119 deletions

View File

@ -119,6 +119,34 @@ public final class BridgeConfiguration implements Serializable {
public BridgeConfiguration() {
}
public BridgeConfiguration(BridgeConfiguration other) {
name = other.name;
queueName = other.queueName;
forwardingAddress = other.forwardingAddress;
filterString = other.filterString;
staticConnectors = other.staticConnectors;
discoveryGroupName = other.discoveryGroupName;
ha = other.ha;
transformerConfiguration = other.transformerConfiguration;
retryInterval = other.retryInterval;
retryIntervalMultiplier = other.retryIntervalMultiplier;
initialConnectAttempts = other.initialConnectAttempts;
reconnectAttempts = other.reconnectAttempts;
reconnectAttemptsOnSameNode = other.reconnectAttemptsOnSameNode;
useDuplicateDetection = other.useDuplicateDetection;
confirmationWindowSize = other.confirmationWindowSize;
producerWindowSize = other.producerWindowSize;
clientFailureCheckPeriod = other.clientFailureCheckPeriod;
user = other.user;
password = other.password;
connectionTTL = other.connectionTTL;
maxRetryInterval = other.maxRetryInterval;
minLargeMessageSize = other.minLargeMessageSize;
callTimeout = other.callTimeout;
routingType = other.routingType;
concurrency = other.concurrency;
}
public BridgeConfiguration(String name) {
setName(name);
}

View File

@ -216,11 +216,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.INFO)
@Message(id = 221029, value = "stopped bridge {0}", format = Message.Format.MESSAGE_FORMAT)
void bridgeStopped(SimpleString name);
void bridgeStopped(String name);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 221030, value = "paused bridge {0}", format = Message.Format.MESSAGE_FORMAT)
void bridgePaused(SimpleString name);
void bridgePaused(String name);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 221031, value = "backup announced", format = Message.Format.MESSAGE_FORMAT)
@ -843,16 +843,16 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222096, value = "Error on querying binding on bridge {0}. Retrying in 100 milliseconds", format = Message.Format.MESSAGE_FORMAT)
void errorQueryingBridge(@Cause Throwable t, SimpleString name);
void errorQueryingBridge(@Cause Throwable t, String name);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222097, value = "Address {0} does not have any bindings, retry #({1})",
format = Message.Format.MESSAGE_FORMAT)
void errorQueryingBridge(SimpleString address, Integer retryCount);
void errorQueryingBridge(String address, Integer retryCount);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222098, value = "Server is starting, retry to create the session for bridge {0}", format = Message.Format.MESSAGE_FORMAT)
void errorStartingBridge(SimpleString name);
void errorStartingBridge(String name);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222099, value = "Bridge {0} is unable to connect to destination. It will be disabled.", format = Message.Format.MESSAGE_FORMAT)
@ -865,7 +865,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222101, value = "Bridge {0} achieved {1} maxattempts={2} it will stop retrying to reconnect", format = Message.Format.MESSAGE_FORMAT)
void bridgeAbortStart(SimpleString name, Integer retryCount, Integer reconnectAttempts);
void bridgeAbortStart(String name, Integer retryCount, Integer reconnectAttempts);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222102, value = "Unexpected exception while trying to reconnect", format = Message.Format.MESSAGE_FORMAT)

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.cluster;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.Queue;
@ -55,4 +56,6 @@ public interface Bridge extends Consumer, ActiveMQComponent {
boolean isConnected();
BridgeMetrics getMetrics();
BridgeConfiguration getConfiguration();
}

View File

@ -40,7 +40,6 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.protocol.core.Channel;
@ -53,7 +52,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.cluster.ha.HAManager;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.BroadcastGroupImpl;
@ -401,8 +399,6 @@ public class ClusterManager implements ActiveMQComponent {
return false;
}
Transformer transformer = server.getServiceRegistry().getBridgeTransformer(config.getName(), config.getTransformerConfiguration());
Binding binding = postOffice.getBinding(new SimpleString(config.getQueueName()));
if (binding == null) {
@ -482,13 +478,7 @@ public class ClusterManager implements ActiveMQComponent {
for (int i = 0; i < config.getConcurrency(); i++) {
String name = config.getConcurrency() > 1 ? (config.getName() + "-" + i) : config.getName();
Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(),
config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(name), queue,
executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()),
SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer,
config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server,
config.getRoutingType());
Bridge bridge = new BridgeImpl(serverLocator, new BridgeConfiguration(config).setName(name), nodeManager.getUUID(), queue, executorFactory.getExecutor(), scheduledExecutor, server);
bridges.put(name, bridge);
managementService.registerBridge(bridge, config);
bridge.start();

View File

@ -50,10 +50,11 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -78,11 +79,8 @@ import org.jboss.logging.Logger;
public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener, ClientProducerFlowCallback {
private static final Logger logger = Logger.getLogger(BridgeImpl.class);
protected final ServerLocatorInternal serverLocator;
protected final Executor executor;
@ -95,36 +93,16 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private final long sequentialID;
private final SimpleString name;
protected final Queue queue;
private final Filter filter;
private final SimpleString forwardingAddress;
final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
private final Transformer transformer;
private final Object connectionGuard = new Object();
private final boolean useDuplicateDetection;
private final String user;
private final String password;
private final int reconnectAttempts;
private final int reconnectAttemptsSameNode;
private final long retryInterval;
private final double retryMultiplier;
private final long maxRetryInterval;
private boolean blockedOnFlowControl;
/**
@ -170,70 +148,35 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private final BridgeMetrics metrics = new BridgeMetrics();
private final ComponentConfigurationRoutingType routingType;
private BridgeConfiguration configuration;
public BridgeImpl(final ServerLocatorInternal serverLocator,
final int initialConnectAttempts,
final int reconnectAttempts,
final int reconnectAttemptsSameNode,
final long retryInterval,
final double retryMultiplier,
final long maxRetryInterval,
final BridgeConfiguration configuration,
final UUID nodeUUID,
final SimpleString name,
final Queue queue,
final Executor executor,
final Filter filter,
final SimpleString forwardingAddress,
final ScheduledExecutorService scheduledExecutor,
final Transformer transformer,
final boolean useDuplicateDetection,
final String user,
final String password,
final ActiveMQServer server,
final ComponentConfigurationRoutingType routingType) {
final ActiveMQServer server) throws ActiveMQException {
this.sequentialID = server.getStorageManager().generateID();
this.reconnectAttempts = reconnectAttempts;
this.reconnectAttemptsInUse = initialConnectAttempts;
this.reconnectAttemptsSameNode = reconnectAttemptsSameNode;
this.retryInterval = retryInterval;
this.retryMultiplier = retryMultiplier;
this.maxRetryInterval = maxRetryInterval;
this.configuration = configuration;
this.serverLocator = serverLocator;
this.nodeUUID = nodeUUID;
this.name = name;
this.queue = queue;
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.filter = filter;
this.transformer = server.getServiceRegistry().getBridgeTransformer(configuration.getName(), configuration.getTransformerConfiguration());
this.forwardingAddress = forwardingAddress;
this.transformer = transformer;
this.useDuplicateDetection = useDuplicateDetection;
this.user = user;
this.password = password;
this.filter = FilterImpl.createFilter(configuration.getFilterString());
this.server = server;
this.routingType = routingType;
}
/** For tests mainly */
@ -311,7 +254,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
if (notificationService != null) {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(new SimpleString("name"), name);
props.putSimpleStringProperty(new SimpleString("name"), SimpleString.toSimpleString(configuration.getName()));
Notification notification = new Notification(nodeUUID.toString(), CoreNotificationType.BRIDGE_STARTED, props);
notificationService.sendNotification(notification);
}
@ -424,7 +367,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
stopping = true;
if (logger.isDebugEnabled()) {
logger.debug("Bridge " + this.name + " being stopped");
logger.debug("Bridge " + this.configuration.getName() + " being stopped");
}
if (futureScheduledReconnection != null) {
@ -435,7 +378,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
if (notificationService != null) {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(new SimpleString("name"), name);
props.putSimpleStringProperty(new SimpleString("name"), SimpleString.toSimpleString(configuration.getName()));
Notification notification = new Notification(nodeUUID.toString(), CoreNotificationType.BRIDGE_STOPPED, props);
try {
notificationService.sendNotification(notification);
@ -448,14 +391,14 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
@Override
public void pause() throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Bridge " + this.name + " being paused");
logger.debug("Bridge " + this.configuration.getName() + " being paused");
}
executor.execute(new PauseRunnable());
if (notificationService != null) {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(new SimpleString("name"), name);
props.putSimpleStringProperty(new SimpleString("name"), SimpleString.toSimpleString(configuration.getName()));
Notification notification = new Notification(nodeUUID.toString(), CoreNotificationType.BRIDGE_STOPPED, props);
try {
notificationService.sendNotification(notification);
@ -482,7 +425,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
@Override
public SimpleString getName() {
return name;
return SimpleString.toSimpleString(configuration.getName());
}
@Override
@ -499,7 +442,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
@Override
public SimpleString getForwardingAddress() {
return forwardingAddress;
return SimpleString.toSimpleString(configuration.getForwardingAddress());
}
// For testing only
@ -561,7 +504,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
/** ClusterConnectionBridge already makes a copy of the message.
* So I needed I hook where the message is not copied. */
protected Message beforeForwardingNoCopy(Message message, SimpleString forwardingAddress) {
if (useDuplicateDetection) {
if (configuration.isUseDuplicateDetection()) {
// We keep our own DuplicateID for the Bridge, so bouncing back and forth will work fine
byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
@ -573,7 +516,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
message.setAddress(forwardingAddress);
}
switch (routingType) {
switch (configuration.getRoutingType()) {
case ANYCAST:
message.setRoutingType(RoutingType.ANYCAST);
break;
@ -645,8 +588,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
final SimpleString dest;
if (forwardingAddress != null) {
dest = forwardingAddress;
if (configuration.getForwardingAddress() != null) {
dest = SimpleString.toSimpleString(configuration.getForwardingAddress());
} else {
// Preserve the original address
dest = ref.getMessage().getAddressSimpleString();
@ -850,7 +793,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return this.getClass().getSimpleName() + "@" +
Integer.toHexString(System.identityHashCode(this)) +
" [name=" +
name +
configuration.getName() +
", queue=" +
queue +
" targetConnector=" +
@ -862,7 +805,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
public String toManagementString() {
return this.getClass().getSimpleName() +
" [name=" +
name +
configuration.getName() +
", queue=" +
queue.getName() + "/" + queue.getID() + "]";
}
@ -875,6 +818,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return transformer;
}
@Override
public BridgeConfiguration getConfiguration() {
return configuration;
}
protected void fail(final boolean permanently, boolean scaleDown) {
logger.debug(this + "\n\t::fail being called, permanently=" + permanently);
//we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly
@ -907,7 +855,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
disconnectedAndDown = false;
}
retryCount = 0;
reconnectAttemptsInUse = reconnectAttempts;
reconnectAttemptsInUse = configuration.getReconnectAttempts();
if (futureScheduledReconnection != null) {
futureScheduledReconnection.cancel(true);
futureScheduledReconnection = null;
@ -916,7 +864,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
/* Hook for creating session factory */
protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
if (targetNodeID != null && (this.reconnectAttemptsSameNode < 0 || retryCount <= this.reconnectAttemptsSameNode)) {
if (targetNodeID != null && (this.configuration.getReconnectAttemptsOnSameNode() < 0 || retryCount <= this.configuration.getReconnectAttemptsOnSameNode())) {
csf = reconnectOnOriginalNode();
} else {
serverLocator.resetToInitialConnectors();
@ -984,18 +932,18 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return;
}
// Session is pre-acknowledge
session = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
session = (ClientSessionInternal) csf.createSession(configuration.getUser(), configuration.getPassword(), false, true, true, true, 1);
session.getProducerCreditManager().setCallback(this);
sessionConsumer = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
sessionConsumer = (ClientSessionInternal) csf.createSession(configuration.getUser(), configuration.getPassword(), false, true, true, true, 1);
}
if (forwardingAddress != null) {
if (configuration.getForwardingAddress() != null) {
ClientSession.AddressQuery query = null;
try {
query = session.addressQuery(forwardingAddress);
query = session.addressQuery(SimpleString.toSimpleString(configuration.getForwardingAddress()));
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorQueryingBridge(e, name);
ActiveMQServerLogger.LOGGER.errorQueryingBridge(e, configuration.getName());
// This was an issue during startup, we will not count this retry
retryCount--;
@ -1004,7 +952,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
if (!query.isExists()) {
ActiveMQServerLogger.LOGGER.errorQueryingBridge(forwardingAddress, retryCount);
ActiveMQServerLogger.LOGGER.errorQueryingBridge(configuration.getForwardingAddress(), retryCount);
scheduleRetryConnect();
return;
}
@ -1035,12 +983,12 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} catch (ActiveMQException e) {
// the session was created while its server was starting, retry it:
if (e.getType() == ActiveMQExceptionType.SESSION_CREATION_REJECTED) {
ActiveMQServerLogger.LOGGER.errorStartingBridge(name);
ActiveMQServerLogger.LOGGER.errorStartingBridge(configuration.getName());
// We are not going to count this one as a retry
retryCount--;
scheduleRetryConnectFixedTimeout(this.retryInterval);
scheduleRetryConnectFixedTimeout(this.configuration.getRetryInterval());
return;
} else {
ActiveMQServerLogger.LOGGER.errorConnectingBridgeRetry(this);
@ -1078,17 +1026,17 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttemptsInUse) {
ActiveMQServerLogger.LOGGER.bridgeAbortStart(name, retryCount, reconnectAttempts);
ActiveMQServerLogger.LOGGER.bridgeAbortStart(configuration.getName(), retryCount, configuration.getReconnectAttempts());
fail(true, false);
return;
}
long timeout = (long) (this.retryInterval * Math.pow(this.retryMultiplier, retryCount));
long timeout = (long) (this.configuration.getRetryInterval() * Math.pow(this.configuration.getRetryIntervalMultiplier(), retryCount));
if (timeout == 0) {
timeout = this.retryInterval;
timeout = this.configuration.getRetryInterval();
}
if (timeout > maxRetryInterval) {
timeout = maxRetryInterval;
if (timeout > configuration.getMaxRetryInterval()) {
timeout = configuration.getMaxRetryInterval();
}
logger.debug("Bridge " + this +
@ -1135,7 +1083,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return;
if (logger.isDebugEnabled()) {
logger.debug("Scheduling retry for bridge " + this.name + " in " + milliseconds + " milliseconds");
logger.debug("Scheduling retry for bridge " + this.configuration.getName() + " in " + milliseconds + " milliseconds");
}
futureScheduledReconnection = scheduledExecutor.schedule(new FutureConnectRunnable(executor, this), milliseconds, TimeUnit.MILLISECONDS);
@ -1197,7 +1145,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
queue.removeConsumer(BridgeImpl.this);
synchronized (BridgeImpl.this) {
logger.debug("Closing Session for bridge " + BridgeImpl.this.name);
logger.debug("Closing Session for bridge " + BridgeImpl.this.configuration.getName());
started = false;
@ -1237,7 +1185,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
if (logger.isTraceEnabled()) {
logger.trace("Removing consumer on stopRunnable " + this + " from queue " + queue);
}
ActiveMQServerLogger.LOGGER.bridgeStopped(name);
ActiveMQServerLogger.LOGGER.bridgeStopped(configuration.getName());
}
}
@ -1259,7 +1207,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
internalCancelReferences();
ActiveMQServerLogger.LOGGER.bridgePaused(name);
ActiveMQServerLogger.LOGGER.bridgePaused(configuration.getName());
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorPausingBridge(e);
}

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.BindingType;
@ -118,9 +119,20 @@ public class ClusterConnectionBridge extends BridgeImpl {
final MessageFlowRecord flowRecord,
final TransportConfiguration connector,
final String storeAndForwardPrefix,
final StorageManager storageManager) {
super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same
retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server, ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType()));
final StorageManager storageManager) throws ActiveMQException {
super(targetLocator, new BridgeConfiguration()
.setInitialConnectAttempts(initialConnectAttempts)
.setReconnectAttempts(reconnectAttempts)
.setReconnectAttemptsOnSameNode(0) // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same
.setRetryInterval(retryInterval)
.setRetryIntervalMultiplier(retryMultiplier)
.setMaxRetryInterval(maxRetryInterval)
.setFilterString(filterString.toString())
.setForwardingAddress(forwardingAddress.toString())
.setUseDuplicateDetection(useDuplicateDetection)
.setUser(user)
.setPassword(password)
.setRoutingType(ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType())), nodeUUID, queue, executor, scheduledExecutor, server);
this.discoveryLocator = discoveryLocator;

View File

@ -143,7 +143,9 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.ServiceComponent;
import org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancerManager;
import org.apache.activemq.artemis.core.server.cluster.BackupManager;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
@ -172,7 +174,6 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugi
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancerManager;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
import org.apache.activemq.artemis.core.server.replay.ReplayManager;
@ -4347,6 +4348,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
configuration.setDivertConfigurations(config.getDivertConfigurations());
configuration.setAddressConfigurations(config.getAddressConfigurations());
configuration.setQueueConfigs(config.getQueueConfigs());
configuration.setBridgeConfigurations(config.getBridgeConfigurations());
configurationReloadDeployed.set(false);
if (isActive()) {
configuration.parseProperties(propertiesFileUrl);
@ -4415,6 +4417,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
undeployAddressesAndQueueNotInConfiguration(configuration);
deployAddressesFromConfiguration(configuration);
deployQueuesFromListQueueConfiguration(configuration.getQueueConfigs());
ActiveMQServerLogger.LOGGER.reloadingConfiguration("bridges");
for (BridgeConfiguration newBridgeConfig : configuration.getBridgeConfigurations()) {
Bridge existingBridge = clusterManager.getBridges().get(newBridgeConfig.getName());
if (existingBridge != null && !existingBridge.getConfiguration().equals(newBridgeConfig)) {
// this is an existing bridge but the config changed so stop the current bridge and deploy the new one
destroyBridge(existingBridge.getName().toString());
deployBridge(newBridgeConfig);
} else if (existingBridge == null) {
// this is a new bridge
deployBridge(newBridgeConfig);
}
}
for (final Bridge runningBridge: clusterManager.getBridges().values()) {
if (!configuration.getBridgeConfigurations().contains(runningBridge.getConfiguration())) {
// this bridge is running but it isn't in the new config which means it was removed so destroy it
destroyBridge(runningBridge.getName().toString());
}
}
}
}

View File

@ -10,7 +10,8 @@ be reloaded automatically:
- Address Settings
- Security Settings
- Diverts
- Addresses & queues
- Addresses & Queues
- Bridges
If using [modulised broker.xml](configuration-index.md#modularising-broker.xml) ensure you also read [Reloading modular configuration files](configuration-index.md#reloading-modular-configuration-files)

View File

@ -489,6 +489,83 @@ public class RedeployTest extends ActiveMQTestBase {
}
}
@Test
public void testRedeployBridge() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-bridge.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-bridge-updated.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = latch::countDown;
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
try {
latch.await(10, TimeUnit.SECONDS);
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue("a-from");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createMessage());
Wait.assertEquals(1, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("a-to").getMessageCount());
}
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue("b-from");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createMessage());
Wait.assertEquals(1, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("b-to").getMessageCount());
}
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue("a-from");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createMessage());
Wait.assertEquals(1, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("a-new").getMessageCount());
Wait.assertEquals(1, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("a-to").getMessageCount());
}
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue("b-from");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createMessage());
assertFalse(Wait.waitFor(() -> embeddedActiveMQ.getActiveMQServer().locateQueue("b-to").getMessageCount() == 2, 2000, 100));
}
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue("c-from");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createMessage());
Wait.assertEquals(1, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("c-to").getMessageCount());
}
} finally {
embeddedActiveMQ.stop();
}
}
private void deployBrokerConfig(EmbeddedActiveMQ server, URL configFile) throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");

View File

@ -0,0 +1,78 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<security-enabled>false</security-enabled>
<persistence-enabled>false</persistence-enabled>
<configuration-file-refresh-period>100</configuration-file-refresh-period>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
</acceptors>
<connectors>
<connector name="connector">tcp://127.0.0.1:61616</connector>
</connectors>
<bridges>
<bridge name="a">
<queue-name>a-from</queue-name>
<forwarding-address>a-new</forwarding-address>
<static-connectors>
<connector-ref>connector</connector-ref>
</static-connectors>
</bridge>
<bridge name="c">
<queue-name>c-from</queue-name>
<forwarding-address>c-to</forwarding-address>
<static-connectors>
<connector-ref>connector</connector-ref>
</static-connectors>
</bridge>
</bridges>
<addresses>
<address name="a-new">
<anycast>
<queue name="a-new"/>
</anycast>
</address>
<address name="a-from">
<anycast>
<queue name="a-from"/>
</anycast>
</address>
<address name="c-to">
<anycast>
<queue name="c-to"/>
</anycast>
</address>
<address name="c-from">
<anycast>
<queue name="c-from"/>
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,78 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<security-enabled>false</security-enabled>
<persistence-enabled>false</persistence-enabled>
<configuration-file-refresh-period>100</configuration-file-refresh-period>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
</acceptors>
<connectors>
<connector name="connector">tcp://127.0.0.1:61616</connector>
</connectors>
<bridges>
<bridge name="a">
<queue-name>a-from</queue-name>
<forwarding-address>a-to</forwarding-address>
<static-connectors>
<connector-ref>connector</connector-ref>
</static-connectors>
</bridge>
<bridge name="b">
<queue-name>b-from</queue-name>
<forwarding-address>b-to</forwarding-address>
<static-connectors>
<connector-ref>connector</connector-ref>
</static-connectors>
</bridge>
</bridges>
<addresses>
<address name="a-to">
<anycast>
<queue name="a-to"/>
</anycast>
</address>
<address name="a-from">
<anycast>
<queue name="a-from"/>
</anycast>
</address>
<address name="b-to">
<anycast>
<queue name="b-to"/>
</anycast>
</address>
<address name="b-from">
<anycast>
<queue name="b-from"/>
</anycast>
</address>
</addresses>
</core>
</configuration>