Some new tests showing existing problems with AMQP transacted consumer handling. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1507146 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-07-25 22:32:56 +00:00
parent 563cc1bc6a
commit c5c4caadcf
2 changed files with 157 additions and 1 deletions

View File

@ -19,9 +19,20 @@ package org.apache.activemq.transport.amqp;
import java.io.File;
import java.util.Vector;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.spring.SpringSslContext;
import org.junit.After;
import org.junit.Before;
@ -92,4 +103,39 @@ public class AmqpTestSupport {
}
autoFailTestSupport.stopAutoFailThread();
}
public void sendMessages(Connection connection, Destination destination, int count) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = session.createProducer(destination);
for (int i = 0; i < count; i++) {
TextMessage message = session.createTextMessage();
message.setText("TextMessage: " + i);
p.send(message);
}
p.close();
}
protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
ObjectName brokerViewMBean = new ObjectName(
"org.apache.activemq:type=Broker,brokerName=localhost");
BrokerViewMBean proxy = (BrokerViewMBean) brokerService.getManagementContext()
.newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true);
return proxy;
}
protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
return proxy;
}
protected QueueViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
return proxy;
}
}

View File

@ -33,6 +33,7 @@ import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
@ -46,7 +47,7 @@ public class JMSClientTest extends AmqpTestSupport {
@SuppressWarnings("rawtypes")
@Test
public void testTransactions() throws Exception {
public void testProducerConsume() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
@ -72,7 +73,116 @@ public class JMSClientTest extends AmqpTestSupport {
assertTrue(msg instanceof TextMessage);
}
connection.close();
}
@Test
public void testTransactedConsumer() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
final int msgCount = 10;
Connection connection = createConnection();
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue("txqueue");
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
// Consumer all in TX and commit.
{
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < msgCount; ++i) {
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
}
consumer.close();
session.commit();
}
LOG.info("Queue size after consumer commit is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
connection.close();
}
@Test
public void testRollbackRececeivedMessage() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
final int msgCount = 1;
Connection connection = createConnection();
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue("txqueue");
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
// Receive and roll back, first receive should not show redelivered.
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals(false, msg.getJMSRedelivered());
session.rollback();
// Receive and roll back, first receive should not show redelivered.
msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals(true, msg.getJMSRedelivered());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
session.commit();
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
}
@Test
public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
final int msgCount = 500;
Connection connection = createConnection();
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue("txqueue");
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
// Consumer all in TX and commit.
{
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < msgCount; ++i) {
if ((i % 100) == 0) {
LOG.info("Attempting receive of Message #{}", i);
}
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull("Should receive message: " + i, msg);
assertTrue(msg instanceof TextMessage);
}
consumer.close();
session.commit();
}
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
}
@SuppressWarnings("rawtypes")