diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java index e66d1ede4a..236b38c6b9 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.core.protocol.stomp; +import org.jboss.logging.BasicLogger; import org.jboss.logging.Logger; import org.jboss.logging.annotations.Cause; import org.jboss.logging.annotations.LogMessage; @@ -41,7 +42,7 @@ import org.jboss.logging.annotations.MessageLogger; */ @MessageLogger(projectCode = "AMQ") -public interface ActiveMQStompProtocolLogger +public interface ActiveMQStompProtocolLogger extends BasicLogger { /** * The default logger. diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java index 23d399d8d4..16cf55e6c5 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java @@ -27,7 +27,9 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffers; import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.api.core.client.ActiveMQClient; +import org.apache.activemq.api.core.management.ResourceNames; import org.apache.activemq.core.protocol.stomp.v10.StompFrameHandlerV10; import org.apache.activemq.core.protocol.stomp.v12.StompFrameHandlerV12; import org.apache.activemq.core.remoting.CloseListener; @@ -245,12 +247,38 @@ public final class StompConnection implements RemotingConnection public void checkDestination(String destination) throws ActiveMQStompException { + if (autoCreateQueueIfPossible(destination)) + { + return; + } + if (!manager.destinationExists(destination)) { throw BUNDLE.destinationNotExist(destination); } } + public boolean autoCreateQueueIfPossible(String queue) throws ActiveMQStompException + { + boolean autoCreated = false; + + if (queue.startsWith(ResourceNames.JMS_QUEUE) && manager.getServer().getAddressSettingsRepository().getMatch(queue).isAutoCreateJmsQueues() && manager.getServer().locateQueue(new SimpleString(queue)) == null) + { + SimpleString queueName = new SimpleString(queue); + try + { + manager.getServer().createQueue(queueName, queueName, null, true, false, true); + } + catch (Exception e) + { + throw new ActiveMQStompException(e.getMessage(), e); + } + autoCreated = true; + } + + return autoCreated; + } + @Override public ActiveMQBuffer createBuffer(int size) { @@ -689,6 +717,7 @@ public final class StompConnection implements RemotingConnection void subscribe(String destination, String selector, String ack, String id, String durableSubscriptionName, boolean noLocal) throws ActiveMQStompException { + autoCreateQueueIfPossible(destination); if (noLocal) { String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'"; diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java index e73e9e4a4b..6ce03db305 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java @@ -500,4 +500,9 @@ class StompProtocolManager implements ProtocolManager, NotificationListener break; } } + + public ActiveMQServer getServer() + { + return server; + } } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java index 2054be14bc..21976906d3 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java @@ -190,7 +190,7 @@ public interface ActiveMQServer extends ActiveMQComponent boolean durable, boolean temporary) throws Exception; - Queue locateQueue(SimpleString queueName) throws Exception; + Queue locateQueue(SimpleString queueName); void destroyQueue(SimpleString queueName) throws Exception; diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java index cb91351221..469ece2742 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java @@ -1234,7 +1234,7 @@ public class ActiveMQServerImpl implements ActiveMQServer } - public Queue locateQueue(SimpleString queueName) throws Exception + public Queue locateQueue(SimpleString queueName) { Binding binding = postOffice.getBinding(queueName); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java index 5bcd2c66b1..05cc9cf01b 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java @@ -24,7 +24,7 @@ import org.apache.activemq.core.server.Queue; import org.apache.activemq.utils.ReferenceCounterUtil; /** - * @author Clebert Suconic + * @author Justin Bertram */ public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java index 072b2e752c..b32e1bf944 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java @@ -30,8 +30,12 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.management.ResourceNames; +import org.apache.activemq.api.jms.ActiveMQJMSClient; import org.apache.activemq.core.protocol.stomp.Stomp; import org.apache.activemq.tests.integration.IntegrationTestLogger; +import org.apache.activemq.tests.util.RandomUtil; import org.junit.Assert; import org.junit.Test; @@ -147,6 +151,40 @@ public class StompTest extends StompTestBase Assert.assertTrue(Math.abs(tnow - tmsg) < 1000); } + @Test + public void testSendMessageToNonExistentQueue() throws Exception + { + String nonExistantQueue = RandomUtil.randomString(); + String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + sendFrame(frame); + + frame = receiveFrame(10000); + Assert.assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:" + getQueuePrefix() + nonExistantQueue + "\n\n" + "Hello World" + Stomp.NULL; + + sendFrame(frame); + receiveFrame(1000); + + MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createQueue(nonExistantQueue)); + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals("Hello World", message.getText()); + // Assert default priority 4 is used when priority header is not set + Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority()); + + // Make sure that the timestamp is valid - should + // be very close to the current time. + long tnow = System.currentTimeMillis(); + long tmsg = message.getJMSTimestamp(); + Assert.assertTrue(Math.abs(tnow - tmsg) < 1500); + + // closing the consumer here should trigger auto-deletion + assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue))); + consumer.close(); + assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue))); + } + /* * Some STOMP clients erroneously put a new line \n *after* the terminating NUL char at the end of the frame * This means next frame read might have a \n a the beginning. @@ -1094,6 +1132,63 @@ public class StompTest extends StompTestBase sendFrame(frame); } + @Test + public void testSubscribeToNonExistantQueue() throws Exception + { + String nonExistantQueue = RandomUtil.randomString(); + + String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + sendFrame(frame); + + frame = receiveFrame(100000); + Assert.assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:" + + getQueuePrefix() + + nonExistantQueue + + "\n" + + "receipt: 12\n" + + "\n\n" + + Stomp.NULL; + sendFrame(frame); + // wait for SUBSCRIBE's receipt + frame = receiveFrame(10000); + Assert.assertTrue(frame.startsWith("RECEIPT")); + + sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistantQueue)); + + frame = receiveFrame(10000); + Assert.assertTrue(frame.startsWith("MESSAGE")); + Assert.assertTrue(frame.indexOf("destination:") > 0); + Assert.assertTrue(frame.indexOf(getName()) > 0); + + assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue))); + + frame = "UNSUBSCRIBE\n" + "destination:" + + getQueuePrefix() + + nonExistantQueue + + "\n" + + "receipt: 1234\n" + + "\n\n" + + Stomp.NULL; + sendFrame(frame); + // wait for UNSUBSCRIBE's receipt + frame = receiveFrame(10000); + Assert.assertTrue(frame.startsWith("RECEIPT")); + + assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue))); + + sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistantQueue)); + + frame = receiveFrame(1000); + log.info("Received frame: " + frame); + Assert.assertNull("No message should have been received since subscription was removed", frame); + + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + sendFrame(frame); + } + @Test public void testDurableSubscriberWithReconnection() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java index 3aec0838c6..e7c781948e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java @@ -114,6 +114,7 @@ public abstract class StompTestBase extends UnitTestCase if (autoCreateServer) { server = createServer(); + addServer(server.getActiveMQServer()); server.start(); connectionFactory = createConnectionFactory(); createBootstrap(); @@ -231,9 +232,8 @@ public abstract class StompTestBase extends UnitTestCase if (group != null) { channel.close(); - group.shutdown(); + group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS); } - server.stop(); } super.tearDown(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java index 0984353a50..5117952905 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.core.settings.impl.AddressSettings; import org.apache.activemq.tests.integration.IntegrationTestLogger; import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.tests.integration.stomp.util.StompClientConnection; @@ -2354,8 +2355,11 @@ public class StompV11Test extends StompV11TestBase } @Test - public void testSendMessageToNonExistentJmsQueue() throws Exception + public void testSendMessageToNonExistentJmsQueueWithoutAutoCreation() throws Exception { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAutoCreateJmsQueues(false); + server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings); connV11.connect(defUser, defPass); ClientStompFrame frame = connV11.createFrame("SEND"); @@ -2373,6 +2377,26 @@ public class StompV11Test extends StompV11TestBase connV11.disconnect(); } + @Test + public void testSendMessageToNonExistentJmsQueueWithAutoCreation() throws Exception + { + connV11.connect(defUser, defPass); + + ClientStompFrame frame = connV11.createFrame("SEND"); + String guid = UUID.randomUUID().toString(); + frame.addHeader("destination", "jms.queue.NonExistentQueue" + guid); + frame.addHeader("receipt", "1234"); + frame.setBody("Hello World"); + + frame = connV11.sendFrame(frame); + + assertTrue(frame.getCommand().equals("RECEIPT")); + assertEquals("1234", frame.getHeader("receipt-id")); + System.out.println("message: " + frame.getHeader("message")); + + connV11.disconnect(); + } + @Test public void testSendAndReceiveWithEscapedCharactersInSenderId() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java index 3b6e471003..7a308afdbc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java @@ -31,6 +31,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.core.settings.impl.AddressSettings; import org.apache.activemq.tests.integration.IntegrationTestLogger; import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.tests.integration.stomp.util.StompClientConnection; @@ -400,6 +401,10 @@ public class StompV12Test extends StompV11TestBase @Test public void testHeaderRepetitive() throws Exception { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAutoCreateJmsQueues(false); + server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings); + connV12.connect(defUser, defPass); ClientStompFrame frame = connV12.createFrame("SEND"); @@ -2617,8 +2622,11 @@ public class StompV12Test extends StompV11TestBase } @Test - public void testSendMessageToNonExistentJmsQueue() throws Exception + public void testSendMessageToNonExistentJmsQueueWithoutAutoCreation() throws Exception { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAutoCreateJmsQueues(false); + server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings); connV12.connect(defUser, defPass); ClientStompFrame frame = connV12.createFrame("SEND"); @@ -2636,6 +2644,26 @@ public class StompV12Test extends StompV11TestBase connV12.disconnect(); } + @Test + public void testSendMessageToNonExistentJmsQueueWithAutoCreation() throws Exception + { + connV12.connect(defUser, defPass); + + ClientStompFrame frame = connV12.createFrame("SEND"); + String guid = UUID.randomUUID().toString(); + frame.addHeader("destination", "jms.queue.NonExistentQueue" + guid); + frame.addHeader("receipt", "1234"); + frame.setBody("Hello World"); + + frame = connV12.sendFrame(frame); + + assertTrue(frame.getCommand().equals("RECEIPT")); + assertEquals("1234", frame.getHeader("receipt-id")); + System.out.println("message: " + frame.getHeader("message")); + + connV12.disconnect(); + } + @Test public void testInvalidStompCommand() throws Exception {