From c04d8c548d0f0cd1fe4b25b3d6aaf5b68b25ec20 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Mon, 24 Mar 2008 23:19:50 +0000 Subject: [PATCH] Better failover error handling and now we pass on the max initial inactivity timeout to the timeout used by the intial wire format negociation. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@640641 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/WireFormatNegotiator.java | 7 +++ .../transport/failover/FailoverTransport.java | 49 ++++++++----------- 2 files changed, 27 insertions(+), 29 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java index dc9910620e..7c7ba2e069 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java @@ -57,6 +57,13 @@ public class WireFormatNegotiator extends TransportFilter { minimumVersion = 1; } this.minimumVersion = minimumVersion; + + // Setup the initial negociation timeout to be the same as the inital max inactivity delay specified on the wireformat + // Does not make sense for us to take longer. + try { + setNegotiateTimeout(wireFormat.getPreferedWireFormatInfo().getMaxInactivityDurationInitalDelay()); + } catch (IOException e) { + } } public void start() throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index a78792485a..0f1df8299e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -94,7 +94,7 @@ public class FailoverTransport implements CompositeTransport { private int backupPoolSize=1; private boolean trackMessages = false; private int maxCacheSize = 128 * 1024; - private TransportListener disposedListener = new DefaultTransportListener(); + private TransportListener disposedListener = new DefaultTransportListener() {}; private final TransportListener myTransportListener = createTransportListener(); @@ -189,42 +189,33 @@ public class FailoverTransport implements CompositeTransport { public final void handleTransportFailure(IOException e) throws InterruptedException { - Transport transport = connectedTransport.get(); + Transport transport = connectedTransport.getAndSet(null); if( transport!=null ) { - ServiceSupport.dispose(transport); - } - - boolean wasConnected=false; - synchronized (reconnectMutex) { - boolean reconnectOk = false; - if(started) { - LOG.warn("Transport failed, attempting to automatically reconnect due to: " + e); - LOG.debug("Transport failed with the following exception:", e); - reconnectOk = true; - } - if (connectedTransport.get() != null) { - wasConnected=true; + transport.setTransportListener(disposedListener); + ServiceSupport.dispose(transport); + + synchronized (reconnectMutex) { + boolean reconnectOk = false; + if(started) { + LOG.warn("Transport failed, attempting to automatically reconnect due to: " + e); + LOG.debug("Transport failed with the following exception:", e); + reconnectOk = true; + } + initialized = false; failedConnectTransportURI=connectedTransportURI; - Transport old = connectedTransport.get(); - if(old != null) { - //don't want errors from old transport - old.setTransportListener(disposedListener); - } - connectedTransport.set(null); connectedTransportURI = null; connected=false; + + if(reconnectOk) { + reconnectTask.wakeup(); + } } - - if(reconnectOk) { - reconnectTask.wakeup(); - } - } - // Avoid double firing a transportInterupted() event due to an extra IOException - if (transportListener != null && wasConnected) { - transportListener.transportInterupted(); + if (transportListener != null) { + transportListener.transportInterupted(); + } } }