diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 3e7d5085de..cf8b079c80 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -1411,7 +1411,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon */ public void cleanup() throws JMSException { - if (advisoryConsumer != null) { + if (advisoryConsumer != null && !isTransportFailed()) { advisoryConsumer.dispose(); advisoryConsumer = null; } @@ -1805,7 +1805,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon transportFailed(error); ServiceSupport.dispose(ActiveMQConnection.this.transport); brokerInfoReceived.countDown(); - + try { + cleanup(); + } catch (JMSException e) { + LOG.warn("Exception during connection cleanup, " + e, e); + } for (Iterator iter = transportListeners .iterator(); iter.hasNext();) { TransportListener listener = iter.next(); @@ -2215,4 +2219,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) { connectionAudit.rollbackDuplicate(dispatcher, message); } + + public IOException getFirstFailureError() { + return firstFailureError; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index daeb1e0b84..9d264dae52 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.activemq; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -32,6 +33,7 @@ import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageListener; import org.apache.activemq.command.ActiveMQDestination; @@ -129,6 +131,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private MessageAck pendingAck; private long lastDeliveredSequenceId; + + private IOException failureError; /** * Create a MessageConsumer @@ -417,7 +421,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (timeout > 0 && !unconsumedMessages.isClosed()) { timeout = Math.max(deadline - System.currentTimeMillis(), 0); } else { - return null; + if (failureError != null) { + throw JMSExceptionSupport.create(failureError); + } else { + return null; + } } } else if (md.getMessage() == null) { return null; @@ -1136,4 +1144,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC return lastDeliveredSequenceId; } + public IOException getFailureError() { + return failureError; + } + + public void setFailureError(IOException failureError) { + this.failureError = failureError; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index cc5f9a821c..d35737553a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -609,6 +609,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta for (Iterator iter = consumers.iterator(); iter.hasNext();) { ActiveMQMessageConsumer consumer = iter.next(); + consumer.setFailureError(connection.getFirstFailureError()); consumer.dispose(); lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); }