From 19d8059a4eca5b7296ec9711a439ea3a129242df Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 25 Jun 2024 22:52:19 -0500 Subject: [PATCH] ARTEMIS-4794 configure pending ack behavior for bridge When a bridge is stopped it doesn't wait for pending send acknowledgements to arrive. However, when a bridge is paused it does wait. The behavior should be consistent and more importantly configurable. This commit implements these improvements and generally refactors BridgeImpl to clarify and simplify the code. In total, this commit includes the follow changes: - Removes the hard-coded 60-second timeout for pending acks when pausing the bridge and adds a new config parameter (i.e. "pending-ack-timeout"). - Applies the new pending-ack-timeout when the bridge is stopped. - Updates existing and adds new logging messages for clarity. - De-duplicates code for sending bridge-related notifications. - Avoids converting bridge name to/from SimpleString. - Removes unnecessary comments. - Renames variables & functions for clarity. - Replaces the `started`, `stopping`, & `active` booleans with a single `state` variable which is an enum. - Adds `final` to a few variables that were functionally final. - Synchronizes `stop` & `pause` methods to add safety when invoked concurrently with `handle` (since both deal with `state` and execute runnables on the ordered executor). - Reorganizes and removes a few methods for clarity. - Relocates `connect` method directly into `ConnectRunnable` (mirroring the structure of the `StopRunnable` and `PauseRunnable`). - Eliminates unnecessary variables in `ConnectRunnable` and `ScheduledConnectRunnable`. - Adds test to verify pending ack timeout works as expected with both `stop` & `pause` with both regular and large messages. --- .../config/ActiveMQDefaultConfiguration.java | 7 + .../core/config/BridgeConfiguration.java | 30 + .../impl/FileConfigurationParser.java | 5 +- .../core/server/ActiveMQMessageBundle.java | 4 + .../core/server/ActiveMQServerLogger.java | 24 +- .../core/server/cluster/ClusterManager.java | 10 +- .../core/server/cluster/impl/BridgeImpl.java | 562 +++++++++--------- .../impl/ManagementServiceImpl.java | 2 +- .../schema/artemis-configuration.xsd | 8 + .../core/config/BridgeConfigurationTest.java | 3 + .../config/impl/FileConfigurationTest.java | 2 +- .../ConfigurationTest-full-config.xml | 1 + .../ConfigurationTest-xinclude-config.xml | 1 + ...ionTest-xinclude-schema-config-bridges.xml | 1 + .../cluster/bridge/BridgeTest.java | 213 +++++++ .../BridgeConfigurationStorageTest.java | 2 + 16 files changed, 561 insertions(+), 314 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 3b580f40b6..2813154014 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -664,6 +664,9 @@ public final class ActiveMQDefaultConfiguration { // Number of concurrent workers for a core bridge public static int DEFAULT_BRIDGE_CONCURRENCY = 1; + // How long to wait for acknowledgements to arrive from the bridge's target while stopping or pausing the bridge + public static long DEFAULT_BRIDGE_PENDING_ACK_TIMEOUT = 60000; + // Whether or not to report Netty pool metrics private static final boolean DEFAULT_NETTY_POOL_METRICS = false; @@ -1860,6 +1863,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_BRIDGE_CONCURRENCY; } + public static long getDefaultBridgePendingAckTimeout() { + return DEFAULT_BRIDGE_PENDING_ACK_TIMEOUT; + } + /** * Whether or not to report Netty pool metrics */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java index 7cfb861d70..f4e3a4e0af 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java @@ -66,6 +66,7 @@ public final class BridgeConfiguration implements Serializable { public static String ROUTING_TYPE = "routing-type"; public static String CONCURRENCY = "concurrency"; public static String CONFIGURATION_MANAGED = "configuration-managed"; + public static String PENDING_ACK_TIMEOUT = "pending-ack-timeout"; private String name = null; @@ -120,6 +121,8 @@ public final class BridgeConfiguration implements Serializable { private int concurrency = ActiveMQDefaultConfiguration.getDefaultBridgeConcurrency(); + private long pendingAckTimeout = ActiveMQDefaultConfiguration.getDefaultBridgePendingAckTimeout(); + private String parentName = null; private boolean configurationManaged = true; @@ -155,6 +158,7 @@ public final class BridgeConfiguration implements Serializable { routingType = other.routingType; concurrency = other.concurrency; configurationManaged = other.configurationManaged; + pendingAckTimeout = other.pendingAckTimeout; } public BridgeConfiguration(String name) { @@ -261,6 +265,8 @@ public final class BridgeConfiguration implements Serializable { setRoutingType(ComponentConfigurationRoutingType.valueOf(value)); } else if (key.equals(CONCURRENCY)) { setConcurrency(Integer.parseInt(value)); + } else if (key.equals(PENDING_ACK_TIMEOUT)) { + setPendingAckTimeout(Long.parseLong(value)); } } return this; @@ -570,6 +576,21 @@ public final class BridgeConfiguration implements Serializable { return this; } + /** + * @return the bridge pending ack timeout + */ + public long getPendingAckTimeout() { + return pendingAckTimeout; + } + + /** + * @param pendingAckTimeout the bridge pending ack timeout to set + */ + public BridgeConfiguration setPendingAckTimeout(long pendingAckTimeout) { + this.pendingAckTimeout = pendingAckTimeout; + return this; + } + /** * At this point this is only changed on testcases * The bridge shouldn't be sending blocking anyways @@ -631,6 +652,7 @@ public final class BridgeConfiguration implements Serializable { builder.add(CALL_TIMEOUT, getCallTimeout()); builder.add(CONCURRENCY, getConcurrency()); builder.add(CONFIGURATION_MANAGED, isConfigurationManaged()); + builder.add(PENDING_ACK_TIMEOUT, getPendingAckTimeout()); // complex fields (only serialize if value is not null) @@ -725,6 +747,7 @@ public final class BridgeConfiguration implements Serializable { result = prime * result + (useDuplicateDetection ? 1231 : 1237); result = prime * result + ((user == null) ? 0 : user.hashCode()); result = prime * result + concurrency; + result = prime * result + (int) (pendingAckTimeout ^ (pendingAckTimeout >>> 32)); result = prime * result + (configurationManaged ? 1231 : 1237); return result; } @@ -811,6 +834,8 @@ public final class BridgeConfiguration implements Serializable { return false; if (concurrency != other.concurrency) return false; + if (pendingAckTimeout != other.pendingAckTimeout) + return false; if (configurationManaged != other.configurationManaged) return false; return true; @@ -857,6 +882,7 @@ public final class BridgeConfiguration implements Serializable { BufferHelper.sizeOfNullableInteger(minLargeMessageSize) + BufferHelper.sizeOfNullableLong(callTimeout) + BufferHelper.sizeOfNullableInteger(concurrency) + + BufferHelper.sizeOfNullableLong(pendingAckTimeout) + BufferHelper.sizeOfNullableBoolean(configurationManaged) + DataConstants.SIZE_BYTE + transformerSize + @@ -909,6 +935,7 @@ public final class BridgeConfiguration implements Serializable { } else { buffer.writeInt(0); } + buffer.writeNullableLong(pendingAckTimeout); } public void decode(ActiveMQBuffer buffer) { @@ -952,6 +979,9 @@ public final class BridgeConfiguration implements Serializable { staticConnectors.add(buffer.readNullableString()); } } + if (buffer.readable()) { + pendingAckTimeout = buffer.readNullableLong(); + } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index c423be3e8a..e12dc13fce 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -2475,6 +2475,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { int concurrency = getInteger(brNode, "concurrency", ActiveMQDefaultConfiguration.getDefaultBridgeConcurrency(), GT_ZERO); + long pendingAckTimeout = getLong(brNode, "pending-ack-timeout", ActiveMQDefaultConfiguration.getDefaultBridgePendingAckTimeout(), GT_ZERO); + NodeList clusterPassNodes = brNode.getElementsByTagName("password"); String password = null; @@ -2541,7 +2543,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { .setUser(user) .setPassword(password) .setRoutingType(routingType) - .setConcurrency(concurrency); + .setConcurrency(concurrency) + .setPendingAckTimeout(pendingAckTimeout); if (!staticConnectorNames.isEmpty()) { config.setStaticConnectors(staticConnectorNames); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 2cbeb11a76..0064bac1d1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage; import org.apache.activemq.artemis.core.security.CheckType; +import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl; import org.apache.activemq.artemis.logs.annotation.LogBundle; import org.apache.activemq.artemis.logs.annotation.Message; import org.apache.activemq.artemis.logs.BundleFactory; @@ -555,4 +556,7 @@ public interface ActiveMQMessageBundle { @Message(id = 229254, value = "Already replicating, started={}") ActiveMQIllegalStateException alreadyReplicating(boolean status); + @Message(id = 229255, value = "Bridge {} cannot be {}. Current state: {}") + ActiveMQIllegalStateException bridgeOperationCannotBeExecuted(String bridgeName, String failedOp, BridgeImpl.State currentState); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index b9fba03ffe..a29ad5f8df 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -30,6 +30,7 @@ import io.netty.channel.Channel; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.io.IOCallback; @@ -152,13 +153,13 @@ public interface ActiveMQServerLogger { @LogMessage(id = 221027, value = "Bridge {} is connected", level = LogMessage.Level.INFO) void bridgeConnected(BridgeImpl name); - @LogMessage(id = 221028, value = "Bridge is stopping, will not retry", level = LogMessage.Level.INFO) - void bridgeStopping(); + @LogMessage(id = 221028, value = "Bridge is {}, will not retry", level = LogMessage.Level.INFO) + void bridgeWillNotRetry(String operation); - @LogMessage(id = 221029, value = "stopped bridge {}", level = LogMessage.Level.INFO) + @LogMessage(id = 221029, value = "Stopped bridge {}", level = LogMessage.Level.INFO) void bridgeStopped(String name); - @LogMessage(id = 221030, value = "paused bridge {}", level = LogMessage.Level.INFO) + @LogMessage(id = 221030, value = "Paused bridge {}", level = LogMessage.Level.INFO) void bridgePaused(String name); @LogMessage(id = 221031, value = "backup announced", level = LogMessage.Level.INFO) @@ -197,8 +198,8 @@ public interface ActiveMQServerLogger { @LogMessage(id = 221041, value = "Cannot find queue {} while reloading PAGE_CURSOR_COMPLETE, deleting record now", level = LogMessage.Level.INFO) void cantFindQueueOnPageComplete(long queueID); - @LogMessage(id = 221042, value = "Bridge {} timed out waiting for the completion of {} messages, we will just shutdown the bridge after 10 seconds wait", level = LogMessage.Level.INFO) - void timedOutWaitingCompletions(String bridgeName, long numberOfMessages); + @LogMessage(id = 221042, value = "{} bridge {} timed out waiting for the send acknowledgement of {} messages. Messages may be duplicated between the bridge's source and the target.", level = LogMessage.Level.INFO) + void timedOutWaitingForSendAcks(String operation, String bridgeName, long numberOfMessages); @LogMessage(id = 221043, value = "Protocol module found: [{}]. Adding protocol support for: {}", level = LogMessage.Level.INFO) void addingProtocolSupport(String moduleName, String protocolKey); @@ -789,8 +790,8 @@ public interface ActiveMQServerLogger { @LogMessage(id = 222159, value = "unable to send notification when broadcast group is stopped", level = LogMessage.Level.WARN) void broadcastBridgeStoppedError(Exception e); - @LogMessage(id = 222160, value = "unable to send notification when broadcast group is stopped", level = LogMessage.Level.WARN) - void notificationBridgeStoppedError(Exception e); + @LogMessage(id = 222160, value = "unable to send notification for bridge {}: {}", level = LogMessage.Level.WARN) + void notificationBridgeError(String bridge, CoreNotificationType type, Exception e); @LogMessage(id = 222161, value = "Group Handler timed-out waiting for sendCondition", level = LogMessage.Level.WARN) void groupHandlerSendTimeout(); @@ -1302,8 +1303,8 @@ public interface ActiveMQServerLogger { @LogMessage(id = 224030, value = "Could not cancel reference {}", level = LogMessage.Level.ERROR) void errorCancellingRefOnBridge(MessageReference ref2, Exception e); - @LogMessage(id = 224032, value = "Failed to pause bridge", level = LogMessage.Level.ERROR) - void errorPausingBridge(Exception e); + @LogMessage(id = 224032, value = "Failed to pause bridge: {}", level = LogMessage.Level.ERROR) + void errorPausingBridge(String bridgeName, Exception e); @LogMessage(id = 224033, value = "Failed to broadcast connector configs", level = LogMessage.Level.ERROR) void errorBroadcastingConnectorConfigs(Exception e); @@ -1617,4 +1618,7 @@ public interface ActiveMQServerLogger { @LogMessage(id = 224138, value = "Error Registering DuplicateCacheSize on namespace {}", level = LogMessage.Level.WARN) void errorRegisteringDuplicateCacheSize(String address, Exception reason); + + @LogMessage(id = 224139, value = "Failed to stop bridge: {}", level = LogMessage.Level.ERROR) + void errorStoppingBridge(String bridgeName, Exception e); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java index 68bf794c65..4cabb0abb4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java @@ -324,7 +324,7 @@ public class ClusterManager implements ActiveMQComponent { for (Bridge bridge : bridges.values()) { bridge.stop(); - managementService.unregisterBridge(bridge.getName().toString()); + managementService.unregisterBridge(bridge.getConfiguration().getName()); } bridges.clear(); @@ -532,17 +532,17 @@ public class ClusterManager implements ActiveMQComponent { synchronized (this) { for (Bridge bridge : bridges.values()) { - if (bridge.getName().toString().matches(name + "|" + name + "-\\d+")) { - bridge = bridges.get(bridge.getName().toString()); + if (bridge.getConfiguration().getName().matches(name + "|" + name + "-\\d+")) { + bridge = bridges.get(bridge.getConfiguration().getName()); if (bridge != null) { bridgesToRemove.add(bridge); } } } for (Bridge bridgeToRemove : bridgesToRemove) { - bridges.remove(bridgeToRemove.getName().toString()); + bridges.remove(bridgeToRemove.getConfiguration().getName()); bridgeToRemove.stop(); - managementService.unregisterBridge(bridgeToRemove.getName().toString()); + managementService.unregisterBridge(bridgeToRemove.getConfiguration().getName()); } } for (Bridge bridge : bridgesToRemove) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index f63944a141..71b09e4310 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server.cluster.impl; +import java.lang.invoke.MethodHandles; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -28,6 +29,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; +import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; @@ -47,13 +49,13 @@ import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits; import org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback; -import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.HandleStatus; @@ -74,15 +76,13 @@ import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; -import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException; - -/** - * A Core BridgeImpl - */ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener, ClientProducerFlowCallback { + public enum State { + STARTING, STARTED, PAUSING, PAUSED, STOPPING, STOPPED + } + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); protected final ServerLocatorInternal serverLocator; @@ -109,17 +109,13 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private boolean blockedOnFlowControl; - /** - * Used when there's a scheduled reconnection - */ - protected ScheduledFuture futureScheduledReconnection; + protected ScheduledFuture scheduledReconnection; protected volatile ClientSessionInternal session; // on cases where sub-classes need a consumer protected volatile ClientSessionInternal sessionConsumer; - // this will happen if a disconnect happened // upon reconnection we need to send the nodeUP back into the topology protected volatile boolean disconnectedAndDown = false; @@ -132,11 +128,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private volatile ClientProducer producer; - private volatile boolean started; - - private volatile boolean stopping = false; - - private volatile boolean active; + private volatile State state = State.STOPPED; private boolean deliveringLargeMessage; @@ -148,11 +140,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private boolean keepConnecting = true; - private ActiveMQServer server; + private final ActiveMQServer server; private final BridgeMetrics metrics = new BridgeMetrics(); - private BridgeConfiguration configuration; + private final BridgeConfiguration configuration; public BridgeImpl(final ServerLocatorInternal serverLocator, final BridgeConfiguration configuration, @@ -185,11 +177,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled this.server = server; } - /** For tests mainly */ - public boolean isBlockedOnFlowControl() { - return blockedOnFlowControl; - } - public static final byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID) { byte[] bytes = new byte[24]; @@ -202,6 +189,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled return bytes; } + // for tests + public boolean isBlockedOnFlowControl() { + return blockedOnFlowControl; + } + // for tests public ClientSessionFactory getSessionFactory() { return csf; @@ -230,7 +222,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled @Override public void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits) { if (logger.isTraceEnabled()) { - logger.trace("Bridge {} received credits, with blocked = {}", this.getName(), blocked); + logger.trace("Bridge {} received credits, with blocked = {}", configuration.getName(), blocked); } this.blockedOnFlowControl = blocked; if (!blocked) { @@ -240,7 +232,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled @Override public void onCreditsFail(ClientProducerCredits producerCredits) { - ActiveMQServerLogger.LOGGER.bridgeAddressFull(String.valueOf(producerCredits.getAddress()), String.valueOf(this.getName())); + ActiveMQServerLogger.LOGGER.bridgeAddressFull(String.valueOf(producerCredits.getAddress()), configuration.getName()); disconnect(); } @@ -251,22 +243,23 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled @Override public synchronized void start() throws Exception { - if (started) { - return; + State localState = this.state; + if (localState == State.STARTING || localState == State.STARTED || localState == State.STOPPING || localState == State.PAUSING) { + logger.debug("Bridge {} state is {}. Ignoring call to start.", configuration.getName(), localState); + if (localState == State.STOPPING || localState == State.PAUSING) { + throw ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(), "started", localState); + } else { + return; + } } - started = true; + state = State.STARTING; - stopping = false; + logger.debug("Bridge {} is starting", configuration.getName()); - activate(); + executor.execute(new ConnectRunnable()); - if (notificationService != null) { - TypedProperties props = new TypedProperties(); - props.putSimpleStringProperty(SimpleString.of("name"), SimpleString.of(configuration.getName())); - Notification notification = new Notification(nodeUUID.toString(), CoreNotificationType.BRIDGE_STARTED, props); - notificationService.sendNotification(notification); - } + sendNotification(CoreNotificationType.BRIDGE_STARTED); } @Override @@ -275,10 +268,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } private void cancelRefs() { - LinkedList list = new LinkedList<>(); + LinkedList list; synchronized (refs) { - list.addAll(refs.values()); + list = new LinkedList<>(refs.values()); refs.clear(); } @@ -363,49 +356,45 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } @Override - public void stop() throws Exception { - if (stopping) { - return; + public synchronized void stop() throws Exception { + State localState = state; + if (localState == State.STOPPING || localState == State.STOPPED || localState == State.PAUSING) { + logger.debug("Bridge {} state is {}. Ignoring call to stop.", configuration.getName(), localState); + if (localState == State.PAUSING) { + throw ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(), "stopped", localState); + } else { + return; + } } - stopping = true; + state = State.STOPPING; - logger.debug("Bridge {} being stopped", configuration.getName()); + logger.debug("Bridge {} is stopping", configuration.getName()); - if (futureScheduledReconnection != null) { - futureScheduledReconnection.cancel(true); + if (scheduledReconnection != null) { + scheduledReconnection.cancel(true); } executor.execute(new StopRunnable()); - - if (notificationService != null) { - TypedProperties props = new TypedProperties(); - props.putSimpleStringProperty(SimpleString.of("name"), SimpleString.of(configuration.getName())); - Notification notification = new Notification(nodeUUID.toString(), CoreNotificationType.BRIDGE_STOPPED, props); - try { - notificationService.sendNotification(notification); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.broadcastBridgeStoppedError(e); - } - } } @Override - public void pause() throws Exception { - logger.debug("Bridge {} being paused", configuration.getName()); - - executor.execute(new PauseRunnable()); - - if (notificationService != null) { - TypedProperties props = new TypedProperties(); - props.putSimpleStringProperty(SimpleString.of("name"), SimpleString.of(configuration.getName())); - Notification notification = new Notification(nodeUUID.toString(), CoreNotificationType.BRIDGE_STOPPED, props); - try { - notificationService.sendNotification(notification); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.notificationBridgeStoppedError(e); + public synchronized void pause() throws Exception { + State localState = state; + if (localState == State.STOPPING || localState == State.STOPPED || localState == State.PAUSING || localState == State.PAUSED) { + logger.debug("Bridge {} state is {}. Ignoring call to pause.", configuration.getName(), localState); + if (localState == State.STOPPING || localState == State.STOPPED) { + throw ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(), "paused", localState); + } else { + return; } } + + state = State.PAUSING; + + logger.info("Bridge {} is pausing", configuration.getName()); + + executor.execute(new PauseRunnable()); } @Override @@ -416,11 +405,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled @Override public boolean isStarted() { - return started; - } - - public synchronized void activate() { - executor.execute(new ConnectRunnable(this)); + return state == State.STARTING || state == State.STARTED; } @Override @@ -438,14 +423,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled return filter; } - // SendAcknowledgementHandler implementation --------------------- - @Override public SimpleString getForwardingAddress() { return SimpleString.of(configuration.getForwardingAddress()); } - // For testing only @Override public RemotingConnection getForwardingConnection() { if (session == null) { @@ -465,9 +447,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled @Override public void sendAcknowledged(final Message message) { - logger.trace("BridgeImpl::sendAcknowledged received confirmation for message {}", message); + logger.debug("Bridge {} received confirmation for message {}", configuration.getName(), message); - if (active) { + State localState = state; + if (localState == State.STARTED || localState == State.STOPPING || localState == State.PAUSING) { try { final MessageReference ref; @@ -493,6 +476,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } catch (Exception e) { ActiveMQServerLogger.LOGGER.bridgeFailedToAck(e); } + } else { + logger.debug("Bridge {} state is {}. Ignoring call to sendAcknowledged.", configuration.getName(), localState); } } @@ -562,7 +547,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled queue.deliverAsync(); } - @Override public HandleStatus handle(final MessageReference ref) throws Exception { if (RefCountMessage.isRefTraceEnabled() && ref.getMessage() instanceof RefCountMessage) { @@ -574,9 +558,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } synchronized (this) { - if (!active || !session.isWritable(this)) { + if (state != State.STARTED || !session.isWritable(this)) { if (logger.isDebugEnabled()) { - logger.debug("{}::Ignoring reference on bridge as it is set to inactive ref {}, active = {}", this, ref, active); + logger.debug("{}::Ignoring reference on bridge as it is set to inactive ref {}, active = false", this, ref); } return HandleStatus.BUSY; } @@ -591,7 +575,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled return HandleStatus.BUSY; } - logger.trace("Bridge {} is handling reference {} ", ref); + logger.trace("Bridge {} is handling reference {} ", configuration.getName(), ref); ref.handled(); @@ -620,7 +604,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled final HandleStatus status; if (message.isLargeMessage()) { deliveringLargeMessage = true; - deliverLargeMessage(dest, ref, (LargeServerMessage) message, ref.getMessage()); + deliverLargeMessage(dest, ref, (LargeServerMessage) message); status = HandleStatus.HANDLED; } else { status = deliverStandardMessage(dest, ref, message, ref.getMessage()); @@ -676,12 +660,12 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } cleanUpSessionFactory(csf); - } catch (Throwable dontCare) { + } catch (Throwable ignored) { } try { session.cleanUp(false); - } catch (Throwable dontCare) { + } catch (Throwable ignored) { } if (scaleDownTargetNodeID != null && !scaleDownTargetNodeID.equals(nodeUUID.toString())) { @@ -691,8 +675,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled logger.debug("Received scaleDownTargetNodeID: {}; cancelling reconnect.", scaleDownTargetNodeID); fail(true, true); } else { - logger.debug("Received invalid scaleDownTargetNodeID: {}", scaleDownTargetNodeID); - + logger.debug("Received null scaleDownTargetNodeID"); fail(me.getType() == ActiveMQExceptionType.DISCONNECTED, false); } @@ -726,9 +709,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private void deliverLargeMessage(final SimpleString dest, final MessageReference ref, - final LargeServerMessage message, - final Message originalMessage) { + final LargeServerMessage message) { executor.execute(() -> { + logger.trace("going to send large message: {} from {}", message, queue); + try { producer.send(dest, message.toMessage()); @@ -749,11 +733,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled }); } - /** - * @param ref - * @param message - * @return - */ private HandleStatus deliverStandardMessage(SimpleString dest, final MessageReference ref, Message message, Message originalMessage) { // if we failover during send then there is a chance that the // that this will throw a disconnect, we need to remove the message @@ -788,8 +767,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled /** * for use in tests mainly - * - * @return */ public TopologyMember getTargetNodeFromTopology() { return this.targetNode; @@ -822,10 +799,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled queue.getName() + "/" + queue.getID() + "]"; } - public ClientSessionFactoryImpl getCSF() { - return (ClientSessionFactoryImpl) csf; - } - public Transformer getTransformer() { return transformer; } @@ -835,6 +808,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled return configuration; } + public State getState() { + return state; + } + protected void fail(final boolean permanently, boolean scaleDown) { logger.debug("{}\n\t::fail being called, permanently={}", this, permanently); //we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly @@ -867,9 +844,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } retryCount = 0; reconnectAttemptsInUse = configuration.getReconnectAttempts(); - if (futureScheduledReconnection != null) { - futureScheduledReconnection.cancel(true); - futureScheduledReconnection = null; + if (scheduledReconnection != null) { + scheduledReconnection.cancel(true); + scheduledReconnection = null; } } @@ -919,121 +896,14 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled csf = sfi; } - /* This is called only when the bridge is activated */ - protected void connect() { - if (stopping) - return; - - synchronized (connectionGuard) { - if (!keepConnecting) { - return; - } - - if (logger.isDebugEnabled()) { - logger.debug("Connecting {} to its destination [{}], csf={}", this, nodeUUID, csf); - } - retryCount++; - - try { - if (csf == null || csf.isClosed()) { - if (stopping) - return; - csf = createSessionFactory(); - if (csf == null) { - // Retrying. This probably means the node is not available (for the cluster connection case) - scheduleRetryConnect(); - return; - } - // Session is pre-acknowledge - session = (ClientSessionInternal) csf.createSession(configuration.getUser(), configuration.getPassword(), false, true, true, true, 1); - session.getProducerCreditManager().setCallback(this); - sessionConsumer = (ClientSessionInternal) csf.createSession(configuration.getUser(), configuration.getPassword(), false, true, true, true, 1); - } - - if (configuration.getForwardingAddress() != null) { - ClientSession.AddressQuery query = null; - - try { - query = session.addressQuery(SimpleString.of(configuration.getForwardingAddress())); - } catch (Throwable e) { - ActiveMQServerLogger.LOGGER.errorQueryingBridge(configuration.getName(), e); - // This was an issue during startup, we will not count this retry - retryCount--; - - scheduleRetryConnectFixedTimeout(100); - return; - } - - if (!query.isExists()) { - ActiveMQServerLogger.LOGGER.errorQueryingBridge(configuration.getForwardingAddress(), retryCount); - scheduleRetryConnect(); - return; - } - } - - // need to reset blockedOnFlowControl after creating a new producer - // otherwise in case the bridge was blocked before a previous failure - // this would never resume - blockedOnFlowControl = false; - producer = session.createProducer(); - session.addFailureListener(BridgeImpl.this); - - session.setSendAcknowledgementHandler(BridgeImpl.this); - - afterConnect(); - - active = true; - - queue.addConsumer(BridgeImpl.this); - queue.deliverAsync(); - - ActiveMQServerLogger.LOGGER.bridgeConnected(this); - - serverLocator.addClusterTopologyListener(new TopologyListener()); - - keepConnecting = false; - return; - } catch (ActiveMQException e) { - // the session was created while its server was starting, retry it: - if (e.getType() == ActiveMQExceptionType.SESSION_CREATION_REJECTED) { - ActiveMQServerLogger.LOGGER.errorStartingBridge(configuration.getName()); - - // We are not going to count this one as a retry - retryCount--; - - scheduleRetryConnectFixedTimeout(this.configuration.getRetryInterval()); - return; - } else { - ActiveMQServerLogger.LOGGER.errorConnectingBridgeRetry(this); - logger.debug("Underlying bridge connection failure", e); - - scheduleRetryConnect(); - } - } catch (ActiveMQInterruptedException | InterruptedException e) { - ActiveMQServerLogger.LOGGER.errorConnectingBridge(this, e); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorConnectingBridge(this, e); - if (csf != null) { - try { - csf.close(); - csf = null; - } catch (Throwable ignored) { - } - } - fail(false, false); - scheduleRetryConnect(); - } - } - } - protected void scheduleRetryConnect() { if (serverLocator.isClosed()) { ActiveMQServerLogger.LOGGER.bridgeLocatorShutdown(); return; } - if (stopping) { - ActiveMQServerLogger.LOGGER.bridgeStopping(); + if (state == State.STOPPING || state == State.PAUSING) { + ActiveMQServerLogger.LOGGER.bridgeWillNotRetry(state == State.STOPPING ? "stopping" : "pausing"); return; } @@ -1076,25 +946,23 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } } - } - - protected void scheduleRetryConnectFixedTimeout(final long milliseconds) { try { cleanUpSessionFactory(csf); } catch (Throwable ignored) { } - if (stopping) + if (state == State.STOPPING || state == State.STOPPED || state == State.PAUSING || state == State.PAUSED) { return; + } if (logger.isDebugEnabled()) { logger.debug("Scheduling retry for bridge {} in {} milliseconds", configuration.getName(), milliseconds); } - futureScheduledReconnection = scheduledExecutor.schedule(new FutureConnectRunnable(executor, this), milliseconds, TimeUnit.MILLISECONDS); + scheduledReconnection = scheduledExecutor.schedule(new ScheduledConnectRunnable(), milliseconds, TimeUnit.MILLISECONDS); } private void internalCancelReferences() { @@ -1105,43 +973,137 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } - /** - * just set deliveringLargeMessage to false - */ private synchronized void unsetLargeMessageDelivery() { deliveringLargeMessage = false; } - // The scheduling will still use the main executor here - private static class FutureConnectRunnable implements Runnable { - - private final BridgeImpl bridge; - - private final Executor executor; - - private FutureConnectRunnable(Executor exe, BridgeImpl bridge) { - executor = exe; - this.bridge = bridge; - } + private class ScheduledConnectRunnable implements Runnable { @Override public void run() { - if (bridge.isStarted()) - executor.execute(new ConnectRunnable(bridge)); + if (isStarted()) { + // The scheduling will still use the main executor here + executor.execute(new ConnectRunnable()); + } } } - private static final class ConnectRunnable implements Runnable { - - private final BridgeImpl bridge; - - private ConnectRunnable(BridgeImpl bridge2) { - bridge = bridge2; - } + private class ConnectRunnable implements Runnable { @Override public void run() { - bridge.connect(); + if (state == State.STOPPING || state == State.PAUSING) { + logger.debug("Bridge {} state is {}. Ignoring call to connect.", configuration.getName(), state); + return; + } + + synchronized (connectionGuard) { + if (!keepConnecting) { + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Connecting {} to its destination [{}], csf={}", this, nodeUUID, csf); + } + retryCount++; + + try { + if (csf == null || csf.isClosed()) { + if (state == State.STOPPING || state == State.PAUSING) + return; + csf = createSessionFactory(); + if (csf == null) { + // Retrying. This probably means the node is not available (for the cluster connection case) + scheduleRetryConnect(); + return; + } + // Session is pre-acknowledge + session = (ClientSessionInternal) csf.createSession(configuration.getUser(), configuration.getPassword(), false, true, true, true, 1); + session.getProducerCreditManager().setCallback(BridgeImpl.this); + sessionConsumer = (ClientSessionInternal) csf.createSession(configuration.getUser(), configuration.getPassword(), false, true, true, true, 1); + } + + if (configuration.getForwardingAddress() != null) { + ClientSession.AddressQuery query = null; + + try { + query = session.addressQuery(SimpleString.of(configuration.getForwardingAddress())); + } catch (Throwable e) { + ActiveMQServerLogger.LOGGER.errorQueryingBridge(configuration.getName(), e); + // This was an issue during startup, we will not count this retry + retryCount--; + + scheduleRetryConnectFixedTimeout(100); + return; + } + + if (!query.isExists()) { + ActiveMQServerLogger.LOGGER.errorQueryingBridge(configuration.getForwardingAddress(), retryCount); + scheduleRetryConnect(); + return; + } + } + + // need to reset blockedOnFlowControl after creating a new producer + // otherwise in case the bridge was blocked before a previous failure + // this would never resume + blockedOnFlowControl = false; + producer = session.createProducer(); + session.addFailureListener(BridgeImpl.this); + + session.setSendAcknowledgementHandler(BridgeImpl.this); + + afterConnect(); + + state = State.STARTED; + + queue.addConsumer(BridgeImpl.this); + queue.deliverAsync(); + + ActiveMQServerLogger.LOGGER.bridgeConnected(BridgeImpl.this); + + serverLocator.addClusterTopologyListener(new ClusterTopologyListener() { + @Override + public void nodeUP(TopologyMember member, boolean last) { + BridgeImpl.this.nodeUP(member, last); + } + + @Override + public void nodeDown(long eventUID, String nodeID) { + } + }); + + keepConnecting = false; + } catch (ActiveMQException e) { + // the session was created while its server was starting, retry it: + if (e.getType() == ActiveMQExceptionType.SESSION_CREATION_REJECTED) { + ActiveMQServerLogger.LOGGER.errorStartingBridge(configuration.getName()); + + // We are not going to count this one as a retry + retryCount--; + + scheduleRetryConnectFixedTimeout(configuration.getRetryInterval()); + } else { + ActiveMQServerLogger.LOGGER.errorConnectingBridgeRetry(BridgeImpl.this); + logger.debug("Underlying bridge connection failure", e); + + scheduleRetryConnect(); + } + } catch (ActiveMQInterruptedException | InterruptedException e) { + ActiveMQServerLogger.LOGGER.errorConnectingBridge(BridgeImpl.this, e); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorConnectingBridge(BridgeImpl.this, e); + if (csf != null) { + try { + csf.close(); + csf = null; + } catch (Throwable ignored) { + } + } + fail(false, false); + scheduleRetryConnect(); + } + } } } @@ -1149,47 +1111,54 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled @Override public void run() { - logger.debug("stopping bridge {}", BridgeImpl.this); - queue.removeConsumer(BridgeImpl.this); + try { + logger.debug("stopping bridge {}", BridgeImpl.this); + logger.trace("Removing consumer on stopRunnable {} from queue {}", this, queue); + queue.removeConsumer(BridgeImpl.this); - synchronized (BridgeImpl.this) { - logger.debug("Closing Session for bridge {}", configuration.getName()); - started = false; - active = false; - } - - if (session != null) { - logger.debug("Cleaning up session {}", session); - session.removeFailureListener(BridgeImpl.this); - try { - session.close(); - session = null; - } catch (ActiveMQException dontcare) { + if (!pendingAcks.await(configuration.getPendingAckTimeout(), TimeUnit.MILLISECONDS)) { + ActiveMQServerLogger.LOGGER.timedOutWaitingForSendAcks("Stopping", configuration.getName(), pendingAcks.getCount()); } - } - if (sessionConsumer != null) { - logger.debug("Cleaning up session {}", session); - try { - sessionConsumer.close(); - sessionConsumer = null; - } catch (ActiveMQException dontcare) { + synchronized (BridgeImpl.this) { + state = State.STOPPED; } + + if (session != null) { + logger.debug("Cleaning up session {} for bridge {}", session, configuration.getName()); + session.removeFailureListener(BridgeImpl.this); + try { + session.close(); + session = null; + } catch (ActiveMQException ignored) { + } + } + + if (sessionConsumer != null) { + logger.debug("Cleaning up session {}", session); + try { + sessionConsumer.close(); + sessionConsumer = null; + } catch (ActiveMQException ignored) { + } + } + + internalCancelReferences(); + + if (csf != null) { + csf.cleanup(); + } + + synchronized (connectionGuard) { + keepConnecting = true; + } + + sendNotification(CoreNotificationType.BRIDGE_STOPPED); + + ActiveMQServerLogger.LOGGER.bridgeStopped(configuration.getName()); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorStoppingBridge(configuration.getName(), e); } - - internalCancelReferences(); - - if (csf != null) { - csf.cleanup(); - } - - synchronized (connectionGuard) { - keepConnecting = true; - } - - logger.trace("Removing consumer on stopRunnable {} from queue {}", this, queue); - - ActiveMQServerLogger.LOGGER.bridgeStopped(configuration.getName()); } } @@ -1198,38 +1167,39 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled @Override public void run() { try { + logger.debug("pausing bridge {}", BridgeImpl.this); + logger.trace("Removing consumer on pauseRunnable {} from queue {}", this, queue); queue.removeConsumer(BridgeImpl.this); - if (!pendingAcks.await(60, TimeUnit.SECONDS)) { - ActiveMQServerLogger.LOGGER.timedOutWaitingCompletions(BridgeImpl.this.toString(), pendingAcks.getCount()); + if (!pendingAcks.await(configuration.getPendingAckTimeout(), TimeUnit.MILLISECONDS)) { + ActiveMQServerLogger.LOGGER.timedOutWaitingForSendAcks("Pausing", configuration.getName(), pendingAcks.getCount()); } synchronized (BridgeImpl.this) { - started = false; - active = false; + state = State.PAUSED; } internalCancelReferences(); + sendNotification(CoreNotificationType.BRIDGE_STOPPED); + ActiveMQServerLogger.LOGGER.bridgePaused(configuration.getName()); } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorPausingBridge(e); + ActiveMQServerLogger.LOGGER.errorPausingBridge(configuration.getName(), e); } } - } - private class TopologyListener implements ClusterTopologyListener { - - // ClusterListener - @Override - public void nodeUP(TopologyMember member, boolean last) { - BridgeImpl.this.nodeUP(member, last); - } - - @Override - public void nodeDown(long eventUID, String nodeID) { - + private void sendNotification(CoreNotificationType type) { + if (notificationService != null) { + TypedProperties props = new TypedProperties(); + props.putSimpleStringProperty(SimpleString.of("name"), getName()); + Notification notification = new Notification(nodeUUID.toString(), type, props); + try { + notificationService.sendNotification(notification); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.notificationBridgeError(configuration.getName(), type, e); + } } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index d5529a6baa..ff5fceb39f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -460,7 +460,7 @@ public class ManagementServiceImpl implements ManagementService { @Override public synchronized void registerBridge(final Bridge bridge) throws Exception { bridge.setNotificationService(this); - ObjectName objectName = objectNameBuilder.getBridgeObjectName(bridge.getName().toString()); + ObjectName objectName = objectNameBuilder.getBridgeObjectName(bridge.getConfiguration().getName()); BridgeControl control = new BridgeControlImpl(bridge, storageManager); registerInJMX(objectName, control); registerInRegistry(ResourceNames.BRIDGE + bridge.getName(), control); diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 88ecad9be9..01eee16306 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1562,6 +1562,14 @@ + + + + how long to wait for acknowledgements to arrive from the bridge's target while stopping or pausing the bridge + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java index ce8351d0ed..6bde568d5b 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java @@ -73,6 +73,7 @@ public class BridgeConfigurationTest { assertEquals(12, bridgeConfiguration.getCallTimeout()); assertEquals(ComponentConfigurationRoutingType.MULTICAST, bridgeConfiguration.getRoutingType()); assertEquals(1, bridgeConfiguration.getConcurrency()); + assertEquals(321, bridgeConfiguration.getPendingAckTimeout()); } @Test @@ -112,6 +113,7 @@ public class BridgeConfigurationTest { assertEquals("102400", jsonObject.get(BridgeConfiguration.MIN_LARGE_MESSAGE_SIZE).toString()); assertEquals("30000", jsonObject.get(BridgeConfiguration.CALL_TIMEOUT).toString()); assertEquals("1", jsonObject.get(BridgeConfiguration.CONCURRENCY).toString()); + assertEquals("60000", jsonObject.get(BridgeConfiguration.PENDING_ACK_TIMEOUT).toString()); // also should contain default non-null values of string fields assertEquals("\"ACTIVEMQ.CLUSTER.ADMIN.USER\"", jsonObject.get(BridgeConfiguration.USER).toString()); @@ -199,6 +201,7 @@ public class BridgeConfigurationTest { objectBuilder.add(BridgeConfiguration.ROUTING_TYPE, "MULTICAST"); objectBuilder.add(BridgeConfiguration.CONCURRENCY, 1); objectBuilder.add(BridgeConfiguration.CONFIGURATION_MANAGED, true); + objectBuilder.add(BridgeConfiguration.PENDING_ACK_TIMEOUT, 321); return objectBuilder.build(); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 31060d31ac..bc8fad3ce4 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -428,7 +428,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase { assertEquals("org.foo.BridgeTransformer3", bc.getTransformerConfiguration().getClassName()); assertEquals("bridgeTransformerValue1", bc.getTransformerConfiguration().getProperties().get("bridgeTransformerKey1")); assertEquals("bridgeTransformerValue2", bc.getTransformerConfiguration().getProperties().get("bridgeTransformerKey2")); - + assertEquals(123456, bc.getPendingAckTimeout()); } } diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 456a7bf272..c57fcf9af8 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -255,6 +255,7 @@ 555k + 123456 bridge-forwarding-address2 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml index 436d2fb819..9022a6af9f 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml @@ -250,6 +250,7 @@ 555k + 123456 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-bridges.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-bridges.xml index 289716712a..c4138a868c 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-bridges.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-bridges.xml @@ -51,6 +51,7 @@ 555k + 123456 diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index f73047d732..23dd1c63a8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -70,6 +71,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -2084,6 +2086,217 @@ public class BridgeTest extends ActiveMQTestBase { assertEquals(0, server.getManagementService().getResources(BridgeControl.class).length); } + @TestTemplate + public void testPendingAcksNeverArriveOnStop() throws Exception { + testPendingAcksNeverArrive(true, false); + } + + @TestTemplate + public void testPendingAcksNeverArriveOnPause() throws Exception { + testPendingAcksNeverArrive(false, false); + } + + @TestTemplate + public void testPendingAcksNeverArriveOnStopWithLargeMessages() throws Exception { + testPendingAcksNeverArrive(true, true); + } + + @TestTemplate + public void testPendingAcksNeverArriveOnPauseWithLargeMessages() throws Exception { + testPendingAcksNeverArrive(false, true); + } + + private void testPendingAcksNeverArrive(boolean stop, boolean large) throws Exception { + server0 = createClusteredServerWithParams(isNetty(), 0, true, null); + + Map server1Params = new HashMap<>(); + addTargetParameters(server1Params); + server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params); + + final String testAddress = "testAddress"; + final String queueName0 = "queue0"; + final String forwardAddress = "forwardAddress"; + final String queueName1 = "queue1"; + final long pendingAckTimeout = 2000; + final int messageSize = 1024; + final int numMessages = 10; + + TransportConfiguration server0tc = new TransportConfiguration(getConnector(), null); + TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params); + + server0.getConfiguration() + .setConnectorConfigurations(Map.of(server1tc.getName(), server1tc)) + .setBridgeConfigurations(Arrays.asList(new BridgeConfiguration() + .setName("bridge1") + .setQueueName(queueName0) + .setForwardingAddress(forwardAddress) + .setRetryInterval(1000) + .setReconnectAttemptsOnSameNode(-1) + .setUseDuplicateDetection(false) + .setConfirmationWindowSize(numMessages * messageSize / 2) + .setMinLargeMessageSize(large ? (messageSize / 2) : (messageSize * 2)) + .setPendingAckTimeout(pendingAckTimeout) + .setStaticConnectors(Arrays.asList(server1tc.getName())))); + server0.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName0).setAddress(testAddress))); + server0.start(); + + // this interceptor will prevent the target from returning any send acknowledgements + Interceptor sendBlockingInterceptor = (packet, connection) -> { + if (packet.getType() == PacketImpl.SESS_SEND || packet.getType() == PacketImpl.SESS_SEND_LARGE) { + return false; + } + return true; + }; + + server1.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName1).setAddress(forwardAddress))); + server1.start(); + server1.getRemotingService().addIncomingInterceptor(sendBlockingInterceptor); + Bridge bridge = server0.getClusterManager().getBridges().get("bridge1"); + Wait.assertTrue(() -> (bridge.isConnected()), 2000, 100); + + locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc)); + ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc)); + ClientSession session0 = sf0.createSession(false, true, true); + ClientProducer producer0 = session0.createProducer(SimpleString.of(testAddress)); + final byte[] bytes = new byte[messageSize]; + + final SimpleString propKey = SimpleString.of("testkey"); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session0.createMessage(true); + message.putIntProperty(propKey, i); + message.getBodyBuffer().writeBytes(bytes); + producer0.send(message); + } + + session0.close(); + sf0.close(); + + Wait.assertEquals((long) numMessages, () -> bridge.getMetrics().getMessagesPendingAcknowledgement(), 2000, 100); + long start = System.currentTimeMillis(); + BridgeImpl.State desiredState; + if (stop) { + bridge.stop(); + desiredState = BridgeImpl.State.STOPPED; + } else { + bridge.pause(); + desiredState = BridgeImpl.State.PAUSED; + } + Wait.assertEquals(desiredState, () -> ((BridgeImpl)bridge).getState(), pendingAckTimeout, 25); + assertTrue(System.currentTimeMillis() - start >= pendingAckTimeout); + Wait.assertEquals((long) numMessages, () -> server0.locateQueue(queueName0).getMessageCount(), 2000, 100); + Wait.assertEquals(0L, () -> server0.locateQueue(queueName0).getDeliveringCount(), 2000, 100); + } + + @TestTemplate + public void testPendingAcksEventuallyArriveOnStop() throws Exception { + testPendingAcksEventuallyArrive(true, false); + } + + @TestTemplate + public void testPendingAcksEventuallyArriveOnPause() throws Exception { + testPendingAcksEventuallyArrive(false, false); + } + + @TestTemplate + public void testPendingAcksEventuallyArriveOnStopWithLargeMessages() throws Exception { + testPendingAcksEventuallyArrive(true, true); + } + + @TestTemplate + public void testPendingAcksEventuallyArriveOnPauseWithLargeMessages() throws Exception { + testPendingAcksEventuallyArrive(false, true); + } + + private void testPendingAcksEventuallyArrive(boolean stop, boolean large) throws Exception { + server0 = createClusteredServerWithParams(isNetty(), 0, true, null); + + Map server1Params = new HashMap<>(); + addTargetParameters(server1Params); + server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params); + + final String testAddress = "testAddress"; + final String queueName0 = "queue0"; + final String forwardAddress = "forwardAddress"; + final String queueName1 = "queue1"; + final long pendingAckTimeout = 2000; + final int messageSize = 1024; + final int numMessages = 10; + + TransportConfiguration server0tc = new TransportConfiguration(getConnector(), null); + TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params); + + server0.getConfiguration() + .setConnectorConfigurations(Map.of(server1tc.getName(), server1tc)) + .setBridgeConfigurations(Arrays.asList(new BridgeConfiguration() + .setName("bridge1") + .setQueueName(queueName0) + .setForwardingAddress(forwardAddress) + .setRetryInterval(1000) + .setReconnectAttemptsOnSameNode(-1) + .setUseDuplicateDetection(false) + .setConfirmationWindowSize(numMessages * messageSize / 2) + .setMinLargeMessageSize(large ? (messageSize / 2) : (messageSize * 2)) + .setPendingAckTimeout(pendingAckTimeout) + .setStaticConnectors(Arrays.asList(server1tc.getName())))); + server0.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName0).setAddress(testAddress))); + server0.start(); + + // this interceptor will prevent the target from returning any send acks until a certain amount of time has elapsed + final CountDownLatch opLatch = new CountDownLatch(1); + Interceptor sendBlockingInterceptor = (packet, connection) -> { + if (packet.getType() == PacketImpl.SESS_SEND || packet.getType() == PacketImpl.SESS_SEND_LARGE) { + try { + opLatch.await(pendingAckTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return true; + }; + + server1.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName1).setAddress(forwardAddress))); + server1.start(); + server1.getRemotingService().addIncomingInterceptor(sendBlockingInterceptor); + Bridge bridge = server0.getClusterManager().getBridges().get("bridge1"); + Wait.assertTrue(() -> (bridge.isConnected()), 2000, 100); + + locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc)); + ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc)); + ClientSession session0 = sf0.createSession(false, true, true); + ClientProducer producer0 = session0.createProducer(SimpleString.of(testAddress)); + final byte[] bytes = new byte[messageSize]; + + final SimpleString propKey = SimpleString.of("testkey"); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session0.createMessage(true); + message.putIntProperty(propKey, i); + message.getBodyBuffer().writeBytes(bytes); + producer0.send(message); + } + + session0.close(); + sf0.close(); + + Wait.assertEquals((long) numMessages, () -> bridge.getMetrics().getMessagesPendingAcknowledgement(), 2000, 100); + assertEquals((long) numMessages, server0.locateQueue(queueName0).getDeliveringCount()); + BridgeImpl.State desiredState; + if (stop) { + bridge.stop(); + desiredState = BridgeImpl.State.STOPPED; + } else { + bridge.pause(); + desiredState = BridgeImpl.State.PAUSED; + } + Thread.sleep(pendingAckTimeout / 2); + opLatch.countDown(); + Wait.assertEquals(desiredState, () -> ((BridgeImpl)bridge).getState(), pendingAckTimeout, 25); + Wait.assertEquals(0L, () -> server0.locateQueue(queueName0).getMessageCount(), 2000, 100); + Wait.assertEquals(0L, () -> server0.locateQueue(queueName0).getDeliveringCount(), 2000, 100); + Wait.assertEquals((long) numMessages, () -> server1.locateQueue(queueName1).getMessageCount(), 2000, 100); + } + /** * It will inspect the journal directly and determine if there are queues on this journal, * diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/BridgeConfigurationStorageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/BridgeConfigurationStorageTest.java index c4de23d8d0..48d5844bfd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/BridgeConfigurationStorageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/BridgeConfigurationStorageTest.java @@ -56,6 +56,7 @@ public class BridgeConfigurationStorageTest extends StorageManagerTestBase { configuration.setParentName("name"); configuration.setQueueName("QueueName"); configuration.setConcurrency(2); + configuration.setPendingAckTimeout(9876); configuration.setForwardingAddress("forward"); configuration.setProducerWindowSize(123123); configuration.setConfirmationWindowSize(123123); @@ -79,6 +80,7 @@ public class BridgeConfigurationStorageTest extends StorageManagerTestBase { assertEquals(configuration.getName(), persistedBridgeConfiguration.getBridgeConfiguration().getName()); assertEquals(configuration.getQueueName(), persistedBridgeConfiguration.getBridgeConfiguration().getQueueName()); assertEquals(configuration.getConcurrency(), persistedBridgeConfiguration.getBridgeConfiguration().getConcurrency()); + assertEquals(configuration.getPendingAckTimeout(), persistedBridgeConfiguration.getBridgeConfiguration().getPendingAckTimeout()); assertEquals(configuration.getForwardingAddress(), persistedBridgeConfiguration.getBridgeConfiguration().getForwardingAddress()); assertEquals(configuration.getStaticConnectors(), persistedBridgeConfiguration.getBridgeConfiguration().getStaticConnectors()); assertNotNull(persistedBridgeConfiguration.getBridgeConfiguration().getTransformerConfiguration());