diff --git a/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java b/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java index c598dddbda..0adff4efa1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java +++ b/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.activemq.pool; import java.io.IOException; @@ -73,7 +74,14 @@ public class ConnectionPool { public void transportResumed() { } - }); + }); + // + // make sure that we set the hasFailed flag, in case the transport already failed + // prior to the addition of our new TransportListener + // + if(connection.isTransportFailed()) { + hasFailed = true; + } } public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory, @@ -218,5 +226,4 @@ public class ConnectionPool { } } } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index f0579269e3..d92a00e4ff 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.activemq.transport.failover; import java.io.IOException; @@ -29,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; import org.apache.activemq.command.Response; +import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.state.ConnectionStateTracker; import org.apache.activemq.state.Tracked; import org.apache.activemq.thread.DefaultThreadPools; @@ -176,7 +178,12 @@ public class FailoverTransport implements CompositeTransport { transportListener.transportInterupted(); } synchronized (reconnectMutex) { - LOG.warn("Transport failed, attempting to automatically reconnect due to: " + e, e); + boolean reconnectOk = false; + if(started) { + LOG.warn("Transport failed, attempting to automatically reconnect due to: " + e, e); + reconnectOk = true; + } + if (connectedTransport != null) { initialized = false; ServiceSupport.dispose(connectedTransport); @@ -185,7 +192,10 @@ public class FailoverTransport implements CompositeTransport { connectedTransportURI = null; connected=false; } - reconnectTask.wakeup(); + + if(reconnectOk) { + reconnectTask.wakeup(); + } } } @@ -307,6 +317,21 @@ public class FailoverTransport implements CompositeTransport { public void setBackupPoolSize(int backupPoolSize) { this.backupPoolSize = backupPoolSize; } + +/* +* BEGIN Patch segment for issue AMQ-1116 +*/ + /** + * @return Returns true if the command is one sent when a connection + * is being closed. + */ + private boolean isShutdownCommand(Command command) { + return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo)); + } +/* +* END Patch segment +*/ + public void oneway(Object o) throws IOException { Command command = (Command)o; @@ -314,6 +339,20 @@ public class FailoverTransport implements CompositeTransport { try { synchronized (reconnectMutex) { + + if (isShutdownCommand(command) && connectedTransport == null) { + if(command.isShutdownInfo()) { + // Skipping send of ShutdownInfo command when not connected. + return; + } + if(command instanceof RemoveInfo) { + // Simulate response to RemoveInfo command + Response response = new Response(); + response.setCorrelationId(command.getCommandId()); + myTransportListener.onCommand(response); + return; + } + } // Keep trying until the message is sent. for (int i = 0; !disposed; i++) { try { @@ -609,11 +648,28 @@ public class FailoverTransport implements CompositeTransport { if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) { LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)"); connectionFailure = failure; + + // Make sure on initial startup, that the transportListener has been initialized + // for this instance. + while(transportListener == null) { + try { + Thread.sleep(100); + } + catch(InterruptedException iEx) {} + } + + + if(transportListener != null) { + if (connectionFailure instanceof IOException) { + transportListener.onException((IOException)connectionFailure); + } else { + transportListener.onException(IOExceptionSupport.create(connectionFailure)); + } + } reconnectMutex.notifyAll(); return false; } } - if (!disposed) { LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");