This closes #2438
This commit is contained in:
commit
6e0dc8163d
|
@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.activemq.artemis.ArtemisConstants;
|
import org.apache.activemq.artemis.ArtemisConstants;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
|
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -367,7 +367,10 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
private static boolean DEFAULT_DIVERT_EXCLUSIVE = false;
|
private static boolean DEFAULT_DIVERT_EXCLUSIVE = false;
|
||||||
|
|
||||||
// how the divert should handle the message's routing type
|
// how the divert should handle the message's routing type
|
||||||
private static String DEFAULT_DIVERT_ROUTING_TYPE = DivertConfigurationRoutingType.STRIP.toString();
|
private static String DEFAULT_DIVERT_ROUTING_TYPE = ComponentConfigurationRoutingType.STRIP.toString();
|
||||||
|
|
||||||
|
// how the bridge should handle the message's routing type
|
||||||
|
private static String DEFAULT_BRIDGE_ROUTING_TYPE = ComponentConfigurationRoutingType.PASS.toString();
|
||||||
|
|
||||||
// If true then the server will request a backup on another node
|
// If true then the server will request a backup on another node
|
||||||
private static boolean DEFAULT_HAPOLICY_REQUEST_BACKUP = false;
|
private static boolean DEFAULT_HAPOLICY_REQUEST_BACKUP = false;
|
||||||
|
@ -1089,6 +1092,13 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
return DEFAULT_DIVERT_ROUTING_TYPE;
|
return DEFAULT_DIVERT_ROUTING_TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* how the bridge should handle the message's routing type
|
||||||
|
*/
|
||||||
|
public static String getDefaultBridgeRoutingType() {
|
||||||
|
return DEFAULT_BRIDGE_ROUTING_TYPE;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If true then the server will request a backup on another node
|
* If true then the server will request a backup on another node
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.server;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class essentially mirrors {@code RoutingType} except it has some additional members to support special
|
* This class essentially mirrors {@code RoutingType} except it has some additional members to support special
|
||||||
* configuration semantics for diverts. These additional members weren't put in {@code RoutingType} so as to not
|
* configuration semantics for diverts & bridges. These additional members weren't put in {@code RoutingType}
|
||||||
* confuse users.
|
* so as to not confuse users.
|
||||||
*/
|
*/
|
||||||
public enum DivertConfigurationRoutingType {
|
public enum ComponentConfigurationRoutingType {
|
||||||
|
|
||||||
MULTICAST, ANYCAST, STRIP, PASS;
|
MULTICAST, ANYCAST, STRIP, PASS;
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ public enum DivertConfigurationRoutingType {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static DivertConfigurationRoutingType getType(byte type) {
|
public static ComponentConfigurationRoutingType getType(byte type) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case 0:
|
case 0:
|
||||||
return MULTICAST;
|
return MULTICAST;
|
|
@ -21,6 +21,7 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
|
|
||||||
public final class BridgeConfiguration implements Serializable {
|
public final class BridgeConfiguration implements Serializable {
|
||||||
|
|
||||||
|
@ -75,6 +76,8 @@ public final class BridgeConfiguration implements Serializable {
|
||||||
// The bridge shouldn't be sending blocking anyways
|
// The bridge shouldn't be sending blocking anyways
|
||||||
private long callTimeout = ActiveMQClient.DEFAULT_CALL_TIMEOUT;
|
private long callTimeout = ActiveMQClient.DEFAULT_CALL_TIMEOUT;
|
||||||
|
|
||||||
|
private ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType());
|
||||||
|
|
||||||
public BridgeConfiguration() {
|
public BridgeConfiguration() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -337,6 +340,15 @@ public final class BridgeConfiguration implements Serializable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ComponentConfigurationRoutingType getRoutingType() {
|
||||||
|
return routingType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public BridgeConfiguration setRoutingType(ComponentConfigurationRoutingType routingType) {
|
||||||
|
this.routingType = routingType;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* At this point this is only changed on testcases
|
* At this point this is only changed on testcases
|
||||||
* The bridge shouldn't be sending blocking anyways
|
* The bridge shouldn't be sending blocking anyways
|
||||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.config;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
|
|
||||||
public class DivertConfiguration implements Serializable {
|
public class DivertConfiguration implements Serializable {
|
||||||
|
@ -40,7 +40,7 @@ public class DivertConfiguration implements Serializable {
|
||||||
|
|
||||||
private TransformerConfiguration transformerConfiguration = null;
|
private TransformerConfiguration transformerConfiguration = null;
|
||||||
|
|
||||||
private DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
|
private ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
|
||||||
|
|
||||||
public DivertConfiguration() {
|
public DivertConfiguration() {
|
||||||
}
|
}
|
||||||
|
@ -73,7 +73,7 @@ public class DivertConfiguration implements Serializable {
|
||||||
return transformerConfiguration;
|
return transformerConfiguration;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DivertConfigurationRoutingType getRoutingType() {
|
public ComponentConfigurationRoutingType getRoutingType() {
|
||||||
return routingType;
|
return routingType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -140,7 +140,7 @@ public class DivertConfiguration implements Serializable {
|
||||||
/**
|
/**
|
||||||
* @param routingType the routingType to set
|
* @param routingType the routingType to set
|
||||||
*/
|
*/
|
||||||
public DivertConfiguration setRoutingType(final DivertConfigurationRoutingType routingType) {
|
public DivertConfiguration setRoutingType(final ComponentConfigurationRoutingType routingType) {
|
||||||
this.routingType = routingType;
|
this.routingType = routingType;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.config.impl;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||||
|
@ -200,14 +200,14 @@ public final class Validators {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
public static final Validator DIVERT_ROUTING_TYPE = new Validator() {
|
public static final Validator COMPONENT_ROUTING_TYPE = new Validator() {
|
||||||
@Override
|
@Override
|
||||||
public void validate(final String name, final Object value) {
|
public void validate(final String name, final Object value) {
|
||||||
String val = (String) value;
|
String val = (String) value;
|
||||||
if (val == null || !val.equals(DivertConfigurationRoutingType.ANYCAST.toString()) &&
|
if (val == null || !val.equals(ComponentConfigurationRoutingType.ANYCAST.toString()) &&
|
||||||
!val.equals(DivertConfigurationRoutingType.MULTICAST.toString()) &&
|
!val.equals(ComponentConfigurationRoutingType.MULTICAST.toString()) &&
|
||||||
!val.equals(DivertConfigurationRoutingType.PASS.toString()) &&
|
!val.equals(ComponentConfigurationRoutingType.PASS.toString()) &&
|
||||||
!val.equals(DivertConfigurationRoutingType.STRIP.toString())) {
|
!val.equals(ComponentConfigurationRoutingType.STRIP.toString())) {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingType(val);
|
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingType(val);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ import org.apache.activemq.artemis.core.config.storage.FileStorageConfiguration;
|
||||||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.security.Role;
|
import org.apache.activemq.artemis.core.security.Role;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
|
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||||
|
@ -1781,6 +1781,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
|
|
||||||
String user = getString(brNode, "user", ActiveMQDefaultConfiguration.getDefaultClusterUser(), Validators.NO_CHECK);
|
String user = getString(brNode, "user", ActiveMQDefaultConfiguration.getDefaultClusterUser(), Validators.NO_CHECK);
|
||||||
|
|
||||||
|
ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(getString(brNode, "routing-type", ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType(), Validators.COMPONENT_ROUTING_TYPE));
|
||||||
|
|
||||||
|
|
||||||
NodeList clusterPassNodes = brNode.getElementsByTagName("password");
|
NodeList clusterPassNodes = brNode.getElementsByTagName("password");
|
||||||
String password = null;
|
String password = null;
|
||||||
|
|
||||||
|
@ -1825,7 +1828,28 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
transformerConfiguration = getTransformerConfiguration(transformerClassName);
|
transformerConfiguration = getTransformerConfiguration(transformerClassName);
|
||||||
}
|
}
|
||||||
|
|
||||||
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setMaxRetryInterval(maxRetryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password);
|
BridgeConfiguration config = new BridgeConfiguration()
|
||||||
|
.setName(name)
|
||||||
|
.setQueueName(queueName)
|
||||||
|
.setForwardingAddress(forwardingAddress)
|
||||||
|
.setFilterString(filterString)
|
||||||
|
.setTransformerConfiguration(transformerConfiguration)
|
||||||
|
.setMinLargeMessageSize(minLargeMessageSize)
|
||||||
|
.setClientFailureCheckPeriod(clientFailureCheckPeriod)
|
||||||
|
.setConnectionTTL(connectionTTL)
|
||||||
|
.setRetryInterval(retryInterval)
|
||||||
|
.setMaxRetryInterval(maxRetryInterval)
|
||||||
|
.setRetryIntervalMultiplier(retryIntervalMultiplier)
|
||||||
|
.setInitialConnectAttempts(initialConnectAttempts)
|
||||||
|
.setReconnectAttempts(reconnectAttempts)
|
||||||
|
.setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode)
|
||||||
|
.setUseDuplicateDetection(useDuplicateDetection)
|
||||||
|
.setConfirmationWindowSize(confirmationWindowSize)
|
||||||
|
.setProducerWindowSize(producerWindowSize)
|
||||||
|
.setHA(ha)
|
||||||
|
.setUser(user)
|
||||||
|
.setPassword(password)
|
||||||
|
.setRoutingType(routingType);
|
||||||
|
|
||||||
if (!staticConnectorNames.isEmpty()) {
|
if (!staticConnectorNames.isEmpty()) {
|
||||||
config.setStaticConnectors(staticConnectorNames);
|
config.setStaticConnectors(staticConnectorNames);
|
||||||
|
@ -1861,7 +1885,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
|
|
||||||
String transformerClassName = getString(e, "transformer-class-name", null, Validators.NO_CHECK);
|
String transformerClassName = getString(e, "transformer-class-name", null, Validators.NO_CHECK);
|
||||||
|
|
||||||
DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.DIVERT_ROUTING_TYPE));
|
ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.COMPONENT_ROUTING_TYPE));
|
||||||
|
|
||||||
TransformerConfiguration transformerConfiguration = null;
|
TransformerConfiguration transformerConfiguration = null;
|
||||||
|
|
||||||
|
|
|
@ -92,7 +92,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
|
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
|
||||||
import org.apache.activemq.artemis.core.server.Consumer;
|
import org.apache.activemq.artemis.core.server.Consumer;
|
||||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
|
@ -2441,7 +2441,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
clearIO();
|
clearIO();
|
||||||
try {
|
try {
|
||||||
TransformerConfiguration transformerConfiguration = transformerClassName == null ? null : new TransformerConfiguration(transformerClassName).setProperties(transformerProperties);
|
TransformerConfiguration transformerConfiguration = transformerClassName == null ? null : new TransformerConfiguration(transformerClassName).setProperties(transformerProperties);
|
||||||
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(DivertConfigurationRoutingType.valueOf(routingType));
|
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(ComponentConfigurationRoutingType.valueOf(routingType));
|
||||||
server.deployDivert(config);
|
server.deployDivert(config);
|
||||||
} finally {
|
} finally {
|
||||||
blockOnIO();
|
blockOnIO();
|
||||||
|
|
|
@ -475,7 +475,7 @@ public final class ClusterManager implements ActiveMQComponent {
|
||||||
|
|
||||||
clusterLocators.add(serverLocator);
|
clusterLocators.add(serverLocator);
|
||||||
|
|
||||||
Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server);
|
Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server, config.getRoutingType());
|
||||||
|
|
||||||
bridges.put(config.getName(), bridge);
|
bridges.put(config.getName(), bridge);
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||||
|
@ -50,6 +51,7 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
|
||||||
import org.apache.activemq.artemis.core.filter.Filter;
|
import org.apache.activemq.artemis.core.filter.Filter;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.HandleStatus;
|
import org.apache.activemq.artemis.core.server.HandleStatus;
|
||||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
|
@ -161,6 +163,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||||
|
|
||||||
private final BridgeMetrics metrics = new BridgeMetrics();
|
private final BridgeMetrics metrics = new BridgeMetrics();
|
||||||
|
|
||||||
|
private final ComponentConfigurationRoutingType routingType;
|
||||||
|
|
||||||
public BridgeImpl(final ServerLocatorInternal serverLocator,
|
public BridgeImpl(final ServerLocatorInternal serverLocator,
|
||||||
final int initialConnectAttempts,
|
final int initialConnectAttempts,
|
||||||
final int reconnectAttempts,
|
final int reconnectAttempts,
|
||||||
|
@ -179,7 +183,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||||
final boolean useDuplicateDetection,
|
final boolean useDuplicateDetection,
|
||||||
final String user,
|
final String user,
|
||||||
final String password,
|
final String password,
|
||||||
final ActiveMQServer server) {
|
final ActiveMQServer server,
|
||||||
|
final ComponentConfigurationRoutingType routingType) {
|
||||||
|
|
||||||
this.sequentialID = server.getStorageManager().generateID();
|
this.sequentialID = server.getStorageManager().generateID();
|
||||||
|
|
||||||
|
@ -220,6 +225,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||||
this.password = password;
|
this.password = password;
|
||||||
|
|
||||||
this.server = server;
|
this.server = server;
|
||||||
|
|
||||||
|
this.routingType = routingType;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** For tests mainly */
|
/** For tests mainly */
|
||||||
|
@ -550,6 +557,20 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||||
message.setAddress(forwardingAddress);
|
message.setAddress(forwardingAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch (routingType) {
|
||||||
|
case ANYCAST:
|
||||||
|
message.setRoutingType(RoutingType.ANYCAST);
|
||||||
|
break;
|
||||||
|
case MULTICAST:
|
||||||
|
message.setRoutingType(RoutingType.MULTICAST);
|
||||||
|
break;
|
||||||
|
case STRIP:
|
||||||
|
message.setRoutingType(null);
|
||||||
|
break;
|
||||||
|
case PASS:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (transformer != null) {
|
if (transformer != null) {
|
||||||
final Message transformedMessage = transformer.transform(message);
|
final Message transformedMessage = transformer.transform(message);
|
||||||
if (transformedMessage != message) {
|
if (transformedMessage != message) {
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
@ -42,6 +43,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
|
||||||
import org.apache.activemq.artemis.core.postoffice.BindingType;
|
import org.apache.activemq.artemis.core.postoffice.BindingType;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
|
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||||
|
@ -111,7 +113,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
|
||||||
final TransportConfiguration connector,
|
final TransportConfiguration connector,
|
||||||
final String storeAndForwardPrefix) {
|
final String storeAndForwardPrefix) {
|
||||||
super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same
|
super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same
|
||||||
retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server);
|
retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server, ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType()));
|
||||||
|
|
||||||
this.discoveryLocator = discoveryLocator;
|
this.discoveryLocator = discoveryLocator;
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
|
||||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
import org.apache.activemq.artemis.core.server.Divert;
|
import org.apache.activemq.artemis.core.server.Divert;
|
||||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
@ -51,7 +51,7 @@ public class DivertImpl implements Divert {
|
||||||
|
|
||||||
private final StorageManager storageManager;
|
private final StorageManager storageManager;
|
||||||
|
|
||||||
private final DivertConfigurationRoutingType routingType;
|
private final ComponentConfigurationRoutingType routingType;
|
||||||
|
|
||||||
public DivertImpl(final SimpleString forwardAddress,
|
public DivertImpl(final SimpleString forwardAddress,
|
||||||
final SimpleString uniqueName,
|
final SimpleString uniqueName,
|
||||||
|
@ -61,7 +61,7 @@ public class DivertImpl implements Divert {
|
||||||
final Transformer transformer,
|
final Transformer transformer,
|
||||||
final PostOffice postOffice,
|
final PostOffice postOffice,
|
||||||
final StorageManager storageManager,
|
final StorageManager storageManager,
|
||||||
final DivertConfigurationRoutingType routingType) {
|
final ComponentConfigurationRoutingType routingType) {
|
||||||
this.forwardAddress = forwardAddress;
|
this.forwardAddress = forwardAddress;
|
||||||
|
|
||||||
this.uniqueName = uniqueName;
|
this.uniqueName = uniqueName;
|
||||||
|
|
|
@ -1460,6 +1460,14 @@
|
||||||
</xsd:annotation>
|
</xsd:annotation>
|
||||||
</xsd:element>
|
</xsd:element>
|
||||||
|
|
||||||
|
<xsd:element name="routing-type" type="component-routing-type" default="PASS" maxOccurs="1" minOccurs="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
how should the routing-type on the bridged messages be set?
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
|
||||||
<xsd:choice>
|
<xsd:choice>
|
||||||
<xsd:element name="static-connectors" maxOccurs="1" minOccurs="1">
|
<xsd:element name="static-connectors" maxOccurs="1" minOccurs="1">
|
||||||
<xsd:complexType>
|
<xsd:complexType>
|
||||||
|
@ -1868,7 +1876,7 @@
|
||||||
|
|
||||||
<xsd:element ref="filter" maxOccurs="1" minOccurs="0"/>
|
<xsd:element ref="filter" maxOccurs="1" minOccurs="0"/>
|
||||||
|
|
||||||
<xsd:element name="routing-type" type="divert-routing-type" default="STRIP" maxOccurs="1" minOccurs="0">
|
<xsd:element name="routing-type" type="component-routing-type" default="STRIP" maxOccurs="1" minOccurs="0">
|
||||||
<xsd:annotation>
|
<xsd:annotation>
|
||||||
<xsd:documentation>
|
<xsd:documentation>
|
||||||
how should the routing-type on the diverted messages be set?
|
how should the routing-type on the diverted messages be set?
|
||||||
|
@ -3183,7 +3191,7 @@
|
||||||
</xsd:restriction>
|
</xsd:restriction>
|
||||||
</xsd:simpleType>
|
</xsd:simpleType>
|
||||||
|
|
||||||
<xsd:simpleType name="divert-routing-type">
|
<xsd:simpleType name="component-routing-type">
|
||||||
<xsd:restriction base="xsd:string">
|
<xsd:restriction base="xsd:string">
|
||||||
<xsd:enumeration value="ANYCAST"/>
|
<xsd:enumeration value="ANYCAST"/>
|
||||||
<xsd:enumeration value="MULTICAST"/>
|
<xsd:enumeration value="MULTICAST"/>
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.Set;
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
|
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
|
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
|
||||||
|
@ -246,6 +247,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
||||||
Assert.assertEquals(null, bc.getDiscoveryGroupName());
|
Assert.assertEquals(null, bc.getDiscoveryGroupName());
|
||||||
Assert.assertEquals(444, bc.getProducerWindowSize());
|
Assert.assertEquals(444, bc.getProducerWindowSize());
|
||||||
Assert.assertEquals(1073741824, bc.getConfirmationWindowSize());
|
Assert.assertEquals(1073741824, bc.getConfirmationWindowSize());
|
||||||
|
Assert.assertEquals(ComponentConfigurationRoutingType.STRIP, bc.getRoutingType());
|
||||||
} else if (bc.getName().equals("bridge2")) {
|
} else if (bc.getName().equals("bridge2")) {
|
||||||
Assert.assertEquals("bridge2", bc.getName());
|
Assert.assertEquals("bridge2", bc.getName());
|
||||||
Assert.assertEquals("queue2", bc.getQueueName());
|
Assert.assertEquals("queue2", bc.getQueueName());
|
||||||
|
@ -255,6 +257,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
||||||
Assert.assertEquals(null, bc.getStaticConnectors());
|
Assert.assertEquals(null, bc.getStaticConnectors());
|
||||||
Assert.assertEquals("dg1", bc.getDiscoveryGroupName());
|
Assert.assertEquals("dg1", bc.getDiscoveryGroupName());
|
||||||
Assert.assertEquals(568320, bc.getProducerWindowSize());
|
Assert.assertEquals(568320, bc.getProducerWindowSize());
|
||||||
|
Assert.assertEquals(ComponentConfigurationRoutingType.PASS, bc.getRoutingType());
|
||||||
} else {
|
} else {
|
||||||
Assert.assertEquals("bridge3", bc.getName());
|
Assert.assertEquals("bridge3", bc.getName());
|
||||||
Assert.assertEquals("org.foo.BridgeTransformer3", bc.getTransformerConfiguration().getClassName());
|
Assert.assertEquals("org.foo.BridgeTransformer3", bc.getTransformerConfiguration().getClassName());
|
||||||
|
|
|
@ -178,6 +178,7 @@
|
||||||
<use-duplicate-detection>true</use-duplicate-detection>
|
<use-duplicate-detection>true</use-duplicate-detection>
|
||||||
<confirmation-window-size>1G</confirmation-window-size>
|
<confirmation-window-size>1G</confirmation-window-size>
|
||||||
<producer-window-size>444</producer-window-size>
|
<producer-window-size>444</producer-window-size>
|
||||||
|
<routing-type>STRIP</routing-type>
|
||||||
<static-connectors>
|
<static-connectors>
|
||||||
<connector-ref>connector1</connector-ref>
|
<connector-ref>connector1</connector-ref>
|
||||||
</static-connectors>
|
</static-connectors>
|
||||||
|
|
|
@ -170,6 +170,7 @@
|
||||||
<use-duplicate-detection>true</use-duplicate-detection>
|
<use-duplicate-detection>true</use-duplicate-detection>
|
||||||
<confirmation-window-size>1G</confirmation-window-size>
|
<confirmation-window-size>1G</confirmation-window-size>
|
||||||
<producer-window-size>444</producer-window-size>
|
<producer-window-size>444</producer-window-size>
|
||||||
|
<routing-type>STRIP</routing-type>
|
||||||
<static-connectors>
|
<static-connectors>
|
||||||
<connector-ref>connector1</connector-ref>
|
<connector-ref>connector1</connector-ref>
|
||||||
</static-connectors>
|
</static-connectors>
|
||||||
|
|
|
@ -254,6 +254,7 @@ Name | Description | Default
|
||||||
[user](core-bridges.md) | Username for the bridge, the default is the cluster username. | n/a
|
[user](core-bridges.md) | Username for the bridge, the default is the cluster username. | n/a
|
||||||
[password](core-bridges.md)| Password for the bridge, default is the cluster password. | n/a
|
[password](core-bridges.md)| Password for the bridge, default is the cluster password. | n/a
|
||||||
[reconnect-attempts-same-node](core-bridges.md) | Number of retries before trying another node. | 10
|
[reconnect-attempts-same-node](core-bridges.md) | Number of retries before trying another node. | 10
|
||||||
|
[routing-type](core-bridges.md) | how to set the routing-type on the bridged message | `PASS`
|
||||||
|
|
||||||
## broadcast-group type
|
## broadcast-group type
|
||||||
|
|
||||||
|
|
|
@ -60,6 +60,7 @@ actually from the bridge example):
|
||||||
<confirmation-window-size>10000000</confirmation-window-size>
|
<confirmation-window-size>10000000</confirmation-window-size>
|
||||||
<user>foouser</user>
|
<user>foouser</user>
|
||||||
<password>foopassword</password>
|
<password>foopassword</password>
|
||||||
|
<routing-type>PASS</routing-type>
|
||||||
<static-connectors>
|
<static-connectors>
|
||||||
<connector-ref>remote-connector</connector-ref>
|
<connector-ref>remote-connector</connector-ref>
|
||||||
</static-connectors>
|
</static-connectors>
|
||||||
|
@ -194,6 +195,16 @@ Let's take a look at all the parameters in turn:
|
||||||
the default cluster password specified by `cluster-password` in `broker.xml`
|
the default cluster password specified by `cluster-password` in `broker.xml`
|
||||||
will be used.
|
will be used.
|
||||||
|
|
||||||
|
- `routing-type`. Bridges can apply a particular routing-type to the messages it
|
||||||
|
forwards, strip the existing routing type, or simply pass the existing
|
||||||
|
routing-type through. This is useful in situations where the message may have
|
||||||
|
its routing-type set but you want to bridge it to an address using a different
|
||||||
|
routing-type. It's important to keep in mind that a message with the `anycast`
|
||||||
|
routing-type will not actually be routed to queues using `multicast` and
|
||||||
|
vice-versa. By configuring the `routing-type` of the bridge you have the
|
||||||
|
flexibility to deal with any situation. Valid values are `ANYCAST`,
|
||||||
|
`MULTICAST`, `PASS`, & `STRIP`. The default is `PASS`.
|
||||||
|
|
||||||
- `static-connectors` or `discovery-group-ref`. Pick either of these options to
|
- `static-connectors` or `discovery-group-ref`. Pick either of these options to
|
||||||
connect the bridge to the target server.
|
connect the bridge to the target server.
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||||
|
@ -33,7 +33,7 @@ public class AmqpMessageDivertsTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testQueueReceiverReadMessageWithDivert() throws Exception {
|
public void testQueueReceiverReadMessageWithDivert() throws Exception {
|
||||||
runQueueReceiverReadMessageWithDivert(DivertConfigurationRoutingType.ANYCAST.toString());
|
runQueueReceiverReadMessageWithDivert(ComponentConfigurationRoutingType.ANYCAST.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
|
|
|
@ -25,7 +25,7 @@ import javax.jms.TextMessage;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -38,8 +38,8 @@ public class DivertTopicToQueueTest extends JMSClientTestSupport {
|
||||||
final String address2 = "bss.order.Consumer.cbma.workorderchanges.v1.queue";
|
final String address2 = "bss.order.Consumer.cbma.workorderchanges.v1.queue";
|
||||||
final String address3 = "bss.order.Consumer.pinpoint.workorderchanges.v1.queue";
|
final String address3 = "bss.order.Consumer.pinpoint.workorderchanges.v1.queue";
|
||||||
|
|
||||||
DivertConfiguration dc1 = new DivertConfiguration().setName("WorkOrderChangesCBMA-Divert").setRoutingName("WorkOrderChangesCBMA-Divert").setAddress(address1).setForwardingAddress(address2).setExclusive(false).setRoutingType(DivertConfigurationRoutingType.ANYCAST);
|
DivertConfiguration dc1 = new DivertConfiguration().setName("WorkOrderChangesCBMA-Divert").setRoutingName("WorkOrderChangesCBMA-Divert").setAddress(address1).setForwardingAddress(address2).setExclusive(false).setRoutingType(ComponentConfigurationRoutingType.ANYCAST);
|
||||||
DivertConfiguration dc2 = new DivertConfiguration().setName("WorkOrderChangesPinpoint-Divert").setRoutingName("WorkOrderChangesPinpoint-Divert").setAddress(address1).setForwardingAddress(address3).setExclusive(false).setRoutingType(DivertConfigurationRoutingType.ANYCAST);
|
DivertConfiguration dc2 = new DivertConfiguration().setName("WorkOrderChangesPinpoint-Divert").setRoutingName("WorkOrderChangesPinpoint-Divert").setAddress(address1).setForwardingAddress(address3).setExclusive(false).setRoutingType(ComponentConfigurationRoutingType.ANYCAST);
|
||||||
|
|
||||||
server.deployDivert(dc1);
|
server.deployDivert(dc1);
|
||||||
server.deployDivert(dc2);
|
server.deployDivert(dc2);
|
||||||
|
|
|
@ -0,0 +1,157 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.bridge;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@RunWith(value = Parameterized.class)
|
||||||
|
public class BridgeRoutingTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
private ActiveMQServer server0;
|
||||||
|
private ActiveMQServer server1;
|
||||||
|
|
||||||
|
private final boolean netty;
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "isNetty={0}")
|
||||||
|
public static Collection getParameters() {
|
||||||
|
return Arrays.asList(new Object[][]{{true}, {false}});
|
||||||
|
}
|
||||||
|
|
||||||
|
public BridgeRoutingTest(boolean isNetty) {
|
||||||
|
this.netty = isNetty;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isNetty() {
|
||||||
|
return netty;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getServer0URL() {
|
||||||
|
return isNetty() ? "tcp://localhost:61616" : "vm://0";
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getServer1URL() {
|
||||||
|
return isNetty() ? "tcp://localhost:61617" : "vm://1";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
server0 = createServer(false, createBasicConfig());
|
||||||
|
server1 = createServer(false, createBasicConfig());
|
||||||
|
server0.getConfiguration().addAcceptorConfiguration("acceptor", getServer0URL());
|
||||||
|
server0.getConfiguration().addConnectorConfiguration("connector", getServer1URL());
|
||||||
|
server1.getConfiguration().addAcceptorConfiguration("acceptor", getServer1URL());
|
||||||
|
server0.start();
|
||||||
|
server1.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAnycastBridge() throws Exception {
|
||||||
|
testBridgeInternal(RoutingType.MULTICAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.ANYCAST, 0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAnycastBridgeNegative() throws Exception {
|
||||||
|
testBridgeInternal(RoutingType.MULTICAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.PASS, 500, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMulticastBridge() throws Exception {
|
||||||
|
testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.MULTICAST, 0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMulticastBridgeNegative() throws Exception {
|
||||||
|
testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.PASS, 500, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPassBridge() throws Exception {
|
||||||
|
testBridgeInternal(RoutingType.MULTICAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.PASS, 0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPassBridge2() throws Exception {
|
||||||
|
testBridgeInternal(RoutingType.ANYCAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.PASS, 0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPassBridgeNegative() throws Exception {
|
||||||
|
testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.PASS, 500, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStripBridge() throws Exception {
|
||||||
|
testBridgeInternal(RoutingType.MULTICAST, RoutingType.ANYCAST, ComponentConfigurationRoutingType.STRIP, 0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStripBridge2() throws Exception {
|
||||||
|
testBridgeInternal(RoutingType.ANYCAST, RoutingType.MULTICAST, ComponentConfigurationRoutingType.STRIP, 0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testBridgeInternal(RoutingType sourceRoutingType,
|
||||||
|
RoutingType destinationRoutingType,
|
||||||
|
ComponentConfigurationRoutingType bridgeRoutingType,
|
||||||
|
long sleepTime,
|
||||||
|
int destinationMessageCount) throws Exception {
|
||||||
|
SimpleString source = SimpleString.toSimpleString("source");
|
||||||
|
SimpleString destination = SimpleString.toSimpleString("destination");
|
||||||
|
|
||||||
|
server0.createQueue(source, sourceRoutingType, source, null, true, false);
|
||||||
|
server1.createQueue(destination, destinationRoutingType, destination, null, true, false);
|
||||||
|
|
||||||
|
server0.deployBridge(new BridgeConfiguration()
|
||||||
|
.setRoutingType(bridgeRoutingType)
|
||||||
|
.setName("bridge")
|
||||||
|
.setForwardingAddress(destination.toString())
|
||||||
|
.setQueueName(source.toString())
|
||||||
|
.setConfirmationWindowSize(10)
|
||||||
|
.setStaticConnectors(Arrays.asList("connector")));
|
||||||
|
|
||||||
|
try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer0URL());
|
||||||
|
ClientSessionFactory sessionFactory = locator.createSessionFactory();
|
||||||
|
ClientSession session = sessionFactory.createSession();
|
||||||
|
ClientProducer producer = session.createProducer(source)) {
|
||||||
|
producer.send(session.createMessage(true).setRoutingType(sourceRoutingType));
|
||||||
|
}
|
||||||
|
|
||||||
|
Wait.waitFor(() -> server0.locateQueue(source).getMessageCount() == 0, 2000, 100);
|
||||||
|
Wait.waitFor(() -> server0.getClusterManager().getBridges().get("bridge").getMetrics().getMessagesAcknowledged() == 1, 2000, 100);
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
assertTrue(Wait.waitFor(() -> server1.locateQueue(destination).getMessageCount() == destinationMessageCount, 2000, 100));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue