resolve timing issue with reconnect in failover transport that can stall reconnect logic - resulted in intermittent failure of AMQ2183Test. reorg of test make it reproducable when master is waiting for slave to connect and its disconnects ocurr async so they can clash with inprogress reconnects - resolve https://issues.apache.org/activemq/browse/AMQ-2588

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@904568 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-01-29 17:39:20 +00:00
parent fed9ac7955
commit 5a03c37faf
2 changed files with 39 additions and 7 deletions

View File

@ -195,9 +195,17 @@ public class FailoverTransport implements CompositeTransport {
public final void handleTransportFailure(IOException e) throws InterruptedException { public final void handleTransportFailure(IOException e) throws InterruptedException {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " handleTransportFailure: " + e);
}
Transport transport = connectedTransport.getAndSet(null); Transport transport = connectedTransport.getAndSet(null);
if( transport!=null ) { if (transport == null) {
// sync with possible in progress reconnect
synchronized (reconnectMutex) {
transport = connectedTransport.getAndSet(null);
}
}
if (transport != null) {
transport.setTransportListener(disposedListener); transport.setTransportListener(disposedListener);
ServiceSupport.dispose(transport); ServiceSupport.dispose(transport);

View File

@ -17,10 +17,15 @@
package org.apache.activemq.bugs; package org.apache.activemq.bugs;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -29,8 +34,6 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -79,7 +82,9 @@ public class AMQ2183Test extends AutoFailTestSupport implements UncaughtExceptio
t.start(); t.start();
Thread.sleep(2000); Thread.sleep(2000);
masterUrl = master.getTransportConnectors().get(0).getConnectUri(); masterUrl = master.getTransportConnectors().get(0).getConnectUri();
}
private void startSlave() throws IOException, Exception, URISyntaxException {
slave.setBrokerName("Slave"); slave.setBrokerName("Slave");
slave.deleteAllMessages(); slave.deleteAllMessages();
slave.addConnector("tcp://localhost:0"); slave.addConnector("tcp://localhost:0");
@ -111,8 +116,27 @@ public class AMQ2183Test extends AutoFailTestSupport implements UncaughtExceptio
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"failover:(" + masterUrl + ")?randomize=false"); "failover:(" + masterUrl + ")?randomize=false");
Connection connection = connectionFactory.createConnection(); final Connection connection = connectionFactory.createConnection();
final CountDownLatch startCommenced = new CountDownLatch(1);
final CountDownLatch startDone = new CountDownLatch(1);
// start will be blocked pending slave connection but should resume after slave started
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
startCommenced.countDown();
try {
connection.start(); connection.start();
startDone.countDown();
} catch (Exception e) {
exceptions.put(Thread.currentThread(), e);
}
}});
assertTrue("connection.start has commenced", startCommenced.await(10, TimeUnit.SECONDS));
startSlave();
assertTrue("connection.start done", startDone.await(70, TimeUnit.SECONDS));
final MessageCounter counterA = new MessageCounter(); final MessageCounter counterA = new MessageCounter();
connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.T")).setMessageListener(counterA); connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.T")).setMessageListener(counterA);