This commit is contained in:
gtully 2013-10-04 11:34:53 +01:00
parent 8572bada7d
commit 4f19f31a37
2 changed files with 79 additions and 18 deletions

View File

@ -257,12 +257,12 @@ public class FailoverTransport implements CompositeTransport {
connected = false;
connectedToPriority = false;
// notify before any reconnect attempt so ack state can be whacked
if (transportListener != null) {
transportListener.transportInterupted();
}
if (reconnectOk) {
// notify before any reconnect attempt so ack state can be whacked
if (transportListener != null) {
transportListener.transportInterupted();
}
updated.remove(failedConnectTransportURI);
reconnectTask.wakeup();
} else if (!isDisposed()) {

View File

@ -16,35 +16,41 @@
*/
package org.apache.activemq.transport.failover;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.transport.TransportListener;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
public class InitalReconnectDelayTest {
private static final transient Logger LOG = LoggerFactory.getLogger(InitalReconnectDelayTest.class);
protected BrokerService broker1;
protected BrokerService broker2;
protected CountDownLatch broker2Started = new CountDownLatch(1);
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&initialReconnectDelay=15000";
@Test
public void testInitialReconnectDelay() throws Exception {
String uriString = "failover://(tcp://localhost:" +
broker1.getTransportConnectors().get(0).getConnectUri().getPort() +
",tcp://localhost:" +
broker2.getTransportConnectors().get(0).getConnectUri().getPort() +
")?randomize=false&initialReconnectDelay=15000";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -72,6 +78,65 @@ public class InitalReconnectDelayTest {
assertTrue("Failover took " + (end - start) + " ms and should be > 14000.", (end - start) > 14000);
}
@Test
public void testNoSuspendedCallbackOnNoReconnect() throws Exception {
String uriString = "failover://(tcp://localhost:" +
broker1.getTransportConnectors().get(0).getConnectUri().getPort() +
",tcp://localhost:" +
broker2.getTransportConnectors().get(0).getConnectUri().getPort() +
")?randomize=false&maxReconnectAttempts=0";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
final AtomicInteger calls = new AtomicInteger(0);
connectionFactory.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
}
@Override
public void onException(IOException error) {
LOG.info("on exception: " + error);
calls.set(0x01 | calls.intValue());
}
@Override
public void transportInterupted() {
LOG.info("on transportInterupted");
calls.set(0x02 | calls.intValue());
}
@Override
public void transportResumed() {
LOG.info("on transportResumed");
calls.set(0x04 | calls.intValue());
}
});
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue("foo");
MessageProducer producer = session.createProducer(destination);
final Message message = session.createTextMessage("TEST");
producer.send(message);
// clear listener state
calls.set(0);
LOG.info("Stopping the Broker1...");
broker1.stop();
LOG.info("Attempting to send... failover should throw on disconnect");
try {
producer.send(destination, message);
fail("Expect IOException to bubble up on send");
} catch (javax.jms.IllegalStateException producerClosed) {
}
assertEquals("Only an exception is reported to the listener", 0x1, calls.get());
}
@Before
public void setUp() throws Exception {
@ -82,7 +147,7 @@ public class InitalReconnectDelayTest {
broker1.setBrokerName("broker1");
broker1.setDeleteAllMessagesOnStartup(true);
broker1.setDataDirectory(dataDir);
broker1.addConnector("tcp://localhost:62001");
broker1.addConnector("tcp://localhost:0");
broker1.setUseJmx(false);
broker1.start();
broker1.waitUntilStarted();
@ -91,7 +156,7 @@ public class InitalReconnectDelayTest {
broker2.setBrokerName("broker2");
broker2.setDataDirectory(dataDir);
broker2.setUseJmx(false);
broker2.addConnector("tcp://localhost:62002");
broker2.addConnector("tcp://localhost:0");
broker2.start();
broker2.waitUntilStarted();
@ -119,8 +184,4 @@ public class InitalReconnectDelayTest {
}
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(uriString);
}
}