From 418823a7e1ef46d2ce26c94eeb616cd7d0ca57a8 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 30 Jan 2008 12:10:22 +0000 Subject: [PATCH] Further enhancement for https://issues.apache.org/activemq/browse/AMQ-1572 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@616733 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/transport/Transport.java | 2 +- .../transport/failover/FailoverTransport.java | 282 ++++++++++-------- 2 files changed, 161 insertions(+), 123 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java b/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java index ff3ce08fc2..497adea52e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java @@ -132,7 +132,7 @@ public interface Transport extends Service { /** * Indicates if the transport can handle faults * - * @return tru if fault tolerant + * @return true if fault tolerant */ boolean isFaultTolerant(); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 512c0cb85f..8b6e63c2c0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -83,8 +83,8 @@ public class FailoverTransport implements CompositeTransport { private boolean firstConnection = true; //optionally always have a backup created private boolean backup=false; - private URI backupTransportURI; - private Transport backupTransport; + private List backups=new CopyOnWriteArrayList(); + private int backupPoolSize=2; private final TransportListener myTransportListener = createTransportListener(); @@ -95,127 +95,12 @@ public class FailoverTransport implements CompositeTransport { // Setup a task that is used to reconnect the a connection async. reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { - public boolean iterate() { - - Exception failure = null; - synchronized (reconnectMutex) { - - if (disposed || connectionFailure != null) { - reconnectMutex.notifyAll(); - } - - if (connectedTransport != null || disposed || connectionFailure != null) { - return false; - } else { - List connectList = getConnectList(); - if (connectList.isEmpty()) { - failure = new IOException("No uris available to connect to."); - } else { - if (!useExponentialBackOff) { - reconnectDelay = initialReconnectDelay; - } - if (backup && backupTransport != null) { - Transport t = backupTransport; - URI uri = backupTransportURI; - backupTransport=null; - backupTransportURI=null; - t.setTransportListener(myTransportListener); - try { - if (started) { - restoreTransport(t); - } - reconnectDelay = initialReconnectDelay; - connectedTransportURI = uri; - connectedTransport = t; - reconnectMutex.notifyAll(); - connectFailures = 0; - LOG.info("Successfully reconnected to backup " + uri); - return false; - }catch (Exception e) { - LOG.debug("Backup transport failed",e); - } - } - - Iterator iter = connectList.iterator(); - while(iter.hasNext() && connectedTransport == null && !disposed) { - URI uri = iter.next(); - try { - LOG.debug("Attempting connect to: " + uri); - Transport t = TransportFactory.compositeConnect(uri); - t.setTransportListener(myTransportListener); - t.start(); - - if (started) { - restoreTransport(t); - } - - LOG.debug("Connection established"); - reconnectDelay = initialReconnectDelay; - connectedTransportURI = uri; - connectedTransport = t; - reconnectMutex.notifyAll(); - connectFailures = 0; - if (transportListener != null) { - transportListener.transportResumed(); - } - if (firstConnection) { - firstConnection=false; - LOG.info("Successfully connected to " + uri); - if(backup) { - while(iter.hasNext() && backupTransport==null){ - uri = iter.next(); - try { - t = TransportFactory.compositeConnect(uri); - t.setTransportListener(new DefaultTransportListener()); - t.start(); - backupTransport=t; - backupTransportURI=uri; - }catch(Exception e) { - LOG.debug("Failed to create backup to " + uri,e); - } - } - } - }else { - LOG.info("Successfully reconnected to " + uri); - } - return false; - } catch (Exception e) { - failure = e; - LOG.debug("Connect fail to: " + uri + ", reason: " + e); - } - } - } - } - - if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) { - LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)"); - connectionFailure = failure; - reconnectMutex.notifyAll(); - return false; - } - } - - if (!disposed) { - - LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); - synchronized (sleepMutex) { - try { - sleepMutex.wait(reconnectDelay); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - if (useExponentialBackOff) { - // Exponential increment of reconnect delay. - reconnectDelay *= backOffMultiplier; - if (reconnectDelay > maxReconnectDelay) { - reconnectDelay = maxReconnectDelay; - } - } - } - return !disposed; + boolean result = doReconnect(); + if(!result) { + buildBackups(); + } + return result; } }, "ActiveMQ Failover Worker: " + System.identityHashCode(this)); @@ -394,6 +279,22 @@ public class FailoverTransport implements CompositeTransport { public void setRandomize(boolean randomize) { this.randomize = randomize; } + + public boolean isBackup() { + return backup; + } + + public void setBackup(boolean backup) { + this.backup = backup; + } + + public int getBackupPoolSize() { + return backupPoolSize; + } + + public void setBackupPoolSize(int backupPoolSize) { + this.backupPoolSize = backupPoolSize; + } public void oneway(Object o) throws IOException { Command command = (Command)o; @@ -613,5 +514,142 @@ public class FailoverTransport implements CompositeTransport { public boolean isFaultTolerant() { return true; } + + final boolean doReconnect() { + + Exception failure = null; + synchronized (reconnectMutex) { + + if (disposed || connectionFailure != null) { + reconnectMutex.notifyAll(); + } + + if (connectedTransport != null || disposed || connectionFailure != null) { + return false; + } else { + List connectList = getConnectList(); + if (connectList.isEmpty()) { + failure = new IOException("No uris available to connect to."); + } else { + if (!useExponentialBackOff) { + reconnectDelay = initialReconnectDelay; + } + if (backup && !backups.isEmpty()) { + BackupTransport bt = backups.remove(0); + Transport t = bt.getTransport(); + URI uri = bt.getUri(); + t.setTransportListener(myTransportListener); + try { + if (started) { + restoreTransport(t); + } + reconnectDelay = initialReconnectDelay; + connectedTransportURI = uri; + connectedTransport = t; + reconnectMutex.notifyAll(); + connectFailures = 0; + LOG.info("Successfully reconnected to backup " + uri); + return false; + }catch (Exception e) { + LOG.debug("Backup transport failed",e); + } + } + + Iterator iter = connectList.iterator(); + while(iter.hasNext() && connectedTransport == null && !disposed) { + URI uri = iter.next(); + try { + LOG.debug("Attempting connect to: " + uri); + Transport t = TransportFactory.compositeConnect(uri); + t.setTransportListener(myTransportListener); + t.start(); + + if (started) { + restoreTransport(t); + } + + LOG.debug("Connection established"); + reconnectDelay = initialReconnectDelay; + connectedTransportURI = uri; + connectedTransport = t; + reconnectMutex.notifyAll(); + connectFailures = 0; + if (transportListener != null) { + transportListener.transportResumed(); + } + if (firstConnection) { + firstConnection=false; + LOG.info("Successfully connected to " + uri); + }else { + LOG.info("Successfully reconnected to " + uri); + } + return false; + } catch (Exception e) { + failure = e; + LOG.debug("Connect fail to: " + uri + ", reason: " + e); + } + } + } + } + + if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) { + LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)"); + connectionFailure = failure; + reconnectMutex.notifyAll(); + return false; + } + } + + if (!disposed) { + + LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); + synchronized (sleepMutex) { + try { + sleepMutex.wait(reconnectDelay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + if (useExponentialBackOff) { + // Exponential increment of reconnect delay. + reconnectDelay *= backOffMultiplier; + if (reconnectDelay > maxReconnectDelay) { + reconnectDelay = maxReconnectDelay; + } + } + } + return !disposed; + } + + + final boolean buildBackups() { + synchronized (reconnectMutex) { + if (backup && backups.size() < backupPoolSize) { + List connectList = getConnectList(); + for (Iteratoriter = connectList.iterator();iter.hasNext() && backups.size() < backupPoolSize;) { + URI uri = iter.next(); + if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { + try { + BackupTransport bt = new BackupTransport(); + bt.setUri(uri); + if (!backups.contains(bt)) { + Transport t = TransportFactory.compositeConnect(uri); + t.setTransportListener(new DefaultTransportListener()); + t.start(); + bt.setTransport(t); + backups.add(bt); + } + }catch(Exception e) { + LOG.debug("Failed to build backup ",e); + } + } + } + } + } + return false; + } + + }