diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java index d43d4070ed..578dad9bf7 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java @@ -16,8 +16,8 @@ */ package org.apache.activemq; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; import javax.jms.ConnectionConsumer; @@ -36,6 +36,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,11 +44,11 @@ import org.slf4j.LoggerFactory; public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport { private static final Logger LOG = LoggerFactory.getLogger(OnePrefetchAsyncConsumerTest.class); - private TestMutex testMutex; - protected Connection connection; - protected ConnectionConsumer connectionConsumer; - protected Queue queue; - protected CountDownLatch messageTwoDelay = new CountDownLatch(1); + private Connection connection; + private ConnectionConsumer connectionConsumer; + private Queue queue; + private final AtomicBoolean completed = new AtomicBoolean(); + private final AtomicBoolean success = new AtomicBoolean(); public void testPrefetchExtension() throws Exception { Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); @@ -60,23 +61,21 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport { producer.send(session.createTextMessage("Msg2")); // Msg3 will cause the test to fail as it will attempt to retrieve an additional ServerSession from - // an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension in the PrefetchSubscription + // an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension in the + // PrefetchSubscription producer.send(session.createTextMessage("Msg3")); session.commit(); - // wait for test to complete and the test result to get set - // this happens asynchronously since the messages are delivered asynchronously - long done = System.currentTimeMillis() + getMaxTestTime(); - synchronized (testMutex) { - while (!testMutex.testCompleted && System.currentTimeMillis() < done) { - testMutex.wait(TimeUnit.SECONDS.toMillis(10)); - } - } + assertTrue("test completed on time", Wait.waitFor(new Wait.Condition() { - assertTrue("completed on time", testMutex.testCompleted); - //test completed, result is ready - assertTrue("Attempted to retrieve more than one ServerSession at a time", testMutex.testSuccessful); + @Override + public boolean isSatisified() throws Exception { + return completed.get(); + } + })); + + assertTrue("Attempted to retrieve more than one ServerSession at a time", success.get()); } @Override @@ -90,12 +89,10 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport { bindAddress = "tcp://localhost:0"; super.setUp(); - testMutex = new TestMutex(); connection = createConnection(); queue = createQueue(); // note the last arg of 1, this becomes the prefetchSize in PrefetchSubscription - connectionConsumer = connection.createConnectionConsumer( - queue, null, new TestServerSessionPool(connection), 1); + connectionConsumer = connection.createConnectionConsumer(queue, null, new TestServerSessionPool(connection), 1); connection.start(); } @@ -124,108 +121,87 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport { // simulates a ServerSessionPool with only 1 ServerSession private class TestServerSessionPool implements ServerSessionPool { - Connection connection; - TestServerSession serverSession; - boolean serverSessionInUse = false; + Connection connection; + TestServerSession serverSession; + boolean serverSessionInUse = false; - public TestServerSessionPool(Connection connection) throws JMSException { - this.connection = connection; - serverSession = new TestServerSession(this); - } + public TestServerSessionPool(Connection connection) throws JMSException { + this.connection = connection; + this.serverSession = new TestServerSession(this); + } - @Override + @Override public ServerSession getServerSession() throws JMSException { - synchronized (this) { - if (serverSessionInUse) { - LOG.info("asked for session while in use, not serialised delivery"); - synchronized (testMutex) { - testMutex.testSuccessful = false; - testMutex.testCompleted = true; - } - } - serverSessionInUse = true; - return serverSession; - } - } + synchronized (this) { + if (serverSessionInUse) { + LOG.info("asked for session while in use, not serialised delivery"); + success.set(false); + completed.set(true); + } + serverSessionInUse = true; + return serverSession; + } + } } private class TestServerSession implements ServerSession { - TestServerSessionPool pool; - Session session; + TestServerSessionPool pool; + Session session; - public TestServerSession(TestServerSessionPool pool) throws JMSException { - this.pool = pool; - session = pool.connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - session.setMessageListener(new TestMessageListener()); - } + public TestServerSession(TestServerSessionPool pool) throws JMSException { + this.pool = pool; + session = pool.connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + session.setMessageListener(new TestMessageListener()); + } - @Override + @Override public Session getSession() throws JMSException { - return session; - } + return session; + } - @Override + @Override public void start() throws JMSException { - // use a separate thread to process the message asynchronously - new Thread() { - @Override + // use a separate thread to process the message asynchronously + new Thread() { + @Override public void run() { - // let the session deliver the message - session.run(); + // let the session deliver the message + session.run(); - // commit the tx and - // return ServerSession to pool - synchronized (pool) { + // commit the tx and return ServerSession to pool + synchronized (pool) { try { session.commit(); - } - catch (JMSException e) { + } catch (JMSException e) { } pool.serverSessionInUse = false; - } - - // let the test check if the test was completed - synchronized (testMutex) { - testMutex.notifyAll(); - } - } - }.start(); - } + } + } + }.start(); + } } private class TestMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { - String text = ((TextMessage)message).getText(); - LOG.info("got message: " + text); - if (text.equals("Msg3")) { - // if we get here, Exception in getServerSession() was not thrown, test is successful - // this obviously doesn't happen now, - // need to fix prefetchExtension computation logic in PrefetchSubscription to get here - synchronized (testMutex) { - if (!testMutex.testCompleted) { - testMutex.testSuccessful = true; - testMutex.testCompleted = true; - testMutex.notifyAll(); - } - } - } - else if (text.equals("Msg2")) { - // simulate long message processing so that Msg3 comes when Msg2 is still being processed - // and thus the single ServerSession is in use - TimeUnit.SECONDS.sleep(4); - } - } - catch (JMSException e) { - } - catch (InterruptedException e) { + String text = ((TextMessage) message).getText(); + LOG.info("got message: " + text); + if (text.equals("Msg3")) { + // if we get here, Exception in getServerSession() was not thrown, test is + // successful this obviously doesn't happen now, need to fix prefetchExtension + // computation logic in PrefetchSubscription to get here + success.set(true); + completed.set(true); + } else if (text.equals("Msg2")) { + // simulate long message processing so that Msg3 comes when Msg2 is still being + // processed and thus the single ServerSession is in use + TimeUnit.SECONDS.sleep(4); + } + } catch (JMSException e) { + } catch (InterruptedException e) { } } } - private class TestMutex { - boolean testCompleted = false; - boolean testSuccessful = true; - } }