git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@733761 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-01-12 14:05:57 +00:00
parent 3967b81cc7
commit 4a1279639f
2 changed files with 83 additions and 7 deletions

View File

@ -82,6 +82,7 @@ public class FailoverTransport implements CompositeTransport {
private long initialReconnectDelay = 10;
private long maxReconnectDelay = 1000 * 30;
private long backOffMultiplier = 2;
private long timeout = -1;
private boolean useExponentialBackOff = true;
private boolean randomize = true;
private boolean initialized;
@ -318,7 +319,15 @@ public class FailoverTransport implements CompositeTransport {
this.maxReconnectAttempts = maxReconnectAttempts;
}
/**
public long getTimeout() {
return timeout;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
/**
* @return Returns the randomize.
*/
public boolean isRandomize() {
@ -380,7 +389,7 @@ public class FailoverTransport implements CompositeTransport {
try {
synchronized (reconnectMutex) {
if (isShutdownCommand(command) && connectedTransport.get() == null) {
if(command.isShutdownInfo()) {
// Skipping send of ShutdownInfo command when not connected.
@ -393,19 +402,27 @@ public class FailoverTransport implements CompositeTransport {
myTransportListener.onCommand(response);
return;
}
}
}
// Keep trying until the message is sent.
for (int i = 0; !disposed; i++) {
try {
// Wait for transport to be connected.
Transport transport = connectedTransport.get();
long start = System.currentTimeMillis();
boolean timedout = false;
while (transport == null && !disposed
&& connectionFailure == null
&& !Thread.currentThread().isInterrupted()) {
LOG.trace("Waiting for transport to reconnect.");
long end = System.currentTimeMillis();
if (timeout > 0 && (end - start > timeout)) {
timedout = true;
LOG.info("Failover timed out after " + (end - start) + "ms");
break;
}
try {
reconnectMutex.wait(1000);
reconnectMutex.wait(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.debug("Interupted: " + e, e);
@ -420,7 +437,9 @@ public class FailoverTransport implements CompositeTransport {
error = new IOException("Transport disposed.");
} else if (connectionFailure != null) {
error = connectionFailure;
} else {
} else if (timedout == true) {
error = new IOException("Failover timeout of " + timeout + " ms reached.");
}else {
error = new IOException("Unexpected failure.");
}
break;
@ -632,7 +651,6 @@ public class FailoverTransport implements CompositeTransport {
}
final boolean doReconnect() {
Exception failure = null;
synchronized (reconnectMutex) {
@ -724,7 +742,7 @@ public class FailoverTransport implements CompositeTransport {
}
}
}
if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
connectionFailure = failure;

View File

@ -0,0 +1,58 @@
package org.apache.activemq.transport.failover;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
public class FailoverTimeoutTest extends TestCase {
private static final String QUEUE_NAME = "test.failovertimeout";
public void testTimeout() throws Exception {
long timeout = 1000;
URI tcpUri = new URI("tcp://localhost:61616");
BrokerService bs = new BrokerService();
bs.setUseJmx(false);
bs.addConnector(tcpUri);
bs.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?timeout=" + timeout);
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session
.createQueue(QUEUE_NAME));
TextMessage message = session.createTextMessage("Test message");
producer.send(message);
bs.stop();
try {
producer.send(message);
} catch (JMSException jmse) {
jmse.printStackTrace();
assertEquals("Failover timeout of " + timeout + " ms reached.", jmse.getMessage());
}
bs = new BrokerService();
bs.setUseJmx(false);
bs.addConnector(tcpUri);
bs.start();
producer.send(message);
bs.stop();
}
}