Made the ReconnectTest a little more robust.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@636106 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-03-11 21:17:51 +00:00
parent 2185cdb6e0
commit c6b88f1ecc
2 changed files with 43 additions and 21 deletions

View File

@ -194,15 +194,13 @@ public class FailoverTransport implements CompositeTransport {
public final void handleTransportFailure(IOException e) throws InterruptedException { public final void handleTransportFailure(IOException e) throws InterruptedException {
if (transportListener != null) {
transportListener.transportInterupted();
}
Transport transport = connectedTransport.get(); Transport transport = connectedTransport.get();
if( transport!=null ) { if( transport!=null ) {
ServiceSupport.dispose(transport); ServiceSupport.dispose(transport);
} }
boolean wasConnected=false;
synchronized (reconnectMutex) { synchronized (reconnectMutex) {
boolean reconnectOk = false; boolean reconnectOk = false;
if(started) { if(started) {
@ -212,6 +210,7 @@ public class FailoverTransport implements CompositeTransport {
} }
if (connectedTransport.get() != null) { if (connectedTransport.get() != null) {
wasConnected=true;
initialized = false; initialized = false;
failedConnectTransportURI=connectedTransportURI; failedConnectTransportURI=connectedTransportURI;
connectedTransport.set(null); connectedTransport.set(null);
@ -223,6 +222,12 @@ public class FailoverTransport implements CompositeTransport {
reconnectTask.wakeup(); reconnectTask.wakeup();
} }
} }
// Avoid double firing a transportInterupted() event due to an extra IOException
if (transportListener != null && wasConnected) {
transportListener.transportInterupted();
}
} }
public void start() throws Exception { public void start() throws Exception {

View File

@ -25,13 +25,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -55,10 +55,11 @@ public class ReconnectTest extends TestCase {
private BrokerService bs; private BrokerService bs;
private URI tcpUri; private URI tcpUri;
private AtomicInteger resumedCount = new AtomicInteger();
private AtomicInteger interruptedCount = new AtomicInteger(); private AtomicInteger interruptedCount = new AtomicInteger();
private Worker[] workers; private Worker[] workers;
class Worker implements Runnable, ExceptionListener { class Worker implements Runnable {
public AtomicInteger iterations = new AtomicInteger(); public AtomicInteger iterations = new AtomicInteger();
public CountDownLatch stopped = new CountDownLatch(1); public CountDownLatch stopped = new CountDownLatch(1);
@ -68,12 +69,11 @@ public class ReconnectTest extends TestCase {
private Throwable error; private Throwable error;
private String name; private String name;
public Worker(String name) throws URISyntaxException, JMSException { public Worker(final String name) throws URISyntaxException, JMSException {
this.name=name; this.name=name;
URI uri = new URI("failover://(mock://(" + tcpUri + "))"); URI uri = new URI("failover://(mock://(" + tcpUri + "))");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
connection = (ActiveMQConnection)factory.createConnection(); connection = (ActiveMQConnection)factory.createConnection();
connection.setExceptionListener(this);
connection.addTransportListener(new TransportListener() { connection.addTransportListener(new TransportListener() {
public void onCommand(Object command) { public void onCommand(Object command) {
} }
@ -83,10 +83,13 @@ public class ReconnectTest extends TestCase {
} }
public void transportInterupted() { public void transportInterupted() {
LOG.info("Worker "+name+" was interrupted...");
interruptedCount.incrementAndGet(); interruptedCount.incrementAndGet();
} }
public void transportResumed() { public void transportResumed() {
LOG.info("Worker "+name+" was resummed...");
resumedCount.incrementAndGet();
} }
}); });
connection.start(); connection.start();
@ -139,11 +142,6 @@ public class ReconnectTest extends TestCase {
} }
} }
public void onException(JMSException error) {
setError(error);
stop();
}
public synchronized Throwable getError() { public synchronized Throwable getError() {
return error; return error;
} }
@ -155,7 +153,7 @@ public class ReconnectTest extends TestCase {
public synchronized void assertNoErrors() { public synchronized void assertNoErrors() {
if (error != null) { if (error != null) {
error.printStackTrace(); error.printStackTrace();
fail("Got Exception: " + error); fail("Worker "+name+" got Exception: " + error);
} }
} }
@ -163,18 +161,23 @@ public class ReconnectTest extends TestCase {
public void testReconnects() throws Exception { public void testReconnects() throws Exception {
for (int k = 1; k < 5; k++) { for (int k = 1; k < 10; k++) {
LOG.info("Test run: " + k); LOG.info("Test run: " + k);
// Wait for at least one iteration to occur... // Wait for at least one iteration to occur...
for (int i = 0; i < WORKER_COUNT; i++) { for (int i = 0; i < WORKER_COUNT; i++) {
for (int j = 0; workers[i].iterations.get() == 0 && j < 5; j++) { int c=0;
for (int j = 0; j < 30; j++) {
c = workers[i].iterations.getAndSet(0);
if( c != 0 ) {
break;
}
workers[i].assertNoErrors(); workers[i].assertNoErrors();
LOG.info("Waiting for worker " + i + " to finish an iteration."); LOG.info("Test run "+k+": Waiting for worker " + i + " to finish an iteration.");
Thread.sleep(1000); Thread.sleep(1000);
} }
assertTrue("Worker " + i + " never completed an interation.", workers[i].iterations.get() != 0); assertTrue("Test run "+k+": Worker " + i + " never completed an interation.", c != 0);
workers[i].assertNoErrors(); workers[i].assertNoErrors();
} }
@ -185,21 +188,35 @@ public class ReconnectTest extends TestCase {
workers[i].failConnection(); workers[i].failConnection();
} }
long start;
// Wait for the connections to get interrupted... // Wait for the connections to get interrupted...
start = System.currentTimeMillis();
while (interruptedCount.get() < WORKER_COUNT) { while (interruptedCount.get() < WORKER_COUNT) {
LOG.info("Waiting for connections to get interrupted.. at: " + interruptedCount.get()); if( System.currentTimeMillis()-start > 1000*60 ) {
fail("Timed out waiting for all connections to be interrupted.");
}
LOG.info("Test run "+k+": Waiting for connections to get interrupted.. at: " + interruptedCount.get());
Thread.sleep(1000); Thread.sleep(1000);
} }
// let things stablize.. // Wait for the connections to re-establish...
LOG.info("Pausing before starting next iterations..."); start = System.currentTimeMillis();
Thread.sleep(1000); while (resumedCount.get() < WORKER_COUNT) {
if( System.currentTimeMillis()-start > 1000*60 ) {
fail("Timed out waiting for all connections to be resumed.");
}
LOG.info("Test run "+k+": Waiting for connections to get resumed.. at: " + resumedCount.get());
Thread.sleep(1000);
}
// Reset the counters.. // Reset the counters..
interruptedCount.set(0); interruptedCount.set(0);
resumedCount.set(0);
for (int i = 0; i < WORKER_COUNT; i++) { for (int i = 0; i < WORKER_COUNT; i++) {
workers[i].iterations.set(0); workers[i].iterations.set(0);
} }
Thread.sleep(1000);
} }