diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index e7cb4cb1a3..8760c9c2ad 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
/**
@@ -367,7 +367,10 @@ public final class ActiveMQDefaultConfiguration {
private static boolean DEFAULT_DIVERT_EXCLUSIVE = false;
// how the divert should handle the message's routing type
- private static String DEFAULT_DIVERT_ROUTING_TYPE = DivertConfigurationRoutingType.STRIP.toString();
+ private static String DEFAULT_DIVERT_ROUTING_TYPE = ComponentConfigurationRoutingType.STRIP.toString();
+
+ // how the bridge should handle the message's routing type
+ private static String DEFAULT_BRIDGE_ROUTING_TYPE = ComponentConfigurationRoutingType.PASS.toString();
// If true then the server will request a backup on another node
private static boolean DEFAULT_HAPOLICY_REQUEST_BACKUP = false;
@@ -1089,6 +1092,13 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_DIVERT_ROUTING_TYPE;
}
+ /**
+ * how the bridge should handle the message's routing type
+ */
+ public static String getDefaultBridgeRoutingType() {
+ return DEFAULT_BRIDGE_ROUTING_TYPE;
+ }
+
/**
* If true then the server will request a backup on another node
*/
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/DivertConfigurationRoutingType.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/ComponentConfigurationRoutingType.java
similarity index 85%
rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/DivertConfigurationRoutingType.java
rename to artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/ComponentConfigurationRoutingType.java
index 84f45f9e89..780572e574 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/DivertConfigurationRoutingType.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/ComponentConfigurationRoutingType.java
@@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.server;
/**
* This class essentially mirrors {@code RoutingType} except it has some additional members to support special
- * configuration semantics for diverts. These additional members weren't put in {@code RoutingType} so as to not
- * confuse users.
+ * configuration semantics for diverts & bridges. These additional members weren't put in {@code RoutingType}
+ * so as to not confuse users.
*/
-public enum DivertConfigurationRoutingType {
+public enum ComponentConfigurationRoutingType {
MULTICAST, ANYCAST, STRIP, PASS;
@@ -40,7 +40,7 @@ public enum DivertConfigurationRoutingType {
}
}
- public static DivertConfigurationRoutingType getType(byte type) {
+ public static ComponentConfigurationRoutingType getType(byte type) {
switch (type) {
case 0:
return MULTICAST;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
index 99933c3fd9..d7b388d6b8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
public final class BridgeConfiguration implements Serializable {
@@ -75,6 +76,8 @@ public final class BridgeConfiguration implements Serializable {
// The bridge shouldn't be sending blocking anyways
private long callTimeout = ActiveMQClient.DEFAULT_CALL_TIMEOUT;
+ private ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType());
+
public BridgeConfiguration() {
}
@@ -337,6 +340,15 @@ public final class BridgeConfiguration implements Serializable {
return this;
}
+ public ComponentConfigurationRoutingType getRoutingType() {
+ return routingType;
+ }
+
+ public BridgeConfiguration setRoutingType(ComponentConfigurationRoutingType routingType) {
+ this.routingType = routingType;
+ return this;
+ }
+
/**
* At this point this is only changed on testcases
* The bridge shouldn't be sending blocking anyways
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java
index cb914d0ef7..c88191fd6c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.config;
import java.io.Serializable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
-import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.utils.UUIDGenerator;
public class DivertConfiguration implements Serializable {
@@ -40,7 +40,7 @@ public class DivertConfiguration implements Serializable {
private TransformerConfiguration transformerConfiguration = null;
- private DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
+ private ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
public DivertConfiguration() {
}
@@ -73,7 +73,7 @@ public class DivertConfiguration implements Serializable {
return transformerConfiguration;
}
- public DivertConfigurationRoutingType getRoutingType() {
+ public ComponentConfigurationRoutingType getRoutingType() {
return routingType;
}
@@ -140,7 +140,7 @@ public class DivertConfiguration implements Serializable {
/**
* @param routingType the routingType to set
*/
- public DivertConfiguration setRoutingType(final DivertConfigurationRoutingType routingType) {
+ public DivertConfiguration setRoutingType(final ComponentConfigurationRoutingType routingType) {
this.routingType = routingType;
return this;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
index 2f28e49716..7883013740 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.config.impl;
import java.util.EnumSet;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
-import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@@ -200,14 +200,14 @@ public final class Validators {
}
};
- public static final Validator DIVERT_ROUTING_TYPE = new Validator() {
+ public static final Validator COMPONENT_ROUTING_TYPE = new Validator() {
@Override
public void validate(final String name, final Object value) {
String val = (String) value;
- if (val == null || !val.equals(DivertConfigurationRoutingType.ANYCAST.toString()) &&
- !val.equals(DivertConfigurationRoutingType.MULTICAST.toString()) &&
- !val.equals(DivertConfigurationRoutingType.PASS.toString()) &&
- !val.equals(DivertConfigurationRoutingType.STRIP.toString())) {
+ if (val == null || !val.equals(ComponentConfigurationRoutingType.ANYCAST.toString()) &&
+ !val.equals(ComponentConfigurationRoutingType.MULTICAST.toString()) &&
+ !val.equals(ComponentConfigurationRoutingType.PASS.toString()) &&
+ !val.equals(ComponentConfigurationRoutingType.STRIP.toString())) {
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingType(val);
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 00db3f83d7..3303b576ae 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -65,7 +65,7 @@ import org.apache.activemq.artemis.core.config.storage.FileStorageConfiguration;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@@ -1781,6 +1781,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
String user = getString(brNode, "user", ActiveMQDefaultConfiguration.getDefaultClusterUser(), Validators.NO_CHECK);
+ ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(getString(brNode, "routing-type", ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType(), Validators.COMPONENT_ROUTING_TYPE));
+
+
NodeList clusterPassNodes = brNode.getElementsByTagName("password");
String password = null;
@@ -1825,7 +1828,28 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
transformerConfiguration = getTransformerConfiguration(transformerClassName);
}
- BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setMaxRetryInterval(maxRetryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password);
+ BridgeConfiguration config = new BridgeConfiguration()
+ .setName(name)
+ .setQueueName(queueName)
+ .setForwardingAddress(forwardingAddress)
+ .setFilterString(filterString)
+ .setTransformerConfiguration(transformerConfiguration)
+ .setMinLargeMessageSize(minLargeMessageSize)
+ .setClientFailureCheckPeriod(clientFailureCheckPeriod)
+ .setConnectionTTL(connectionTTL)
+ .setRetryInterval(retryInterval)
+ .setMaxRetryInterval(maxRetryInterval)
+ .setRetryIntervalMultiplier(retryIntervalMultiplier)
+ .setInitialConnectAttempts(initialConnectAttempts)
+ .setReconnectAttempts(reconnectAttempts)
+ .setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode)
+ .setUseDuplicateDetection(useDuplicateDetection)
+ .setConfirmationWindowSize(confirmationWindowSize)
+ .setProducerWindowSize(producerWindowSize)
+ .setHA(ha)
+ .setUser(user)
+ .setPassword(password)
+ .setRoutingType(routingType);
if (!staticConnectorNames.isEmpty()) {
config.setStaticConnectors(staticConnectorNames);
@@ -1861,7 +1885,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
String transformerClassName = getString(e, "transformer-class-name", null, Validators.NO_CHECK);
- DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.DIVERT_ROUTING_TYPE));
+ ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.COMPONENT_ROUTING_TYPE));
TransformerConfiguration transformerConfiguration = null;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 96f352e8fc..ec0098ed0a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -92,7 +92,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.Consumer;
-import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -2441,7 +2441,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
TransformerConfiguration transformerConfiguration = transformerClassName == null ? null : new TransformerConfiguration(transformerClassName).setProperties(transformerProperties);
- DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(DivertConfigurationRoutingType.valueOf(routingType));
+ DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(ComponentConfigurationRoutingType.valueOf(routingType));
server.deployDivert(config);
} finally {
blockOnIO();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index a354033227..522c2d21b6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -475,7 +475,7 @@ public final class ClusterManager implements ActiveMQComponent {
clusterLocators.add(serverLocator);
- Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server);
+ Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server, config.getRoutingType());
bridges.put(config.getName(), bridge);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 20b5ac96be..9f28452b45 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -50,6 +51,7 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -161,6 +163,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private final BridgeMetrics metrics = new BridgeMetrics();
+ private final ComponentConfigurationRoutingType routingType;
+
public BridgeImpl(final ServerLocatorInternal serverLocator,
final int initialConnectAttempts,
final int reconnectAttempts,
@@ -179,7 +183,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
final boolean useDuplicateDetection,
final String user,
final String password,
- final ActiveMQServer server) {
+ final ActiveMQServer server,
+ final ComponentConfigurationRoutingType routingType) {
this.sequentialID = server.getStorageManager().generateID();
@@ -220,6 +225,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
this.password = password;
this.server = server;
+
+ this.routingType = routingType;
}
/** For tests mainly */
@@ -550,6 +557,20 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
message.setAddress(forwardingAddress);
}
+ switch (routingType) {
+ case ANYCAST:
+ message.setRoutingType(RoutingType.ANYCAST);
+ break;
+ case MULTICAST:
+ message.setRoutingType(RoutingType.MULTICAST);
+ break;
+ case STRIP:
+ message.setRoutingType(null);
+ break;
+ case PASS:
+ break;
+ }
+
if (transformer != null) {
final Message transformedMessage = transformer.transform(message);
if (transformedMessage != message) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 1a642fec96..20f83ec41a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -23,6 +23,7 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Message;
@@ -42,6 +43,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -111,7 +113,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
final TransportConfiguration connector,
final String storeAndForwardPrefix) {
super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same
- retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server);
+ retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server, ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType()));
this.discoveryLocator = discoveryLocator;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 953756f668..e6aa210e0e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.Divert;
-import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.jboss.logging.Logger;
@@ -51,7 +51,7 @@ public class DivertImpl implements Divert {
private final StorageManager storageManager;
- private final DivertConfigurationRoutingType routingType;
+ private final ComponentConfigurationRoutingType routingType;
public DivertImpl(final SimpleString forwardAddress,
final SimpleString uniqueName,
@@ -61,7 +61,7 @@ public class DivertImpl implements Divert {
final Transformer transformer,
final PostOffice postOffice,
final StorageManager storageManager,
- final DivertConfigurationRoutingType routingType) {
+ final ComponentConfigurationRoutingType routingType) {
this.forwardAddress = forwardAddress;
this.uniqueName = uniqueName;
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index cd0d9c9a08..671bd2612d 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1460,6 +1460,14 @@
+
+
+
+ how should the routing-type on the bridged messages be set?
+
+
+
+
@@ -1868,7 +1876,7 @@
-
+
how should the routing-type on the diverted messages be set?
@@ -3183,7 +3191,7 @@
-
+
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 66d1e9ef50..85d0ddf93c 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -31,6 +31,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
@@ -246,6 +247,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(null, bc.getDiscoveryGroupName());
Assert.assertEquals(444, bc.getProducerWindowSize());
Assert.assertEquals(1073741824, bc.getConfirmationWindowSize());
+ Assert.assertEquals(ComponentConfigurationRoutingType.STRIP, bc.getRoutingType());
} else if (bc.getName().equals("bridge2")) {
Assert.assertEquals("bridge2", bc.getName());
Assert.assertEquals("queue2", bc.getQueueName());
@@ -255,6 +257,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(null, bc.getStaticConnectors());
Assert.assertEquals("dg1", bc.getDiscoveryGroupName());
Assert.assertEquals(568320, bc.getProducerWindowSize());
+ Assert.assertEquals(ComponentConfigurationRoutingType.PASS, bc.getRoutingType());
} else {
Assert.assertEquals("bridge3", bc.getName());
Assert.assertEquals("org.foo.BridgeTransformer3", bc.getTransformerConfiguration().getClassName());
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 7603a2aeaa..8e3204a78c 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -178,6 +178,7 @@
true
1G
444
+ STRIP
connector1
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index d560379cce..c99b4ebe28 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -170,6 +170,7 @@
true
1G
444
+ STRIP
connector1
diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md
index 67b9823661..d698bd85b9 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -254,6 +254,7 @@ Name | Description | Default
[user](core-bridges.md) | Username for the bridge, the default is the cluster username. | n/a
[password](core-bridges.md)| Password for the bridge, default is the cluster password. | n/a
[reconnect-attempts-same-node](core-bridges.md) | Number of retries before trying another node. | 10
+[routing-type](core-bridges.md) | how to set the routing-type on the bridged message | `PASS`
## broadcast-group type
diff --git a/docs/user-manual/en/core-bridges.md b/docs/user-manual/en/core-bridges.md
index ede12abefa..2e488da33f 100644
--- a/docs/user-manual/en/core-bridges.md
+++ b/docs/user-manual/en/core-bridges.md
@@ -60,6 +60,7 @@ actually from the bridge example):
10000000
foouser
foopassword
+ PASS
remote-connector
@@ -194,6 +195,16 @@ Let's take a look at all the parameters in turn:
the default cluster password specified by `cluster-password` in `broker.xml`
will be used.
+- `routing-type`. Bridges can apply a particular routing-type to the messages it
+ forwards, strip the existing routing type, or simply pass the existing
+ routing-type through. This is useful in situations where the message may have
+ its routing-type set but you want to bridge it to an address using a different
+ routing-type. It's important to keep in mind that a message with the `anycast`
+ routing-type will not actually be routed to queues using `multicast` and
+ vice-versa. By configuring the `routing-type` of the bridge you have the
+ flexibility to deal with any situation. Valid values are `ANYCAST`,
+ `MULTICAST`, `PASS`, & `STRIP`. The default is `PASS`.
+
- `static-connectors` or `discovery-group-ref`. Pick either of these options to
connect the bridge to the target server.
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java
index ed7b110f38..0613801489 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -33,7 +33,7 @@ public class AmqpMessageDivertsTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testQueueReceiverReadMessageWithDivert() throws Exception {
- runQueueReceiverReadMessageWithDivert(DivertConfigurationRoutingType.ANYCAST.toString());
+ runQueueReceiverReadMessageWithDivert(ComponentConfigurationRoutingType.ANYCAST.toString());
}
@Test(timeout = 60000)
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertTopicToQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertTopicToQueueTest.java
index 560a88b0c7..aef9e2b97e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertTopicToQueueTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertTopicToQueueTest.java
@@ -25,7 +25,7 @@ import javax.jms.TextMessage;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Test;
@@ -38,8 +38,8 @@ public class DivertTopicToQueueTest extends JMSClientTestSupport {
final String address2 = "bss.order.Consumer.cbma.workorderchanges.v1.queue";
final String address3 = "bss.order.Consumer.pinpoint.workorderchanges.v1.queue";
- DivertConfiguration dc1 = new DivertConfiguration().setName("WorkOrderChangesCBMA-Divert").setRoutingName("WorkOrderChangesCBMA-Divert").setAddress(address1).setForwardingAddress(address2).setExclusive(false).setRoutingType(DivertConfigurationRoutingType.ANYCAST);
- DivertConfiguration dc2 = new DivertConfiguration().setName("WorkOrderChangesPinpoint-Divert").setRoutingName("WorkOrderChangesPinpoint-Divert").setAddress(address1).setForwardingAddress(address3).setExclusive(false).setRoutingType(DivertConfigurationRoutingType.ANYCAST);
+ DivertConfiguration dc1 = new DivertConfiguration().setName("WorkOrderChangesCBMA-Divert").setRoutingName("WorkOrderChangesCBMA-Divert").setAddress(address1).setForwardingAddress(address2).setExclusive(false).setRoutingType(ComponentConfigurationRoutingType.ANYCAST);
+ DivertConfiguration dc2 = new DivertConfiguration().setName("WorkOrderChangesPinpoint-Divert").setRoutingName("WorkOrderChangesPinpoint-Divert").setAddress(address1).setForwardingAddress(address3).setExclusive(false).setRoutingType(ComponentConfigurationRoutingType.ANYCAST);
server.deployDivert(dc1);
server.deployDivert(dc2);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeRoutingTest.java
new file mode 100644
index 0000000000..394dc3680d
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeRoutingTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.bridge;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class BridgeRoutingTest extends ActiveMQTestBase {
+
+ private ActiveMQServer server0;
+ private ActiveMQServer server1;
+
+ private final boolean netty;
+
+ @Parameterized.Parameters(name = "isNetty={0}")
+ public static Collection getParameters() {
+ return Arrays.asList(new Object[][]{{true}, {false}});
+ }
+
+ public BridgeRoutingTest(boolean isNetty) {
+ this.netty = isNetty;
+ }
+
+ protected boolean isNetty() {
+ return netty;
+ }
+
+ private String getServer0URL() {
+ return isNetty() ? "tcp://localhost:61616" : "vm://0";
+ }
+
+ private String getServer1URL() {
+ return isNetty() ? "tcp://localhost:61617" : "vm://1";
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server0 = createServer(false, createBasicConfig());
+ server1 = createServer(false, createBasicConfig());
+ server0.getConfiguration().addAcceptorConfiguration("acceptor", getServer0URL());
+ server0.getConfiguration().addConnectorConfiguration("connector", getServer1URL());
+ server1.getConfiguration().addAcceptorConfiguration("acceptor", getServer1URL());
+ server0.start();
+ server1.start();
+ }
+
+ @Test
+ public void testAnycastBridge() throws Exception {
+ testBridgeInternal(RoutingType.MULTICAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.ANYCAST, 0, 1);
+ }
+
+ @Test
+ public void testAnycastBridgeNegative() throws Exception {
+ testBridgeInternal(RoutingType.MULTICAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.PASS, 500, 0);
+ }
+
+ @Test
+ public void testMulticastBridge() throws Exception {
+ testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.MULTICAST, 0, 1);
+ }
+
+ @Test
+ public void testMulticastBridgeNegative() throws Exception {
+ testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.PASS, 500, 0);
+ }
+
+ @Test
+ public void testPassBridge() throws Exception {
+ testBridgeInternal(RoutingType.MULTICAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.PASS, 0, 1);
+ }
+
+ @Test
+ public void testPassBridge2() throws Exception {
+ testBridgeInternal(RoutingType.ANYCAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.PASS, 0, 1);
+ }
+
+ @Test
+ public void testPassBridgeNegative() throws Exception {
+ testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.PASS, 500, 0);
+ }
+
+ @Test
+ public void testStripBridge() throws Exception {
+ testBridgeInternal(RoutingType.MULTICAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.STRIP, 0, 1);
+ }
+
+ @Test
+ public void testStripBridge2() throws Exception {
+ testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.STRIP, 0, 1);
+ }
+
+ private void testBridgeInternal(RoutingType sourceRoutingType,
+ RoutingType destinationRoutingType,
+ ComponentConfigurationRoutingType bridgeRoutingType,
+ long sleepTime,
+ int destinationMessageCount) throws Exception {
+ SimpleString source = SimpleString.toSimpleString("source");
+ SimpleString destination = SimpleString.toSimpleString("destination");
+
+ server0.createQueue(source, sourceRoutingType, source, null, true, false);
+ server1.createQueue(destination, destinationRoutingType, destination, null, true, false);
+
+ server0.deployBridge(new BridgeConfiguration()
+ .setRoutingType(bridgeRoutingType)
+ .setName("bridge")
+ .setForwardingAddress(destination.toString())
+ .setQueueName(source.toString())
+ .setConfirmationWindowSize(10)
+ .setStaticConnectors(Arrays.asList("connector")));
+
+ try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer0URL());
+ ClientSessionFactory sessionFactory = locator.createSessionFactory();
+ ClientSession session = sessionFactory.createSession();
+ ClientProducer producer = session.createProducer(source)) {
+ producer.send(session.createMessage(true).setRoutingType(sourceRoutingType));
+ }
+
+ Wait.waitFor(() -> server0.locateQueue(source).getMessageCount() == 0, 2000, 100);
+ Wait.waitFor(() -> server0.getClusterManager().getBridges().get("bridge").getMetrics().getMessagesAcknowledged() == 1, 2000, 100);
+ Thread.sleep(sleepTime);
+ assertTrue(Wait.waitFor(() -> server1.locateQueue(destination).getMessageCount() == destinationMessageCount, 2000, 100));
+ }
+}
\ No newline at end of file