Use a Wait condition instead of sleeps to detect completion.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1177411 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-09-29 19:45:21 +00:00
parent ad76330eed
commit 1878e951c6
1 changed files with 28 additions and 43 deletions

View File

@ -37,12 +37,10 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.mock.MockTransport; import org.apache.activemq.transport.mock.MockTransport;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/**
*
*/
public class ReconnectTest extends TestCase { public class ReconnectTest extends TestCase {
public static final int MESSAGES_PER_ITTERATION = 10; public static final int MESSAGES_PER_ITTERATION = 10;
@ -67,10 +65,10 @@ public class ReconnectTest extends TestCase {
private final String name; private final String name;
public Worker(final String name) throws URISyntaxException, JMSException { public Worker(final String name) throws URISyntaxException, JMSException {
this.name=name; this.name = name;
URI uri = new URI("failover://(mock://(" + tcpUri + "))?updateURIsSupported=false"); URI uri = new URI("failover://(mock://(" + tcpUri + "))?updateURIsSupported=false");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
connection = (ActiveMQConnection)factory.createConnection(); connection = (ActiveMQConnection) factory.createConnection();
connection.addTransportListener(new TransportListener() { connection.addTransportListener(new TransportListener() {
public void onCommand(Object command) { public void onCommand(Object command) {
} }
@ -80,12 +78,12 @@ public class ReconnectTest extends TestCase {
} }
public void transportInterupted() { public void transportInterupted() {
LOG.info("Worker "+name+" was interrupted..."); LOG.info("Worker " + name + " was interrupted...");
interruptedCount.incrementAndGet(); interruptedCount.incrementAndGet();
} }
public void transportResumed() { public void transportResumed() {
LOG.info("Worker "+name+" was resummed..."); LOG.info("Worker " + name + " was resummed...");
resumedCount.incrementAndGet(); resumedCount.incrementAndGet();
} }
}); });
@ -117,7 +115,7 @@ public class ReconnectTest extends TestCase {
public void run() { public void run() {
try { try {
ActiveMQQueue queue = new ActiveMQQueue("FOO_"+name); ActiveMQQueue queue = new ActiveMQQueue("FOO_" + name);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
@ -150,31 +148,29 @@ public class ReconnectTest extends TestCase {
public synchronized void assertNoErrors() { public synchronized void assertNoErrors() {
if (error != null) { if (error != null) {
error.printStackTrace(); error.printStackTrace();
fail("Worker "+name+" got Exception: " + error); fail("Worker " + name + " got Exception: " + error);
} }
} }
} }
public void testReconnects() throws Exception { public void testReconnects() throws Exception {
for (int k = 1; k < 10; k++) { for (int k = 1; k < 10; k++) {
LOG.info("Test run: " + k); LOG.info("Test run: " + k);
// Wait for at least one iteration to occur... // Wait for at least one iteration to occur...
for (int i = 0; i < WORKER_COUNT; i++) { for (int i = 0; i < WORKER_COUNT; i++) {
int c=0; int c = 0;
for (int j = 0; j < 30; j++) { for (int j = 0; j < 30; j++) {
c = workers[i].iterations.getAndSet(0); c = workers[i].iterations.getAndSet(0);
if( c != 0 ) { if (c != 0) {
break; break;
} }
workers[i].assertNoErrors(); workers[i].assertNoErrors();
LOG.info("Test run "+k+": Waiting for worker " + i + " to finish an iteration."); LOG.info("Test run " + k + ": Waiting for worker " + i + " to finish an iteration.");
Thread.sleep(1000); Thread.sleep(1000);
} }
assertTrue("Test run "+k+": Worker " + i + " never completed an interation.", c != 0); assertTrue("Test run " + k + ": Worker " + i + " never completed an interation.", c != 0);
workers[i].assertNoErrors(); workers[i].assertNoErrors();
} }
@ -185,26 +181,20 @@ public class ReconnectTest extends TestCase {
workers[i].failConnection(); workers[i].failConnection();
} }
long start; assertTrue("Timed out waiting for all connections to be interrupted.", Wait.waitFor(new Wait.Condition(){
// Wait for the connections to get interrupted... public boolean isSatisified() throws Exception {
start = System.currentTimeMillis(); LOG.debug("Test run waiting for connections to get interrupted.. at: " + interruptedCount.get());
while (interruptedCount.get() < WORKER_COUNT) { return interruptedCount.get() == WORKER_COUNT;
if( System.currentTimeMillis()-start > 1000*60 ) { }
fail("Timed out waiting for all connections to be interrupted."); }, TimeUnit.SECONDS.toMillis(60)));
}
LOG.info("Test run "+k+": Waiting for connections to get interrupted.. at: " + interruptedCount.get());
Thread.sleep(1000);
}
// Wait for the connections to re-establish... // Wait for the connections to re-establish...
start = System.currentTimeMillis(); assertTrue("Timed out waiting for all connections to be resumed.", Wait.waitFor(new Wait.Condition(){
while (resumedCount.get() < WORKER_COUNT) { public boolean isSatisified() throws Exception {
if( System.currentTimeMillis()-start > 1000*60 ) { LOG.debug("Test run waiting for connections to get resumed.. at: " + resumedCount.get());
fail("Timed out waiting for all connections to be resumed."); return resumedCount.get() >= WORKER_COUNT;
} }
LOG.info("Test run "+k+": Waiting for connections to get resumed.. at: " + resumedCount.get()); }, TimeUnit.SECONDS.toMillis(60)));
Thread.sleep(1000);
}
// Reset the counters.. // Reset the counters..
interruptedCount.set(0); interruptedCount.set(0);
@ -214,9 +204,7 @@ public class ReconnectTest extends TestCase {
} }
Thread.sleep(1000); Thread.sleep(1000);
} }
} }
@Override @Override
@ -227,13 +215,11 @@ public class ReconnectTest extends TestCase {
TransportConnector connector = bs.addConnector("tcp://localhost:0"); TransportConnector connector = bs.addConnector("tcp://localhost:0");
bs.start(); bs.start();
tcpUri = connector.getConnectUri(); tcpUri = connector.getConnectUri();
workers = new Worker[WORKER_COUNT]; workers = new Worker[WORKER_COUNT];
for (int i = 0; i < WORKER_COUNT; i++) { for (int i = 0; i < WORKER_COUNT; i++) {
workers[i] = new Worker(""+i); workers[i] = new Worker("" + i);
workers[i].start(); workers[i].start();
} }
} }
@Override @Override
@ -243,5 +229,4 @@ public class ReconnectTest extends TestCase {
} }
new ServiceStopper().stop(bs); new ServiceStopper().stop(bs);
} }
} }