From 4f19f31a37ca719a92967272cd751ce0c3a539b4 Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 4 Oct 2013 11:34:53 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4785 - fix and test --- .../transport/failover/FailoverTransport.java | 10 +-- .../failover/InitalReconnectDelayTest.java | 87 ++++++++++++++++--- 2 files changed, 79 insertions(+), 18 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index df669bf2ae..fc81f9460f 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -257,12 +257,12 @@ public class FailoverTransport implements CompositeTransport { connected = false; connectedToPriority = false; - // notify before any reconnect attempt so ack state can be whacked - if (transportListener != null) { - transportListener.transportInterupted(); - } - if (reconnectOk) { + // notify before any reconnect attempt so ack state can be whacked + if (transportListener != null) { + transportListener.transportInterupted(); + } + updated.remove(failedConnectTransportURI); reconnectTask.wakeup(); } else if (!isDisposed()) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java index 5e728cba25..8fe8c08f53 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java @@ -16,35 +16,41 @@ */ package org.apache.activemq.transport.failover; -import static org.junit.Assert.assertTrue; - +import java.io.IOException; import java.util.Date; -import java.util.concurrent.CountDownLatch; - +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; +import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.transport.TransportListener; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; + public class InitalReconnectDelayTest { private static final transient Logger LOG = LoggerFactory.getLogger(InitalReconnectDelayTest.class); protected BrokerService broker1; protected BrokerService broker2; - protected CountDownLatch broker2Started = new CountDownLatch(1); - protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&initialReconnectDelay=15000"; @Test public void testInitialReconnectDelay() throws Exception { + String uriString = "failover://(tcp://localhost:" + + broker1.getTransportConnectors().get(0).getConnectUri().getPort() + + ",tcp://localhost:" + + broker2.getTransportConnectors().get(0).getConnectUri().getPort() + + ")?randomize=false&initialReconnectDelay=15000"; + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -72,6 +78,65 @@ public class InitalReconnectDelayTest { assertTrue("Failover took " + (end - start) + " ms and should be > 14000.", (end - start) > 14000); } + @Test + public void testNoSuspendedCallbackOnNoReconnect() throws Exception { + + String uriString = "failover://(tcp://localhost:" + + broker1.getTransportConnectors().get(0).getConnectUri().getPort() + + ",tcp://localhost:" + + broker2.getTransportConnectors().get(0).getConnectUri().getPort() + + ")?randomize=false&maxReconnectAttempts=0"; + + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString); + final AtomicInteger calls = new AtomicInteger(0); + connectionFactory.setTransportListener(new TransportListener() { + @Override + public void onCommand(Object command) { + } + + @Override + public void onException(IOException error) { + LOG.info("on exception: " + error); + calls.set(0x01 | calls.intValue()); + } + + @Override + public void transportInterupted() { + LOG.info("on transportInterupted"); + calls.set(0x02 | calls.intValue()); + } + + @Override + public void transportResumed() { + LOG.info("on transportResumed"); + calls.set(0x04 | calls.intValue()); + } + }); + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue("foo"); + MessageProducer producer = session.createProducer(destination); + + final Message message = session.createTextMessage("TEST"); + producer.send(message); + + // clear listener state + calls.set(0); + + LOG.info("Stopping the Broker1..."); + broker1.stop(); + + LOG.info("Attempting to send... failover should throw on disconnect"); + try { + producer.send(destination, message); + fail("Expect IOException to bubble up on send"); + } catch (javax.jms.IllegalStateException producerClosed) { + } + + assertEquals("Only an exception is reported to the listener", 0x1, calls.get()); + } + @Before public void setUp() throws Exception { @@ -82,7 +147,7 @@ public class InitalReconnectDelayTest { broker1.setBrokerName("broker1"); broker1.setDeleteAllMessagesOnStartup(true); broker1.setDataDirectory(dataDir); - broker1.addConnector("tcp://localhost:62001"); + broker1.addConnector("tcp://localhost:0"); broker1.setUseJmx(false); broker1.start(); broker1.waitUntilStarted(); @@ -91,7 +156,7 @@ public class InitalReconnectDelayTest { broker2.setBrokerName("broker2"); broker2.setDataDirectory(dataDir); broker2.setUseJmx(false); - broker2.addConnector("tcp://localhost:62002"); + broker2.addConnector("tcp://localhost:0"); broker2.start(); broker2.waitUntilStarted(); @@ -119,8 +184,4 @@ public class InitalReconnectDelayTest { } } - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory(uriString); - } - }