From 2cbef28cc727dc002031d01c209d89725fba1147 Mon Sep 17 00:00:00 2001 From: jbertram Date: Tue, 10 Feb 2015 15:18:44 -0600 Subject: [PATCH] ACTIVEMQ-77 auto queue creation on AMQP send/rec Implements a new feature for the broker whereby it may automatically create and delete queues which are not explicitly defined through the management API or file-based configuration when a client sends a message to or receives from a queue via the AMQP protocol. Note, the destination has to be named like "jms.queue.*" to be auto- created. The queue may subsequently be deleted when it no longer has any messages and consumers. Auto-creation and auto-deletion can both be turned on/off via address-setting. --- .../ProtonSessionIntegrationCallback.java | 22 +++- .../tests/integration/proton/ProtonTest.java | 106 ++++++++++++++++++ 2 files changed, 127 insertions(+), 1 deletion(-) diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 5fe44729ae..5e0a9dea19 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -150,8 +150,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se @Override public boolean queueQuery(String queueName) throws Exception { + boolean queryResult = false; + QueueQueryResult queueQuery = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); - return queueQuery.isExists(); + + if (queueQuery.isExists()) + { + queryResult = true; + } + else + { + if (queueQuery.isAutoCreateJmsQueues()) + { + serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true); + queryResult = true; + } + else + { + queryResult = false; + } + } + + return queryResult; } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java index 79cb3b50f7..3665c73bad 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java @@ -40,6 +40,8 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Random; +import org.apache.activemq.api.core.management.ResourceNames; +import org.apache.activemq.tests.util.RandomUtil; import org.apache.qpid.amqp_1_0.client.Receiver; import org.apache.qpid.amqp_1_0.client.Sender; import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; @@ -886,6 +888,110 @@ public class ProtonTest extends ServiceTestBase } + @Test + public void testUsingPlainAMQPSenderWithNonExistantQueue() throws Exception + { + if (this.protocol != 0 && protocol != 3) + { + return; + } + + String queue = ResourceNames.JMS_QUEUE + RandomUtil.randomString(); + + org.apache.qpid.amqp_1_0.client.Connection connection = null; + + try + { + // Step 1. Create an amqp qpid 1.0 connection + connection = new org.apache.qpid.amqp_1_0.client.Connection("localhost", 5672, null, null); + + // Step 2. Create a session + org.apache.qpid.amqp_1_0.client.Session session = connection.createSession(); + + // Step 3. Create a sender + Sender sender = session.createSender(queue); + + assertNotNull(server.locateQueue(new SimpleString(queue))); + + // Step 4. send a simple message + sender.send(new org.apache.qpid.amqp_1_0.client.Message("I am an amqp message")); + + // Step 5. create a moving receiver, this means the message will be removed from the queue + Receiver rec = session.createMovingReceiver(queue); + + // Step 6. set some credit so we can receive + rec.setCredit(UnsignedInteger.valueOf(1), false); + + // Step 7. receive the simple message + org.apache.qpid.amqp_1_0.client.Message m = rec.receive(5000); + System.out.println("message = " + m.getPayload()); + + // Step 8. acknowledge the message + rec.acknowledge(m); + } + finally + { + if (connection != null) + { + // Step 9. close the connection + connection.close(); + } + } + } + + + @Test + public void testUsingPlainAMQPReceiverWithNonExistantQueue() throws Exception + { + if (this.protocol != 0 && protocol != 3) + { + return; + } + + String queue = ResourceNames.JMS_QUEUE + RandomUtil.randomString(); + + org.apache.qpid.amqp_1_0.client.Connection connection = null; + + try + { + // Step 1. Create an amqp qpid 1.0 connection + connection = new org.apache.qpid.amqp_1_0.client.Connection("localhost", 5672, null, null); + + // Step 2. Create a session + org.apache.qpid.amqp_1_0.client.Session session = connection.createSession(); + + // Step 3. create a moving receiver, this means the message will be removed from the queue + Receiver rec = session.createMovingReceiver(queue); + + assertNotNull(server.locateQueue(new SimpleString(queue))); + + // Step 4. Create a sender + Sender sender = session.createSender(queue); + + // Step 5. send a simple message + sender.send(new org.apache.qpid.amqp_1_0.client.Message("I am an amqp message")); + + // Step 6. set some credit so we can receive + rec.setCredit(UnsignedInteger.valueOf(1), false); + + // Step 7. receive the simple message + org.apache.qpid.amqp_1_0.client.Message m = rec.receive(5000); + System.out.println("message = " + m.getPayload()); + + // Step 8. acknowledge the message + rec.acknowledge(m); + } + finally + { + if (connection != null) + { + // Step 9. close the connection + connection.close(); + } + } + } + + private javax.jms.Queue createQueue(String address) { if (protocol == 0 || protocol == 3)