ARTEMIS-789 AMQP tests for routing semantics

This commit is contained in:
jbertram 2016-12-14 09:30:43 -06:00
parent c18ee83190
commit 3af1e5c734
3 changed files with 78 additions and 8 deletions

View File

@ -418,10 +418,10 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID)));
ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID), "invm"));
if (netty) {
configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY));
configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap<String, Object>(), "netty", new HashMap<String, Object>()));
}
return configuration;

View File

@ -19,8 +19,10 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.net.URI;
import java.util.LinkedList;
import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
@ -105,6 +107,13 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
serverConfig.addAddressConfiguration(address);
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(false);
Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations();
for (TransportConfiguration tc : acceptors) {
if (tc.getName().equals("netty")) {
tc.getExtraParams().put("anycastPrefix", "anycast://");
tc.getExtraParams().put("multicastPrefix", "multicast://");
}
}
serverManager.start();
server.start();
return server;

View File

@ -16,11 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
import javax.jms.JMSException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@ -30,7 +26,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -48,7 +47,10 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
/**
* Test basic send and receive scenarios using only AMQP sender and receiver links.
@ -177,6 +179,65 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testAnycastMessageRoutingExclusivity() throws Exception {
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
ActiveMQServerControl serverControl = server.getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
sendMessages("anycast://" + addressA, 1);
assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
}
@Test
public void testMulticastMessageRoutingExclusivity() throws Exception {
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
ActiveMQServerControl serverControl = server.getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
sendMessages("multicast://" + addressA, 1);
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
}
@Test
public void testAmbiguousMessageRouting() throws Exception {
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
final String queueD = "queueD";
ActiveMQServerControl serverControl = server.getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString());
sendMessages(addressA, 1);
assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount());
}
@Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
int MSG_COUNT = 4;