From 1878e951c614944b2dcde8a90e31accf8ef88dd6 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Thu, 29 Sep 2011 19:45:21 +0000 Subject: [PATCH] Use a Wait condition instead of sleeps to detect completion. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1177411 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/failover/ReconnectTest.java | 71 ++++++++----------- 1 file changed, 28 insertions(+), 43 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java index a717d2f978..28031578e0 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java @@ -37,12 +37,10 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.mock.MockTransport; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * - */ public class ReconnectTest extends TestCase { public static final int MESSAGES_PER_ITTERATION = 10; @@ -67,10 +65,10 @@ public class ReconnectTest extends TestCase { private final String name; public Worker(final String name) throws URISyntaxException, JMSException { - this.name=name; + this.name = name; URI uri = new URI("failover://(mock://(" + tcpUri + "))?updateURIsSupported=false"); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); - connection = (ActiveMQConnection)factory.createConnection(); + connection = (ActiveMQConnection) factory.createConnection(); connection.addTransportListener(new TransportListener() { public void onCommand(Object command) { } @@ -80,12 +78,12 @@ public class ReconnectTest extends TestCase { } public void transportInterupted() { - LOG.info("Worker "+name+" was interrupted..."); + LOG.info("Worker " + name + " was interrupted..."); interruptedCount.incrementAndGet(); } public void transportResumed() { - LOG.info("Worker "+name+" was resummed..."); + LOG.info("Worker " + name + " was resummed..."); resumedCount.incrementAndGet(); } }); @@ -117,7 +115,7 @@ public class ReconnectTest extends TestCase { public void run() { try { - ActiveMQQueue queue = new ActiveMQQueue("FOO_"+name); + ActiveMQQueue queue = new ActiveMQQueue("FOO_" + name); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(queue); MessageProducer producer = session.createProducer(queue); @@ -150,31 +148,29 @@ public class ReconnectTest extends TestCase { public synchronized void assertNoErrors() { if (error != null) { error.printStackTrace(); - fail("Worker "+name+" got Exception: " + error); + fail("Worker " + name + " got Exception: " + error); } } - } public void testReconnects() throws Exception { for (int k = 1; k < 10; k++) { - LOG.info("Test run: " + k); // Wait for at least one iteration to occur... for (int i = 0; i < WORKER_COUNT; i++) { - int c=0; + int c = 0; for (int j = 0; j < 30; j++) { - c = workers[i].iterations.getAndSet(0); - if( c != 0 ) { - break; - } + c = workers[i].iterations.getAndSet(0); + if (c != 0) { + break; + } workers[i].assertNoErrors(); - LOG.info("Test run "+k+": Waiting for worker " + i + " to finish an iteration."); + LOG.info("Test run " + k + ": Waiting for worker " + i + " to finish an iteration."); Thread.sleep(1000); } - assertTrue("Test run "+k+": Worker " + i + " never completed an interation.", c != 0); + assertTrue("Test run " + k + ": Worker " + i + " never completed an interation.", c != 0); workers[i].assertNoErrors(); } @@ -185,26 +181,20 @@ public class ReconnectTest extends TestCase { workers[i].failConnection(); } - long start; - // Wait for the connections to get interrupted... - start = System.currentTimeMillis(); - while (interruptedCount.get() < WORKER_COUNT) { - if( System.currentTimeMillis()-start > 1000*60 ) { - fail("Timed out waiting for all connections to be interrupted."); - } - LOG.info("Test run "+k+": Waiting for connections to get interrupted.. at: " + interruptedCount.get()); - Thread.sleep(1000); - } + assertTrue("Timed out waiting for all connections to be interrupted.", Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.debug("Test run waiting for connections to get interrupted.. at: " + interruptedCount.get()); + return interruptedCount.get() == WORKER_COUNT; + } + }, TimeUnit.SECONDS.toMillis(60))); // Wait for the connections to re-establish... - start = System.currentTimeMillis(); - while (resumedCount.get() < WORKER_COUNT) { - if( System.currentTimeMillis()-start > 1000*60 ) { - fail("Timed out waiting for all connections to be resumed."); - } - LOG.info("Test run "+k+": Waiting for connections to get resumed.. at: " + resumedCount.get()); - Thread.sleep(1000); - } + assertTrue("Timed out waiting for all connections to be resumed.", Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.debug("Test run waiting for connections to get resumed.. at: " + resumedCount.get()); + return resumedCount.get() >= WORKER_COUNT; + } + }, TimeUnit.SECONDS.toMillis(60))); // Reset the counters.. interruptedCount.set(0); @@ -212,11 +202,9 @@ public class ReconnectTest extends TestCase { for (int i = 0; i < WORKER_COUNT; i++) { workers[i].iterations.set(0); } - + Thread.sleep(1000); - } - } @Override @@ -227,13 +215,11 @@ public class ReconnectTest extends TestCase { TransportConnector connector = bs.addConnector("tcp://localhost:0"); bs.start(); tcpUri = connector.getConnectUri(); - workers = new Worker[WORKER_COUNT]; for (int i = 0; i < WORKER_COUNT; i++) { - workers[i] = new Worker(""+i); + workers[i] = new Worker("" + i); workers[i].start(); } - } @Override @@ -243,5 +229,4 @@ public class ReconnectTest extends TestCase { } new ServiceStopper().stop(bs); } - }