From 5a03c37faf0f75a9835039cb8f403d18e2a853b6 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 29 Jan 2010 17:39:20 +0000 Subject: [PATCH] resolve timing issue with reconnect in failover transport that can stall reconnect logic - resulted in intermittent failure of AMQ2183Test. reorg of test make it reproducable when master is waiting for slave to connect and its disconnects ocurr async so they can clash with inprogress reconnects - resolve https://issues.apache.org/activemq/browse/AMQ-2588 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@904568 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/failover/FailoverTransport.java | 12 +++++-- .../org/apache/activemq/bugs/AMQ2183Test.java | 34 ++++++++++++++++--- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 92b421595f..6d5817bdf7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -195,9 +195,17 @@ public class FailoverTransport implements CompositeTransport { public final void handleTransportFailure(IOException e) throws InterruptedException { - + if (LOG.isTraceEnabled()) { + LOG.trace(this + " handleTransportFailure: " + e); + } Transport transport = connectedTransport.getAndSet(null); - if( transport!=null ) { + if (transport == null) { + // sync with possible in progress reconnect + synchronized (reconnectMutex) { + transport = connectedTransport.getAndSet(null); + } + } + if (transport != null) { transport.setTransportListener(disposedListener); ServiceSupport.dispose(transport); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java index f84d9d157b..980ce2e502 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java @@ -17,10 +17,15 @@ package org.apache.activemq.bugs; +import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.net.URI; +import java.net.URISyntaxException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.JMSException; @@ -29,8 +34,6 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; -import junit.framework.TestCase; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.broker.BrokerService; @@ -79,7 +82,9 @@ public class AMQ2183Test extends AutoFailTestSupport implements UncaughtExceptio t.start(); Thread.sleep(2000); masterUrl = master.getTransportConnectors().get(0).getConnectUri(); - + } + + private void startSlave() throws IOException, Exception, URISyntaxException { slave.setBrokerName("Slave"); slave.deleteAllMessages(); slave.addConnector("tcp://localhost:0"); @@ -111,8 +116,27 @@ public class AMQ2183Test extends AutoFailTestSupport implements UncaughtExceptio ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "failover:(" + masterUrl + ")?randomize=false"); - Connection connection = connectionFactory.createConnection(); - connection.start(); + final Connection connection = connectionFactory.createConnection(); + final CountDownLatch startCommenced = new CountDownLatch(1); + final CountDownLatch startDone = new CountDownLatch(1); + + // start will be blocked pending slave connection but should resume after slave started + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + startCommenced.countDown(); + try { + connection.start(); + startDone.countDown(); + } catch (Exception e) { + exceptions.put(Thread.currentThread(), e); + } + }}); + + + assertTrue("connection.start has commenced", startCommenced.await(10, TimeUnit.SECONDS)); + startSlave(); + assertTrue("connection.start done", startDone.await(70, TimeUnit.SECONDS)); + final MessageCounter counterA = new MessageCounter(); connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.T")).setMessageListener(counterA);