diff --git a/activemq-stomp/pom.xml b/activemq-stomp/pom.xml
index c51ff7098f..69d1c5fe17 100755
--- a/activemq-stomp/pom.xml
+++ b/activemq-stomp/pom.xml
@@ -84,6 +84,11 @@
junit
test
+
+ org.mockito
+ mockito-core
+ test
+
org.slf4j
slf4j-log4j12
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
index 8cfa1219e4..013c1ec7dc 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
@@ -16,25 +16,31 @@
*/
package org.apache.activemq.transport.stomp;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implements ActiveMQ 4.0 translations
*/
public class LegacyFrameTranslator implements FrameTranslator {
+ private static final Logger LOG = LoggerFactory.getLogger(LegacyFrameTranslator.class);
+
+ @Override
public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
final Map, ?> headers = command.getHeaders();
final ActiveMQMessage msg;
@@ -87,6 +93,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
return msg;
}
+ @Override
public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
StompFrame command = new StompFrame();
command.setAction(Stomp.Responses.MESSAGE);
@@ -126,6 +133,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
return command;
}
+ @Override
public String convertDestination(ProtocolConverter converter, Destination d) {
if (d == null) {
return null;
@@ -156,6 +164,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
return buffer.toString();
}
+ @Override
public ActiveMQDestination convertDestination(ProtocolConverter converter, String name, boolean forceFallback) throws ProtocolException {
if (name == null) {
return null;
@@ -166,38 +175,64 @@ public class LegacyFrameTranslator implements FrameTranslator {
String originalName = name;
name = name.trim();
- if (name.startsWith("/queue/")) {
- String qName = name.substring("/queue/".length(), name.length());
- return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE);
- } else if (name.startsWith("/topic/")) {
- String tName = name.substring("/topic/".length(), name.length());
- return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
- } else if (name.startsWith("/remote-temp-queue/")) {
- String tName = name.substring("/remote-temp-queue/".length(), name.length());
- return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
- } else if (name.startsWith("/remote-temp-topic/")) {
- String tName = name.substring("/remote-temp-topic/".length(), name.length());
- return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
- } else if (name.startsWith("/temp-queue/")) {
- return converter.createTempDestination(name, false);
- } else if (name.startsWith("/temp-topic/")) {
- return converter.createTempDestination(name, true);
- } else {
- if (forceFallback) {
- try {
- ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(originalName);
- if (fallback != null) {
- return fallback;
- }
- } catch (JMSException e) {
- throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations "
- + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e);
- }
- }
- throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations "
- + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+ String[] destinations = name.split(",");
+ if (destinations == null || destinations.length == 0) {
+ destinations = new String[] { name };
}
+
+ StringBuilder destinationBuilder = new StringBuilder();
+ for (int i = 0; i < destinations.length; ++i) {
+ String destinationName = destinations[i];
+
+ if (destinationName.startsWith("/queue/")) {
+ destinationName = destinationName.substring("/queue/".length(), destinationName.length());
+ destinationBuilder.append(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + destinationName);
+ } else if (destinationName.startsWith("/topic/")) {
+ destinationName = destinationName.substring("/topic/".length(), destinationName.length());
+ destinationBuilder.append(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + destinationName);
+ } else if (destinationName.startsWith("/remote-temp-queue/")) {
+ destinationName = destinationName.substring("/remote-temp-queue/".length(), destinationName.length());
+ destinationBuilder.append(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX + destinationName);
+ } else if (destinationName.startsWith("/remote-temp-topic/")) {
+ destinationName = destinationName.substring("/remote-temp-topic/".length(), destinationName.length());
+ destinationBuilder.append(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX + destinationName);
+ } else if (destinationName.startsWith("/temp-queue/")) {
+ ActiveMQDestination converted = converter.createTempDestination(destinationName, false);
+ destinationBuilder.append(converted.getQualifiedName());
+ } else if (destinationName.startsWith("/temp-topic/")) {
+ ActiveMQDestination converted = converter.createTempDestination(destinationName, true);
+ destinationBuilder.append(converted.getQualifiedName());
+ } else {
+ if (forceFallback) {
+ String fallbackName = destinationName;
+ if (destinationName.length() == 1) {
+ // Use the original non-trimmed name instead
+ fallbackName = originalName;
+ }
+
+ try {
+ ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(fallbackName);
+ if (fallback != null) {
+ destinationBuilder.append(fallback.getQualifiedName());
+ continue;
+ }
+ } catch (JMSException e) {
+ throw new ProtocolException("Illegal destination name: [" + fallbackName + "] -- ActiveMQ STOMP destinations "
+ + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e);
+ }
+ }
+
+ throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations "
+ + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+ }
+
+ if (i < destinations.length - 1) {
+ destinationBuilder.append(",");
+ }
+ }
+
+ LOG.trace("New Composite Destination name: {}", destinationBuilder);
+
+ return ActiveMQDestination.createDestination(destinationBuilder.toString(), ActiveMQDestination.QUEUE_TYPE);
}
-
-
}
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
index cfeff9eb71..bcf4714523 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
@@ -600,7 +600,7 @@ public class ProtocolConverter {
throw new ProtocolException("Invalid Subscription: cannot durably subscribe to a Queue destination!");
}
- consumerInfo.setDestination(translator.convertDestination(this, destination, true));
+ consumerInfo.setDestination(actualDest);
StompSubscription stompSubscription;
if (!consumerInfo.isBrowser()) {
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/LegacyFrameTranslatorTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/LegacyFrameTranslatorTest.java
new file mode 100644
index 0000000000..85b51dba0b
--- /dev/null
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/LegacyFrameTranslatorTest.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.UUID;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests for conversion capabilities of LegacyFrameTranslator
+ */
+public class LegacyFrameTranslatorTest {
+
+ private ProtocolConverter converter;
+ private LegacyFrameTranslator translator;
+
+ @Before
+ public void setUp() {
+ converter = Mockito.mock(ProtocolConverter.class);
+
+ // Stub out a temp destination creation
+ Mockito.when(converter.createTempDestination(Mockito.anyString(), Mockito.anyBoolean())).thenAnswer(new Answer() {
+
+ @Override
+ public ActiveMQDestination answer(InvocationOnMock invocation) throws Throwable {
+
+ String name = invocation.getArgumentAt(0, String.class);
+ boolean topic = invocation.getArgumentAt(1, Boolean.class);
+
+ name = "temp-" + (topic ? "topic://" : "queue://X:") + UUID.randomUUID().toString();
+
+ return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
+ }
+ });
+
+ translator = new LegacyFrameTranslator();
+ }
+
+ @Test(timeout = 10000)
+ public void testConvertQueue() throws Exception {
+ ActiveMQDestination destination = translator.convertDestination(converter, "/queue/test", false);
+
+ assertFalse(destination.isComposite());
+ assertEquals("test", destination.getPhysicalName());
+ assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType());
+ }
+
+ @Test(timeout = 10000)
+ public void testConvertTopic() throws Exception {
+ ActiveMQDestination destination = translator.convertDestination(converter, "/topic/test", false);
+
+ assertFalse(destination.isComposite());
+ assertEquals("test", destination.getPhysicalName());
+ assertEquals(ActiveMQDestination.TOPIC_TYPE, destination.getDestinationType());
+ }
+
+ @Test(timeout = 10000)
+ public void testConvertTemporaryQueue() throws Exception {
+ ActiveMQDestination destination = translator.convertDestination(converter, "/temp-queue/test", false);
+
+ assertFalse(destination.isComposite());
+ assertEquals(ActiveMQDestination.TEMP_QUEUE_TYPE, destination.getDestinationType());
+ }
+
+ @Test(timeout = 10000)
+ public void testConvertTemporaryTopic() throws Exception {
+ ActiveMQDestination destination = translator.convertDestination(converter, "/temp-topic/test", false);
+
+ assertFalse(destination.isComposite());
+ assertEquals(ActiveMQDestination.TEMP_TOPIC_TYPE, destination.getDestinationType());
+ }
+
+ @Test(timeout = 10000)
+ public void testConvertRemoteTempQueue() throws Exception {
+ ActiveMQDestination destination = translator.convertDestination(converter, "/remote-temp-queue/test", false);
+
+ assertFalse(destination.isComposite());
+ assertEquals("test", destination.getPhysicalName());
+ assertEquals(ActiveMQDestination.TEMP_QUEUE_TYPE, destination.getDestinationType());
+ }
+
+ @Test(timeout = 10000)
+ public void testConvertRemoteTempTopic() throws Exception {
+ ActiveMQDestination destination = translator.convertDestination(converter, "/remote-temp-topic/test", false);
+
+ assertFalse(destination.isComposite());
+ assertEquals("test", destination.getPhysicalName());
+ assertEquals(ActiveMQDestination.TEMP_TOPIC_TYPE, destination.getDestinationType());
+ }
+
+ @Test(timeout = 10000)
+ public void testConvertCompositeQueues() throws Exception {
+ String destinationA = "destinationA";
+ String destinationB = "destinationB";
+
+ String composite = "/queue/" + destinationA + ",/queue/" + destinationB;
+
+ ActiveMQDestination destination = translator.convertDestination(converter, composite, false);
+
+ assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType());
+ assertTrue(destination.isComposite());
+ ActiveMQDestination[] composites = destination.getCompositeDestinations();
+ assertEquals(2, composites.length);
+
+ Arrays.sort(composites);
+
+ assertEquals(ActiveMQDestination.QUEUE_TYPE, composites[0].getDestinationType());
+ assertEquals(ActiveMQDestination.QUEUE_TYPE, composites[1].getDestinationType());
+
+ assertEquals(destinationA, composites[0].getPhysicalName());
+ assertEquals(destinationB, composites[1].getPhysicalName());
+ }
+
+ @Test(timeout = 10000)
+ public void testConvertCompositeTopics() throws Exception {
+ String destinationA = "destinationA";
+ String destinationB = "destinationB";
+
+ String composite = "/topic/" + destinationA + ",/topic/" + destinationB;
+
+ ActiveMQDestination destination = translator.convertDestination(converter, composite, false);
+
+ assertEquals(ActiveMQDestination.TOPIC_TYPE, destination.getDestinationType());
+ assertTrue(destination.isComposite());
+ ActiveMQDestination[] composites = destination.getCompositeDestinations();
+ assertEquals(2, composites.length);
+
+ Arrays.sort(composites);
+
+ assertEquals(ActiveMQDestination.TOPIC_TYPE, composites[0].getDestinationType());
+ assertEquals(ActiveMQDestination.TOPIC_TYPE, composites[1].getDestinationType());
+
+ assertEquals(destinationA, composites[0].getPhysicalName());
+ assertEquals(destinationB, composites[1].getPhysicalName());
+ }
+
+ @Test(timeout = 10000)
+ public void testConvertCompositeQueueAndTopic() throws Exception {
+ String destinationA = "destinationA";
+ String destinationB = "destinationB";
+
+ String composite = "/queue/" + destinationA + ",/topic/" + destinationB;
+
+ ActiveMQDestination destination = translator.convertDestination(converter, composite, false);
+
+ assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType());
+ assertTrue(destination.isComposite());
+ ActiveMQDestination[] composites = destination.getCompositeDestinations();
+ assertEquals(2, composites.length);
+
+ Arrays.sort(composites);
+
+ assertEquals(ActiveMQDestination.QUEUE_TYPE, composites[0].getDestinationType());
+ assertEquals(ActiveMQDestination.TOPIC_TYPE, composites[1].getDestinationType());
+
+ assertEquals(destinationA, composites[0].getPhysicalName());
+ assertEquals(destinationB, composites[1].getPhysicalName());
+ }
+
+ @Test(timeout = 10000)
+ public void testConvertCompositeMixture() throws Exception {
+ String destinationA = "destinationA";
+ String destinationB = "destinationB";
+ String destinationC = "destinationC";
+ String destinationD = "destinationD";
+
+ String composite = "/queue/" + destinationA + ",/topic/" + destinationB +
+ ",/temp-queue/" + destinationC + ",/temp-topic/" + destinationD;
+
+ ActiveMQDestination destination = translator.convertDestination(converter, composite, false);
+
+ assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType());
+ assertTrue(destination.isComposite());
+ ActiveMQDestination[] composites = destination.getCompositeDestinations();
+ assertEquals(4, composites.length);
+
+ Arrays.sort(composites);
+
+ boolean foundQueue = false;
+ boolean foundTopic = false;
+ boolean foundTempTopic = false;
+ boolean foundTempQueue = false;
+
+ for (ActiveMQDestination dest : composites) {
+ if (dest.getDestinationType() == ActiveMQDestination.QUEUE_TYPE) {
+ foundQueue = true;
+ } else if (dest.getDestinationType() == ActiveMQDestination.TOPIC_TYPE) {
+ foundTopic = true;
+ } else if (dest.getDestinationType() == ActiveMQDestination.TEMP_TOPIC_TYPE) {
+ foundTempTopic = true;
+ } else if (dest.getDestinationType() == ActiveMQDestination.TEMP_QUEUE_TYPE) {
+ foundTempQueue = true;
+ }
+ }
+
+ assertTrue(foundQueue);
+ assertTrue(foundTopic);
+ assertTrue(foundTempTopic);
+ assertTrue(foundTempQueue);
+ }
+}
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java
new file mode 100644
index 0000000000..223a84a2b6
--- /dev/null
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for support of composite destination support over STOMP
+ */
+public class StompCompositeDestinationTest extends StompTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StompCompositeDestinationTest.class);
+
+ protected ActiveMQConnection connection;
+
+ @Override
+ public void tearDown() throws Exception {
+ try {
+ connection.close();
+ } catch (Exception ex) {}
+
+ super.tearDown();
+ }
+
+ @Test(timeout = 20000)
+ public void testSubscribeToCompositeQueue() throws Exception {
+ stompConnect();
+
+ String destinationA = "StompA";
+ String destinationB = "StompB";
+
+ String frame = "CONNECT\n" +
+ "login:system\n" +
+ "passcode:manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ LOG.info("Subscribing to destination: {},{}", destinationA, destinationB);
+
+ frame = "SUBSCRIBE\n" +
+ "destination:/queue/" + destinationA + ",/queue/" + destinationB + "\n" +
+ "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ // Test in same order as the subscribe command
+
+ sendMessage(destinationA, false);
+ sendMessage(destinationB, false);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("MESSAGE"));
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("MESSAGE"));
+
+ // Test the reverse ordering
+
+ sendMessage(destinationB, false);
+ sendMessage(destinationA, false);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("MESSAGE"));
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("MESSAGE"));
+
+ stompConnection.disconnect();
+ }
+
+ @Test(timeout = 20000)
+ public void testSubscribeToCompositeQueueTrailersDefault() throws Exception {
+ stompConnect();
+
+ String destinationA = "StompA";
+ String destinationB = "StompB";
+
+ String frame = "CONNECT\n" +
+ "login:system\n" +
+ "passcode:manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ LOG.info("Subscribing to destination: {},{}", destinationA, destinationB);
+
+ frame = "SUBSCRIBE\n" +
+ "destination:/queue/" + destinationA + "," + destinationB + "\n" +
+ "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ // Test in same order as the subscribe command
+
+ sendMessage(destinationA, false);
+ sendMessage(destinationB, false);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("MESSAGE"));
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("MESSAGE"));
+
+ // Test the reverse ordering
+
+ sendMessage(destinationB, false);
+ sendMessage(destinationA, false);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("MESSAGE"));
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("MESSAGE"));
+
+ stompConnection.disconnect();
+ }
+
+ @Test(timeout = 20000)
+ public void testSubscribeToCompositeTopics() throws Exception {
+ stompConnect();
+
+ String destinationA = "StompA";
+ String destinationB = "StompB";
+
+ String frame = "CONNECT\n" +
+ "login:system\n" +
+ "passcode:manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ LOG.info("Subscribing to destination: {},{}", destinationA, destinationB);
+
+ frame = "SUBSCRIBE\n" +
+ "destination:/topic/" + destinationA + ",/topic/" + destinationB + "\n" +
+ "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ // Test in same order as the subscribe command
+
+ sendMessage(destinationA, true);
+ sendMessage(destinationB, true);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("MESSAGE"));
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("MESSAGE"));
+
+ // Test the reverse ordering
+
+ sendMessage(destinationB, true);
+ sendMessage(destinationA, true);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("MESSAGE"));
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("MESSAGE"));
+
+ stompConnection.disconnect();
+ }
+
+ @Test(timeout = 60000)
+ public void testSendMessageToCompositeQueue() throws Exception {
+ stompConnect();
+
+ String destinationA = "StompA";
+ String destinationB = "StompB";
+
+ String frame = "CONNECT\n" +
+ "login:system\n" +
+ "passcode:manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n" +
+ "destination:/queue/" + destinationA + ",/queue/" + destinationB +
+ "\n\n" + "Hello World" + Stomp.NULL;
+
+ stompConnection.sendFrame(frame);
+
+ final BrokerViewMBean brokerView = getProxyToBroker();
+ assertTrue("Should be two destinations for the dispatch", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokerView.getQueues().length == 2;
+ }
+ }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(150)));
+
+ QueueViewMBean viewOfA = getProxyToQueue(destinationA);
+ QueueViewMBean viewOfB = getProxyToQueue(destinationB);
+
+ assertNotNull(viewOfA);
+ assertNotNull(viewOfB);
+
+ assertEquals(1, viewOfA.getQueueSize());
+ assertEquals(1, viewOfB.getQueueSize());
+
+ stompConnection.disconnect();
+ }
+
+ @Test(timeout = 60000)
+ public void testSendMessageToCompositeTopic() throws Exception {
+ stompConnect();
+
+ String destinationA = "StompA";
+ String destinationB = "StompB";
+
+ String frame = "CONNECT\n" +
+ "login:system\n" +
+ "passcode:manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n" +
+ "destination:/topic/" + destinationA + ",/topic/" + destinationB +
+ "\n\n" + "Hello World" + Stomp.NULL;
+
+ stompConnection.sendFrame(frame);
+
+ final BrokerViewMBean brokerView = getProxyToBroker();
+ assertTrue("Should be two destinations for the dispatch", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokerView.getTopics().length == 2;
+ }
+ }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(150)));
+
+ TopicViewMBean viewOfA = getProxyToTopic(destinationA);
+ TopicViewMBean viewOfB = getProxyToTopic(destinationB);
+
+ assertNotNull(viewOfA);
+ assertNotNull(viewOfB);
+
+ assertEquals(1, viewOfA.getEnqueueCount());
+ assertEquals(1, viewOfB.getEnqueueCount());
+
+ stompConnection.disconnect();
+ }
+
+ private void sendMessage(String destinationName, boolean topic) throws JMSException {
+ Connection connection = cf.createConnection("system", "manager");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = null;
+
+ if (topic) {
+ destination = session.createTopic(destinationName);
+ } else {
+ destination = session.createQueue(destinationName);
+ }
+
+ MessageProducer producer = session.createProducer(destination);
+ TextMessage message = session.createTextMessage("test");
+ producer.send(message);
+
+ connection.close();
+ }
+}