Pull out JMS client common test bits into a test support class.

This commit is contained in:
Timothy Bish 2014-05-12 15:16:03 -04:00
parent 683fcda55d
commit 1dd34a13b2
3 changed files with 144 additions and 137 deletions

View File

@ -47,11 +47,15 @@ import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.spring.SpringSslContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpTestSupport {
@Rule public TestName name = new TestName();
protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class);
protected BrokerService brokerService;
protected Vector<Throwable> exceptions = new Vector<Throwable>();

View File

@ -46,21 +46,18 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.activemq.util.Wait;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.objectweb.jtests.jms.framework.TestConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JMSClientTest extends AmqpTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
@Rule public TestName name = new TestName();
java.util.logging.Logger frameLoggger = java.util.logging.Logger.getLogger("FRM");
public class JMSClientTest extends JMSClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
protected java.util.logging.Logger frameLoggger = java.util.logging.Logger.getLogger("FRM");
@Override
@Before
@ -82,10 +79,10 @@ public class JMSClientTest extends AmqpTestSupport {
public void testProducerConsume() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
Connection connection = createConnection();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();
@ -104,18 +101,17 @@ public class JMSClientTest extends AmqpTestSupport {
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
}
connection.close();
}
@Test(timeout=30000)
public void testAnonymousProducerConsume() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
Connection connection = createConnection();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue1 = session.createQueue(name.toString() + "1");
Queue queue2 = session.createQueue(name.toString() + "2");
Queue queue1 = session.createQueue(getDestinationName() + "1");
Queue queue2 = session.createQueue(getDestinationName() + "2");
MessageProducer p = session.createProducer(null);
TextMessage message = session.createTextMessage();
@ -138,7 +134,6 @@ public class JMSClientTest extends AmqpTestSupport {
consumer.close();
}
}
connection.close();
}
@Test
@ -146,12 +141,12 @@ public class JMSClientTest extends AmqpTestSupport {
ActiveMQAdmin.enableJMSFrameTracing();
final int msgCount = 1;
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue(name.toString());
QueueViewMBean queueView = getProxyToQueue(getDestinationName());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
@ -168,8 +163,6 @@ public class JMSClientTest extends AmqpTestSupport {
LOG.info("Queue size after session commit is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
connection.close();
}
@Test(timeout=30000)
@ -178,12 +171,12 @@ public class JMSClientTest extends AmqpTestSupport {
ActiveMQAdmin.enableJMSFrameTracing();
final int msgCount = 1;
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue(name.toString());
QueueViewMBean queueView = getProxyToQueue(getDestinationName());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
@ -213,7 +206,6 @@ public class JMSClientTest extends AmqpTestSupport {
assertEquals(0, queueView.getQueueSize());
session.close();
connection.close();
}
@Test(timeout=60000)
@ -222,12 +214,12 @@ public class JMSClientTest extends AmqpTestSupport {
ActiveMQAdmin.enableJMSFrameTracing();
final int msgCount = 500;
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue(name.toString());
QueueViewMBean queueView = getProxyToQueue(getDestinationName());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
@ -249,8 +241,6 @@ public class JMSClientTest extends AmqpTestSupport {
session.close();
}
connection.close();
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
}
@ -260,10 +250,10 @@ public class JMSClientTest extends AmqpTestSupport {
public void testSelectors() throws Exception{
ActiveMQAdmin.enableJMSFrameTracing();
Connection connection = createConnection();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();
@ -291,15 +281,14 @@ public class JMSClientTest extends AmqpTestSupport {
assertTrue(msg instanceof TextMessage);
assertEquals("hello + 9", ((TextMessage) msg).getText());
}
connection.close();
}
@Test(timeout=30000)
public void testProducerThrowsWhenBrokerStops() throws Exception {
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageProducer producer = session.createProducer(queue);
@ -332,10 +321,9 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=30000)
public void testProducerCreateThrowsWhenBrokerStops() throws Exception {
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
connection.start();
Thread stopper = new Thread(new Runnable() {
@ -364,10 +352,9 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=30000)
public void testConsumerCreateThrowsWhenBrokerStops() throws Exception {
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageProducer producer = session.createProducer(queue);
@ -387,10 +374,9 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=90000)
public void testConsumerReceiveNoWaitThrowsWhenBrokerStops() throws Exception {
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageConsumer consumer=session.createConsumer(queue);
@ -419,10 +405,9 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=60000)
public void testConsumerReceiveTimedThrowsWhenBrokerStops() throws Exception {
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageConsumer consumer=session.createConsumer(queue);
@ -450,10 +435,9 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=30000)
public void testConsumerReceiveReturnsBrokerStops() throws Exception {
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageConsumer consumer=session.createConsumer(queue);
@ -479,10 +463,9 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=30000)
public void testBrokerRestartWontHangConnectionClose() throws Exception {
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageProducer producer = session.createProducer(queue);
@ -503,12 +486,10 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=120000)
public void testProduceAndConsumeLargeNumbersOfMessages() throws JMSException {
int count = 2000;
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageProducer producer= session.createProducer(queue);
@ -532,22 +513,17 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=30000)
public void testSyncSends() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
Connection connection = null;
try {
connection = createConnection(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message toSend = session.createTextMessage("Sample text");
producer.send(toSend);
MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(5000);
assertNotNull(received);
} finally {
connection.close();
}
connection = createConnection(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message toSend = session.createTextMessage("Sample text");
producer.send(toSend);
MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(5000);
assertNotNull(received);
}
@Test(timeout=30000)
@ -556,10 +532,10 @@ public class JMSClientTest extends AmqpTestSupport {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Message> received = new AtomicReference<Message>();
Connection connection = createConnection();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(name.toString());
Topic topic = session.createTopic(getDestinationName());
MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
consumer.setMessageListener(new MessageListener() {
@ -582,17 +558,16 @@ public class JMSClientTest extends AmqpTestSupport {
assertNotNull("Should have received a message by now.", received.get());
assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage);
}
connection.close();
}
@Test(timeout=30000)
public void testDurableConsumerSync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
Connection connection = createConnection();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(name.toString());
Topic topic = session.createTopic(getDestinationName());
final MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@ -615,7 +590,6 @@ public class JMSClientTest extends AmqpTestSupport {
assertNotNull("Should have received a message by now.", msg.get());
assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
}
connection.close();
}
@Test(timeout=30000)
@ -624,10 +598,10 @@ public class JMSClientTest extends AmqpTestSupport {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Message> received = new AtomicReference<Message>();
Connection connection = createConnection();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(name.toString());
Topic topic = session.createTopic(getDestinationName());
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@ -657,10 +631,10 @@ public class JMSClientTest extends AmqpTestSupport {
public void testTopicConsumerSync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
Connection connection = createConnection();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(name.toString());
Topic topic = session.createTopic(getDestinationName());
final MessageConsumer consumer = session.createConsumer(topic);
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@ -683,7 +657,6 @@ public class JMSClientTest extends AmqpTestSupport {
assertNotNull("Should have received a message by now.", msg.get());
assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
}
connection.close();
}
@Test(timeout=60000)
@ -723,7 +696,7 @@ public class JMSClientTest extends AmqpTestSupport {
public void testExecptionListenerCalledOnBrokerStop() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
Connection connection = createConnection();
connection = createConnection();
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
@ -757,9 +730,9 @@ public class JMSClientTest extends AmqpTestSupport {
public void testSessionTransactedCommit() throws JMSException, InterruptedException {
ActiveMQAdmin.enableJMSFrameTracing();
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
connection.start();
@ -785,16 +758,15 @@ public class JMSClientTest extends AmqpTestSupport {
}
session.close();
connection.close();
}
@Test(timeout=30000)
public void testSessionTransactedRollback() throws JMSException, InterruptedException {
ActiveMQAdmin.enableJMSFrameTracing();
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
Queue queue = session.createQueue(getDestinationName());
connection.start();
@ -813,7 +785,6 @@ public class JMSClientTest extends AmqpTestSupport {
assertNull(m);
session.close();
connection.close();
}
private String createLargeString(int sizeInBytes) {
@ -829,9 +800,9 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout = 60 * 1000)
public void testSendLargeMessage() throws JMSException, InterruptedException {
Connection connection = createConnection();
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = name.toString();
String queueName = getDestinationName();
Queue queue = session.createQueue(queueName);
MessageProducer producer=session.createProducer(queue);
@ -842,7 +813,6 @@ public class JMSClientTest extends AmqpTestSupport {
producer.send(m);
MessageConsumer consumer=session.createConsumer(queue);
Message message = consumer.receive();
assertNotNull(message);
assertTrue(message instanceof TextMessage);
@ -851,49 +821,4 @@ public class JMSClientTest extends AmqpTestSupport {
assertEquals(messageSize, textMessage.getText().length());
assertEquals(messageText, textMessage.getText());
}
private Connection createConnection() throws JMSException {
return createConnection(name.toString(), false, false);
}
private Connection createConnection(boolean syncPublish) throws JMSException {
return createConnection(name.toString(), syncPublish, false);
}
private Connection createConnection(String clientId) throws JMSException {
return createConnection(clientId, false, false);
}
/**
* Can be overridden in subclasses to test against a different transport suchs as NIO.
*
* @return the port to connect to on the Broker.
*/
protected int getBrokerPort() {
return port;
}
protected Connection createConnection(String clientId, boolean syncPublish, boolean useSsl) throws JMSException {
int brokerPort = getBrokerPort();
LOG.debug("Creating connection on port {}", brokerPort);
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password", null, useSsl);
factory.setSyncPublish(syncPublish);
factory.setTopicPrefix("topic://");
factory.setQueuePrefix("queue://");
final Connection connection = factory.createConnection();
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
return connection;
}
}

View File

@ -0,0 +1,78 @@
package org.apache.activemq.transport.amqp;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.junit.After;
public class JMSClientTestSupport extends AmqpTestSupport {
protected Connection connection;
@Override
@After
public void tearDown() throws Exception {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
}
}
super.tearDown();
}
/**
* @return the proper destination name to use for each test method invocation.
*/
protected String getDestinationName() {
return name.getMethodName();
}
/**
* Can be overridden in subclasses to test against a different transport suchs as NIO.
*
* @return the port to connect to on the Broker.
*/
protected int getBrokerPort() {
return port;
}
protected Connection createConnection() throws JMSException {
return createConnection(name.toString(), false, false);
}
protected Connection createConnection(boolean syncPublish) throws JMSException {
return createConnection(name.toString(), syncPublish, false);
}
protected Connection createConnection(String clientId) throws JMSException {
return createConnection(clientId, false, false);
}
protected Connection createConnection(String clientId, boolean syncPublish, boolean useSsl) throws JMSException {
int brokerPort = getBrokerPort();
LOG.debug("Creating connection on port {}", brokerPort);
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password", null, useSsl);
factory.setSyncPublish(syncPublish);
factory.setTopicPrefix("topic://");
factory.setQueuePrefix("queue://");
final Connection connection = factory.createConnection();
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
return connection;
}
}