From 47aa25933f020882442cf5f96b09bd3448bc17f5 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Mon, 19 Nov 2018 21:24:35 -0600 Subject: [PATCH] ARTEMIS-2178 routing-type config for core bridge MULTICAST messages forwarded by a core bridge will not be routed to any ANYCAST queues and vice-versa. Diverts have the ability to configure how routing-type is treated. Core bridges now support this same kind of functionality. By default the bridge does not alter the routing-type of forwarded messages to maintain compatibility with existing behavior. --- .../config/ActiveMQDefaultConfiguration.java | 14 +- ...=> ComponentConfigurationRoutingType.java} | 8 +- .../core/config/BridgeConfiguration.java | 12 ++ .../core/config/DivertConfiguration.java | 8 +- .../artemis/core/config/impl/Validators.java | 12 +- .../impl/FileConfigurationParser.java | 30 +++- .../impl/ActiveMQServerControlImpl.java | 4 +- .../core/server/cluster/ClusterManager.java | 2 +- .../core/server/cluster/impl/BridgeImpl.java | 23 ++- .../cluster/impl/ClusterConnectionBridge.java | 4 +- .../artemis/core/server/impl/DivertImpl.java | 6 +- .../schema/artemis-configuration.xsd | 12 +- .../config/impl/FileConfigurationTest.java | 3 + .../ConfigurationTest-full-config.xml | 1 + .../ConfigurationTest-xinclude-config.xml | 1 + docs/user-manual/en/configuration-index.md | 1 + docs/user-manual/en/core-bridges.md | 11 ++ .../amqp/AmqpMessageDivertsTest.java | 4 +- .../amqp/DivertTopicToQueueTest.java | 6 +- .../integration/bridge/BridgeRoutingTest.java | 157 ++++++++++++++++++ 20 files changed, 285 insertions(+), 34 deletions(-) rename artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/{DivertConfigurationRoutingType.java => ComponentConfigurationRoutingType.java} (85%) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeRoutingTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index e7cb4cb1a3..8760c9c2ad 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; /** @@ -367,7 +367,10 @@ public final class ActiveMQDefaultConfiguration { private static boolean DEFAULT_DIVERT_EXCLUSIVE = false; // how the divert should handle the message's routing type - private static String DEFAULT_DIVERT_ROUTING_TYPE = DivertConfigurationRoutingType.STRIP.toString(); + private static String DEFAULT_DIVERT_ROUTING_TYPE = ComponentConfigurationRoutingType.STRIP.toString(); + + // how the bridge should handle the message's routing type + private static String DEFAULT_BRIDGE_ROUTING_TYPE = ComponentConfigurationRoutingType.PASS.toString(); // If true then the server will request a backup on another node private static boolean DEFAULT_HAPOLICY_REQUEST_BACKUP = false; @@ -1089,6 +1092,13 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_DIVERT_ROUTING_TYPE; } + /** + * how the bridge should handle the message's routing type + */ + public static String getDefaultBridgeRoutingType() { + return DEFAULT_BRIDGE_ROUTING_TYPE; + } + /** * If true then the server will request a backup on another node */ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/DivertConfigurationRoutingType.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/ComponentConfigurationRoutingType.java similarity index 85% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/DivertConfigurationRoutingType.java rename to artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/ComponentConfigurationRoutingType.java index 84f45f9e89..780572e574 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/DivertConfigurationRoutingType.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/ComponentConfigurationRoutingType.java @@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.server; /** * This class essentially mirrors {@code RoutingType} except it has some additional members to support special - * configuration semantics for diverts. These additional members weren't put in {@code RoutingType} so as to not - * confuse users. + * configuration semantics for diverts & bridges. These additional members weren't put in {@code RoutingType} + * so as to not confuse users. */ -public enum DivertConfigurationRoutingType { +public enum ComponentConfigurationRoutingType { MULTICAST, ANYCAST, STRIP, PASS; @@ -40,7 +40,7 @@ public enum DivertConfigurationRoutingType { } } - public static DivertConfigurationRoutingType getType(byte type) { + public static ComponentConfigurationRoutingType getType(byte type) { switch (type) { case 0: return MULTICAST; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java index 99933c3fd9..d7b388d6b8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; public final class BridgeConfiguration implements Serializable { @@ -75,6 +76,8 @@ public final class BridgeConfiguration implements Serializable { // The bridge shouldn't be sending blocking anyways private long callTimeout = ActiveMQClient.DEFAULT_CALL_TIMEOUT; + private ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType()); + public BridgeConfiguration() { } @@ -337,6 +340,15 @@ public final class BridgeConfiguration implements Serializable { return this; } + public ComponentConfigurationRoutingType getRoutingType() { + return routingType; + } + + public BridgeConfiguration setRoutingType(ComponentConfigurationRoutingType routingType) { + this.routingType = routingType; + return this; + } + /** * At this point this is only changed on testcases * The bridge shouldn't be sending blocking anyways diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java index cb914d0ef7..c88191fd6c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java @@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.config; import java.io.Serializable; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; -import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.utils.UUIDGenerator; public class DivertConfiguration implements Serializable { @@ -40,7 +40,7 @@ public class DivertConfiguration implements Serializable { private TransformerConfiguration transformerConfiguration = null; - private DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType()); + private ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType()); public DivertConfiguration() { } @@ -73,7 +73,7 @@ public class DivertConfiguration implements Serializable { return transformerConfiguration; } - public DivertConfigurationRoutingType getRoutingType() { + public ComponentConfigurationRoutingType getRoutingType() { return routingType; } @@ -140,7 +140,7 @@ public class DivertConfiguration implements Serializable { /** * @param routingType the routingType to set */ - public DivertConfiguration setRoutingType(final DivertConfigurationRoutingType routingType) { + public DivertConfiguration setRoutingType(final ComponentConfigurationRoutingType routingType) { this.routingType = routingType; return this; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java index 2f28e49716..7883013740 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java @@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.config.impl; import java.util.EnumSet; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; -import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; @@ -200,14 +200,14 @@ public final class Validators { } }; - public static final Validator DIVERT_ROUTING_TYPE = new Validator() { + public static final Validator COMPONENT_ROUTING_TYPE = new Validator() { @Override public void validate(final String name, final Object value) { String val = (String) value; - if (val == null || !val.equals(DivertConfigurationRoutingType.ANYCAST.toString()) && - !val.equals(DivertConfigurationRoutingType.MULTICAST.toString()) && - !val.equals(DivertConfigurationRoutingType.PASS.toString()) && - !val.equals(DivertConfigurationRoutingType.STRIP.toString())) { + if (val == null || !val.equals(ComponentConfigurationRoutingType.ANYCAST.toString()) && + !val.equals(ComponentConfigurationRoutingType.MULTICAST.toString()) && + !val.equals(ComponentConfigurationRoutingType.PASS.toString()) && + !val.equals(ComponentConfigurationRoutingType.STRIP.toString())) { throw ActiveMQMessageBundle.BUNDLE.invalidRoutingType(val); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 00db3f83d7..3303b576ae 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -65,7 +65,7 @@ import org.apache.activemq.artemis.core.config.storage.FileStorageConfiguration; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; @@ -1781,6 +1781,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { String user = getString(brNode, "user", ActiveMQDefaultConfiguration.getDefaultClusterUser(), Validators.NO_CHECK); + ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(getString(brNode, "routing-type", ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType(), Validators.COMPONENT_ROUTING_TYPE)); + + NodeList clusterPassNodes = brNode.getElementsByTagName("password"); String password = null; @@ -1825,7 +1828,28 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { transformerConfiguration = getTransformerConfiguration(transformerClassName); } - BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setMaxRetryInterval(maxRetryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password); + BridgeConfiguration config = new BridgeConfiguration() + .setName(name) + .setQueueName(queueName) + .setForwardingAddress(forwardingAddress) + .setFilterString(filterString) + .setTransformerConfiguration(transformerConfiguration) + .setMinLargeMessageSize(minLargeMessageSize) + .setClientFailureCheckPeriod(clientFailureCheckPeriod) + .setConnectionTTL(connectionTTL) + .setRetryInterval(retryInterval) + .setMaxRetryInterval(maxRetryInterval) + .setRetryIntervalMultiplier(retryIntervalMultiplier) + .setInitialConnectAttempts(initialConnectAttempts) + .setReconnectAttempts(reconnectAttempts) + .setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode) + .setUseDuplicateDetection(useDuplicateDetection) + .setConfirmationWindowSize(confirmationWindowSize) + .setProducerWindowSize(producerWindowSize) + .setHA(ha) + .setUser(user) + .setPassword(password) + .setRoutingType(routingType); if (!staticConnectorNames.isEmpty()) { config.setStaticConnectors(staticConnectorNames); @@ -1861,7 +1885,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { String transformerClassName = getString(e, "transformer-class-name", null, Validators.NO_CHECK); - DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.DIVERT_ROUTING_TYPE)); + ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.COMPONENT_ROUTING_TYPE)); TransformerConfiguration transformerConfiguration = null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 96f352e8fc..ec0098ed0a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -92,7 +92,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ConnectorServiceFactory; import org.apache.activemq.artemis.core.server.Consumer; -import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -2441,7 +2441,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active clearIO(); try { TransformerConfiguration transformerConfiguration = transformerClassName == null ? null : new TransformerConfiguration(transformerClassName).setProperties(transformerProperties); - DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(DivertConfigurationRoutingType.valueOf(routingType)); + DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(ComponentConfigurationRoutingType.valueOf(routingType)); server.deployDivert(config); } finally { blockOnIO(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java index a354033227..522c2d21b6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java @@ -475,7 +475,7 @@ public final class ClusterManager implements ActiveMQComponent { clusterLocators.add(serverLocator); - Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server); + Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server, config.getRoutingType()); bridges.put(config.getName(), bridge); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 20b5ac96be..9f28452b45 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -50,6 +51,7 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; @@ -161,6 +163,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private final BridgeMetrics metrics = new BridgeMetrics(); + private final ComponentConfigurationRoutingType routingType; + public BridgeImpl(final ServerLocatorInternal serverLocator, final int initialConnectAttempts, final int reconnectAttempts, @@ -179,7 +183,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled final boolean useDuplicateDetection, final String user, final String password, - final ActiveMQServer server) { + final ActiveMQServer server, + final ComponentConfigurationRoutingType routingType) { this.sequentialID = server.getStorageManager().generateID(); @@ -220,6 +225,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled this.password = password; this.server = server; + + this.routingType = routingType; } /** For tests mainly */ @@ -550,6 +557,20 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled message.setAddress(forwardingAddress); } + switch (routingType) { + case ANYCAST: + message.setRoutingType(RoutingType.ANYCAST); + break; + case MULTICAST: + message.setRoutingType(RoutingType.MULTICAST); + break; + case STRIP: + message.setRoutingType(null); + break; + case PASS: + break; + } + if (transformer != null) { final Message transformedMessage = transformer.transform(message); if (transformedMessage != message) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index 1a642fec96..20f83ec41a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.Message; @@ -42,6 +43,7 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; @@ -111,7 +113,7 @@ public class ClusterConnectionBridge extends BridgeImpl { final TransportConfiguration connector, final String storeAndForwardPrefix) { super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same - retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server); + retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server, ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType())); this.discoveryLocator = discoveryLocator; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java index 953756f668..e6aa210e0e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java @@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.Divert; -import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.jboss.logging.Logger; @@ -51,7 +51,7 @@ public class DivertImpl implements Divert { private final StorageManager storageManager; - private final DivertConfigurationRoutingType routingType; + private final ComponentConfigurationRoutingType routingType; public DivertImpl(final SimpleString forwardAddress, final SimpleString uniqueName, @@ -61,7 +61,7 @@ public class DivertImpl implements Divert { final Transformer transformer, final PostOffice postOffice, final StorageManager storageManager, - final DivertConfigurationRoutingType routingType) { + final ComponentConfigurationRoutingType routingType) { this.forwardAddress = forwardAddress; this.uniqueName = uniqueName; diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index cd0d9c9a08..671bd2612d 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1460,6 +1460,14 @@ + + + + how should the routing-type on the bridged messages be set? + + + + @@ -1868,7 +1876,7 @@ - + how should the routing-type on the diverted messages be set? @@ -3183,7 +3191,7 @@ - + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 66d1e9ef50..85d0ddf93c 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; @@ -246,6 +247,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { Assert.assertEquals(null, bc.getDiscoveryGroupName()); Assert.assertEquals(444, bc.getProducerWindowSize()); Assert.assertEquals(1073741824, bc.getConfirmationWindowSize()); + Assert.assertEquals(ComponentConfigurationRoutingType.STRIP, bc.getRoutingType()); } else if (bc.getName().equals("bridge2")) { Assert.assertEquals("bridge2", bc.getName()); Assert.assertEquals("queue2", bc.getQueueName()); @@ -255,6 +257,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { Assert.assertEquals(null, bc.getStaticConnectors()); Assert.assertEquals("dg1", bc.getDiscoveryGroupName()); Assert.assertEquals(568320, bc.getProducerWindowSize()); + Assert.assertEquals(ComponentConfigurationRoutingType.PASS, bc.getRoutingType()); } else { Assert.assertEquals("bridge3", bc.getName()); Assert.assertEquals("org.foo.BridgeTransformer3", bc.getTransformerConfiguration().getClassName()); diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 7603a2aeaa..8e3204a78c 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -178,6 +178,7 @@ true 1G 444 + STRIP connector1 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml index d560379cce..c99b4ebe28 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml @@ -170,6 +170,7 @@ true 1G 444 + STRIP connector1 diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md index 67b9823661..d698bd85b9 100644 --- a/docs/user-manual/en/configuration-index.md +++ b/docs/user-manual/en/configuration-index.md @@ -254,6 +254,7 @@ Name | Description | Default [user](core-bridges.md) | Username for the bridge, the default is the cluster username. | n/a [password](core-bridges.md)| Password for the bridge, default is the cluster password. | n/a [reconnect-attempts-same-node](core-bridges.md) | Number of retries before trying another node. | 10 +[routing-type](core-bridges.md) | how to set the routing-type on the bridged message | `PASS` ## broadcast-group type diff --git a/docs/user-manual/en/core-bridges.md b/docs/user-manual/en/core-bridges.md index ede12abefa..2e488da33f 100644 --- a/docs/user-manual/en/core-bridges.md +++ b/docs/user-manual/en/core-bridges.md @@ -60,6 +60,7 @@ actually from the bridge example): 10000000 foouser foopassword + PASS remote-connector @@ -194,6 +195,16 @@ Let's take a look at all the parameters in turn: the default cluster password specified by `cluster-password` in `broker.xml` will be used. +- `routing-type`. Bridges can apply a particular routing-type to the messages it + forwards, strip the existing routing type, or simply pass the existing + routing-type through. This is useful in situations where the message may have + its routing-type set but you want to bridge it to an address using a different + routing-type. It's important to keep in mind that a message with the `anycast` + routing-type will not actually be routed to queues using `multicast` and + vice-versa. By configuring the `routing-type` of the bridge you have the + flexibility to deal with any situation. Valid values are `ANYCAST`, + `MULTICAST`, `PASS`, & `STRIP`. The default is `PASS`. + - `static-connectors` or `discovery-group-ref`. Pick either of these options to connect the bridge to the target server. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java index ed7b110f38..0613801489 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -33,7 +33,7 @@ public class AmqpMessageDivertsTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testQueueReceiverReadMessageWithDivert() throws Exception { - runQueueReceiverReadMessageWithDivert(DivertConfigurationRoutingType.ANYCAST.toString()); + runQueueReceiverReadMessageWithDivert(ComponentConfigurationRoutingType.ANYCAST.toString()); } @Test(timeout = 60000) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertTopicToQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertTopicToQueueTest.java index 560a88b0c7..aef9e2b97e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertTopicToQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertTopicToQueueTest.java @@ -25,7 +25,7 @@ import javax.jms.TextMessage; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.Assert; import org.junit.Test; @@ -38,8 +38,8 @@ public class DivertTopicToQueueTest extends JMSClientTestSupport { final String address2 = "bss.order.Consumer.cbma.workorderchanges.v1.queue"; final String address3 = "bss.order.Consumer.pinpoint.workorderchanges.v1.queue"; - DivertConfiguration dc1 = new DivertConfiguration().setName("WorkOrderChangesCBMA-Divert").setRoutingName("WorkOrderChangesCBMA-Divert").setAddress(address1).setForwardingAddress(address2).setExclusive(false).setRoutingType(DivertConfigurationRoutingType.ANYCAST); - DivertConfiguration dc2 = new DivertConfiguration().setName("WorkOrderChangesPinpoint-Divert").setRoutingName("WorkOrderChangesPinpoint-Divert").setAddress(address1).setForwardingAddress(address3).setExclusive(false).setRoutingType(DivertConfigurationRoutingType.ANYCAST); + DivertConfiguration dc1 = new DivertConfiguration().setName("WorkOrderChangesCBMA-Divert").setRoutingName("WorkOrderChangesCBMA-Divert").setAddress(address1).setForwardingAddress(address2).setExclusive(false).setRoutingType(ComponentConfigurationRoutingType.ANYCAST); + DivertConfiguration dc2 = new DivertConfiguration().setName("WorkOrderChangesPinpoint-Divert").setRoutingName("WorkOrderChangesPinpoint-Divert").setAddress(address1).setForwardingAddress(address3).setExclusive(false).setRoutingType(ComponentConfigurationRoutingType.ANYCAST); server.deployDivert(dc1); server.deployDivert(dc2); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeRoutingTest.java new file mode 100644 index 0000000000..394dc3680d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeRoutingTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.bridge; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(value = Parameterized.class) +public class BridgeRoutingTest extends ActiveMQTestBase { + + private ActiveMQServer server0; + private ActiveMQServer server1; + + private final boolean netty; + + @Parameterized.Parameters(name = "isNetty={0}") + public static Collection getParameters() { + return Arrays.asList(new Object[][]{{true}, {false}}); + } + + public BridgeRoutingTest(boolean isNetty) { + this.netty = isNetty; + } + + protected boolean isNetty() { + return netty; + } + + private String getServer0URL() { + return isNetty() ? "tcp://localhost:61616" : "vm://0"; + } + + private String getServer1URL() { + return isNetty() ? "tcp://localhost:61617" : "vm://1"; + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server0 = createServer(false, createBasicConfig()); + server1 = createServer(false, createBasicConfig()); + server0.getConfiguration().addAcceptorConfiguration("acceptor", getServer0URL()); + server0.getConfiguration().addConnectorConfiguration("connector", getServer1URL()); + server1.getConfiguration().addAcceptorConfiguration("acceptor", getServer1URL()); + server0.start(); + server1.start(); + } + + @Test + public void testAnycastBridge() throws Exception { + testBridgeInternal(RoutingType.MULTICAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.ANYCAST, 0, 1); + } + + @Test + public void testAnycastBridgeNegative() throws Exception { + testBridgeInternal(RoutingType.MULTICAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.PASS, 500, 0); + } + + @Test + public void testMulticastBridge() throws Exception { + testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.MULTICAST, 0, 1); + } + + @Test + public void testMulticastBridgeNegative() throws Exception { + testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.PASS, 500, 0); + } + + @Test + public void testPassBridge() throws Exception { + testBridgeInternal(RoutingType.MULTICAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.PASS, 0, 1); + } + + @Test + public void testPassBridge2() throws Exception { + testBridgeInternal(RoutingType.ANYCAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.PASS, 0, 1); + } + + @Test + public void testPassBridgeNegative() throws Exception { + testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.PASS, 500, 0); + } + + @Test + public void testStripBridge() throws Exception { + testBridgeInternal(RoutingType.MULTICAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.STRIP, 0, 1); + } + + @Test + public void testStripBridge2() throws Exception { + testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.STRIP, 0, 1); + } + + private void testBridgeInternal(RoutingType sourceRoutingType, + RoutingType destinationRoutingType, + ComponentConfigurationRoutingType bridgeRoutingType, + long sleepTime, + int destinationMessageCount) throws Exception { + SimpleString source = SimpleString.toSimpleString("source"); + SimpleString destination = SimpleString.toSimpleString("destination"); + + server0.createQueue(source, sourceRoutingType, source, null, true, false); + server1.createQueue(destination, destinationRoutingType, destination, null, true, false); + + server0.deployBridge(new BridgeConfiguration() + .setRoutingType(bridgeRoutingType) + .setName("bridge") + .setForwardingAddress(destination.toString()) + .setQueueName(source.toString()) + .setConfirmationWindowSize(10) + .setStaticConnectors(Arrays.asList("connector"))); + + try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer0URL()); + ClientSessionFactory sessionFactory = locator.createSessionFactory(); + ClientSession session = sessionFactory.createSession(); + ClientProducer producer = session.createProducer(source)) { + producer.send(session.createMessage(true).setRoutingType(sourceRoutingType)); + } + + Wait.waitFor(() -> server0.locateQueue(source).getMessageCount() == 0, 2000, 100); + Wait.waitFor(() -> server0.getClusterManager().getBridges().get("bridge").getMetrics().getMessagesAcknowledged() == 1, 2000, 100); + Thread.sleep(sleepTime); + assertTrue(Wait.waitFor(() -> server1.locateQueue(destination).getMessageCount() == destinationMessageCount, 2000, 100)); + } +} \ No newline at end of file