diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 92b421595f..6d5817bdf7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -195,9 +195,17 @@ public class FailoverTransport implements CompositeTransport { public final void handleTransportFailure(IOException e) throws InterruptedException { - + if (LOG.isTraceEnabled()) { + LOG.trace(this + " handleTransportFailure: " + e); + } Transport transport = connectedTransport.getAndSet(null); - if( transport!=null ) { + if (transport == null) { + // sync with possible in progress reconnect + synchronized (reconnectMutex) { + transport = connectedTransport.getAndSet(null); + } + } + if (transport != null) { transport.setTransportListener(disposedListener); ServiceSupport.dispose(transport); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java index f84d9d157b..980ce2e502 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java @@ -17,10 +17,15 @@ package org.apache.activemq.bugs; +import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.net.URI; +import java.net.URISyntaxException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.JMSException; @@ -29,8 +34,6 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; -import junit.framework.TestCase; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.broker.BrokerService; @@ -79,7 +82,9 @@ public class AMQ2183Test extends AutoFailTestSupport implements UncaughtExceptio t.start(); Thread.sleep(2000); masterUrl = master.getTransportConnectors().get(0).getConnectUri(); - + } + + private void startSlave() throws IOException, Exception, URISyntaxException { slave.setBrokerName("Slave"); slave.deleteAllMessages(); slave.addConnector("tcp://localhost:0"); @@ -111,8 +116,27 @@ public class AMQ2183Test extends AutoFailTestSupport implements UncaughtExceptio ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "failover:(" + masterUrl + ")?randomize=false"); - Connection connection = connectionFactory.createConnection(); - connection.start(); + final Connection connection = connectionFactory.createConnection(); + final CountDownLatch startCommenced = new CountDownLatch(1); + final CountDownLatch startDone = new CountDownLatch(1); + + // start will be blocked pending slave connection but should resume after slave started + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + startCommenced.countDown(); + try { + connection.start(); + startDone.countDown(); + } catch (Exception e) { + exceptions.put(Thread.currentThread(), e); + } + }}); + + + assertTrue("connection.start has commenced", startCommenced.await(10, TimeUnit.SECONDS)); + startSlave(); + assertTrue("connection.start done", startDone.await(70, TimeUnit.SECONDS)); + final MessageCounter counterA = new MessageCounter(); connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.T")).setMessageListener(counterA);