ARTEMIS-877 STOMP tests + routing header

This commit is contained in:
jbertram 2016-12-13 17:59:09 -06:00
parent 2290ab40c3
commit 30b1335614
2 changed files with 73 additions and 0 deletions

View File

@ -178,6 +178,9 @@ public abstract class VersionedStompFrameHandler {
long timestamp = System.currentTimeMillis();
ServerMessageImpl message = connection.createServerMessage();
if (routingType != null) {
message.putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType());
}
message.setTimestamp(timestamp);
message.setAddress(SimpleString.toSimpleString(destination));
StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);

View File

@ -40,12 +40,14 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@ -1472,4 +1474,72 @@ public class StompTest extends StompTestBase {
conn.disconnect();
}
@Test
public void testAnycastMessageRoutingExclusivity() throws Exception {
conn.connect(defUser, defPass);
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
ActiveMQServer activeMQServer = server.getActiveMQServer();
ActiveMQServerControl serverControl = server.getActiveMQServer().getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
send(conn, addressA, null, "Hello World!", true, RoutingType.ANYCAST);
assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
}
@Test
public void testMulticastMessageRoutingExclusivity() throws Exception {
conn.connect(defUser, defPass);
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
ActiveMQServer activeMQServer = server.getActiveMQServer();
ActiveMQServerControl serverControl = server.getActiveMQServer().getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
send(conn, addressA, null, "Hello World!", true, RoutingType.MULTICAST);
assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
}
@Test
public void testAmbiguousMessageRouting() throws Exception {
conn.connect(defUser, defPass);
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
final String queueD = "queueD";
ActiveMQServer activeMQServer = server.getActiveMQServer();
ActiveMQServerControl serverControl = server.getActiveMQServer().getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString());
send(conn, addressA, null, "Hello World!", true);
assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount());
}
}