ACTIVEMQ-77 auto queue creation on AMQP send/rec
Implements a new feature for the broker whereby it may automatically create and delete queues which are not explicitly defined through the management API or file-based configuration when a client sends a message to or receives from a queue via the AMQP protocol. Note, the destination has to be named like "jms.queue.*" to be auto- created. The queue may subsequently be deleted when it no longer has any messages and consumers. Auto-creation and auto-deletion can both be turned on/off via address-setting.
This commit is contained in:
parent
87966029c7
commit
2cbef28cc7
|
@ -150,8 +150,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
|||
@Override
|
||||
public boolean queueQuery(String queueName) throws Exception
|
||||
{
|
||||
boolean queryResult = false;
|
||||
|
||||
QueueQueryResult queueQuery = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
|
||||
return queueQuery.isExists();
|
||||
|
||||
if (queueQuery.isExists())
|
||||
{
|
||||
queryResult = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (queueQuery.isAutoCreateJmsQueues())
|
||||
{
|
||||
serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true);
|
||||
queryResult = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
queryResult = false;
|
||||
}
|
||||
}
|
||||
|
||||
return queryResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -40,6 +40,8 @@ import java.util.Enumeration;
|
|||
import java.util.HashMap;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.activemq.api.core.management.ResourceNames;
|
||||
import org.apache.activemq.tests.util.RandomUtil;
|
||||
import org.apache.qpid.amqp_1_0.client.Receiver;
|
||||
import org.apache.qpid.amqp_1_0.client.Sender;
|
||||
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
|
||||
|
@ -886,6 +888,110 @@ public class ProtonTest extends ServiceTestBase
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testUsingPlainAMQPSenderWithNonExistantQueue() throws Exception
|
||||
{
|
||||
if (this.protocol != 0 && protocol != 3)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
String queue = ResourceNames.JMS_QUEUE + RandomUtil.randomString();
|
||||
|
||||
org.apache.qpid.amqp_1_0.client.Connection connection = null;
|
||||
|
||||
try
|
||||
{
|
||||
// Step 1. Create an amqp qpid 1.0 connection
|
||||
connection = new org.apache.qpid.amqp_1_0.client.Connection("localhost", 5672, null, null);
|
||||
|
||||
// Step 2. Create a session
|
||||
org.apache.qpid.amqp_1_0.client.Session session = connection.createSession();
|
||||
|
||||
// Step 3. Create a sender
|
||||
Sender sender = session.createSender(queue);
|
||||
|
||||
assertNotNull(server.locateQueue(new SimpleString(queue)));
|
||||
|
||||
// Step 4. send a simple message
|
||||
sender.send(new org.apache.qpid.amqp_1_0.client.Message("I am an amqp message"));
|
||||
|
||||
// Step 5. create a moving receiver, this means the message will be removed from the queue
|
||||
Receiver rec = session.createMovingReceiver(queue);
|
||||
|
||||
// Step 6. set some credit so we can receive
|
||||
rec.setCredit(UnsignedInteger.valueOf(1), false);
|
||||
|
||||
// Step 7. receive the simple message
|
||||
org.apache.qpid.amqp_1_0.client.Message m = rec.receive(5000);
|
||||
System.out.println("message = " + m.getPayload());
|
||||
|
||||
// Step 8. acknowledge the message
|
||||
rec.acknowledge(m);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (connection != null)
|
||||
{
|
||||
// Step 9. close the connection
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testUsingPlainAMQPReceiverWithNonExistantQueue() throws Exception
|
||||
{
|
||||
if (this.protocol != 0 && protocol != 3)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
String queue = ResourceNames.JMS_QUEUE + RandomUtil.randomString();
|
||||
|
||||
org.apache.qpid.amqp_1_0.client.Connection connection = null;
|
||||
|
||||
try
|
||||
{
|
||||
// Step 1. Create an amqp qpid 1.0 connection
|
||||
connection = new org.apache.qpid.amqp_1_0.client.Connection("localhost", 5672, null, null);
|
||||
|
||||
// Step 2. Create a session
|
||||
org.apache.qpid.amqp_1_0.client.Session session = connection.createSession();
|
||||
|
||||
// Step 3. create a moving receiver, this means the message will be removed from the queue
|
||||
Receiver rec = session.createMovingReceiver(queue);
|
||||
|
||||
assertNotNull(server.locateQueue(new SimpleString(queue)));
|
||||
|
||||
// Step 4. Create a sender
|
||||
Sender sender = session.createSender(queue);
|
||||
|
||||
// Step 5. send a simple message
|
||||
sender.send(new org.apache.qpid.amqp_1_0.client.Message("I am an amqp message"));
|
||||
|
||||
// Step 6. set some credit so we can receive
|
||||
rec.setCredit(UnsignedInteger.valueOf(1), false);
|
||||
|
||||
// Step 7. receive the simple message
|
||||
org.apache.qpid.amqp_1_0.client.Message m = rec.receive(5000);
|
||||
System.out.println("message = " + m.getPayload());
|
||||
|
||||
// Step 8. acknowledge the message
|
||||
rec.acknowledge(m);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (connection != null)
|
||||
{
|
||||
// Step 9. close the connection
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private javax.jms.Queue createQueue(String address)
|
||||
{
|
||||
if (protocol == 0 || protocol == 3)
|
||||
|
|
Loading…
Reference in New Issue