git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@669512 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-06-19 15:37:03 +00:00
parent e38da226ef
commit 1555260c6c
1 changed files with 40 additions and 28 deletions

View File

@ -183,6 +183,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private long timeCreated; private long timeCreated;
private ConnectionAudit connectionAudit = new ConnectionAudit(); private ConnectionAudit connectionAudit = new ConnectionAudit();
private DestinationSource destinationSource; private DestinationSource destinationSource;
private final Object ensureConnectionInfoSentMutex = new Object();
/** /**
* Construct an <code>ActiveMQConnection</code> * Construct an <code>ActiveMQConnection</code>
@ -524,9 +525,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void stop() throws JMSException { public void stop() throws JMSException {
checkClosedOrFailed(); checkClosedOrFailed();
if (started.compareAndSet(true, false)) { if (started.compareAndSet(true, false)) {
for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { synchronized(sessions) {
ActiveMQSession s = i.next(); for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
s.stop(); ActiveMQSession s = i.next();
s.stop();
}
} }
} }
} }
@ -577,7 +580,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void close() throws JMSException { public void close() throws JMSException {
try { try {
// If we were running, lets stop first. // If we were running, lets stop first.
stop(); if (!closed.get() && !transportFailed.get()) {
stop();
}
synchronized (this) { synchronized (this) {
if (!closed.get()) { if (!closed.get()) {
@ -626,16 +631,18 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// then we may need to call // then we may need to call
// factory.onConnectionClose(this); // factory.onConnectionClose(this);
sessionTaskRunner.shutdown(); sessionTaskRunner.shutdown();
if (asyncConnectionThread != null){
asyncConnectionThread.shutdown();
}
closed.set(true); closed.set(true);
closing.set(false); closing.set(false);
} }
} }
} finally { } finally {
try {
if (asyncConnectionThread != null){
asyncConnectionThread.shutdown();
}
}catch(Throwable e) {
LOG.error("Error shutting down thread pool " + e,e);
}
factoryStats.removeConnection(this); factoryStats.removeConnection(this);
} }
} }
@ -1226,6 +1233,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
if (er.getException() instanceof JMSException) { if (er.getException() instanceof JMSException) {
throw (JMSException)er.getException(); throw (JMSException)er.getException();
} else { } else {
if (isClosed()||closing.get()) {
LOG.debug("Received an exception but connection is closing");
}
JMSException jmsEx = null; JMSException jmsEx = null;
try { try {
jmsEx = JMSExceptionSupport.create(er.getException()); jmsEx = JMSExceptionSupport.create(er.getException());
@ -1313,25 +1323,27 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* *
* @throws JMSException * @throws JMSException
*/ */
protected synchronized void ensureConnectionInfoSent() throws JMSException { protected void ensureConnectionInfoSent() throws JMSException {
// Can we skip sending the ConnectionInfo packet?? synchronized(this.ensureConnectionInfoSentMutex) {
if (isConnectionInfoSentToBroker || closed.get()) { // Can we skip sending the ConnectionInfo packet??
return; if (isConnectionInfoSentToBroker || closed.get()) {
} return;
}
if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
info.setClientId(clientIdGenerator.generateId()); if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
} info.setClientId(clientIdGenerator.generateId());
syncSendPacket(info); }
syncSendPacket(info);
this.isConnectionInfoSentToBroker = true;
// Add a temp destination advisory consumer so that this.isConnectionInfoSentToBroker = true;
// We know what the valid temporary destinations are on the // Add a temp destination advisory consumer so that
// broker without having to do an RPC to the broker. // We know what the valid temporary destinations are on the
// broker without having to do an RPC to the broker.
ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
if (watchTopicAdvisories) { ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
advisoryConsumer = new AdvisoryConsumer(this, consumerId); if (watchTopicAdvisories) {
advisoryConsumer = new AdvisoryConsumer(this, consumerId);
}
} }
} }