diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 5a92e5e5ea..5e9ad5afe1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -183,6 +183,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private long timeCreated; private ConnectionAudit connectionAudit = new ConnectionAudit(); private DestinationSource destinationSource; + private final Object ensureConnectionInfoSentMutex = new Object(); /** * Construct an ActiveMQConnection @@ -524,9 +525,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public void stop() throws JMSException { checkClosedOrFailed(); if (started.compareAndSet(true, false)) { - for (Iterator i = sessions.iterator(); i.hasNext();) { - ActiveMQSession s = i.next(); - s.stop(); + synchronized(sessions) { + for (Iterator i = sessions.iterator(); i.hasNext();) { + ActiveMQSession s = i.next(); + s.stop(); + } } } } @@ -577,7 +580,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public void close() throws JMSException { try { // If we were running, lets stop first. - stop(); + if (!closed.get() && !transportFailed.get()) { + stop(); + } synchronized (this) { if (!closed.get()) { @@ -626,16 +631,18 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon // then we may need to call // factory.onConnectionClose(this); sessionTaskRunner.shutdown(); - - if (asyncConnectionThread != null){ - asyncConnectionThread.shutdown(); - } - closed.set(true); closing.set(false); } } } finally { + try { + if (asyncConnectionThread != null){ + asyncConnectionThread.shutdown(); + } + }catch(Throwable e) { + LOG.error("Error shutting down thread pool " + e,e); + } factoryStats.removeConnection(this); } } @@ -1226,6 +1233,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon if (er.getException() instanceof JMSException) { throw (JMSException)er.getException(); } else { + if (isClosed()||closing.get()) { + LOG.debug("Received an exception but connection is closing"); + } JMSException jmsEx = null; try { jmsEx = JMSExceptionSupport.create(er.getException()); @@ -1313,25 +1323,27 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * * @throws JMSException */ - protected synchronized void ensureConnectionInfoSent() throws JMSException { - // Can we skip sending the ConnectionInfo packet?? - if (isConnectionInfoSentToBroker || closed.get()) { - return; - } - - if (info.getClientId() == null || info.getClientId().trim().length() == 0) { - info.setClientId(clientIdGenerator.generateId()); - } - syncSendPacket(info); - - this.isConnectionInfoSentToBroker = true; - // Add a temp destination advisory consumer so that - // We know what the valid temporary destinations are on the - // broker without having to do an RPC to the broker. - - ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId()); - if (watchTopicAdvisories) { - advisoryConsumer = new AdvisoryConsumer(this, consumerId); + protected void ensureConnectionInfoSent() throws JMSException { + synchronized(this.ensureConnectionInfoSentMutex) { + // Can we skip sending the ConnectionInfo packet?? + if (isConnectionInfoSentToBroker || closed.get()) { + return; + } + + if (info.getClientId() == null || info.getClientId().trim().length() == 0) { + info.setClientId(clientIdGenerator.generateId()); + } + syncSendPacket(info); + + this.isConnectionInfoSentToBroker = true; + // Add a temp destination advisory consumer so that + // We know what the valid temporary destinations are on the + // broker without having to do an RPC to the broker. + + ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId()); + if (watchTopicAdvisories) { + advisoryConsumer = new AdvisoryConsumer(this, consumerId); + } } }