From 36a2bdc519ea449813995e2404fd5187f7c1ffea Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 7 Feb 2006 21:40:23 +0000 Subject: [PATCH] receive() returns null on connection transport failure git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@375721 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 58 ++++++++++++++----- .../apache/activemq/ActiveMQXAConnection.java | 2 +- .../activemq/ConnectionFailedException.java | 4 ++ 3 files changed, 47 insertions(+), 17 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index ebce559ac5..feb4c8af5b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -126,6 +126,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean closing = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false); + private final AtomicBoolean transportFailed = new AtomicBoolean(false); private final CopyOnWriteArrayList sessions = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList connectionConsumers = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList inputStreams = new CopyOnWriteArrayList(); @@ -246,7 +247,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @since 1.1 */ public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { - checkClosed(); + checkClosedOrFailed(); ensureConnectionInfoSent(); return new ActiveMQSession(this, getNextSessionId(), (transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode)), asyncDispatch); @@ -273,7 +274,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * connection due to some internal error. */ public String getClientID() throws JMSException { - checkClosed(); + checkClosedOrFailed(); return this.info.getClientId(); } @@ -319,7 +320,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * configured. */ public void setClientID(String newClientID) throws JMSException { - checkClosed(); + checkClosedOrFailed(); if (this.clientIDSet) { throw new IllegalStateException("The clientID has already been set"); @@ -344,7 +345,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @see javax.jms.ConnectionMetaData */ public ConnectionMetaData getMetaData() throws JMSException { - checkClosed(); + checkClosedOrFailed(); return ActiveMQConnectionMetaData.INSTANCE; } @@ -362,7 +363,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @see javax.jms.Connection#setExceptionListener(ExceptionListener) */ public ExceptionListener getExceptionListener() throws JMSException { - checkClosed(); + checkClosedOrFailed(); return this.exceptionListener; } @@ -391,7 +392,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * this connection. */ public void setExceptionListener(ExceptionListener listener) throws JMSException { - checkClosed(); + checkClosedOrFailed(); this.exceptionListener = listener; } @@ -406,7 +407,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @see javax.jms.Connection#stop() */ public void start() throws JMSException { - checkClosed(); + checkClosedOrFailed(); ensureConnectionInfoSent(); if (started.compareAndSet(false, true)) { for (Iterator i = sessions.iterator(); i.hasNext();) { @@ -456,7 +457,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @see javax.jms.Connection#start() */ public void stop() throws JMSException { - checkClosed(); + checkClosedOrFailed(); if (started.compareAndSet(true, false)) { for (Iterator i = sessions.iterator(); i.hasNext();) { ActiveMQSession s = (ActiveMQSession) i.next(); @@ -647,7 +648,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal) throws JMSException { - checkClosed(); + checkClosedOrFailed(); ensureConnectionInfoSent(); SessionId sessionId = new SessionId(info.getConnectionId(), -1); ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator @@ -945,7 +946,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal) throws JMSException { - checkClosed(); + checkClosedOrFailed(); ensureConnectionInfoSent(); ConsumerId consumerId = createConsumerId(); ConsumerInfo info = new ConsumerInfo(consumerId); @@ -1090,6 +1091,19 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon return stats; } + /** + * simply throws an exception if the Connection is already closed + * or the Transport has failed + * + * @throws JMSException + */ + protected synchronized void checkClosedOrFailed() throws JMSException { + checkClosed(); + if (transportFailed.get()){ + throw new ConnectionFailedException(); + } + } + /** * simply throws an exception if the Connection is already closed * @@ -1315,9 +1329,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } else { log.warn("Async exception with no exception listener: " + error, error); } + transportFailed(error); } } + public void onException(IOException error) { onAsyncException(error); ServiceSupport.dispose(this.transport); @@ -1359,7 +1375,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon */ public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException { - checkClosed(); + checkClosedOrFailed(); activeTempDestinations.remove(destination); DestinationInfo info = new DestinationInfo(); @@ -1394,7 +1410,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public void destroyDestination(ActiveMQDestination destination) throws JMSException { - checkClosed(); + checkClosedOrFailed(); ensureConnectionInfoSent(); DestinationInfo info = new DestinationInfo(); @@ -1447,7 +1463,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName) throws JMSException { - checkClosed(); + checkClosedOrFailed(); ensureConnectionInfoSent(); return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch()); } @@ -1458,7 +1474,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } public OutputStream createOutputStream(Destination dest, Map streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkClosed(); + checkClosedOrFailed(); ensureConnectionInfoSent(); return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive); } @@ -1484,7 +1500,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @since 1.1 */ public void unsubscribe(String name) throws JMSException { - checkClosed(); + checkClosedOrFailed(); RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); rsi.setConnectionId(getConnectionInfo().getConnectionId()); rsi.setSubcriptionName(name); @@ -1500,7 +1516,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * - Does not allow you to send /w a transaction. */ void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException { - checkClosed(); + checkClosedOrFailed(); if( destination.isTemporary() && isDeleted(destination) ) { throw new JMSException("Cannot publish to a deleted Destination: "+destination); @@ -1562,6 +1578,16 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } } } + + protected void transportFailed(Throwable error){ + transportFailed.set(true); + try{ + cleanup(); + }catch(JMSException e){ + log.warn("Cleanup failed",e); + } + + } public void setCopyMessageOnSend(boolean copyMessageOnSend) { this.copyMessageOnSend = copyMessageOnSend; diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java index 22c37f9859..bf5b8b7844 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java @@ -73,7 +73,7 @@ public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicC } public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { - checkClosed(); + checkClosedOrFailed(); ensureConnectionInfoSent(); return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, asyncDispatch); } diff --git a/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java b/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java index f61820a42f..caf5967ef3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java +++ b/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java @@ -35,6 +35,10 @@ public class ConnectionFailedException extends JMSException { initCause(cause); setLinkedException(cause); } + + public ConnectionFailedException() { + super("The JMS connection has failed due ti a Transport problem"); + } static private String extractMessage(IOException cause) { String m = cause.getMessage();