diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java index 9418342846..4aeed08eb2 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java @@ -1,10 +1,8 @@ package org.apache.activemq.bugs; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import java.net.URI; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; @@ -18,77 +16,63 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import javax.management.ObjectName; -import junit.framework.Assert; - -import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQMessageConsumer; -import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.DestinationViewMBean; -import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.util.Wait; - import org.junit.After; import org.junit.Before; import org.junit.Test; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Test for AMQ-3965. - * A consumer may be stalled in case it uses optimizeAcknowledge and receives - * a number of messages that expire before being dispatched to application code. - * See AMQ-3965 for more details. + * A consumer may be stalled in case it uses optimizeAcknowledge and receives + * a number of messages that expire before being dispatched to application code. + * See for more details. + * */ public class OptimizeAcknowledgeWithExpiredMsgsTest { - + private final static Logger LOG = LoggerFactory.getLogger(OptimizeAcknowledgeWithExpiredMsgsTest.class); - private static BrokerService broker = null; - protected static final String DATA_DIR = "target/activemq-data/"; - public final String brokerUrl = "tcp://localhost:61614"; + private BrokerService broker = null; - - /** - * Creates a broker instance and starts it. - * + private String connectionUri; + + /** + * Creates a broker instance but does not start it. + * * @param brokerUri - transport uri of broker * @param brokerName - name for the broker * @return a BrokerService instance with transport uri and broker name set * @throws Exception */ - protected BrokerService createBroker(URI brokerUri, String brokerName) throws Exception { - BrokerService broker = BrokerFactory.createBroker(brokerUri); - broker.setBrokerName(brokerName); - broker.setBrokerId(brokerName); - broker.setDataDirectory(DATA_DIR); - broker.setEnableStatistics(true); + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setDeleteAllMessagesOnStartup(true); broker.setUseJmx(false); + connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); return broker; } - - + @Before public void setUp() throws Exception { - final String options = "?persistent=false&useJmx=false&deleteAllMessagesOnStartup=true"; - - broker = createBroker(new URI("broker:(" + brokerUrl + ")" + options), "localhost"); + broker = createBroker(); broker.start(); - broker.waitUntilStarted(); - + broker.waitUntilStarted(); } - - + @After - public void tearDown() throws Exception { - if (broker != null) - broker.stop(); + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } } - /** * Tests for AMQ-3965 @@ -96,18 +80,79 @@ public class OptimizeAcknowledgeWithExpiredMsgsTest { * Creates producer and consumer. Producer sends 45 msgs that will expire * at consumer (but before being dispatched to app code). * Producer then sends 60 msgs without expiry. - * + * * Consumer receives msgs using a MessageListener and increments a counter. - * Main thread sleeps for 5 seconds and checks the counter value. + * Main thread sleeps for 5 seconds and checks the counter value. * If counter != 60 msgs (the number of msgs that should get dispatched - * to consumer) the test fails. + * to consumer) the test fails. */ @Test public void testOptimizedAckWithExpiredMsgs() throws Exception { - - ActiveMQConnectionFactory connectionFactory = - new ActiveMQConnectionFactory(brokerUrl + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100"); + ActiveMQConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100"); + + // Create JMS resources + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("TEST.FOO"); + + // ***** Consumer code ***** + MessageConsumer consumer = session.createConsumer(destination); + + final MyMessageListener listener = new MyMessageListener(); + connection.setExceptionListener((ExceptionListener) listener); + + // ***** Producer Code ***** + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); + TextMessage message; + + // Produce msgs that will expire quickly + for (int i=0; i<45; i++) { + message = session.createTextMessage(text); + producer.send(message,1,1,100); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 10 msec"); + } + // Produce msgs that don't expire + for (int i=0; i<60; i++) { + message = session.createTextMessage(text); + producer.send(message,1,1,60000); + // producer.send(message); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 30 sec"); + } + consumer.setMessageListener(listener); + + sleep(1000); // let the batch of 45 expire. + + connection.start(); + + assertTrue("Should receive all expected messages, counter at " + listener.getCounter(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return listener.getCounter() == 60; + } + })); + + LOG.info("Received all expected messages with counter at: " + listener.getCounter()); + + // Cleanup + producer.close(); + consumer.close(); + session.close(); + connection.close(); + } + + @Test + public void testOptimizedAckWithExpiredMsgsSync() throws Exception + { + ActiveMQConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100"); // Create JMS resources Connection connection = connectionFactory.createConnection(); @@ -115,117 +160,137 @@ public class OptimizeAcknowledgeWithExpiredMsgsTest { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST.FOO"); - // ***** Consumer code ***** - MessageConsumer consumer = session.createConsumer(destination); - - MyMessageListener listener = new MyMessageListener(); - connection.setExceptionListener((ExceptionListener) listener); - + // ***** Consumer code ***** + MessageConsumer consumer = session.createConsumer(destination); + // ***** Producer Code ***** MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); - TextMessage message; - - consumer.setMessageListener(listener); - listener.setDelay(100); - + TextMessage message; + // Produce msgs that will expire quickly for (int i=0; i<45; i++) { message = session.createTextMessage(text); - producer.send(message,1,1,30); - LOG.trace("Sent message: "+ message.getJMSMessageID() + - " with expiry 30 msec"); + producer.send(message,1,1,10); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 10 msec"); } // Produce msgs that don't expire for (int i=0; i<60; i++) { message = session.createTextMessage(text); - producer.send(message); - LOG.trace("Sent message: "+ message.getJMSMessageID() + - " with no expiry."); + producer.send(message,1,1,30000); + // producer.send(message); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 30 sec"); } - listener.setDelay(0); + sleep(200); + + int counter = 1; + for (; counter <= 60; ++counter) { + assertNotNull(consumer.receive(2000)); + LOG.info("counter at " + counter); + } + LOG.info("Received all expected messages with counter at: " + counter); + + // Cleanup + producer.close(); + consumer.close(); + session.close(); + connection.close(); + } + + @Test + public void testOptimizedAckWithExpiredMsgsSync2() throws Exception + { + ActiveMQConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100"); + + // Create JMS resources + Connection connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("TEST.FOO"); + + // ***** Consumer code ***** + MessageConsumer consumer = session.createConsumer(destination); + + // ***** Producer Code ***** + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); + TextMessage message; + + // Produce msgs that don't expire + for (int i=0; i<56; i++) { + message = session.createTextMessage(text); + producer.send(message,1,1,30000); + // producer.send(message); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 30 sec"); + } + // Produce msgs that will expire quickly + for (int i=0; i<44; i++) { + message = session.createTextMessage(text); + producer.send(message,1,1,10); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 10 msec"); + } + // Produce some moremsgs that don't expire + for (int i=0; i<4; i++) { + message = session.createTextMessage(text); + producer.send(message,1,1,30000); + // producer.send(message); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 30 sec"); + } + + sleep(200); + + int counter = 1; + for (; counter <= 60; ++counter) { + assertNotNull(consumer.receive(2000)); + LOG.info("counter at " + counter); + } + LOG.info("Received all expected messages with counter at: " + counter); - // set exit condition - TestExitCondition cond = new TestExitCondition(listener); - Wait.waitFor(cond, 5000); - - Assert.assertTrue("Error: Some non-expired messages were not received.", listener.getCounter() >= 60); - - LOG.info("Received all expected messages with counter at " + listener.getCounter()); - // Cleanup - LOG.info("Cleaning up."); producer.close(); consumer.close(); session.close(); connection.close(); - listener = null; } - private void sleep(int milliSecondTime) { try { Thread.sleep(milliSecondTime); } catch (InterruptedException igonred) { - } + } } - - - /** - * Defines the exit condition for the test. - */ - private class TestExitCondition implements Wait.Condition { - private MyMessageListener listener; - - public TestExitCondition(MyMessageListener l) { - this.listener = l; - } - - public boolean isSatisified() throws Exception { - return listener.getCounter() == 36; - } - - } - - - /** + /** * Standard JMS MessageListener */ private class MyMessageListener implements MessageListener, ExceptionListener { - - private AtomicInteger counter = new AtomicInteger(0); - private int delay = 0; - - public void onMessage(final Message message) { - try { - LOG.trace("Got Message " + message.getJMSMessageID()); - LOG.debug("counter at " + counter.incrementAndGet()); - if (delay>0) { - sleep(delay); - } - } catch (final Exception e) { - e.printStackTrace(); + + private AtomicInteger counter = new AtomicInteger(0); + + public void onMessage(final Message message) { + try { + LOG.trace("Got Message " + message.getJMSMessageID()); + LOG.info("counter at " + counter.incrementAndGet()); + } catch (final Exception e) { } - } - - public int getCounter() { - return counter.get(); - } - - public int getDelay() { - return delay; - } - - public void setDelay(int newDelay) { - this.delay = newDelay; - } - - public synchronized void onException(JMSException ex) { + } + + public int getCounter() { + return counter.get(); + } + + public synchronized void onException(JMSException ex) { LOG.error("JMS Exception occured. Shutting down client."); } } } - \ No newline at end of file