AMQ-6030 Add support for composite destinations to STOMP.

This commit is contained in:
Timothy Bish 2015-10-30 11:02:27 -04:00
parent 8136e67b40
commit c360c3e4a3
5 changed files with 600 additions and 40 deletions

View File

@ -84,6 +84,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>

View File

@ -16,25 +16,31 @@
*/
package org.apache.activemq.transport.stomp;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import javax.jms.Destination;
import javax.jms.JMSException;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements ActiveMQ 4.0 translations
*/
public class LegacyFrameTranslator implements FrameTranslator {
private static final Logger LOG = LoggerFactory.getLogger(LegacyFrameTranslator.class);
@Override
public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
final Map<?, ?> headers = command.getHeaders();
final ActiveMQMessage msg;
@ -87,6 +93,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
return msg;
}
@Override
public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
StompFrame command = new StompFrame();
command.setAction(Stomp.Responses.MESSAGE);
@ -126,6 +133,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
return command;
}
@Override
public String convertDestination(ProtocolConverter converter, Destination d) {
if (d == null) {
return null;
@ -156,6 +164,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
return buffer.toString();
}
@Override
public ActiveMQDestination convertDestination(ProtocolConverter converter, String name, boolean forceFallback) throws ProtocolException {
if (name == null) {
return null;
@ -166,38 +175,64 @@ public class LegacyFrameTranslator implements FrameTranslator {
String originalName = name;
name = name.trim();
if (name.startsWith("/queue/")) {
String qName = name.substring("/queue/".length(), name.length());
return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE);
} else if (name.startsWith("/topic/")) {
String tName = name.substring("/topic/".length(), name.length());
return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
} else if (name.startsWith("/remote-temp-queue/")) {
String tName = name.substring("/remote-temp-queue/".length(), name.length());
return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
} else if (name.startsWith("/remote-temp-topic/")) {
String tName = name.substring("/remote-temp-topic/".length(), name.length());
return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
} else if (name.startsWith("/temp-queue/")) {
return converter.createTempDestination(name, false);
} else if (name.startsWith("/temp-topic/")) {
return converter.createTempDestination(name, true);
String[] destinations = name.split(",");
if (destinations == null || destinations.length == 0) {
destinations = new String[] { name };
}
StringBuilder destinationBuilder = new StringBuilder();
for (int i = 0; i < destinations.length; ++i) {
String destinationName = destinations[i];
if (destinationName.startsWith("/queue/")) {
destinationName = destinationName.substring("/queue/".length(), destinationName.length());
destinationBuilder.append(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + destinationName);
} else if (destinationName.startsWith("/topic/")) {
destinationName = destinationName.substring("/topic/".length(), destinationName.length());
destinationBuilder.append(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + destinationName);
} else if (destinationName.startsWith("/remote-temp-queue/")) {
destinationName = destinationName.substring("/remote-temp-queue/".length(), destinationName.length());
destinationBuilder.append(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX + destinationName);
} else if (destinationName.startsWith("/remote-temp-topic/")) {
destinationName = destinationName.substring("/remote-temp-topic/".length(), destinationName.length());
destinationBuilder.append(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX + destinationName);
} else if (destinationName.startsWith("/temp-queue/")) {
ActiveMQDestination converted = converter.createTempDestination(destinationName, false);
destinationBuilder.append(converted.getQualifiedName());
} else if (destinationName.startsWith("/temp-topic/")) {
ActiveMQDestination converted = converter.createTempDestination(destinationName, true);
destinationBuilder.append(converted.getQualifiedName());
} else {
if (forceFallback) {
String fallbackName = destinationName;
if (destinationName.length() == 1) {
// Use the original non-trimmed name instead
fallbackName = originalName;
}
try {
ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(originalName);
ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(fallbackName);
if (fallback != null) {
return fallback;
destinationBuilder.append(fallback.getQualifiedName());
continue;
}
} catch (JMSException e) {
throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations "
throw new ProtocolException("Illegal destination name: [" + fallbackName + "] -- ActiveMQ STOMP destinations "
+ "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e);
}
}
throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations "
+ "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
}
if (i < destinations.length - 1) {
destinationBuilder.append(",");
}
}
LOG.trace("New Composite Destination name: {}", destinationBuilder);
return ActiveMQDestination.createDestination(destinationBuilder.toString(), ActiveMQDestination.QUEUE_TYPE);
}
}

View File

@ -600,7 +600,7 @@ public class ProtocolConverter {
throw new ProtocolException("Invalid Subscription: cannot durably subscribe to a Queue destination!");
}
consumerInfo.setDestination(translator.convertDestination(this, destination, true));
consumerInfo.setDestination(actualDest);
StompSubscription stompSubscription;
if (!consumerInfo.isBrowser()) {

View File

@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.UUID;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Tests for conversion capabilities of LegacyFrameTranslator
*/
public class LegacyFrameTranslatorTest {
private ProtocolConverter converter;
private LegacyFrameTranslator translator;
@Before
public void setUp() {
converter = Mockito.mock(ProtocolConverter.class);
// Stub out a temp destination creation
Mockito.when(converter.createTempDestination(Mockito.anyString(), Mockito.anyBoolean())).thenAnswer(new Answer<ActiveMQDestination>() {
@Override
public ActiveMQDestination answer(InvocationOnMock invocation) throws Throwable {
String name = invocation.getArgumentAt(0, String.class);
boolean topic = invocation.getArgumentAt(1, Boolean.class);
name = "temp-" + (topic ? "topic://" : "queue://X:") + UUID.randomUUID().toString();
return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
}
});
translator = new LegacyFrameTranslator();
}
@Test(timeout = 10000)
public void testConvertQueue() throws Exception {
ActiveMQDestination destination = translator.convertDestination(converter, "/queue/test", false);
assertFalse(destination.isComposite());
assertEquals("test", destination.getPhysicalName());
assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType());
}
@Test(timeout = 10000)
public void testConvertTopic() throws Exception {
ActiveMQDestination destination = translator.convertDestination(converter, "/topic/test", false);
assertFalse(destination.isComposite());
assertEquals("test", destination.getPhysicalName());
assertEquals(ActiveMQDestination.TOPIC_TYPE, destination.getDestinationType());
}
@Test(timeout = 10000)
public void testConvertTemporaryQueue() throws Exception {
ActiveMQDestination destination = translator.convertDestination(converter, "/temp-queue/test", false);
assertFalse(destination.isComposite());
assertEquals(ActiveMQDestination.TEMP_QUEUE_TYPE, destination.getDestinationType());
}
@Test(timeout = 10000)
public void testConvertTemporaryTopic() throws Exception {
ActiveMQDestination destination = translator.convertDestination(converter, "/temp-topic/test", false);
assertFalse(destination.isComposite());
assertEquals(ActiveMQDestination.TEMP_TOPIC_TYPE, destination.getDestinationType());
}
@Test(timeout = 10000)
public void testConvertRemoteTempQueue() throws Exception {
ActiveMQDestination destination = translator.convertDestination(converter, "/remote-temp-queue/test", false);
assertFalse(destination.isComposite());
assertEquals("test", destination.getPhysicalName());
assertEquals(ActiveMQDestination.TEMP_QUEUE_TYPE, destination.getDestinationType());
}
@Test(timeout = 10000)
public void testConvertRemoteTempTopic() throws Exception {
ActiveMQDestination destination = translator.convertDestination(converter, "/remote-temp-topic/test", false);
assertFalse(destination.isComposite());
assertEquals("test", destination.getPhysicalName());
assertEquals(ActiveMQDestination.TEMP_TOPIC_TYPE, destination.getDestinationType());
}
@Test(timeout = 10000)
public void testConvertCompositeQueues() throws Exception {
String destinationA = "destinationA";
String destinationB = "destinationB";
String composite = "/queue/" + destinationA + ",/queue/" + destinationB;
ActiveMQDestination destination = translator.convertDestination(converter, composite, false);
assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType());
assertTrue(destination.isComposite());
ActiveMQDestination[] composites = destination.getCompositeDestinations();
assertEquals(2, composites.length);
Arrays.sort(composites);
assertEquals(ActiveMQDestination.QUEUE_TYPE, composites[0].getDestinationType());
assertEquals(ActiveMQDestination.QUEUE_TYPE, composites[1].getDestinationType());
assertEquals(destinationA, composites[0].getPhysicalName());
assertEquals(destinationB, composites[1].getPhysicalName());
}
@Test(timeout = 10000)
public void testConvertCompositeTopics() throws Exception {
String destinationA = "destinationA";
String destinationB = "destinationB";
String composite = "/topic/" + destinationA + ",/topic/" + destinationB;
ActiveMQDestination destination = translator.convertDestination(converter, composite, false);
assertEquals(ActiveMQDestination.TOPIC_TYPE, destination.getDestinationType());
assertTrue(destination.isComposite());
ActiveMQDestination[] composites = destination.getCompositeDestinations();
assertEquals(2, composites.length);
Arrays.sort(composites);
assertEquals(ActiveMQDestination.TOPIC_TYPE, composites[0].getDestinationType());
assertEquals(ActiveMQDestination.TOPIC_TYPE, composites[1].getDestinationType());
assertEquals(destinationA, composites[0].getPhysicalName());
assertEquals(destinationB, composites[1].getPhysicalName());
}
@Test(timeout = 10000)
public void testConvertCompositeQueueAndTopic() throws Exception {
String destinationA = "destinationA";
String destinationB = "destinationB";
String composite = "/queue/" + destinationA + ",/topic/" + destinationB;
ActiveMQDestination destination = translator.convertDestination(converter, composite, false);
assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType());
assertTrue(destination.isComposite());
ActiveMQDestination[] composites = destination.getCompositeDestinations();
assertEquals(2, composites.length);
Arrays.sort(composites);
assertEquals(ActiveMQDestination.QUEUE_TYPE, composites[0].getDestinationType());
assertEquals(ActiveMQDestination.TOPIC_TYPE, composites[1].getDestinationType());
assertEquals(destinationA, composites[0].getPhysicalName());
assertEquals(destinationB, composites[1].getPhysicalName());
}
@Test(timeout = 10000)
public void testConvertCompositeMixture() throws Exception {
String destinationA = "destinationA";
String destinationB = "destinationB";
String destinationC = "destinationC";
String destinationD = "destinationD";
String composite = "/queue/" + destinationA + ",/topic/" + destinationB +
",/temp-queue/" + destinationC + ",/temp-topic/" + destinationD;
ActiveMQDestination destination = translator.convertDestination(converter, composite, false);
assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType());
assertTrue(destination.isComposite());
ActiveMQDestination[] composites = destination.getCompositeDestinations();
assertEquals(4, composites.length);
Arrays.sort(composites);
boolean foundQueue = false;
boolean foundTopic = false;
boolean foundTempTopic = false;
boolean foundTempQueue = false;
for (ActiveMQDestination dest : composites) {
if (dest.getDestinationType() == ActiveMQDestination.QUEUE_TYPE) {
foundQueue = true;
} else if (dest.getDestinationType() == ActiveMQDestination.TOPIC_TYPE) {
foundTopic = true;
} else if (dest.getDestinationType() == ActiveMQDestination.TEMP_TOPIC_TYPE) {
foundTempTopic = true;
} else if (dest.getDestinationType() == ActiveMQDestination.TEMP_QUEUE_TYPE) {
foundTempQueue = true;
}
}
assertTrue(foundQueue);
assertTrue(foundTopic);
assertTrue(foundTempTopic);
assertTrue(foundTempQueue);
}
}

View File

@ -0,0 +1,295 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for support of composite destination support over STOMP
*/
public class StompCompositeDestinationTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompCompositeDestinationTest.class);
protected ActiveMQConnection connection;
@Override
public void tearDown() throws Exception {
try {
connection.close();
} catch (Exception ex) {}
super.tearDown();
}
@Test(timeout = 20000)
public void testSubscribeToCompositeQueue() throws Exception {
stompConnect();
String destinationA = "StompA";
String destinationB = "StompB";
String frame = "CONNECT\n" +
"login:system\n" +
"passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
LOG.info("Subscribing to destination: {},{}", destinationA, destinationB);
frame = "SUBSCRIBE\n" +
"destination:/queue/" + destinationA + ",/queue/" + destinationB + "\n" +
"ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// Test in same order as the subscribe command
sendMessage(destinationA, false);
sendMessage(destinationB, false);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
// Test the reverse ordering
sendMessage(destinationB, false);
sendMessage(destinationA, false);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
stompConnection.disconnect();
}
@Test(timeout = 20000)
public void testSubscribeToCompositeQueueTrailersDefault() throws Exception {
stompConnect();
String destinationA = "StompA";
String destinationB = "StompB";
String frame = "CONNECT\n" +
"login:system\n" +
"passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
LOG.info("Subscribing to destination: {},{}", destinationA, destinationB);
frame = "SUBSCRIBE\n" +
"destination:/queue/" + destinationA + "," + destinationB + "\n" +
"ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// Test in same order as the subscribe command
sendMessage(destinationA, false);
sendMessage(destinationB, false);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
// Test the reverse ordering
sendMessage(destinationB, false);
sendMessage(destinationA, false);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
stompConnection.disconnect();
}
@Test(timeout = 20000)
public void testSubscribeToCompositeTopics() throws Exception {
stompConnect();
String destinationA = "StompA";
String destinationB = "StompB";
String frame = "CONNECT\n" +
"login:system\n" +
"passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
LOG.info("Subscribing to destination: {},{}", destinationA, destinationB);
frame = "SUBSCRIBE\n" +
"destination:/topic/" + destinationA + ",/topic/" + destinationB + "\n" +
"ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// Test in same order as the subscribe command
sendMessage(destinationA, true);
sendMessage(destinationB, true);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
// Test the reverse ordering
sendMessage(destinationB, true);
sendMessage(destinationA, true);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
stompConnection.disconnect();
}
@Test(timeout = 60000)
public void testSendMessageToCompositeQueue() throws Exception {
stompConnect();
String destinationA = "StompA";
String destinationB = "StompB";
String frame = "CONNECT\n" +
"login:system\n" +
"passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" +
"destination:/queue/" + destinationA + ",/queue/" + destinationB +
"\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
final BrokerViewMBean brokerView = getProxyToBroker();
assertTrue("Should be two destinations for the dispatch", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerView.getQueues().length == 2;
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(150)));
QueueViewMBean viewOfA = getProxyToQueue(destinationA);
QueueViewMBean viewOfB = getProxyToQueue(destinationB);
assertNotNull(viewOfA);
assertNotNull(viewOfB);
assertEquals(1, viewOfA.getQueueSize());
assertEquals(1, viewOfB.getQueueSize());
stompConnection.disconnect();
}
@Test(timeout = 60000)
public void testSendMessageToCompositeTopic() throws Exception {
stompConnect();
String destinationA = "StompA";
String destinationB = "StompB";
String frame = "CONNECT\n" +
"login:system\n" +
"passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" +
"destination:/topic/" + destinationA + ",/topic/" + destinationB +
"\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
final BrokerViewMBean brokerView = getProxyToBroker();
assertTrue("Should be two destinations for the dispatch", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerView.getTopics().length == 2;
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(150)));
TopicViewMBean viewOfA = getProxyToTopic(destinationA);
TopicViewMBean viewOfB = getProxyToTopic(destinationB);
assertNotNull(viewOfA);
assertNotNull(viewOfB);
assertEquals(1, viewOfA.getEnqueueCount());
assertEquals(1, viewOfB.getEnqueueCount());
stompConnection.disconnect();
}
private void sendMessage(String destinationName, boolean topic) throws JMSException {
Connection connection = cf.createConnection("system", "manager");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = null;
if (topic) {
destination = session.createTopic(destinationName);
} else {
destination = session.createQueue(destinationName);
}
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test");
producer.send(message);
connection.close();
}
}