diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java index e936787ff8..8c1ed75b8d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java @@ -130,9 +130,14 @@ public class AMQ4351Test extends BrokerTestSupport { final AtomicLong size = new AtomicLong(); final AtomicBoolean done = new AtomicBoolean(); CountDownLatch doneLatch = new CountDownLatch(1); + CountDownLatch started; + CountDownLatch finished; - public ConsumingClient(String name) { + + public ConsumingClient(String name, CountDownLatch started, CountDownLatch finished) { this.name = name; + this.started = started; + this.finished = finished; } public void start() { @@ -141,6 +146,7 @@ public class AMQ4351Test extends BrokerTestSupport { } public void stopAsync() { + finished.countDown(); done.set(true); } @@ -158,6 +164,7 @@ public class AMQ4351Test extends BrokerTestSupport { try { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); MessageConsumer consumer = session.createDurableSubscriber(destination, name, null, false); + started.countDown(); while( !done.get() ) { Message msg = consumer.receive(100); if(msg!=null ) { @@ -181,24 +188,28 @@ public class AMQ4351Test extends BrokerTestSupport { public void testAMQ4351() throws InterruptedException, JMSException { LOG.info("Start test."); + int subs = 100; + CountDownLatch startedLatch = new CountDownLatch(subs - 1); + CountDownLatch shutdownLatch = new CountDownLatch(subs - 4); + ProducingClient producer = new ProducingClient(); - ConsumingClient listener1 = new ConsumingClient("subscriber-1"); - ConsumingClient listener2 = new ConsumingClient("subscriber-2"); - ConsumingClient listener3 = new ConsumingClient("subscriber-3"); + ConsumingClient listener1 = new ConsumingClient("subscriber-1", startedLatch, shutdownLatch); + ConsumingClient listener2 = new ConsumingClient("subscriber-2", startedLatch, shutdownLatch); + ConsumingClient listener3 = new ConsumingClient("subscriber-3", startedLatch, shutdownLatch); try { listener1.start(); listener2.start(); listener3.start(); - int subs = 100; List subscribers = new ArrayList(subs); for (int i = 4; i < subs; i++) { - ConsumingClient client = new ConsumingClient("subscriber-" + i); + ConsumingClient client = new ConsumingClient("subscriber-" + i, startedLatch, shutdownLatch); subscribers.add(client); client.start(); } + startedLatch.await(10, TimeUnit.SECONDS); LOG.info("All subscribers started."); producer.sendMessage(); @@ -207,12 +218,12 @@ public class AMQ4351Test extends BrokerTestSupport { for (ConsumingClient client : subscribers) { client.stopAsync(); } + shutdownLatch.await(10, TimeUnit.SECONDS); // Start producing messages for 10 minutes, at high rate LOG.info("Starting mass message producer..."); producer.start(); - long lastSize = listener1.size.get(); for( int i=0 ; i < 10; i++ ) { Thread.sleep(1000);