mirror of https://github.com/apache/activemq.git
Better failover error handling and now we pass on the max initial inactivity timeout to the timeout used by the intial wire format negociation.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@640641 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0a4c8064e4
commit
c04d8c548d
|
@ -57,6 +57,13 @@ public class WireFormatNegotiator extends TransportFilter {
|
||||||
minimumVersion = 1;
|
minimumVersion = 1;
|
||||||
}
|
}
|
||||||
this.minimumVersion = minimumVersion;
|
this.minimumVersion = minimumVersion;
|
||||||
|
|
||||||
|
// Setup the initial negociation timeout to be the same as the inital max inactivity delay specified on the wireformat
|
||||||
|
// Does not make sense for us to take longer.
|
||||||
|
try {
|
||||||
|
setNegotiateTimeout(wireFormat.getPreferedWireFormatInfo().getMaxInactivityDurationInitalDelay());
|
||||||
|
} catch (IOException e) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() throws Exception {
|
public void start() throws Exception {
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class FailoverTransport implements CompositeTransport {
|
||||||
private int backupPoolSize=1;
|
private int backupPoolSize=1;
|
||||||
private boolean trackMessages = false;
|
private boolean trackMessages = false;
|
||||||
private int maxCacheSize = 128 * 1024;
|
private int maxCacheSize = 128 * 1024;
|
||||||
private TransportListener disposedListener = new DefaultTransportListener();
|
private TransportListener disposedListener = new DefaultTransportListener() {};
|
||||||
|
|
||||||
|
|
||||||
private final TransportListener myTransportListener = createTransportListener();
|
private final TransportListener myTransportListener = createTransportListener();
|
||||||
|
@ -189,44 +189,35 @@ public class FailoverTransport implements CompositeTransport {
|
||||||
|
|
||||||
public final void handleTransportFailure(IOException e) throws InterruptedException {
|
public final void handleTransportFailure(IOException e) throws InterruptedException {
|
||||||
|
|
||||||
Transport transport = connectedTransport.get();
|
Transport transport = connectedTransport.getAndSet(null);
|
||||||
if( transport!=null ) {
|
if( transport!=null ) {
|
||||||
|
|
||||||
|
transport.setTransportListener(disposedListener);
|
||||||
ServiceSupport.dispose(transport);
|
ServiceSupport.dispose(transport);
|
||||||
}
|
|
||||||
|
|
||||||
boolean wasConnected=false;
|
synchronized (reconnectMutex) {
|
||||||
synchronized (reconnectMutex) {
|
boolean reconnectOk = false;
|
||||||
boolean reconnectOk = false;
|
if(started) {
|
||||||
if(started) {
|
LOG.warn("Transport failed, attempting to automatically reconnect due to: " + e);
|
||||||
LOG.warn("Transport failed, attempting to automatically reconnect due to: " + e);
|
LOG.debug("Transport failed with the following exception:", e);
|
||||||
LOG.debug("Transport failed with the following exception:", e);
|
reconnectOk = true;
|
||||||
reconnectOk = true;
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (connectedTransport.get() != null) {
|
|
||||||
wasConnected=true;
|
|
||||||
initialized = false;
|
initialized = false;
|
||||||
failedConnectTransportURI=connectedTransportURI;
|
failedConnectTransportURI=connectedTransportURI;
|
||||||
Transport old = connectedTransport.get();
|
|
||||||
if(old != null) {
|
|
||||||
//don't want errors from old transport
|
|
||||||
old.setTransportListener(disposedListener);
|
|
||||||
}
|
|
||||||
connectedTransport.set(null);
|
|
||||||
connectedTransportURI = null;
|
connectedTransportURI = null;
|
||||||
connected=false;
|
connected=false;
|
||||||
|
|
||||||
|
if(reconnectOk) {
|
||||||
|
reconnectTask.wakeup();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(reconnectOk) {
|
if (transportListener != null) {
|
||||||
reconnectTask.wakeup();
|
transportListener.transportInterupted();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Avoid double firing a transportInterupted() event due to an extra IOException
|
|
||||||
if (transportListener != null && wasConnected) {
|
|
||||||
transportListener.transportInterupted();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() throws Exception {
|
public void start() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue