added patch for AMQ-835 - many thanks Ozgur Cetinturk

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@423797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-07-20 06:13:43 +00:00
parent 1bcd631bf9
commit ea35563b49
1 changed files with 43 additions and 39 deletions

View File

@ -529,53 +529,57 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void close() throws JMSException {
checkClosed();
// If we were running, lets stop first.
stop();
try {
// If we were running, lets stop first.
stop();
synchronized (this) {
if (!closed.get()) {
closing.set(true);
synchronized (this) {
if (!closed.get()) {
closing.set(true);
if( advisoryConsumer!=null ) {
advisoryConsumer.dispose();
advisoryConsumer=null;
}
if (advisoryConsumer != null) {
advisoryConsumer.dispose();
advisoryConsumer = null;
}
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();
s.dispose();
}
for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) {
ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i.next();
c.dispose();
}
for (Iterator i = this.inputStreams.iterator(); i.hasNext();) {
ActiveMQInputStream c = (ActiveMQInputStream) i.next();
c.dispose();
}
for (Iterator i = this.outputStreams.iterator(); i.hasNext();) {
ActiveMQOutputStream c = (ActiveMQOutputStream) i.next();
c.dispose();
}
if (isConnectionInfoSentToBroker) {
syncSendPacket(info.createRemoveCommand(),closeTimeout);
}
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();
s.dispose();
}
for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) {
ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i.next();
c.dispose();
}
for (Iterator i = this.inputStreams.iterator(); i.hasNext();) {
ActiveMQInputStream c = (ActiveMQInputStream) i.next();
c.dispose();
}
for (Iterator i = this.outputStreams.iterator(); i.hasNext();) {
ActiveMQOutputStream c = (ActiveMQOutputStream) i.next();
c.dispose();
}
asyncSendPacket(new ShutdownInfo());
ServiceSupport.dispose(this.transport);
if (isConnectionInfoSentToBroker) {
syncSendPacket(info.createRemoveCommand(), closeTimeout);
}
started.set(false);
asyncSendPacket(new ShutdownInfo());
ServiceSupport.dispose(this.transport);
// TODO : ActiveMQConnectionFactory.onConnectionClose() not
// yet implemented.
// factory.onConnectionClose(this);
closed.set(true);
closing.set(false);
started.set(false);
// TODO : ActiveMQConnectionFactory.onConnectionClose() not
// yet implemented.
// factory.onConnectionClose(this);
closed.set(true);
closing.set(false);
}
}
}
finally {
factoryStats.removeConnection(this);
}
}
/**