This closes #92 on auto-create queue for AMQP
This commit is contained in:
commit
5f65c07d31
|
@ -150,8 +150,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
||||||
@Override
|
@Override
|
||||||
public boolean queueQuery(String queueName) throws Exception
|
public boolean queueQuery(String queueName) throws Exception
|
||||||
{
|
{
|
||||||
|
boolean queryResult = false;
|
||||||
|
|
||||||
QueueQueryResult queueQuery = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
|
QueueQueryResult queueQuery = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
|
||||||
return queueQuery.isExists();
|
|
||||||
|
if (queueQuery.isExists())
|
||||||
|
{
|
||||||
|
queryResult = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (queueQuery.isAutoCreateJmsQueues())
|
||||||
|
{
|
||||||
|
serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true);
|
||||||
|
queryResult = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
queryResult = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return queryResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -40,6 +40,8 @@ import java.util.Enumeration;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.activemq.api.core.management.ResourceNames;
|
||||||
|
import org.apache.activemq.tests.util.RandomUtil;
|
||||||
import org.apache.qpid.amqp_1_0.client.Receiver;
|
import org.apache.qpid.amqp_1_0.client.Receiver;
|
||||||
import org.apache.qpid.amqp_1_0.client.Sender;
|
import org.apache.qpid.amqp_1_0.client.Sender;
|
||||||
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
|
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
|
||||||
|
@ -886,6 +888,110 @@ public class ProtonTest extends ServiceTestBase
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUsingPlainAMQPSenderWithNonExistantQueue() throws Exception
|
||||||
|
{
|
||||||
|
if (this.protocol != 0 && protocol != 3)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String queue = ResourceNames.JMS_QUEUE + RandomUtil.randomString();
|
||||||
|
|
||||||
|
org.apache.qpid.amqp_1_0.client.Connection connection = null;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Step 1. Create an amqp qpid 1.0 connection
|
||||||
|
connection = new org.apache.qpid.amqp_1_0.client.Connection("localhost", 5672, null, null);
|
||||||
|
|
||||||
|
// Step 2. Create a session
|
||||||
|
org.apache.qpid.amqp_1_0.client.Session session = connection.createSession();
|
||||||
|
|
||||||
|
// Step 3. Create a sender
|
||||||
|
Sender sender = session.createSender(queue);
|
||||||
|
|
||||||
|
assertNotNull(server.locateQueue(new SimpleString(queue)));
|
||||||
|
|
||||||
|
// Step 4. send a simple message
|
||||||
|
sender.send(new org.apache.qpid.amqp_1_0.client.Message("I am an amqp message"));
|
||||||
|
|
||||||
|
// Step 5. create a moving receiver, this means the message will be removed from the queue
|
||||||
|
Receiver rec = session.createMovingReceiver(queue);
|
||||||
|
|
||||||
|
// Step 6. set some credit so we can receive
|
||||||
|
rec.setCredit(UnsignedInteger.valueOf(1), false);
|
||||||
|
|
||||||
|
// Step 7. receive the simple message
|
||||||
|
org.apache.qpid.amqp_1_0.client.Message m = rec.receive(5000);
|
||||||
|
System.out.println("message = " + m.getPayload());
|
||||||
|
|
||||||
|
// Step 8. acknowledge the message
|
||||||
|
rec.acknowledge(m);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (connection != null)
|
||||||
|
{
|
||||||
|
// Step 9. close the connection
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUsingPlainAMQPReceiverWithNonExistantQueue() throws Exception
|
||||||
|
{
|
||||||
|
if (this.protocol != 0 && protocol != 3)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String queue = ResourceNames.JMS_QUEUE + RandomUtil.randomString();
|
||||||
|
|
||||||
|
org.apache.qpid.amqp_1_0.client.Connection connection = null;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Step 1. Create an amqp qpid 1.0 connection
|
||||||
|
connection = new org.apache.qpid.amqp_1_0.client.Connection("localhost", 5672, null, null);
|
||||||
|
|
||||||
|
// Step 2. Create a session
|
||||||
|
org.apache.qpid.amqp_1_0.client.Session session = connection.createSession();
|
||||||
|
|
||||||
|
// Step 3. create a moving receiver, this means the message will be removed from the queue
|
||||||
|
Receiver rec = session.createMovingReceiver(queue);
|
||||||
|
|
||||||
|
assertNotNull(server.locateQueue(new SimpleString(queue)));
|
||||||
|
|
||||||
|
// Step 4. Create a sender
|
||||||
|
Sender sender = session.createSender(queue);
|
||||||
|
|
||||||
|
// Step 5. send a simple message
|
||||||
|
sender.send(new org.apache.qpid.amqp_1_0.client.Message("I am an amqp message"));
|
||||||
|
|
||||||
|
// Step 6. set some credit so we can receive
|
||||||
|
rec.setCredit(UnsignedInteger.valueOf(1), false);
|
||||||
|
|
||||||
|
// Step 7. receive the simple message
|
||||||
|
org.apache.qpid.amqp_1_0.client.Message m = rec.receive(5000);
|
||||||
|
System.out.println("message = " + m.getPayload());
|
||||||
|
|
||||||
|
// Step 8. acknowledge the message
|
||||||
|
rec.acknowledge(m);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (connection != null)
|
||||||
|
{
|
||||||
|
// Step 9. close the connection
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private javax.jms.Queue createQueue(String address)
|
private javax.jms.Queue createQueue(String address)
|
||||||
{
|
{
|
||||||
if (protocol == 0 || protocol == 3)
|
if (protocol == 0 || protocol == 3)
|
||||||
|
|
Loading…
Reference in New Issue