Some additional JMS Tests focused on Topics. Useful when updating the

AMQP JMS Client version as it shows some new problem in the latest
SNAPSHOT builds.
This commit is contained in:
Timothy Bish 2013-12-11 16:58:08 -05:00
parent 78c4e43372
commit e1e8c5b083
1 changed files with 168 additions and 14 deletions

View File

@ -23,7 +23,9 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@ -31,6 +33,7 @@ import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
@ -38,8 +41,10 @@ import javax.jms.TextMessage;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.activemq.util.Wait;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@ -347,26 +352,175 @@ public class JMSClientTest extends AmqpTestSupport {
@Test(timeout=30000)
public void testTTL() throws Exception {
QueueImpl queue = new QueueImpl("queue://" + name);
Connection connection = null;
try {
QueueImpl queue = new QueueImpl("queue://" + name);
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setTimeToLive(1000);
Message toSend = session.createTextMessage("Sample text");
producer.send(toSend);
MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(5000);
assertNotNull(received);
LOG.info("Message JMSExpiration = {}", received.getJMSExpiration());
producer.setTimeToLive(100);
producer.send(toSend);
TimeUnit.SECONDS.sleep(2);
received = consumer.receive(5000);
if (received != null) {
LOG.info("Message JMSExpiration = {} JMSTimeStamp = {} TTL = {}",
new Object[] { received.getJMSExpiration(), received.getJMSTimestamp(),
received.getJMSExpiration() - received.getJMSTimestamp()});
}
assertNull(received);
} finally {
connection.close();
}
}
@Test(timeout=30000)
public void testDurableConsumerAsync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
TopicImpl topic = new TopicImpl("topic://"+name);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Message> received = new AtomicReference<Message>();
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setTimeToLive(1000);
Message toSend = session.createTextMessage("Sample text");
producer.send(toSend);
MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(5000);
assertNotNull(received);
producer.setTimeToLive(100);
producer.send(toSend);
TimeUnit.SECONDS.sleep(1);
assertNull(consumer.receive(5000));
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
received.set(message);
latch.countDown();
}
});
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
TextMessage message = session.createTextMessage();
message.setText("hello");
producer.send(message);
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertNotNull("Should have received a message by now.", received.get());
assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage);
}
connection.close();
}
@Test(timeout=30000)
public void testDurableConsumerSync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
TopicImpl topic = new TopicImpl("topic://"+name);
Connection connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
TextMessage message = session.createTextMessage();
message.setText("hello");
producer.send(message);
final AtomicReference<Message> msg = new AtomicReference<Message>();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
msg.set(consumer.receiveNoWait());
return msg.get() != null;
}
}));
assertNotNull("Should have received a message by now.", msg.get());
assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
}
connection.close();
}
@Test(timeout=30000)
public void testTopicConsumerAsync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
TopicImpl topic = new TopicImpl("topic://"+name);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Message> received = new AtomicReference<Message>();
Connection connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
received.set(message);
latch.countDown();
}
});
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
TextMessage message = session.createTextMessage();
message.setText("hello");
producer.send(message);
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertNotNull("Should have received a message by now.", received.get());
assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage);
}
connection.close();
}
@Test(timeout=45000)
public void testTopicConsumerSync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
TopicImpl topic = new TopicImpl("topic://"+name);
Connection connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session.createConsumer(topic);
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
TextMessage message = session.createTextMessage();
message.setText("hello");
producer.send(message);
final AtomicReference<Message> msg = new AtomicReference<Message>();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
msg.set(consumer.receiveNoWait());
return msg.get() != null;
}
}));
assertNotNull("Should have received a message by now.", msg.get());
assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
}
connection.close();
}
private Connection createConnection() throws JMSException {
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
final Connection connection = factory.createConnection();
connection.setClientID(name.toString());
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {