ACTIVEMQ6-76 auto queue creation on STOMP send/sub

Implements a new feature for the broker whereby it may automatically
create and delete queues which are not explicitly defined through
the management API or file-based configuration when a client sends a
message to or consumes from a queue via the STOMP protocol. Note,
the destination has to be named like "jms.queue.*" to be auto-
created. The queue may subsequently be deleted when it no longer has
any messages and consumers. Auto-creation and auto-deletion can both
be turned on/off via address-setting.
This commit is contained in:
jbertram 2015-02-03 12:21:09 -06:00
parent 593ea2dde2
commit 1310442244
10 changed files with 190 additions and 8 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.core.protocol.stomp; package org.apache.activemq.core.protocol.stomp;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.jboss.logging.annotations.Cause; import org.jboss.logging.annotations.Cause;
import org.jboss.logging.annotations.LogMessage; import org.jboss.logging.annotations.LogMessage;
@ -41,7 +42,7 @@ import org.jboss.logging.annotations.MessageLogger;
*/ */
@MessageLogger(projectCode = "AMQ") @MessageLogger(projectCode = "AMQ")
public interface ActiveMQStompProtocolLogger public interface ActiveMQStompProtocolLogger extends BasicLogger
{ {
/** /**
* The default logger. * The default logger.

View File

@ -27,7 +27,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.ActiveMQBuffers; import org.apache.activemq.api.core.ActiveMQBuffers;
import org.apache.activemq.api.core.ActiveMQException; import org.apache.activemq.api.core.ActiveMQException;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.management.ResourceNames;
import org.apache.activemq.core.protocol.stomp.v10.StompFrameHandlerV10; import org.apache.activemq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.apache.activemq.core.protocol.stomp.v12.StompFrameHandlerV12; import org.apache.activemq.core.protocol.stomp.v12.StompFrameHandlerV12;
import org.apache.activemq.core.remoting.CloseListener; import org.apache.activemq.core.remoting.CloseListener;
@ -245,12 +247,38 @@ public final class StompConnection implements RemotingConnection
public void checkDestination(String destination) throws ActiveMQStompException public void checkDestination(String destination) throws ActiveMQStompException
{ {
if (autoCreateQueueIfPossible(destination))
{
return;
}
if (!manager.destinationExists(destination)) if (!manager.destinationExists(destination))
{ {
throw BUNDLE.destinationNotExist(destination); throw BUNDLE.destinationNotExist(destination);
} }
} }
public boolean autoCreateQueueIfPossible(String queue) throws ActiveMQStompException
{
boolean autoCreated = false;
if (queue.startsWith(ResourceNames.JMS_QUEUE) && manager.getServer().getAddressSettingsRepository().getMatch(queue).isAutoCreateJmsQueues() && manager.getServer().locateQueue(new SimpleString(queue)) == null)
{
SimpleString queueName = new SimpleString(queue);
try
{
manager.getServer().createQueue(queueName, queueName, null, true, false, true);
}
catch (Exception e)
{
throw new ActiveMQStompException(e.getMessage(), e);
}
autoCreated = true;
}
return autoCreated;
}
@Override @Override
public ActiveMQBuffer createBuffer(int size) public ActiveMQBuffer createBuffer(int size)
{ {
@ -689,6 +717,7 @@ public final class StompConnection implements RemotingConnection
void subscribe(String destination, String selector, String ack, void subscribe(String destination, String selector, String ack,
String id, String durableSubscriptionName, boolean noLocal) throws ActiveMQStompException String id, String durableSubscriptionName, boolean noLocal) throws ActiveMQStompException
{ {
autoCreateQueueIfPossible(destination);
if (noLocal) if (noLocal)
{ {
String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'"; String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'";

View File

@ -500,4 +500,9 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
break; break;
} }
} }
public ActiveMQServer getServer()
{
return server;
}
} }

View File

@ -190,7 +190,7 @@ public interface ActiveMQServer extends ActiveMQComponent
boolean durable, boolean durable,
boolean temporary) throws Exception; boolean temporary) throws Exception;
Queue locateQueue(SimpleString queueName) throws Exception; Queue locateQueue(SimpleString queueName);
void destroyQueue(SimpleString queueName) throws Exception; void destroyQueue(SimpleString queueName) throws Exception;

View File

@ -1234,7 +1234,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
} }
public Queue locateQueue(SimpleString queueName) throws Exception public Queue locateQueue(SimpleString queueName)
{ {
Binding binding = postOffice.getBinding(queueName); Binding binding = postOffice.getBinding(queueName);

View File

@ -24,7 +24,7 @@ import org.apache.activemq.core.server.Queue;
import org.apache.activemq.utils.ReferenceCounterUtil; import org.apache.activemq.utils.ReferenceCounterUtil;
/** /**
* @author Clebert Suconic * @author Justin Bertram
*/ */
public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager

View File

@ -30,8 +30,12 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.management.ResourceNames;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.core.protocol.stomp.Stomp; import org.apache.activemq.core.protocol.stomp.Stomp;
import org.apache.activemq.tests.integration.IntegrationTestLogger; import org.apache.activemq.tests.integration.IntegrationTestLogger;
import org.apache.activemq.tests.util.RandomUtil;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -147,6 +151,40 @@ public class StompTest extends StompTestBase
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000); Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
} }
@Test
public void testSendMessageToNonExistentQueue() throws Exception
{
String nonExistantQueue = RandomUtil.randomString();
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
sendFrame(frame);
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:" + getQueuePrefix() + nonExistantQueue + "\n\n" + "Hello World" + Stomp.NULL;
sendFrame(frame);
receiveFrame(1000);
MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createQueue(nonExistantQueue));
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
// Assert default priority 4 is used when priority header is not set
Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow = System.currentTimeMillis();
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1500);
// closing the consumer here should trigger auto-deletion
assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue)));
consumer.close();
assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue)));
}
/* /*
* Some STOMP clients erroneously put a new line \n *after* the terminating NUL char at the end of the frame * Some STOMP clients erroneously put a new line \n *after* the terminating NUL char at the end of the frame
* This means next frame read might have a \n a the beginning. * This means next frame read might have a \n a the beginning.
@ -1094,6 +1132,63 @@ public class StompTest extends StompTestBase
sendFrame(frame); sendFrame(frame);
} }
@Test
public void testSubscribeToNonExistantQueue() throws Exception
{
String nonExistantQueue = RandomUtil.randomString();
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
sendFrame(frame);
frame = receiveFrame(100000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:" +
getQueuePrefix() +
nonExistantQueue +
"\n" +
"receipt: 12\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
// wait for SUBSCRIBE's receipt
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("RECEIPT"));
sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistantQueue));
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
Assert.assertTrue(frame.indexOf("destination:") > 0);
Assert.assertTrue(frame.indexOf(getName()) > 0);
assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue)));
frame = "UNSUBSCRIBE\n" + "destination:" +
getQueuePrefix() +
nonExistantQueue +
"\n" +
"receipt: 1234\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
// wait for UNSUBSCRIBE's receipt
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("RECEIPT"));
assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue)));
sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistantQueue));
frame = receiveFrame(1000);
log.info("Received frame: " + frame);
Assert.assertNull("No message should have been received since subscription was removed", frame);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
sendFrame(frame);
}
@Test @Test
public void testDurableSubscriberWithReconnection() throws Exception public void testDurableSubscriberWithReconnection() throws Exception
{ {

View File

@ -114,6 +114,7 @@ public abstract class StompTestBase extends UnitTestCase
if (autoCreateServer) if (autoCreateServer)
{ {
server = createServer(); server = createServer();
addServer(server.getActiveMQServer());
server.start(); server.start();
connectionFactory = createConnectionFactory(); connectionFactory = createConnectionFactory();
createBootstrap(); createBootstrap();
@ -231,9 +232,8 @@ public abstract class StompTestBase extends UnitTestCase
if (group != null) if (group != null)
{ {
channel.close(); channel.close();
group.shutdown(); group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS);
} }
server.stop();
} }
super.tearDown(); super.tearDown();
} }

View File

@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.core.settings.impl.AddressSettings;
import org.apache.activemq.tests.integration.IntegrationTestLogger; import org.apache.activemq.tests.integration.IntegrationTestLogger;
import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.tests.integration.stomp.util.StompClientConnection; import org.apache.activemq.tests.integration.stomp.util.StompClientConnection;
@ -2354,8 +2355,11 @@ public class StompV11Test extends StompV11TestBase
} }
@Test @Test
public void testSendMessageToNonExistentJmsQueue() throws Exception public void testSendMessageToNonExistentJmsQueueWithoutAutoCreation() throws Exception
{ {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoCreateJmsQueues(false);
server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings);
connV11.connect(defUser, defPass); connV11.connect(defUser, defPass);
ClientStompFrame frame = connV11.createFrame("SEND"); ClientStompFrame frame = connV11.createFrame("SEND");
@ -2373,6 +2377,26 @@ public class StompV11Test extends StompV11TestBase
connV11.disconnect(); connV11.disconnect();
} }
@Test
public void testSendMessageToNonExistentJmsQueueWithAutoCreation() throws Exception
{
connV11.connect(defUser, defPass);
ClientStompFrame frame = connV11.createFrame("SEND");
String guid = UUID.randomUUID().toString();
frame.addHeader("destination", "jms.queue.NonExistentQueue" + guid);
frame.addHeader("receipt", "1234");
frame.setBody("Hello World");
frame = connV11.sendFrame(frame);
assertTrue(frame.getCommand().equals("RECEIPT"));
assertEquals("1234", frame.getHeader("receipt-id"));
System.out.println("message: " + frame.getHeader("message"));
connV11.disconnect();
}
@Test @Test
public void testSendAndReceiveWithEscapedCharactersInSenderId() throws Exception public void testSendAndReceiveWithEscapedCharactersInSenderId() throws Exception
{ {

View File

@ -31,6 +31,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.core.settings.impl.AddressSettings;
import org.apache.activemq.tests.integration.IntegrationTestLogger; import org.apache.activemq.tests.integration.IntegrationTestLogger;
import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.tests.integration.stomp.util.StompClientConnection; import org.apache.activemq.tests.integration.stomp.util.StompClientConnection;
@ -400,6 +401,10 @@ public class StompV12Test extends StompV11TestBase
@Test @Test
public void testHeaderRepetitive() throws Exception public void testHeaderRepetitive() throws Exception
{ {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoCreateJmsQueues(false);
server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings);
connV12.connect(defUser, defPass); connV12.connect(defUser, defPass);
ClientStompFrame frame = connV12.createFrame("SEND"); ClientStompFrame frame = connV12.createFrame("SEND");
@ -2617,8 +2622,11 @@ public class StompV12Test extends StompV11TestBase
} }
@Test @Test
public void testSendMessageToNonExistentJmsQueue() throws Exception public void testSendMessageToNonExistentJmsQueueWithoutAutoCreation() throws Exception
{ {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoCreateJmsQueues(false);
server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings);
connV12.connect(defUser, defPass); connV12.connect(defUser, defPass);
ClientStompFrame frame = connV12.createFrame("SEND"); ClientStompFrame frame = connV12.createFrame("SEND");
@ -2636,6 +2644,26 @@ public class StompV12Test extends StompV11TestBase
connV12.disconnect(); connV12.disconnect();
} }
@Test
public void testSendMessageToNonExistentJmsQueueWithAutoCreation() throws Exception
{
connV12.connect(defUser, defPass);
ClientStompFrame frame = connV12.createFrame("SEND");
String guid = UUID.randomUUID().toString();
frame.addHeader("destination", "jms.queue.NonExistentQueue" + guid);
frame.addHeader("receipt", "1234");
frame.setBody("Hello World");
frame = connV12.sendFrame(frame);
assertTrue(frame.getCommand().equals("RECEIPT"));
assertEquals("1234", frame.getHeader("receipt-id"));
System.out.println("message: " + frame.getHeader("message"));
connV12.disconnect();
}
@Test @Test
public void testInvalidStompCommand() throws Exception public void testInvalidStompCommand() throws Exception
{ {