mirror of https://github.com/apache/activemq.git
Test didn't need to depend on hard coded port so use :0 instead. Add some additional test cases.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1371809 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
39e20d0415
commit
60383038d3
|
@ -1,10 +1,8 @@
|
|||
package org.apache.activemq.bugs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.jms.Connection;
|
||||
|
@ -18,77 +16,63 @@ import javax.jms.MessageListener;
|
|||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQMessageConsumer;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.DestinationViewMBean;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.util.Wait;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Test for AMQ-3965.
|
||||
* A consumer may be stalled in case it uses optimizeAcknowledge and receives
|
||||
* a number of messages that expire before being dispatched to application code.
|
||||
* See AMQ-3965 for more details.
|
||||
* A consumer may be stalled in case it uses optimizeAcknowledge and receives
|
||||
* a number of messages that expire before being dispatched to application code.
|
||||
* See for more details.
|
||||
*
|
||||
*/
|
||||
public class OptimizeAcknowledgeWithExpiredMsgsTest {
|
||||
|
||||
|
||||
private final static Logger LOG = LoggerFactory.getLogger(OptimizeAcknowledgeWithExpiredMsgsTest.class);
|
||||
|
||||
private static BrokerService broker = null;
|
||||
protected static final String DATA_DIR = "target/activemq-data/";
|
||||
public final String brokerUrl = "tcp://localhost:61614";
|
||||
private BrokerService broker = null;
|
||||
|
||||
|
||||
/**
|
||||
* Creates a broker instance and starts it.
|
||||
*
|
||||
private String connectionUri;
|
||||
|
||||
/**
|
||||
* Creates a broker instance but does not start it.
|
||||
*
|
||||
* @param brokerUri - transport uri of broker
|
||||
* @param brokerName - name for the broker
|
||||
* @return a BrokerService instance with transport uri and broker name set
|
||||
* @throws Exception
|
||||
*/
|
||||
protected BrokerService createBroker(URI brokerUri, String brokerName) throws Exception {
|
||||
BrokerService broker = BrokerFactory.createBroker(brokerUri);
|
||||
broker.setBrokerName(brokerName);
|
||||
broker.setBrokerId(brokerName);
|
||||
broker.setDataDirectory(DATA_DIR);
|
||||
broker.setEnableStatistics(true);
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = new BrokerService();
|
||||
broker.setPersistent(false);
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
broker.setUseJmx(false);
|
||||
connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
|
||||
return broker;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
final String options = "?persistent=false&useJmx=false&deleteAllMessagesOnStartup=true";
|
||||
|
||||
broker = createBroker(new URI("broker:(" + brokerUrl + ")" + options), "localhost");
|
||||
broker = createBroker();
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
|
||||
broker.waitUntilStarted();
|
||||
}
|
||||
|
||||
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (broker != null)
|
||||
broker.stop();
|
||||
public void tearDown() throws Exception {
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
broker = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tests for AMQ-3965
|
||||
|
@ -96,18 +80,79 @@ public class OptimizeAcknowledgeWithExpiredMsgsTest {
|
|||
* Creates producer and consumer. Producer sends 45 msgs that will expire
|
||||
* at consumer (but before being dispatched to app code).
|
||||
* Producer then sends 60 msgs without expiry.
|
||||
*
|
||||
*
|
||||
* Consumer receives msgs using a MessageListener and increments a counter.
|
||||
* Main thread sleeps for 5 seconds and checks the counter value.
|
||||
* Main thread sleeps for 5 seconds and checks the counter value.
|
||||
* If counter != 60 msgs (the number of msgs that should get dispatched
|
||||
* to consumer) the test fails.
|
||||
* to consumer) the test fails.
|
||||
*/
|
||||
@Test
|
||||
public void testOptimizedAckWithExpiredMsgs() throws Exception
|
||||
{
|
||||
|
||||
ActiveMQConnectionFactory connectionFactory =
|
||||
new ActiveMQConnectionFactory(brokerUrl + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
|
||||
ActiveMQConnectionFactory connectionFactory =
|
||||
new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
|
||||
|
||||
// Create JMS resources
|
||||
Connection connection = connectionFactory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createQueue("TEST.FOO");
|
||||
|
||||
// ***** Consumer code *****
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
final MyMessageListener listener = new MyMessageListener();
|
||||
connection.setExceptionListener((ExceptionListener) listener);
|
||||
|
||||
// ***** Producer Code *****
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
|
||||
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
|
||||
TextMessage message;
|
||||
|
||||
// Produce msgs that will expire quickly
|
||||
for (int i=0; i<45; i++) {
|
||||
message = session.createTextMessage(text);
|
||||
producer.send(message,1,1,100);
|
||||
LOG.trace("Sent message: "+ message.getJMSMessageID() +
|
||||
" with expiry 10 msec");
|
||||
}
|
||||
// Produce msgs that don't expire
|
||||
for (int i=0; i<60; i++) {
|
||||
message = session.createTextMessage(text);
|
||||
producer.send(message,1,1,60000);
|
||||
// producer.send(message);
|
||||
LOG.trace("Sent message: "+ message.getJMSMessageID() +
|
||||
" with expiry 30 sec");
|
||||
}
|
||||
consumer.setMessageListener(listener);
|
||||
|
||||
sleep(1000); // let the batch of 45 expire.
|
||||
|
||||
connection.start();
|
||||
|
||||
assertTrue("Should receive all expected messages, counter at " + listener.getCounter(), Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return listener.getCounter() == 60;
|
||||
}
|
||||
}));
|
||||
|
||||
LOG.info("Received all expected messages with counter at: " + listener.getCounter());
|
||||
|
||||
// Cleanup
|
||||
producer.close();
|
||||
consumer.close();
|
||||
session.close();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOptimizedAckWithExpiredMsgsSync() throws Exception
|
||||
{
|
||||
ActiveMQConnectionFactory connectionFactory =
|
||||
new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
|
||||
|
||||
// Create JMS resources
|
||||
Connection connection = connectionFactory.createConnection();
|
||||
|
@ -115,117 +160,137 @@ public class OptimizeAcknowledgeWithExpiredMsgsTest {
|
|||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createQueue("TEST.FOO");
|
||||
|
||||
// ***** Consumer code *****
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
MyMessageListener listener = new MyMessageListener();
|
||||
connection.setExceptionListener((ExceptionListener) listener);
|
||||
|
||||
// ***** Consumer code *****
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
// ***** Producer Code *****
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
|
||||
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
|
||||
TextMessage message;
|
||||
|
||||
consumer.setMessageListener(listener);
|
||||
listener.setDelay(100);
|
||||
|
||||
TextMessage message;
|
||||
|
||||
// Produce msgs that will expire quickly
|
||||
for (int i=0; i<45; i++) {
|
||||
message = session.createTextMessage(text);
|
||||
producer.send(message,1,1,30);
|
||||
LOG.trace("Sent message: "+ message.getJMSMessageID() +
|
||||
" with expiry 30 msec");
|
||||
producer.send(message,1,1,10);
|
||||
LOG.trace("Sent message: "+ message.getJMSMessageID() +
|
||||
" with expiry 10 msec");
|
||||
}
|
||||
// Produce msgs that don't expire
|
||||
for (int i=0; i<60; i++) {
|
||||
message = session.createTextMessage(text);
|
||||
producer.send(message);
|
||||
LOG.trace("Sent message: "+ message.getJMSMessageID() +
|
||||
" with no expiry.");
|
||||
producer.send(message,1,1,30000);
|
||||
// producer.send(message);
|
||||
LOG.trace("Sent message: "+ message.getJMSMessageID() +
|
||||
" with expiry 30 sec");
|
||||
}
|
||||
listener.setDelay(0);
|
||||
sleep(200);
|
||||
|
||||
int counter = 1;
|
||||
for (; counter <= 60; ++counter) {
|
||||
assertNotNull(consumer.receive(2000));
|
||||
LOG.info("counter at " + counter);
|
||||
}
|
||||
LOG.info("Received all expected messages with counter at: " + counter);
|
||||
|
||||
// Cleanup
|
||||
producer.close();
|
||||
consumer.close();
|
||||
session.close();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOptimizedAckWithExpiredMsgsSync2() throws Exception
|
||||
{
|
||||
ActiveMQConnectionFactory connectionFactory =
|
||||
new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
|
||||
|
||||
// Create JMS resources
|
||||
Connection connection = connectionFactory.createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createQueue("TEST.FOO");
|
||||
|
||||
// ***** Consumer code *****
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
// ***** Producer Code *****
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
|
||||
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
|
||||
TextMessage message;
|
||||
|
||||
// Produce msgs that don't expire
|
||||
for (int i=0; i<56; i++) {
|
||||
message = session.createTextMessage(text);
|
||||
producer.send(message,1,1,30000);
|
||||
// producer.send(message);
|
||||
LOG.trace("Sent message: "+ message.getJMSMessageID() +
|
||||
" with expiry 30 sec");
|
||||
}
|
||||
// Produce msgs that will expire quickly
|
||||
for (int i=0; i<44; i++) {
|
||||
message = session.createTextMessage(text);
|
||||
producer.send(message,1,1,10);
|
||||
LOG.trace("Sent message: "+ message.getJMSMessageID() +
|
||||
" with expiry 10 msec");
|
||||
}
|
||||
// Produce some moremsgs that don't expire
|
||||
for (int i=0; i<4; i++) {
|
||||
message = session.createTextMessage(text);
|
||||
producer.send(message,1,1,30000);
|
||||
// producer.send(message);
|
||||
LOG.trace("Sent message: "+ message.getJMSMessageID() +
|
||||
" with expiry 30 sec");
|
||||
}
|
||||
|
||||
sleep(200);
|
||||
|
||||
int counter = 1;
|
||||
for (; counter <= 60; ++counter) {
|
||||
assertNotNull(consumer.receive(2000));
|
||||
LOG.info("counter at " + counter);
|
||||
}
|
||||
LOG.info("Received all expected messages with counter at: " + counter);
|
||||
|
||||
// set exit condition
|
||||
TestExitCondition cond = new TestExitCondition(listener);
|
||||
Wait.waitFor(cond, 5000);
|
||||
|
||||
Assert.assertTrue("Error: Some non-expired messages were not received.", listener.getCounter() >= 60);
|
||||
|
||||
LOG.info("Received all expected messages with counter at " + listener.getCounter());
|
||||
|
||||
// Cleanup
|
||||
LOG.info("Cleaning up.");
|
||||
producer.close();
|
||||
consumer.close();
|
||||
session.close();
|
||||
connection.close();
|
||||
listener = null;
|
||||
}
|
||||
|
||||
|
||||
private void sleep(int milliSecondTime) {
|
||||
try {
|
||||
Thread.sleep(milliSecondTime);
|
||||
} catch (InterruptedException igonred) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Defines the exit condition for the test.
|
||||
*/
|
||||
private class TestExitCondition implements Wait.Condition {
|
||||
|
||||
private MyMessageListener listener;
|
||||
|
||||
public TestExitCondition(MyMessageListener l) {
|
||||
this.listener = l;
|
||||
}
|
||||
|
||||
public boolean isSatisified() throws Exception {
|
||||
return listener.getCounter() == 36;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
/**
|
||||
* Standard JMS MessageListener
|
||||
*/
|
||||
private class MyMessageListener implements MessageListener, ExceptionListener {
|
||||
|
||||
private AtomicInteger counter = new AtomicInteger(0);
|
||||
private int delay = 0;
|
||||
|
||||
public void onMessage(final Message message) {
|
||||
try {
|
||||
LOG.trace("Got Message " + message.getJMSMessageID());
|
||||
LOG.debug("counter at " + counter.incrementAndGet());
|
||||
if (delay>0) {
|
||||
sleep(delay);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
e.printStackTrace();
|
||||
|
||||
private AtomicInteger counter = new AtomicInteger(0);
|
||||
|
||||
public void onMessage(final Message message) {
|
||||
try {
|
||||
LOG.trace("Got Message " + message.getJMSMessageID());
|
||||
LOG.info("counter at " + counter.incrementAndGet());
|
||||
} catch (final Exception e) {
|
||||
}
|
||||
}
|
||||
|
||||
public int getCounter() {
|
||||
return counter.get();
|
||||
}
|
||||
|
||||
public int getDelay() {
|
||||
return delay;
|
||||
}
|
||||
|
||||
public void setDelay(int newDelay) {
|
||||
this.delay = newDelay;
|
||||
}
|
||||
|
||||
public synchronized void onException(JMSException ex) {
|
||||
}
|
||||
|
||||
public int getCounter() {
|
||||
return counter.get();
|
||||
}
|
||||
|
||||
public synchronized void onException(JMSException ex) {
|
||||
LOG.error("JMS Exception occured. Shutting down client.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue