Fix failing ReconnectTest

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@923272 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-03-15 14:49:18 +00:00
parent 909c1c3d18
commit a1c427e0a3
2 changed files with 44 additions and 33 deletions

View File

@ -104,9 +104,11 @@ public class FailoverTransport implements CompositeTransport {
private int maxCacheSize = 128 * 1024; private int maxCacheSize = 128 * 1024;
private final TransportListener disposedListener = new DefaultTransportListener() { private final TransportListener disposedListener = new DefaultTransportListener() {
}; };
private boolean connectionInterruptProcessingComplete; //private boolean connectionInterruptProcessingComplete;
private final TransportListener myTransportListener = createTransportListener(); private final TransportListener myTransportListener = createTransportListener();
private boolean updateURIsSupported=true;
private boolean reconnectSupported=true;
public FailoverTransport() throws InterruptedIOException { public FailoverTransport() throws InterruptedIOException {
@ -951,36 +953,46 @@ public class FailoverTransport implements CompositeTransport {
} }
public boolean isReconnectSupported() { public boolean isReconnectSupported() {
return true; return this.reconnectSupported;
} }
public void setReconnectSupported(boolean value) {
this.reconnectSupported=value;
}
public boolean isUpdateURIsSupported() { public boolean isUpdateURIsSupported() {
return true; return this.updateURIsSupported;
}
public void setUpdateURIsSupported(boolean value) {
this.updateURIsSupported=value;
} }
public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
List<URI> copy = new ArrayList<URI>(this.updated); if (isUpdateURIsSupported()) {
List<URI> add = new ArrayList<URI>(); List<URI> copy = new ArrayList<URI>(this.updated);
if (updatedURIs != null && updatedURIs.length > 0) { List<URI> add = new ArrayList<URI>();
Set<URI> set = new HashSet<URI>(); if (updatedURIs != null && updatedURIs.length > 0) {
for (int i = 0; i < updatedURIs.length; i++) { Set<URI> set = new HashSet<URI>();
URI uri = updatedURIs[i]; for (int i = 0; i < updatedURIs.length; i++) {
if (uri != null) { URI uri = updatedURIs[i];
set.add(uri); if (uri != null) {
set.add(uri);
}
} }
} for (URI uri : set) {
for (URI uri : set) { if (copy.remove(uri) == false) {
if (copy.remove(uri) == false) { add.add(uri);
add.add(uri); }
} }
} synchronized (reconnectMutex) {
synchronized (reconnectMutex) { this.updated.clear();
this.updated.clear(); this.updated.addAll(add);
this.updated.addAll(add); for (URI uri : copy) {
for (URI uri : copy) { this.uris.remove(uri);
this.uris.remove(uri); }
add(rebalance, add.toArray(new URI[add.size()]));
} }
add(rebalance, add.toArray(new URI[add.size()]));
} }
} }
} }

View File

@ -23,15 +23,12 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -55,8 +52,8 @@ public class ReconnectTest extends TestCase {
private BrokerService bs; private BrokerService bs;
private URI tcpUri; private URI tcpUri;
private AtomicInteger resumedCount = new AtomicInteger(); private final AtomicInteger resumedCount = new AtomicInteger();
private AtomicInteger interruptedCount = new AtomicInteger(); private final AtomicInteger interruptedCount = new AtomicInteger();
private Worker[] workers; private Worker[] workers;
class Worker implements Runnable { class Worker implements Runnable {
@ -64,14 +61,14 @@ public class ReconnectTest extends TestCase {
public AtomicInteger iterations = new AtomicInteger(); public AtomicInteger iterations = new AtomicInteger();
public CountDownLatch stopped = new CountDownLatch(1); public CountDownLatch stopped = new CountDownLatch(1);
private ActiveMQConnection connection; private final ActiveMQConnection connection;
private AtomicBoolean stop = new AtomicBoolean(false); private final AtomicBoolean stop = new AtomicBoolean(false);
private Throwable error; private Throwable error;
private String name; private final String name;
public Worker(final String name) throws URISyntaxException, JMSException { public Worker(final String name) throws URISyntaxException, JMSException {
this.name=name; this.name=name;
URI uri = new URI("failover://(mock://(" + tcpUri + "))"); URI uri = new URI("failover://(mock://(" + tcpUri + "))?updateURIsSupported=false");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
connection = (ActiveMQConnection)factory.createConnection(); connection = (ActiveMQConnection)factory.createConnection();
connection.addTransportListener(new TransportListener() { connection.addTransportListener(new TransportListener() {
@ -96,7 +93,7 @@ public class ReconnectTest extends TestCase {
} }
public void failConnection() { public void failConnection() {
MockTransport mockTransport = (MockTransport)connection.getTransportChannel().narrow(MockTransport.class); MockTransport mockTransport = connection.getTransportChannel().narrow(MockTransport.class);
mockTransport.onException(new IOException("Simulated error")); mockTransport.onException(new IOException("Simulated error"));
} }
@ -222,6 +219,7 @@ public class ReconnectTest extends TestCase {
} }
@Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
bs = new BrokerService(); bs = new BrokerService();
bs.setPersistent(false); bs.setPersistent(false);
@ -238,6 +236,7 @@ public class ReconnectTest extends TestCase {
} }
@Override
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
for (int i = 0; i < WORKER_COUNT; i++) { for (int i = 0; i < WORKER_COUNT; i++) {
workers[i].stop(); workers[i].stop();