ARTEMIS-2178 routing-type config for core bridge
MULTICAST messages forwarded by a core bridge will not be routed to any ANYCAST queues and vice-versa. Diverts have the ability to configure how routing-type is treated. Core bridges now support this same kind of functionality. By default the bridge does not alter the routing-type of forwarded messages to maintain compatibility with existing behavior.
This commit is contained in:
parent
4695bfb34a
commit
47aa25933f
|
@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.activemq.artemis.ArtemisConstants;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
|
||||
|
||||
/**
|
||||
|
@ -367,7 +367,10 @@ public final class ActiveMQDefaultConfiguration {
|
|||
private static boolean DEFAULT_DIVERT_EXCLUSIVE = false;
|
||||
|
||||
// how the divert should handle the message's routing type
|
||||
private static String DEFAULT_DIVERT_ROUTING_TYPE = DivertConfigurationRoutingType.STRIP.toString();
|
||||
private static String DEFAULT_DIVERT_ROUTING_TYPE = ComponentConfigurationRoutingType.STRIP.toString();
|
||||
|
||||
// how the bridge should handle the message's routing type
|
||||
private static String DEFAULT_BRIDGE_ROUTING_TYPE = ComponentConfigurationRoutingType.PASS.toString();
|
||||
|
||||
// If true then the server will request a backup on another node
|
||||
private static boolean DEFAULT_HAPOLICY_REQUEST_BACKUP = false;
|
||||
|
@ -1089,6 +1092,13 @@ public final class ActiveMQDefaultConfiguration {
|
|||
return DEFAULT_DIVERT_ROUTING_TYPE;
|
||||
}
|
||||
|
||||
/**
|
||||
* how the bridge should handle the message's routing type
|
||||
*/
|
||||
public static String getDefaultBridgeRoutingType() {
|
||||
return DEFAULT_BRIDGE_ROUTING_TYPE;
|
||||
}
|
||||
|
||||
/**
|
||||
* If true then the server will request a backup on another node
|
||||
*/
|
||||
|
|
|
@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.server;
|
|||
|
||||
/**
|
||||
* This class essentially mirrors {@code RoutingType} except it has some additional members to support special
|
||||
* configuration semantics for diverts. These additional members weren't put in {@code RoutingType} so as to not
|
||||
* confuse users.
|
||||
* configuration semantics for diverts & bridges. These additional members weren't put in {@code RoutingType}
|
||||
* so as to not confuse users.
|
||||
*/
|
||||
public enum DivertConfigurationRoutingType {
|
||||
public enum ComponentConfigurationRoutingType {
|
||||
|
||||
MULTICAST, ANYCAST, STRIP, PASS;
|
||||
|
||||
|
@ -40,7 +40,7 @@ public enum DivertConfigurationRoutingType {
|
|||
}
|
||||
}
|
||||
|
||||
public static DivertConfigurationRoutingType getType(byte type) {
|
||||
public static ComponentConfigurationRoutingType getType(byte type) {
|
||||
switch (type) {
|
||||
case 0:
|
||||
return MULTICAST;
|
|
@ -21,6 +21,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
|
||||
public final class BridgeConfiguration implements Serializable {
|
||||
|
||||
|
@ -75,6 +76,8 @@ public final class BridgeConfiguration implements Serializable {
|
|||
// The bridge shouldn't be sending blocking anyways
|
||||
private long callTimeout = ActiveMQClient.DEFAULT_CALL_TIMEOUT;
|
||||
|
||||
private ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType());
|
||||
|
||||
public BridgeConfiguration() {
|
||||
}
|
||||
|
||||
|
@ -337,6 +340,15 @@ public final class BridgeConfiguration implements Serializable {
|
|||
return this;
|
||||
}
|
||||
|
||||
public ComponentConfigurationRoutingType getRoutingType() {
|
||||
return routingType;
|
||||
}
|
||||
|
||||
public BridgeConfiguration setRoutingType(ComponentConfigurationRoutingType routingType) {
|
||||
this.routingType = routingType;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* At this point this is only changed on testcases
|
||||
* The bridge shouldn't be sending blocking anyways
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.config;
|
|||
import java.io.Serializable;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
|
||||
public class DivertConfiguration implements Serializable {
|
||||
|
@ -40,7 +40,7 @@ public class DivertConfiguration implements Serializable {
|
|||
|
||||
private TransformerConfiguration transformerConfiguration = null;
|
||||
|
||||
private DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
|
||||
private ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
|
||||
|
||||
public DivertConfiguration() {
|
||||
}
|
||||
|
@ -73,7 +73,7 @@ public class DivertConfiguration implements Serializable {
|
|||
return transformerConfiguration;
|
||||
}
|
||||
|
||||
public DivertConfigurationRoutingType getRoutingType() {
|
||||
public ComponentConfigurationRoutingType getRoutingType() {
|
||||
return routingType;
|
||||
}
|
||||
|
||||
|
@ -140,7 +140,7 @@ public class DivertConfiguration implements Serializable {
|
|||
/**
|
||||
* @param routingType the routingType to set
|
||||
*/
|
||||
public DivertConfiguration setRoutingType(final DivertConfigurationRoutingType routingType) {
|
||||
public DivertConfiguration setRoutingType(final ComponentConfigurationRoutingType routingType) {
|
||||
this.routingType = routingType;
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.config.impl;
|
|||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
|
@ -200,14 +200,14 @@ public final class Validators {
|
|||
}
|
||||
};
|
||||
|
||||
public static final Validator DIVERT_ROUTING_TYPE = new Validator() {
|
||||
public static final Validator COMPONENT_ROUTING_TYPE = new Validator() {
|
||||
@Override
|
||||
public void validate(final String name, final Object value) {
|
||||
String val = (String) value;
|
||||
if (val == null || !val.equals(DivertConfigurationRoutingType.ANYCAST.toString()) &&
|
||||
!val.equals(DivertConfigurationRoutingType.MULTICAST.toString()) &&
|
||||
!val.equals(DivertConfigurationRoutingType.PASS.toString()) &&
|
||||
!val.equals(DivertConfigurationRoutingType.STRIP.toString())) {
|
||||
if (val == null || !val.equals(ComponentConfigurationRoutingType.ANYCAST.toString()) &&
|
||||
!val.equals(ComponentConfigurationRoutingType.MULTICAST.toString()) &&
|
||||
!val.equals(ComponentConfigurationRoutingType.PASS.toString()) &&
|
||||
!val.equals(ComponentConfigurationRoutingType.STRIP.toString())) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingType(val);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ import org.apache.activemq.artemis.core.config.storage.FileStorageConfiguration;
|
|||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.security.Role;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
|
@ -1781,6 +1781,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
String user = getString(brNode, "user", ActiveMQDefaultConfiguration.getDefaultClusterUser(), Validators.NO_CHECK);
|
||||
|
||||
ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(getString(brNode, "routing-type", ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType(), Validators.COMPONENT_ROUTING_TYPE));
|
||||
|
||||
|
||||
NodeList clusterPassNodes = brNode.getElementsByTagName("password");
|
||||
String password = null;
|
||||
|
||||
|
@ -1825,7 +1828,28 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
transformerConfiguration = getTransformerConfiguration(transformerClassName);
|
||||
}
|
||||
|
||||
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setMaxRetryInterval(maxRetryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password);
|
||||
BridgeConfiguration config = new BridgeConfiguration()
|
||||
.setName(name)
|
||||
.setQueueName(queueName)
|
||||
.setForwardingAddress(forwardingAddress)
|
||||
.setFilterString(filterString)
|
||||
.setTransformerConfiguration(transformerConfiguration)
|
||||
.setMinLargeMessageSize(minLargeMessageSize)
|
||||
.setClientFailureCheckPeriod(clientFailureCheckPeriod)
|
||||
.setConnectionTTL(connectionTTL)
|
||||
.setRetryInterval(retryInterval)
|
||||
.setMaxRetryInterval(maxRetryInterval)
|
||||
.setRetryIntervalMultiplier(retryIntervalMultiplier)
|
||||
.setInitialConnectAttempts(initialConnectAttempts)
|
||||
.setReconnectAttempts(reconnectAttempts)
|
||||
.setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode)
|
||||
.setUseDuplicateDetection(useDuplicateDetection)
|
||||
.setConfirmationWindowSize(confirmationWindowSize)
|
||||
.setProducerWindowSize(producerWindowSize)
|
||||
.setHA(ha)
|
||||
.setUser(user)
|
||||
.setPassword(password)
|
||||
.setRoutingType(routingType);
|
||||
|
||||
if (!staticConnectorNames.isEmpty()) {
|
||||
config.setStaticConnectors(staticConnectorNames);
|
||||
|
@ -1861,7 +1885,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
String transformerClassName = getString(e, "transformer-class-name", null, Validators.NO_CHECK);
|
||||
|
||||
DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.DIVERT_ROUTING_TYPE));
|
||||
ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.COMPONENT_ROUTING_TYPE));
|
||||
|
||||
TransformerConfiguration transformerConfiguration = null;
|
||||
|
||||
|
|
|
@ -92,7 +92,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
|
||||
import org.apache.activemq.artemis.core.server.Consumer;
|
||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
|
@ -2441,7 +2441,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
clearIO();
|
||||
try {
|
||||
TransformerConfiguration transformerConfiguration = transformerClassName == null ? null : new TransformerConfiguration(transformerClassName).setProperties(transformerProperties);
|
||||
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(DivertConfigurationRoutingType.valueOf(routingType));
|
||||
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(ComponentConfigurationRoutingType.valueOf(routingType));
|
||||
server.deployDivert(config);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
|
|
|
@ -475,7 +475,7 @@ public final class ClusterManager implements ActiveMQComponent {
|
|||
|
||||
clusterLocators.add(serverLocator);
|
||||
|
||||
Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server);
|
||||
Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server, config.getRoutingType());
|
||||
|
||||
bridges.put(config.getName(), bridge);
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
|
@ -50,6 +51,7 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
|
|||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.HandleStatus;
|
||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
|
@ -161,6 +163,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
|
||||
private final BridgeMetrics metrics = new BridgeMetrics();
|
||||
|
||||
private final ComponentConfigurationRoutingType routingType;
|
||||
|
||||
public BridgeImpl(final ServerLocatorInternal serverLocator,
|
||||
final int initialConnectAttempts,
|
||||
final int reconnectAttempts,
|
||||
|
@ -179,7 +183,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
final boolean useDuplicateDetection,
|
||||
final String user,
|
||||
final String password,
|
||||
final ActiveMQServer server) {
|
||||
final ActiveMQServer server,
|
||||
final ComponentConfigurationRoutingType routingType) {
|
||||
|
||||
this.sequentialID = server.getStorageManager().generateID();
|
||||
|
||||
|
@ -220,6 +225,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
this.password = password;
|
||||
|
||||
this.server = server;
|
||||
|
||||
this.routingType = routingType;
|
||||
}
|
||||
|
||||
/** For tests mainly */
|
||||
|
@ -550,6 +557,20 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
message.setAddress(forwardingAddress);
|
||||
}
|
||||
|
||||
switch (routingType) {
|
||||
case ANYCAST:
|
||||
message.setRoutingType(RoutingType.ANYCAST);
|
||||
break;
|
||||
case MULTICAST:
|
||||
message.setRoutingType(RoutingType.MULTICAST);
|
||||
break;
|
||||
case STRIP:
|
||||
message.setRoutingType(null);
|
||||
break;
|
||||
case PASS:
|
||||
break;
|
||||
}
|
||||
|
||||
if (transformer != null) {
|
||||
final Message transformedMessage = transformer.transform(message);
|
||||
if (transformedMessage != message) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
|
@ -42,6 +43,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
|
|||
import org.apache.activemq.artemis.core.postoffice.BindingType;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||
|
@ -111,7 +113,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
|
|||
final TransportConfiguration connector,
|
||||
final String storeAndForwardPrefix) {
|
||||
super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same
|
||||
retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server);
|
||||
retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server, ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType()));
|
||||
|
||||
this.discoveryLocator = discoveryLocator;
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
|
|||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||
import org.apache.activemq.artemis.core.server.Divert;
|
||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.jboss.logging.Logger;
|
||||
|
@ -51,7 +51,7 @@ public class DivertImpl implements Divert {
|
|||
|
||||
private final StorageManager storageManager;
|
||||
|
||||
private final DivertConfigurationRoutingType routingType;
|
||||
private final ComponentConfigurationRoutingType routingType;
|
||||
|
||||
public DivertImpl(final SimpleString forwardAddress,
|
||||
final SimpleString uniqueName,
|
||||
|
@ -61,7 +61,7 @@ public class DivertImpl implements Divert {
|
|||
final Transformer transformer,
|
||||
final PostOffice postOffice,
|
||||
final StorageManager storageManager,
|
||||
final DivertConfigurationRoutingType routingType) {
|
||||
final ComponentConfigurationRoutingType routingType) {
|
||||
this.forwardAddress = forwardAddress;
|
||||
|
||||
this.uniqueName = uniqueName;
|
||||
|
|
|
@ -1460,6 +1460,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="routing-type" type="component-routing-type" default="PASS" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
how should the routing-type on the bridged messages be set?
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:choice>
|
||||
<xsd:element name="static-connectors" maxOccurs="1" minOccurs="1">
|
||||
<xsd:complexType>
|
||||
|
@ -1868,7 +1876,7 @@
|
|||
|
||||
<xsd:element ref="filter" maxOccurs="1" minOccurs="0"/>
|
||||
|
||||
<xsd:element name="routing-type" type="divert-routing-type" default="STRIP" maxOccurs="1" minOccurs="0">
|
||||
<xsd:element name="routing-type" type="component-routing-type" default="STRIP" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
how should the routing-type on the diverted messages be set?
|
||||
|
@ -3183,7 +3191,7 @@
|
|||
</xsd:restriction>
|
||||
</xsd:simpleType>
|
||||
|
||||
<xsd:simpleType name="divert-routing-type">
|
||||
<xsd:simpleType name="component-routing-type">
|
||||
<xsd:restriction base="xsd:string">
|
||||
<xsd:enumeration value="ANYCAST"/>
|
||||
<xsd:enumeration value="MULTICAST"/>
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.Set;
|
|||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
|
||||
|
@ -246,6 +247,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
Assert.assertEquals(null, bc.getDiscoveryGroupName());
|
||||
Assert.assertEquals(444, bc.getProducerWindowSize());
|
||||
Assert.assertEquals(1073741824, bc.getConfirmationWindowSize());
|
||||
Assert.assertEquals(ComponentConfigurationRoutingType.STRIP, bc.getRoutingType());
|
||||
} else if (bc.getName().equals("bridge2")) {
|
||||
Assert.assertEquals("bridge2", bc.getName());
|
||||
Assert.assertEquals("queue2", bc.getQueueName());
|
||||
|
@ -255,6 +257,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
Assert.assertEquals(null, bc.getStaticConnectors());
|
||||
Assert.assertEquals("dg1", bc.getDiscoveryGroupName());
|
||||
Assert.assertEquals(568320, bc.getProducerWindowSize());
|
||||
Assert.assertEquals(ComponentConfigurationRoutingType.PASS, bc.getRoutingType());
|
||||
} else {
|
||||
Assert.assertEquals("bridge3", bc.getName());
|
||||
Assert.assertEquals("org.foo.BridgeTransformer3", bc.getTransformerConfiguration().getClassName());
|
||||
|
|
|
@ -178,6 +178,7 @@
|
|||
<use-duplicate-detection>true</use-duplicate-detection>
|
||||
<confirmation-window-size>1G</confirmation-window-size>
|
||||
<producer-window-size>444</producer-window-size>
|
||||
<routing-type>STRIP</routing-type>
|
||||
<static-connectors>
|
||||
<connector-ref>connector1</connector-ref>
|
||||
</static-connectors>
|
||||
|
|
|
@ -170,6 +170,7 @@
|
|||
<use-duplicate-detection>true</use-duplicate-detection>
|
||||
<confirmation-window-size>1G</confirmation-window-size>
|
||||
<producer-window-size>444</producer-window-size>
|
||||
<routing-type>STRIP</routing-type>
|
||||
<static-connectors>
|
||||
<connector-ref>connector1</connector-ref>
|
||||
</static-connectors>
|
||||
|
|
|
@ -254,6 +254,7 @@ Name | Description | Default
|
|||
[user](core-bridges.md) | Username for the bridge, the default is the cluster username. | n/a
|
||||
[password](core-bridges.md)| Password for the bridge, default is the cluster password. | n/a
|
||||
[reconnect-attempts-same-node](core-bridges.md) | Number of retries before trying another node. | 10
|
||||
[routing-type](core-bridges.md) | how to set the routing-type on the bridged message | `PASS`
|
||||
|
||||
## broadcast-group type
|
||||
|
||||
|
|
|
@ -60,6 +60,7 @@ actually from the bridge example):
|
|||
<confirmation-window-size>10000000</confirmation-window-size>
|
||||
<user>foouser</user>
|
||||
<password>foopassword</password>
|
||||
<routing-type>PASS</routing-type>
|
||||
<static-connectors>
|
||||
<connector-ref>remote-connector</connector-ref>
|
||||
</static-connectors>
|
||||
|
@ -194,6 +195,16 @@ Let's take a look at all the parameters in turn:
|
|||
the default cluster password specified by `cluster-password` in `broker.xml`
|
||||
will be used.
|
||||
|
||||
- `routing-type`. Bridges can apply a particular routing-type to the messages it
|
||||
forwards, strip the existing routing type, or simply pass the existing
|
||||
routing-type through. This is useful in situations where the message may have
|
||||
its routing-type set but you want to bridge it to an address using a different
|
||||
routing-type. It's important to keep in mind that a message with the `anycast`
|
||||
routing-type will not actually be routed to queues using `multicast` and
|
||||
vice-versa. By configuring the `routing-type` of the bridge you have the
|
||||
flexibility to deal with any situation. Valid values are `ANYCAST`,
|
||||
`MULTICAST`, `PASS`, & `STRIP`. The default is `PASS`.
|
||||
|
||||
- `static-connectors` or `discovery-group-ref`. Pick either of these options to
|
||||
connect the bridge to the target server.
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
|
@ -33,7 +33,7 @@ public class AmqpMessageDivertsTest extends AmqpClientTestSupport {
|
|||
|
||||
@Test(timeout = 60000)
|
||||
public void testQueueReceiverReadMessageWithDivert() throws Exception {
|
||||
runQueueReceiverReadMessageWithDivert(DivertConfigurationRoutingType.ANYCAST.toString());
|
||||
runQueueReceiverReadMessageWithDivert(ComponentConfigurationRoutingType.ANYCAST.toString());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
|
|
|
@ -25,7 +25,7 @@ import javax.jms.TextMessage;
|
|||
|
||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -38,8 +38,8 @@ public class DivertTopicToQueueTest extends JMSClientTestSupport {
|
|||
final String address2 = "bss.order.Consumer.cbma.workorderchanges.v1.queue";
|
||||
final String address3 = "bss.order.Consumer.pinpoint.workorderchanges.v1.queue";
|
||||
|
||||
DivertConfiguration dc1 = new DivertConfiguration().setName("WorkOrderChangesCBMA-Divert").setRoutingName("WorkOrderChangesCBMA-Divert").setAddress(address1).setForwardingAddress(address2).setExclusive(false).setRoutingType(DivertConfigurationRoutingType.ANYCAST);
|
||||
DivertConfiguration dc2 = new DivertConfiguration().setName("WorkOrderChangesPinpoint-Divert").setRoutingName("WorkOrderChangesPinpoint-Divert").setAddress(address1).setForwardingAddress(address3).setExclusive(false).setRoutingType(DivertConfigurationRoutingType.ANYCAST);
|
||||
DivertConfiguration dc1 = new DivertConfiguration().setName("WorkOrderChangesCBMA-Divert").setRoutingName("WorkOrderChangesCBMA-Divert").setAddress(address1).setForwardingAddress(address2).setExclusive(false).setRoutingType(ComponentConfigurationRoutingType.ANYCAST);
|
||||
DivertConfiguration dc2 = new DivertConfiguration().setName("WorkOrderChangesPinpoint-Divert").setRoutingName("WorkOrderChangesPinpoint-Divert").setAddress(address1).setForwardingAddress(address3).setExclusive(false).setRoutingType(ComponentConfigurationRoutingType.ANYCAST);
|
||||
|
||||
server.deployDivert(dc1);
|
||||
server.deployDivert(dc2);
|
||||
|
|
|
@ -0,0 +1,157 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.bridge;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class BridgeRoutingTest extends ActiveMQTestBase {
|
||||
|
||||
private ActiveMQServer server0;
|
||||
private ActiveMQServer server1;
|
||||
|
||||
private final boolean netty;
|
||||
|
||||
@Parameterized.Parameters(name = "isNetty={0}")
|
||||
public static Collection getParameters() {
|
||||
return Arrays.asList(new Object[][]{{true}, {false}});
|
||||
}
|
||||
|
||||
public BridgeRoutingTest(boolean isNetty) {
|
||||
this.netty = isNetty;
|
||||
}
|
||||
|
||||
protected boolean isNetty() {
|
||||
return netty;
|
||||
}
|
||||
|
||||
private String getServer0URL() {
|
||||
return isNetty() ? "tcp://localhost:61616" : "vm://0";
|
||||
}
|
||||
|
||||
private String getServer1URL() {
|
||||
return isNetty() ? "tcp://localhost:61617" : "vm://1";
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
server0 = createServer(false, createBasicConfig());
|
||||
server1 = createServer(false, createBasicConfig());
|
||||
server0.getConfiguration().addAcceptorConfiguration("acceptor", getServer0URL());
|
||||
server0.getConfiguration().addConnectorConfiguration("connector", getServer1URL());
|
||||
server1.getConfiguration().addAcceptorConfiguration("acceptor", getServer1URL());
|
||||
server0.start();
|
||||
server1.start();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAnycastBridge() throws Exception {
|
||||
testBridgeInternal(RoutingType.MULTICAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.ANYCAST, 0, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAnycastBridgeNegative() throws Exception {
|
||||
testBridgeInternal(RoutingType.MULTICAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.PASS, 500, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMulticastBridge() throws Exception {
|
||||
testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.MULTICAST, 0, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMulticastBridgeNegative() throws Exception {
|
||||
testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.PASS, 500, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPassBridge() throws Exception {
|
||||
testBridgeInternal(RoutingType.MULTICAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.PASS, 0, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPassBridge2() throws Exception {
|
||||
testBridgeInternal(RoutingType.ANYCAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.PASS, 0, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPassBridgeNegative() throws Exception {
|
||||
testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.PASS, 500, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStripBridge() throws Exception {
|
||||
testBridgeInternal(RoutingType.MULTICAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.STRIP, 0, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStripBridge2() throws Exception {
|
||||
testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.STRIP, 0, 1);
|
||||
}
|
||||
|
||||
private void testBridgeInternal(RoutingType sourceRoutingType,
|
||||
RoutingType destinationRoutingType,
|
||||
ComponentConfigurationRoutingType bridgeRoutingType,
|
||||
long sleepTime,
|
||||
int destinationMessageCount) throws Exception {
|
||||
SimpleString source = SimpleString.toSimpleString("source");
|
||||
SimpleString destination = SimpleString.toSimpleString("destination");
|
||||
|
||||
server0.createQueue(source, sourceRoutingType, source, null, true, false);
|
||||
server1.createQueue(destination, destinationRoutingType, destination, null, true, false);
|
||||
|
||||
server0.deployBridge(new BridgeConfiguration()
|
||||
.setRoutingType(bridgeRoutingType)
|
||||
.setName("bridge")
|
||||
.setForwardingAddress(destination.toString())
|
||||
.setQueueName(source.toString())
|
||||
.setConfirmationWindowSize(10)
|
||||
.setStaticConnectors(Arrays.asList("connector")));
|
||||
|
||||
try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer0URL());
|
||||
ClientSessionFactory sessionFactory = locator.createSessionFactory();
|
||||
ClientSession session = sessionFactory.createSession();
|
||||
ClientProducer producer = session.createProducer(source)) {
|
||||
producer.send(session.createMessage(true).setRoutingType(sourceRoutingType));
|
||||
}
|
||||
|
||||
Wait.waitFor(() -> server0.locateQueue(source).getMessageCount() == 0, 2000, 100);
|
||||
Wait.waitFor(() -> server0.getClusterManager().getBridges().get("bridge").getMetrics().getMessagesAcknowledged() == 1, 2000, 100);
|
||||
Thread.sleep(sleepTime);
|
||||
assertTrue(Wait.waitFor(() -> server1.locateQueue(destination).getMessageCount() == destinationMessageCount, 2000, 100));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue