Use the AMQP JMS client's setTopicPrefix and setQueuePrefix on the

ConnectionFactory to make the JMS client usage simpler.
This commit is contained in:
Timothy Bish 2014-01-15 09:46:43 -05:00
parent 6044c4c059
commit 3d63ca75fb
1 changed files with 22 additions and 19 deletions

View File

@ -36,17 +36,17 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.activemq.util.Wait;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@ -60,11 +60,11 @@ public class JMSClientTest extends AmqpTestSupport {
@Test
public void testProducerConsume() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://" + name);
Connection connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();
@ -89,17 +89,17 @@ public class JMSClientTest extends AmqpTestSupport {
@Test
public void testTransactedConsumer() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://" + name);
final int msgCount = 1;
Connection connection = createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue(name.toString());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(TestConfig.TIMEOUT);
@ -121,17 +121,17 @@ public class JMSClientTest extends AmqpTestSupport {
public void testRollbackRececeivedMessage() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://" + name);
final int msgCount = 1;
Connection connection = createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue(name.toString());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
// Receive and roll back, first receive should not show redelivered.
@ -165,10 +165,11 @@ public class JMSClientTest extends AmqpTestSupport {
public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://" + name);
final int msgCount = 500;
Connection connection = createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue(name.toString());
@ -177,7 +178,6 @@ public class JMSClientTest extends AmqpTestSupport {
// Consumer all in TX and commit.
{
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < msgCount; ++i) {
@ -204,11 +204,11 @@ public class JMSClientTest extends AmqpTestSupport {
@Test
public void testSelectors() throws Exception{
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://" + name);
Connection connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();
@ -242,10 +242,10 @@ public class JMSClientTest extends AmqpTestSupport {
//should through exception IllegalStateException:The session is closed
@Test(timeout=30000)
public void testBrokerRestartPersistentQueueException() throws Exception {
QueueImpl queue = new QueueImpl("queue://" + name);
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
connection.start();
MessageProducer producer = session.createProducer(queue);
@ -266,10 +266,10 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=30000)
public void testProducerThrowsWhenBrokerRestarted() throws Exception {
QueueImpl queue = new QueueImpl("queue://" + name);
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
connection.start();
MessageProducer producer = session.createProducer(queue);
@ -302,10 +302,10 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=30000)
public void testBrokerRestartWontHangConnectionClose() throws Exception {
QueueImpl queue = new QueueImpl("queue://" + name);
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
connection.start();
MessageProducer producer = session.createProducer(queue);
@ -329,9 +329,9 @@ public class JMSClientTest extends AmqpTestSupport {
int count = 2000;
QueueImpl queue = new QueueImpl("queue://" + name);
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
connection.start();
MessageProducer producer= session.createProducer(queue);
@ -357,9 +357,9 @@ public class JMSClientTest extends AmqpTestSupport {
ActiveMQAdmin.enableJMSFrameTracing();
Connection connection = null;
try {
QueueImpl queue = new QueueImpl("queue://" + name);
connection = createConnection(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@ -376,13 +376,13 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=30000)
public void testDurableConsumerAsync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
TopicImpl topic = new TopicImpl("topic://"+name);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Message> received = new AtomicReference<Message>();
Connection connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(name.toString());
MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
consumer.setMessageListener(new MessageListener() {
@ -411,11 +411,11 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=30000)
public void testDurableConsumerSync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
TopicImpl topic = new TopicImpl("topic://"+name);
Connection connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(name.toString());
final MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@ -444,13 +444,13 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=30000)
public void testTopicConsumerAsync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
TopicImpl topic = new TopicImpl("topic://"+name);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Message> received = new AtomicReference<Message>();
Connection connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(name.toString());
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@ -479,11 +479,11 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=45000)
public void testTopicConsumerSync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
TopicImpl topic = new TopicImpl("topic://"+name);
Connection connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(name.toString());
final MessageConsumer consumer = session.createConsumer(topic);
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@ -553,7 +553,10 @@ public class JMSClientTest extends AmqpTestSupport {
private Connection createConnection(String clientId, boolean syncPublish) throws JMSException {
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
factory.setSyncPublish(syncPublish);
factory.setTopicPrefix("topic://");
factory.setQueuePrefix("queue://");
final Connection connection = factory.createConnection();
if (clientId != null && !clientId.isEmpty()) {