This closes #914

This commit is contained in:
Clebert Suconic 2016-12-14 09:23:41 -05:00
commit d49a5178ca
12 changed files with 163 additions and 86 deletions

View File

@ -203,14 +203,6 @@ public class TransportConstants {
public static final String STOMP_MIN_LARGE_MESSAGE_SIZE = "stomp-min-large-message-size";
public static final String STOMP_ANYCAST_PREFIX = "stompAnycastPrefix";
public static final String DEFAULT_STOMP_ANYCAST_PREFIX = "";
public static final String STOMP_MULTICAST_PREFIX = "stompMulticastPrefix";
public static final String DEFAULT_STOMP_MULTICAST_PREFIX = "";
public static final String NETTY_CONNECT_TIMEOUT = "connect-timeout-millis";
public static final int DEFAULT_NETTY_CONNECT_TIMEOUT = -1;
@ -250,8 +242,6 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT);
allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE);
allowableAcceptorKeys.add(TransportConstants.STOMP_ANYCAST_PREFIX);
allowableAcceptorKeys.add(TransportConstants.STOMP_MULTICAST_PREFIX);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MAX);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MIN);

View File

@ -90,10 +90,6 @@ public final class StompConnection implements RemotingConnection {
private final int minLargeMessageSize;
private final String anycastPrefix;
private final String multicastPrefix;
private StompVersions version;
private VersionedStompFrameHandler frameHandler;
@ -168,8 +164,6 @@ public final class StompConnection implements RemotingConnection {
this.enableMessageID = ConfigurationHelper.getBooleanProperty(TransportConstants.STOMP_ENABLE_MESSAGE_ID, false, acceptorUsed.getConfiguration());
this.minLargeMessageSize = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, acceptorUsed.getConfiguration());
this.anycastPrefix = ConfigurationHelper.getStringProperty(TransportConstants.STOMP_ANYCAST_PREFIX, TransportConstants.DEFAULT_STOMP_ANYCAST_PREFIX, acceptorUsed.getConfiguration());
this.multicastPrefix = ConfigurationHelper.getStringProperty(TransportConstants.STOMP_MULTICAST_PREFIX, TransportConstants.DEFAULT_STOMP_MULTICAST_PREFIX, acceptorUsed.getConfiguration());
}
@Override
@ -255,14 +249,14 @@ public final class StompConnection implements RemotingConnection {
// TODO this should take a type - send or receive so it knows whether to check the address or the queue
public void checkDestination(String destination) throws ActiveMQStompException {
if (!manager.destinationExists(destination)) {
if (!manager.destinationExists(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString())) {
throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler);
}
}
public boolean autoCreateDestinationIfPossible(String queue, RoutingType routingType) throws ActiveMQStompException {
boolean result = false;
ServerSession session = getSession().getSession();
ServerSession session = getSession().getCoreSession();
try {
if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(queue)) == null) {
@ -291,9 +285,9 @@ public final class StompConnection implements RemotingConnection {
}
public void checkRoutingSemantics(String destination, RoutingType routingType) throws ActiveMQStompException {
Set<RoutingType> actualDeliveryModesOfAddres = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingTypes();
if (routingType != null && !actualDeliveryModesOfAddres.contains(routingType)) {
throw BUNDLE.illegalSemantics(routingType.toString(), actualDeliveryModesOfAddres.toString());
Set<RoutingType> actualDeliveryModesOfAddress = manager.getServer().getAddressInfo(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination))).getRoutingTypes();
if (routingType != null && !actualDeliveryModesOfAddress.contains(routingType)) {
throw BUNDLE.illegalSemantics(routingType.toString(), actualDeliveryModesOfAddress.toString());
}
}
@ -757,14 +751,6 @@ public final class StompConnection implements RemotingConnection {
return minLargeMessageSize;
}
public String getAnycastPrefix() {
return anycastPrefix;
}
public String getMulticastPrefix() {
return multicastPrefix;
}
public StompProtocolManager getManager() {
return manager;
}

View File

@ -261,9 +261,9 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
StompSession session = sessions.remove(connection.getID());
if (session != null) {
try {
session.getSession().stop();
session.getSession().rollback(true);
session.getSession().close(false);
session.getCoreSession().stop();
session.getCoreSession().rollback(true);
session.getCoreSession().close(false);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e);
}
@ -274,7 +274,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
while (iterator.hasNext()) {
Map.Entry<String, StompSession> entry = iterator.next();
if (entry.getValue().getConnection() == connection) {
ServerSession serverSession = entry.getValue().getSession();
ServerSession serverSession = entry.getValue().getCoreSession();
try {
serverSession.rollback(true);
serverSession.close(false);
@ -355,7 +355,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
throw new ActiveMQStompException(connection, "No transaction started: " + txID);
}
transactedSessions.remove(txID);
session.getSession().commit();
session.getCoreSession().commit();
}
public void abortTransaction(StompConnection connection, String txID) throws Exception {
@ -364,7 +364,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
throw new ActiveMQStompException(connection, "No transaction started: " + txID);
}
transactedSessions.remove(txID);
session.getSession().rollback(false);
session.getCoreSession().rollback(false);
}
// Inner classes -------------------------------------------------

View File

@ -90,7 +90,7 @@ public class StompSession implements SessionCallback {
this.session = session;
}
public ServerSession getSession() {
public ServerSession getCoreSession() {
return session;
}
@ -287,7 +287,7 @@ public class StompSession implements SessionCallback {
receiveCredits = -1;
}
Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingTypes();
Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(SimpleString.toSimpleString(destination))).getRoutingTypes();
if (routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST)) {
// subscribes to a topic
pubSub = true;

View File

@ -178,6 +178,9 @@ public abstract class VersionedStompFrameHandler {
long timestamp = System.currentTimeMillis();
ServerMessageImpl message = connection.createServerMessage();
if (routingType != null) {
message.putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType());
}
message.setTimestamp(timestamp);
message.setAddress(SimpleString.toSimpleString(destination));
StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
@ -236,25 +239,25 @@ public abstract class VersionedStompFrameHandler {
return response;
}
public StompFrame onSubscribe(StompFrame request) {
public StompFrame onSubscribe(StompFrame frame) {
StompFrame response = null;
String destination = getDestination(request);
String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
String id = request.getHeader(Stomp.Headers.Subscribe.ID);
String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
}
RoutingType routingType = getRoutingType(request.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), request.getHeader(Headers.Subscribe.DESTINATION));
boolean noLocal = false;
if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
noLocal = Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
}
try {
String destination = getDestination(frame);
String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR);
String ack = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
String id = frame.getHeader(Stomp.Headers.Subscribe.ID);
String durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
if (durableSubscriptionName == null) {
durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
}
RoutingType routingType = getRoutingType(frame.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), frame.getHeader(Headers.Subscribe.DESTINATION));
boolean noLocal = false;
if (frame.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
}
connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType);
} catch (ActiveMQStompException e) {
response = e.getFrame();
@ -264,14 +267,7 @@ public abstract class VersionedStompFrameHandler {
}
public String getDestination(StompFrame request) {
String destination = request.getHeader(Headers.Subscribe.DESTINATION);
if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) {
destination = destination.substring(connection.getMulticastPrefix().length());
} else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) {
destination = destination.substring(connection.getAnycastPrefix().length());
}
return destination;
return request.getHeader(Headers.Subscribe.DESTINATION);
}
public StompFrame postprocess(StompFrame request) {
@ -344,17 +340,13 @@ public abstract class VersionedStompFrameHandler {
connection.destroy();
}
private RoutingType getRoutingType(String typeHeader, String destination) {
private RoutingType getRoutingType(String typeHeader, String destination) throws ActiveMQStompException {
// null is valid to return here so we know when the user didn't provide any routing info
RoutingType routingType = null;
RoutingType routingType;
if (typeHeader != null) {
routingType = RoutingType.valueOf(typeHeader);
} else if (destination != null && !connection.getAnycastPrefix().equals(connection.getMulticastPrefix())) {
if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) {
routingType = RoutingType.MULTICAST;
} else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) {
routingType = RoutingType.ANYCAST;
}
} else {
routingType = connection.getSession().getCoreSession().getAddressAndRoutingType(SimpleString.toSimpleString(destination), null).getB();
}
return routingType;
}

View File

@ -646,7 +646,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public void createQueue(final String address, final String name, final String routingType) throws Exception {
createQueue(address, name, routingType, true);
createQueue(address, name, true, routingType);
}
@Override

View File

@ -21,6 +21,7 @@ import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Set;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.persistence.OperationContext;
@ -250,4 +251,36 @@ public interface ServerSession extends SecurityAuth {
SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
AddressInfo getAddress(SimpleString address);
/**
* Strip the prefix (if it exists) from the address based on the prefixes provided to the ServerSession constructor.
*
* @param address the address to inspect
* @return the canonical (i.e. non-prefixed) address name
*/
SimpleString removePrefix(SimpleString address);
/**
* Get the canonical (i.e. non-prefixed) address and the corresponding routing-type.
*
* @param address the address to inspect
* @param defaultRoutingType the {@code org.apache.activemq.artemis.core.server.RoutingType} to return if no prefix
* match is found.
* @return a {@code org.apache.activemq.artemis.api.core.Pair} representing the canonical (i.e. non-prefixed) address
* name and the {@code org.apache.activemq.artemis.core.server.RoutingType} corresponding to the that prefix.
*/
Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address, RoutingType defaultRoutingType);
/**
* Get the canonical (i.e. non-prefixed) address and the corresponding routing-type.
*
* @param address the address to inspect
* @param defaultRoutingTypes a the {@code java.util.Set} of {@code org.apache.activemq.artemis.core.server.RoutingType}
* objects to return if no prefix match is found.
* @return a {@code org.apache.activemq.artemis.api.core.Pair} representing the canonical (i.e. non-prefixed) address
* name and the {@code java.util.Set} of {@code org.apache.activemq.artemis.core.server.RoutingType} objects
* corresponding to the that prefix.
*/
Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
Set<RoutingType> defaultRoutingTypes);
}

View File

@ -601,7 +601,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
public void resetNodeManager() throws Exception {
nodeManager.stop();
if (nodeManager != null) {
nodeManager.stop();
}
nodeManager = createNodeManager(configuration.getJournalLocation(), true);
}

View File

@ -1726,14 +1726,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
}
private SimpleString removePrefix(SimpleString address) {
@Override
public SimpleString removePrefix(SimpleString address) {
if (prefixEnabled) {
return PrefixUtil.getAddress(address, prefixes);
}
return address;
}
private Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address,
@Override
public Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address,
RoutingType defaultRoutingType) {
if (prefixEnabled) {
return PrefixUtil.getAddressAndRoutingType(address, defaultRoutingType, prefixes);
@ -1741,7 +1743,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return new Pair<>(address, defaultRoutingType);
}
private Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
@Override
public Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
Set<RoutingType> defaultRoutingTypes) {
if (prefixEnabled) {
return PrefixUtil.getAddressAndRoutingTypes(address, defaultRoutingTypes, prefixes);

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -98,14 +99,14 @@ public class ReplicationWithDivertTest extends ActiveMQTestBase {
backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)).
setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).
setLargeMessagesDirectory(getLargeMessagesDir(0, true));
backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE));
backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE));
backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE).setRoutingType(RoutingType.ANYCAST));
backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE).setRoutingType(RoutingType.ANYCAST));
DivertConfiguration divertConfiguration = new DivertConfiguration().setName("Test").setAddress(SOURCE_QUEUE).setForwardingAddress(TARGET_QUEUE).setRoutingName("Test");
liveConfig = createDefaultInVMConfig();
liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE).setDurable(true));
liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE).setDurable(true));
liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE).setDurable(true).setRoutingType(RoutingType.ANYCAST));
liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE).setDurable(true).setRoutingType(RoutingType.ANYCAST));
liveConfig.addDivertConfiguration(divertConfiguration);
backupConfig.addDivertConfiguration(divertConfiguration);

View File

@ -30,6 +30,7 @@ import java.util.Random;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSContext;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
@ -95,7 +96,7 @@ public class JmsProducerTest extends JMSTestBase {
@Test
public void multipleSendsUsingSetters() throws Exception {
server.createQueue(SimpleString.toSimpleString("q1"), SimpleString.toSimpleString("q1"), null, true, false);
server.createQueue(SimpleString.toSimpleString("q1"), RoutingType.ANYCAST, SimpleString.toSimpleString("q1"), null, true, false);
Queue q1 = context.createQueue("q1");

View File

@ -40,12 +40,14 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@ -1306,7 +1308,7 @@ public class StompTest extends StompTestBase {
final String ADDRESS = UUID.randomUUID().toString();
final String PREFIXED_ADDRESS = prefix + ADDRESS;
String param = routingType.toString();
String urlParam = "stomp" + param.substring(0, 1) + param.substring(1).toLowerCase() + "Prefix";
String urlParam = param.toLowerCase() + "Prefix";
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start();
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
conn.connect(defUser, defPass);
@ -1329,9 +1331,9 @@ public class StompTest extends StompTestBase {
AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
assertNotNull("No address was created with the name " + ADDRESS, addressInfo);
Set<RoutingType> deliveryModest = new HashSet<>();
deliveryModest.add(RoutingType.valueOf(param));
assertEquals(deliveryModest, addressInfo.getRoutingTypes());
Set<RoutingType> routingTypes = new HashSet<>();
routingTypes.add(RoutingType.valueOf(param));
assertEquals(routingTypes, addressInfo.getRoutingTypes());
conn.disconnect();
}
@ -1360,8 +1362,7 @@ public class StompTest extends StompTestBase {
int port = 61614;
final String ADDRESS = UUID.randomUUID().toString();
final String PREFIXED_ADDRESS = prefix + ADDRESS;
String param = routingType.toString();
String urlParam = "stomp" + param.substring(0, 1) + param.substring(1).toLowerCase() + "Prefix";
String urlParam = routingType.toString().toLowerCase() + "Prefix";
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start();
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
conn.connect(defUser, defPass);
@ -1473,4 +1474,72 @@ public class StompTest extends StompTestBase {
conn.disconnect();
}
@Test
public void testAnycastMessageRoutingExclusivity() throws Exception {
conn.connect(defUser, defPass);
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
ActiveMQServer activeMQServer = server.getActiveMQServer();
ActiveMQServerControl serverControl = server.getActiveMQServer().getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
send(conn, addressA, null, "Hello World!", true, RoutingType.ANYCAST);
assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
}
@Test
public void testMulticastMessageRoutingExclusivity() throws Exception {
conn.connect(defUser, defPass);
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
ActiveMQServer activeMQServer = server.getActiveMQServer();
ActiveMQServerControl serverControl = server.getActiveMQServer().getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
send(conn, addressA, null, "Hello World!", true, RoutingType.MULTICAST);
assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
}
@Test
public void testAmbiguousMessageRouting() throws Exception {
conn.connect(defUser, defPass);
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
final String queueD = "queueD";
ActiveMQServer activeMQServer = server.getActiveMQServer();
ActiveMQServerControl serverControl = server.getActiveMQServer().getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString());
send(conn, addressA, null, "Hello World!", true);
assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount());
}
}