From a1c427e0a3591fdbefe180bb7f6fff96e3433309 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 15 Mar 2010 14:49:18 +0000 Subject: [PATCH] Fix failing ReconnectTest git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@923272 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/failover/FailoverTransport.java | 58 +++++++++++-------- .../transport/failover/ReconnectTest.java | 19 +++--- 2 files changed, 44 insertions(+), 33 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index e4af1b3c13..3eac611aba 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -104,9 +104,11 @@ public class FailoverTransport implements CompositeTransport { private int maxCacheSize = 128 * 1024; private final TransportListener disposedListener = new DefaultTransportListener() { }; - private boolean connectionInterruptProcessingComplete; + //private boolean connectionInterruptProcessingComplete; private final TransportListener myTransportListener = createTransportListener(); + private boolean updateURIsSupported=true; + private boolean reconnectSupported=true; public FailoverTransport() throws InterruptedIOException { @@ -951,36 +953,46 @@ public class FailoverTransport implements CompositeTransport { } public boolean isReconnectSupported() { - return true; + return this.reconnectSupported; } - + + public void setReconnectSupported(boolean value) { + this.reconnectSupported=value; + } + public boolean isUpdateURIsSupported() { - return true; + return this.updateURIsSupported; + } + + public void setUpdateURIsSupported(boolean value) { + this.updateURIsSupported=value; } public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { - List copy = new ArrayList(this.updated); - List add = new ArrayList(); - if (updatedURIs != null && updatedURIs.length > 0) { - Set set = new HashSet(); - for (int i = 0; i < updatedURIs.length; i++) { - URI uri = updatedURIs[i]; - if (uri != null) { - set.add(uri); + if (isUpdateURIsSupported()) { + List copy = new ArrayList(this.updated); + List add = new ArrayList(); + if (updatedURIs != null && updatedURIs.length > 0) { + Set set = new HashSet(); + for (int i = 0; i < updatedURIs.length; i++) { + URI uri = updatedURIs[i]; + if (uri != null) { + set.add(uri); + } } - } - for (URI uri : set) { - if (copy.remove(uri) == false) { - add.add(uri); + for (URI uri : set) { + if (copy.remove(uri) == false) { + add.add(uri); + } } - } - synchronized (reconnectMutex) { - this.updated.clear(); - this.updated.addAll(add); - for (URI uri : copy) { - this.uris.remove(uri); + synchronized (reconnectMutex) { + this.updated.clear(); + this.updated.addAll(add); + for (URI uri : copy) { + this.uris.remove(uri); + } + add(rebalance, add.toArray(new URI[add.size()])); } - add(rebalance, add.toArray(new URI[add.size()])); } } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java index feaa313eb0..f16cf013b0 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java @@ -23,15 +23,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; - import junit.framework.TestCase; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; @@ -55,8 +52,8 @@ public class ReconnectTest extends TestCase { private BrokerService bs; private URI tcpUri; - private AtomicInteger resumedCount = new AtomicInteger(); - private AtomicInteger interruptedCount = new AtomicInteger(); + private final AtomicInteger resumedCount = new AtomicInteger(); + private final AtomicInteger interruptedCount = new AtomicInteger(); private Worker[] workers; class Worker implements Runnable { @@ -64,14 +61,14 @@ public class ReconnectTest extends TestCase { public AtomicInteger iterations = new AtomicInteger(); public CountDownLatch stopped = new CountDownLatch(1); - private ActiveMQConnection connection; - private AtomicBoolean stop = new AtomicBoolean(false); + private final ActiveMQConnection connection; + private final AtomicBoolean stop = new AtomicBoolean(false); private Throwable error; - private String name; + private final String name; public Worker(final String name) throws URISyntaxException, JMSException { this.name=name; - URI uri = new URI("failover://(mock://(" + tcpUri + "))"); + URI uri = new URI("failover://(mock://(" + tcpUri + "))?updateURIsSupported=false"); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); connection = (ActiveMQConnection)factory.createConnection(); connection.addTransportListener(new TransportListener() { @@ -96,7 +93,7 @@ public class ReconnectTest extends TestCase { } public void failConnection() { - MockTransport mockTransport = (MockTransport)connection.getTransportChannel().narrow(MockTransport.class); + MockTransport mockTransport = connection.getTransportChannel().narrow(MockTransport.class); mockTransport.onException(new IOException("Simulated error")); } @@ -222,6 +219,7 @@ public class ReconnectTest extends TestCase { } + @Override protected void setUp() throws Exception { bs = new BrokerService(); bs.setPersistent(false); @@ -238,6 +236,7 @@ public class ReconnectTest extends TestCase { } + @Override protected void tearDown() throws Exception { for (int i = 0; i < WORKER_COUNT; i++) { workers[i].stop();