receive() returns null on connection transport failure

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@375721 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-02-07 21:40:23 +00:00
parent dbf3e5682b
commit 36a2bdc519
3 changed files with 47 additions and 17 deletions

View File

@ -126,6 +126,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean closing = new AtomicBoolean(false);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean transportFailed = new AtomicBoolean(false);
private final CopyOnWriteArrayList sessions = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList connectionConsumers = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList inputStreams = new CopyOnWriteArrayList();
@ -246,7 +247,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @since 1.1
*/
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosed();
checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQSession(this, getNextSessionId(), (transacted ? Session.SESSION_TRANSACTED
: (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode)), asyncDispatch);
@ -273,7 +274,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* connection due to some internal error.
*/
public String getClientID() throws JMSException {
checkClosed();
checkClosedOrFailed();
return this.info.getClientId();
}
@ -319,7 +320,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* configured.
*/
public void setClientID(String newClientID) throws JMSException {
checkClosed();
checkClosedOrFailed();
if (this.clientIDSet) {
throw new IllegalStateException("The clientID has already been set");
@ -344,7 +345,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @see javax.jms.ConnectionMetaData
*/
public ConnectionMetaData getMetaData() throws JMSException {
checkClosed();
checkClosedOrFailed();
return ActiveMQConnectionMetaData.INSTANCE;
}
@ -362,7 +363,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @see javax.jms.Connection#setExceptionListener(ExceptionListener)
*/
public ExceptionListener getExceptionListener() throws JMSException {
checkClosed();
checkClosedOrFailed();
return this.exceptionListener;
}
@ -391,7 +392,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* this connection.
*/
public void setExceptionListener(ExceptionListener listener) throws JMSException {
checkClosed();
checkClosedOrFailed();
this.exceptionListener = listener;
}
@ -406,7 +407,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @see javax.jms.Connection#stop()
*/
public void start() throws JMSException {
checkClosed();
checkClosedOrFailed();
ensureConnectionInfoSent();
if (started.compareAndSet(false, true)) {
for (Iterator i = sessions.iterator(); i.hasNext();) {
@ -456,7 +457,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @see javax.jms.Connection#start()
*/
public void stop() throws JMSException {
checkClosed();
checkClosedOrFailed();
if (started.compareAndSet(true, false)) {
for (Iterator i = sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();
@ -647,7 +648,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
throws JMSException {
checkClosed();
checkClosedOrFailed();
ensureConnectionInfoSent();
SessionId sessionId = new SessionId(info.getConnectionId(), -1);
ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator
@ -945,7 +946,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal) throws JMSException {
checkClosed();
checkClosedOrFailed();
ensureConnectionInfoSent();
ConsumerId consumerId = createConsumerId();
ConsumerInfo info = new ConsumerInfo(consumerId);
@ -1090,6 +1091,19 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
return stats;
}
/**
* simply throws an exception if the Connection is already closed
* or the Transport has failed
*
* @throws JMSException
*/
protected synchronized void checkClosedOrFailed() throws JMSException {
checkClosed();
if (transportFailed.get()){
throw new ConnectionFailedException();
}
}
/**
* simply throws an exception if the Connection is already closed
*
@ -1315,9 +1329,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} else {
log.warn("Async exception with no exception listener: " + error, error);
}
transportFailed(error);
}
}
public void onException(IOException error) {
onAsyncException(error);
ServiceSupport.dispose(this.transport);
@ -1359,7 +1375,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
*/
public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
checkClosed();
checkClosedOrFailed();
activeTempDestinations.remove(destination);
DestinationInfo info = new DestinationInfo();
@ -1394,7 +1410,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void destroyDestination(ActiveMQDestination destination) throws JMSException {
checkClosed();
checkClosedOrFailed();
ensureConnectionInfoSent();
DestinationInfo info = new DestinationInfo();
@ -1447,7 +1463,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName) throws JMSException {
checkClosed();
checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch());
}
@ -1458,7 +1474,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
public OutputStream createOutputStream(Destination dest, Map streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
checkClosed();
checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
}
@ -1484,7 +1500,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @since 1.1
*/
public void unsubscribe(String name) throws JMSException {
checkClosed();
checkClosedOrFailed();
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(getConnectionInfo().getConnectionId());
rsi.setSubcriptionName(name);
@ -1500,7 +1516,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* - Does not allow you to send /w a transaction.
*/
void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
checkClosed();
checkClosedOrFailed();
if( destination.isTemporary() && isDeleted(destination) ) {
throw new JMSException("Cannot publish to a deleted Destination: "+destination);
@ -1562,6 +1578,16 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
}
}
protected void transportFailed(Throwable error){
transportFailed.set(true);
try{
cleanup();
}catch(JMSException e){
log.warn("Cleanup failed",e);
}
}
public void setCopyMessageOnSend(boolean copyMessageOnSend) {
this.copyMessageOnSend = copyMessageOnSend;

View File

@ -73,7 +73,7 @@ public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicC
}
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosed();
checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, asyncDispatch);
}

View File

@ -35,6 +35,10 @@ public class ConnectionFailedException extends JMSException {
initCause(cause);
setLinkedException(cause);
}
public ConnectionFailedException() {
super("The JMS connection has failed due ti a Transport problem");
}
static private String extractMessage(IOException cause) {
String m = cause.getMessage();