From eccbd87156c141194313e54a8a1bc24c206e6723 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 30 Oct 2015 11:02:27 -0400 Subject: [PATCH] AMQ-6030 Add support for composite destinations to STOMP. (cherry picked from commit c360c3e4a38b51f72b109a095a648433754acc2d) --- activemq-stomp/pom.xml | 5 + .../stomp/LegacyFrameTranslator.java | 113 ++++--- .../transport/stomp/ProtocolConverter.java | 2 +- .../stomp/LegacyFrameTranslatorTest.java | 225 +++++++++++++ .../stomp/StompCompositeDestinationTest.java | 295 ++++++++++++++++++ 5 files changed, 600 insertions(+), 40 deletions(-) create mode 100644 activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/LegacyFrameTranslatorTest.java create mode 100644 activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java diff --git a/activemq-stomp/pom.xml b/activemq-stomp/pom.xml index f2f9ab3834..3a837e069f 100755 --- a/activemq-stomp/pom.xml +++ b/activemq-stomp/pom.xml @@ -84,6 +84,11 @@ junit test + + org.mockito + mockito-core + test + org.slf4j slf4j-log4j12 diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java index 8cfa1219e4..013c1ec7dc 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java @@ -16,25 +16,31 @@ */ package org.apache.activemq.transport.stomp; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.JMSException; + import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteSequence; - -import javax.jms.Destination; -import javax.jms.JMSException; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implements ActiveMQ 4.0 translations */ public class LegacyFrameTranslator implements FrameTranslator { + private static final Logger LOG = LoggerFactory.getLogger(LegacyFrameTranslator.class); + + @Override public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException { final Map headers = command.getHeaders(); final ActiveMQMessage msg; @@ -87,6 +93,7 @@ public class LegacyFrameTranslator implements FrameTranslator { return msg; } + @Override public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException { StompFrame command = new StompFrame(); command.setAction(Stomp.Responses.MESSAGE); @@ -126,6 +133,7 @@ public class LegacyFrameTranslator implements FrameTranslator { return command; } + @Override public String convertDestination(ProtocolConverter converter, Destination d) { if (d == null) { return null; @@ -156,6 +164,7 @@ public class LegacyFrameTranslator implements FrameTranslator { return buffer.toString(); } + @Override public ActiveMQDestination convertDestination(ProtocolConverter converter, String name, boolean forceFallback) throws ProtocolException { if (name == null) { return null; @@ -166,38 +175,64 @@ public class LegacyFrameTranslator implements FrameTranslator { String originalName = name; name = name.trim(); - if (name.startsWith("/queue/")) { - String qName = name.substring("/queue/".length(), name.length()); - return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE); - } else if (name.startsWith("/topic/")) { - String tName = name.substring("/topic/".length(), name.length()); - return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE); - } else if (name.startsWith("/remote-temp-queue/")) { - String tName = name.substring("/remote-temp-queue/".length(), name.length()); - return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE); - } else if (name.startsWith("/remote-temp-topic/")) { - String tName = name.substring("/remote-temp-topic/".length(), name.length()); - return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE); - } else if (name.startsWith("/temp-queue/")) { - return converter.createTempDestination(name, false); - } else if (name.startsWith("/temp-topic/")) { - return converter.createTempDestination(name, true); - } else { - if (forceFallback) { - try { - ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(originalName); - if (fallback != null) { - return fallback; - } - } catch (JMSException e) { - throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations " - + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e); - } - } - throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations " - + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/"); + String[] destinations = name.split(","); + if (destinations == null || destinations.length == 0) { + destinations = new String[] { name }; } + + StringBuilder destinationBuilder = new StringBuilder(); + for (int i = 0; i < destinations.length; ++i) { + String destinationName = destinations[i]; + + if (destinationName.startsWith("/queue/")) { + destinationName = destinationName.substring("/queue/".length(), destinationName.length()); + destinationBuilder.append(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + destinationName); + } else if (destinationName.startsWith("/topic/")) { + destinationName = destinationName.substring("/topic/".length(), destinationName.length()); + destinationBuilder.append(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + destinationName); + } else if (destinationName.startsWith("/remote-temp-queue/")) { + destinationName = destinationName.substring("/remote-temp-queue/".length(), destinationName.length()); + destinationBuilder.append(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX + destinationName); + } else if (destinationName.startsWith("/remote-temp-topic/")) { + destinationName = destinationName.substring("/remote-temp-topic/".length(), destinationName.length()); + destinationBuilder.append(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX + destinationName); + } else if (destinationName.startsWith("/temp-queue/")) { + ActiveMQDestination converted = converter.createTempDestination(destinationName, false); + destinationBuilder.append(converted.getQualifiedName()); + } else if (destinationName.startsWith("/temp-topic/")) { + ActiveMQDestination converted = converter.createTempDestination(destinationName, true); + destinationBuilder.append(converted.getQualifiedName()); + } else { + if (forceFallback) { + String fallbackName = destinationName; + if (destinationName.length() == 1) { + // Use the original non-trimmed name instead + fallbackName = originalName; + } + + try { + ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(fallbackName); + if (fallback != null) { + destinationBuilder.append(fallback.getQualifiedName()); + continue; + } + } catch (JMSException e) { + throw new ProtocolException("Illegal destination name: [" + fallbackName + "] -- ActiveMQ STOMP destinations " + + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e); + } + } + + throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations " + + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/"); + } + + if (i < destinations.length - 1) { + destinationBuilder.append(","); + } + } + + LOG.trace("New Composite Destination name: {}", destinationBuilder); + + return ActiveMQDestination.createDestination(destinationBuilder.toString(), ActiveMQDestination.QUEUE_TYPE); } - - } diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index cfeff9eb71..bcf4714523 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -600,7 +600,7 @@ public class ProtocolConverter { throw new ProtocolException("Invalid Subscription: cannot durably subscribe to a Queue destination!"); } - consumerInfo.setDestination(translator.convertDestination(this, destination, true)); + consumerInfo.setDestination(actualDest); StompSubscription stompSubscription; if (!consumerInfo.isBrowser()) { diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/LegacyFrameTranslatorTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/LegacyFrameTranslatorTest.java new file mode 100644 index 0000000000..85b51dba0b --- /dev/null +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/LegacyFrameTranslatorTest.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.UUID; + +import org.apache.activemq.command.ActiveMQDestination; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Tests for conversion capabilities of LegacyFrameTranslator + */ +public class LegacyFrameTranslatorTest { + + private ProtocolConverter converter; + private LegacyFrameTranslator translator; + + @Before + public void setUp() { + converter = Mockito.mock(ProtocolConverter.class); + + // Stub out a temp destination creation + Mockito.when(converter.createTempDestination(Mockito.anyString(), Mockito.anyBoolean())).thenAnswer(new Answer() { + + @Override + public ActiveMQDestination answer(InvocationOnMock invocation) throws Throwable { + + String name = invocation.getArgumentAt(0, String.class); + boolean topic = invocation.getArgumentAt(1, Boolean.class); + + name = "temp-" + (topic ? "topic://" : "queue://X:") + UUID.randomUUID().toString(); + + return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE); + } + }); + + translator = new LegacyFrameTranslator(); + } + + @Test(timeout = 10000) + public void testConvertQueue() throws Exception { + ActiveMQDestination destination = translator.convertDestination(converter, "/queue/test", false); + + assertFalse(destination.isComposite()); + assertEquals("test", destination.getPhysicalName()); + assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType()); + } + + @Test(timeout = 10000) + public void testConvertTopic() throws Exception { + ActiveMQDestination destination = translator.convertDestination(converter, "/topic/test", false); + + assertFalse(destination.isComposite()); + assertEquals("test", destination.getPhysicalName()); + assertEquals(ActiveMQDestination.TOPIC_TYPE, destination.getDestinationType()); + } + + @Test(timeout = 10000) + public void testConvertTemporaryQueue() throws Exception { + ActiveMQDestination destination = translator.convertDestination(converter, "/temp-queue/test", false); + + assertFalse(destination.isComposite()); + assertEquals(ActiveMQDestination.TEMP_QUEUE_TYPE, destination.getDestinationType()); + } + + @Test(timeout = 10000) + public void testConvertTemporaryTopic() throws Exception { + ActiveMQDestination destination = translator.convertDestination(converter, "/temp-topic/test", false); + + assertFalse(destination.isComposite()); + assertEquals(ActiveMQDestination.TEMP_TOPIC_TYPE, destination.getDestinationType()); + } + + @Test(timeout = 10000) + public void testConvertRemoteTempQueue() throws Exception { + ActiveMQDestination destination = translator.convertDestination(converter, "/remote-temp-queue/test", false); + + assertFalse(destination.isComposite()); + assertEquals("test", destination.getPhysicalName()); + assertEquals(ActiveMQDestination.TEMP_QUEUE_TYPE, destination.getDestinationType()); + } + + @Test(timeout = 10000) + public void testConvertRemoteTempTopic() throws Exception { + ActiveMQDestination destination = translator.convertDestination(converter, "/remote-temp-topic/test", false); + + assertFalse(destination.isComposite()); + assertEquals("test", destination.getPhysicalName()); + assertEquals(ActiveMQDestination.TEMP_TOPIC_TYPE, destination.getDestinationType()); + } + + @Test(timeout = 10000) + public void testConvertCompositeQueues() throws Exception { + String destinationA = "destinationA"; + String destinationB = "destinationB"; + + String composite = "/queue/" + destinationA + ",/queue/" + destinationB; + + ActiveMQDestination destination = translator.convertDestination(converter, composite, false); + + assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType()); + assertTrue(destination.isComposite()); + ActiveMQDestination[] composites = destination.getCompositeDestinations(); + assertEquals(2, composites.length); + + Arrays.sort(composites); + + assertEquals(ActiveMQDestination.QUEUE_TYPE, composites[0].getDestinationType()); + assertEquals(ActiveMQDestination.QUEUE_TYPE, composites[1].getDestinationType()); + + assertEquals(destinationA, composites[0].getPhysicalName()); + assertEquals(destinationB, composites[1].getPhysicalName()); + } + + @Test(timeout = 10000) + public void testConvertCompositeTopics() throws Exception { + String destinationA = "destinationA"; + String destinationB = "destinationB"; + + String composite = "/topic/" + destinationA + ",/topic/" + destinationB; + + ActiveMQDestination destination = translator.convertDestination(converter, composite, false); + + assertEquals(ActiveMQDestination.TOPIC_TYPE, destination.getDestinationType()); + assertTrue(destination.isComposite()); + ActiveMQDestination[] composites = destination.getCompositeDestinations(); + assertEquals(2, composites.length); + + Arrays.sort(composites); + + assertEquals(ActiveMQDestination.TOPIC_TYPE, composites[0].getDestinationType()); + assertEquals(ActiveMQDestination.TOPIC_TYPE, composites[1].getDestinationType()); + + assertEquals(destinationA, composites[0].getPhysicalName()); + assertEquals(destinationB, composites[1].getPhysicalName()); + } + + @Test(timeout = 10000) + public void testConvertCompositeQueueAndTopic() throws Exception { + String destinationA = "destinationA"; + String destinationB = "destinationB"; + + String composite = "/queue/" + destinationA + ",/topic/" + destinationB; + + ActiveMQDestination destination = translator.convertDestination(converter, composite, false); + + assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType()); + assertTrue(destination.isComposite()); + ActiveMQDestination[] composites = destination.getCompositeDestinations(); + assertEquals(2, composites.length); + + Arrays.sort(composites); + + assertEquals(ActiveMQDestination.QUEUE_TYPE, composites[0].getDestinationType()); + assertEquals(ActiveMQDestination.TOPIC_TYPE, composites[1].getDestinationType()); + + assertEquals(destinationA, composites[0].getPhysicalName()); + assertEquals(destinationB, composites[1].getPhysicalName()); + } + + @Test(timeout = 10000) + public void testConvertCompositeMixture() throws Exception { + String destinationA = "destinationA"; + String destinationB = "destinationB"; + String destinationC = "destinationC"; + String destinationD = "destinationD"; + + String composite = "/queue/" + destinationA + ",/topic/" + destinationB + + ",/temp-queue/" + destinationC + ",/temp-topic/" + destinationD; + + ActiveMQDestination destination = translator.convertDestination(converter, composite, false); + + assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType()); + assertTrue(destination.isComposite()); + ActiveMQDestination[] composites = destination.getCompositeDestinations(); + assertEquals(4, composites.length); + + Arrays.sort(composites); + + boolean foundQueue = false; + boolean foundTopic = false; + boolean foundTempTopic = false; + boolean foundTempQueue = false; + + for (ActiveMQDestination dest : composites) { + if (dest.getDestinationType() == ActiveMQDestination.QUEUE_TYPE) { + foundQueue = true; + } else if (dest.getDestinationType() == ActiveMQDestination.TOPIC_TYPE) { + foundTopic = true; + } else if (dest.getDestinationType() == ActiveMQDestination.TEMP_TOPIC_TYPE) { + foundTempTopic = true; + } else if (dest.getDestinationType() == ActiveMQDestination.TEMP_QUEUE_TYPE) { + foundTempQueue = true; + } + } + + assertTrue(foundQueue); + assertTrue(foundTopic); + assertTrue(foundTempTopic); + assertTrue(foundTempQueue); + } +} diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java new file mode 100644 index 0000000000..223a84a2b6 --- /dev/null +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.TopicViewMBean; +import org.apache.activemq.util.Wait; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for support of composite destination support over STOMP + */ +public class StompCompositeDestinationTest extends StompTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(StompCompositeDestinationTest.class); + + protected ActiveMQConnection connection; + + @Override + public void tearDown() throws Exception { + try { + connection.close(); + } catch (Exception ex) {} + + super.tearDown(); + } + + @Test(timeout = 20000) + public void testSubscribeToCompositeQueue() throws Exception { + stompConnect(); + + String destinationA = "StompA"; + String destinationB = "StompB"; + + String frame = "CONNECT\n" + + "login:system\n" + + "passcode:manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + LOG.info("Subscribing to destination: {},{}", destinationA, destinationB); + + frame = "SUBSCRIBE\n" + + "destination:/queue/" + destinationA + ",/queue/" + destinationB + "\n" + + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // Test in same order as the subscribe command + + sendMessage(destinationA, false); + sendMessage(destinationB, false); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + // Test the reverse ordering + + sendMessage(destinationB, false); + sendMessage(destinationA, false); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + stompConnection.disconnect(); + } + + @Test(timeout = 20000) + public void testSubscribeToCompositeQueueTrailersDefault() throws Exception { + stompConnect(); + + String destinationA = "StompA"; + String destinationB = "StompB"; + + String frame = "CONNECT\n" + + "login:system\n" + + "passcode:manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + LOG.info("Subscribing to destination: {},{}", destinationA, destinationB); + + frame = "SUBSCRIBE\n" + + "destination:/queue/" + destinationA + "," + destinationB + "\n" + + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // Test in same order as the subscribe command + + sendMessage(destinationA, false); + sendMessage(destinationB, false); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + // Test the reverse ordering + + sendMessage(destinationB, false); + sendMessage(destinationA, false); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + stompConnection.disconnect(); + } + + @Test(timeout = 20000) + public void testSubscribeToCompositeTopics() throws Exception { + stompConnect(); + + String destinationA = "StompA"; + String destinationB = "StompB"; + + String frame = "CONNECT\n" + + "login:system\n" + + "passcode:manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + LOG.info("Subscribing to destination: {},{}", destinationA, destinationB); + + frame = "SUBSCRIBE\n" + + "destination:/topic/" + destinationA + ",/topic/" + destinationB + "\n" + + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // Test in same order as the subscribe command + + sendMessage(destinationA, true); + sendMessage(destinationB, true); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + // Test the reverse ordering + + sendMessage(destinationB, true); + sendMessage(destinationA, true); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + stompConnection.disconnect(); + } + + @Test(timeout = 60000) + public void testSendMessageToCompositeQueue() throws Exception { + stompConnect(); + + String destinationA = "StompA"; + String destinationB = "StompB"; + + String frame = "CONNECT\n" + + "login:system\n" + + "passcode:manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + + "destination:/queue/" + destinationA + ",/queue/" + destinationB + + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + final BrokerViewMBean brokerView = getProxyToBroker(); + assertTrue("Should be two destinations for the dispatch", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerView.getQueues().length == 2; + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(150))); + + QueueViewMBean viewOfA = getProxyToQueue(destinationA); + QueueViewMBean viewOfB = getProxyToQueue(destinationB); + + assertNotNull(viewOfA); + assertNotNull(viewOfB); + + assertEquals(1, viewOfA.getQueueSize()); + assertEquals(1, viewOfB.getQueueSize()); + + stompConnection.disconnect(); + } + + @Test(timeout = 60000) + public void testSendMessageToCompositeTopic() throws Exception { + stompConnect(); + + String destinationA = "StompA"; + String destinationB = "StompB"; + + String frame = "CONNECT\n" + + "login:system\n" + + "passcode:manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + + "destination:/topic/" + destinationA + ",/topic/" + destinationB + + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + final BrokerViewMBean brokerView = getProxyToBroker(); + assertTrue("Should be two destinations for the dispatch", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerView.getTopics().length == 2; + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(150))); + + TopicViewMBean viewOfA = getProxyToTopic(destinationA); + TopicViewMBean viewOfB = getProxyToTopic(destinationB); + + assertNotNull(viewOfA); + assertNotNull(viewOfB); + + assertEquals(1, viewOfA.getEnqueueCount()); + assertEquals(1, viewOfB.getEnqueueCount()); + + stompConnection.disconnect(); + } + + private void sendMessage(String destinationName, boolean topic) throws JMSException { + Connection connection = cf.createConnection("system", "manager"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = null; + + if (topic) { + destination = session.createTopic(destinationName); + } else { + destination = session.createQueue(destinationName); + } + + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("test"); + producer.send(message); + + connection.close(); + } +}