https://issues.apache.org/activemq/browse/AMQ-2195 - receive should throw an exception if the connection is lost

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@761301 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-04-02 14:03:54 +00:00
parent cd174e7829
commit f8ef7ff31f
3 changed files with 28 additions and 3 deletions

View File

@ -1411,7 +1411,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
*/ */
public void cleanup() throws JMSException { public void cleanup() throws JMSException {
if (advisoryConsumer != null) { if (advisoryConsumer != null && !isTransportFailed()) {
advisoryConsumer.dispose(); advisoryConsumer.dispose();
advisoryConsumer = null; advisoryConsumer = null;
} }
@ -1805,7 +1805,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
transportFailed(error); transportFailed(error);
ServiceSupport.dispose(ActiveMQConnection.this.transport); ServiceSupport.dispose(ActiveMQConnection.this.transport);
brokerInfoReceived.countDown(); brokerInfoReceived.countDown();
try {
cleanup();
} catch (JMSException e) {
LOG.warn("Exception during connection cleanup, " + e, e);
}
for (Iterator<TransportListener> iter = transportListeners for (Iterator<TransportListener> iter = transportListeners
.iterator(); iter.hasNext();) { .iterator(); iter.hasNext();) {
TransportListener listener = iter.next(); TransportListener listener = iter.next();
@ -2215,4 +2219,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) { protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
connectionAudit.rollbackDuplicate(dispatcher, message); connectionAudit.rollbackDuplicate(dispatcher, message);
} }
public IOException getFirstFailureError() {
return firstFailureError;
}
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -32,6 +33,7 @@ import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException; import javax.jms.InvalidDestinationException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -130,6 +132,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private MessageAck pendingAck; private MessageAck pendingAck;
private long lastDeliveredSequenceId; private long lastDeliveredSequenceId;
private IOException failureError;
/** /**
* Create a MessageConsumer * Create a MessageConsumer
* *
@ -416,9 +420,13 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (md == null) { if (md == null) {
if (timeout > 0 && !unconsumedMessages.isClosed()) { if (timeout > 0 && !unconsumedMessages.isClosed()) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0); timeout = Math.max(deadline - System.currentTimeMillis(), 0);
} else {
if (failureError != null) {
throw JMSExceptionSupport.create(failureError);
} else { } else {
return null; return null;
} }
}
} else if (md.getMessage() == null) { } else if (md.getMessage() == null) {
return null; return null;
} else if (md.getMessage().isExpired()) { } else if (md.getMessage().isExpired()) {
@ -1136,4 +1144,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
return lastDeliveredSequenceId; return lastDeliveredSequenceId;
} }
public IOException getFailureError() {
return failureError;
}
public void setFailureError(IOException failureError) {
this.failureError = failureError;
}
} }

View File

@ -609,6 +609,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
ActiveMQMessageConsumer consumer = iter.next(); ActiveMQMessageConsumer consumer = iter.next();
consumer.setFailureError(connection.getFirstFailureError());
consumer.dispose(); consumer.dispose();
lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
} }