diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index a798e351a2..da8ee6602d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -580,8 +580,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon // If we announced ourselfs to the broker.. Try to let // the broker // know that the connection is being shutdown. - syncSendPacket(info.createRemoveCommand(), closeTimeout); - asyncSendPacket(new ShutdownInfo()); + doSyncSendPacket(info.createRemoveCommand(), closeTimeout); + doAsyncSendPacket(new ShutdownInfo()); } ServiceSupport.dispose(this.transport); @@ -1144,18 +1144,21 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @throws JMSException */ public void asyncSendPacket(Command command) throws JMSException { - if (isClosed()) { + if (isClosed() || closing.get()) { throw new ConnectionClosedException(); } else { - - try { - this.transport.oneway(command); - } catch (IOException e) { - throw JMSExceptionSupport.create(e); - } + doAsyncSendPacket(command); } } + private void doAsyncSendPacket(Command command) throws JMSException { + try { + this.transport.oneway(command); + } catch (IOException e) { + throw JMSExceptionSupport.create(e); + } + } + /** * Send a packet through a Connection - for internal use only * @@ -1193,27 +1196,31 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @throws JMSException */ public Response syncSendPacket(Command command, int timeout) throws JMSException { - if (isClosed()) { + if (isClosed() || closing.get()) { throw new ConnectionClosedException(); } else { - - try { - Response response = (Response)this.transport.request(command, timeout); - if (response != null && response.isException()) { - ExceptionResponse er = (ExceptionResponse)response; - if (er.getException() instanceof JMSException) { - throw (JMSException)er.getException(); - } else { - throw JMSExceptionSupport.create(er.getException()); - } - } - return response; - } catch (IOException e) { - throw JMSExceptionSupport.create(e); - } + return doSyncSendPacket(command, timeout); } } + private Response doSyncSendPacket(Command command, int timeout) + throws JMSException { + try { + Response response = (Response)this.transport.request(command, timeout); + if (response != null && response.isException()) { + ExceptionResponse er = (ExceptionResponse)response; + if (er.getException() instanceof JMSException) { + throw (JMSException)er.getException(); + } else { + throw JMSExceptionSupport.create(er.getException()); + } + } + return response; + } catch (IOException e) { + throw JMSExceptionSupport.create(e); + } + } + /** * @return statistics for this Connection */ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index e12bb1b684..cab251dc91 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -139,7 +139,6 @@ public class TransportConnection implements Service, Connection, Task, CommandVi private long timeStamp; private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicBoolean transportDisposed = new AtomicBoolean(); - private final AtomicBoolean disposed = new AtomicBoolean(false); private CountDownLatch stopLatch = new CountDownLatch(1); private final AtomicBoolean asyncException = new AtomicBoolean(false); private final Map producerExchanges = new HashMap(); @@ -198,7 +197,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi } public void serviceTransportException(IOException e) { - if (!disposed.get()) { + if (!stopped.get()) { transportException.set(e); if (TRANSPORTLOG.isDebugEnabled()) { TRANSPORTLOG.debug("Transport failed: " + e, e); @@ -240,7 +239,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi // Handle the case where the broker is stopped // But the client is still connected. - if (!disposed.get()) { + if (!stopped.get()) { if (SERVICELOG.isDebugEnabled()) { SERVICELOG .debug("Broker has been stopped. Notifying client and closing his connection."); @@ -260,7 +259,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi // notification gets to him. ServiceSupport.dispose(this); } - } else if (!disposed.get() && !inServiceException) { + } else if (!stopped.get() && !inServiceException) { inServiceException = true; try { SERVICELOG.error("Async error occurred: " + e, e); @@ -669,11 +668,12 @@ public class TransportConnection implements Service, Connection, Task, CommandVi return null; } - public Response processRemoveConnection(ConnectionId id) { + public Response processRemoveConnection(ConnectionId id) throws InterruptedException { TransportConnectionState cs = lookupConnectionState(id); // Don't allow things to be added to the connection state while we are // shutting down. cs.shutdown(); + // Cascade the connection stop to the sessions. for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) { SessionId sessionId = (SessionId)iter.next(); @@ -731,7 +731,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi } public void dispatchAsync(Command message) { - if (!disposed.get()) { + if (!stopped.get()) { getStatistics().getEnqueues().increment(); if (taskRunner == null) { dispatchSync(message); @@ -759,7 +759,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch() ? command : null); try { - if (!disposed.get()) { + if (!stopped.get()) { if (messageDispatch != null) { broker.preProcessDispatch(messageDispatch); } @@ -779,7 +779,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi public boolean iterate() { try { - if (disposed.get()) { + if (stopped.get()) { if (dispatchStopped.compareAndSet(false, true)) { if (transportException.get() == null) { try { @@ -901,64 +901,67 @@ public class TransportConnection implements Service, Connection, Task, CommandVi } catch (Exception ignore) { LOG.trace("Exception caught stopping", ignore); } - if (disposed.compareAndSet(false, true)) { - // Let all the connection contexts know we are shutting down - // so that in progress operations can notice and unblock. - List connectionStates = listConnectionStates(); + // Let all the connection contexts know we are shutting down + // so that in progress operations can notice and unblock. + List connectionStates = listConnectionStates(); + for (TransportConnectionState cs : connectionStates) { + cs.getContext().getStopping().set(true); + } + + if (taskRunner != null) { + taskRunner.wakeup(); + // Give it a change to stop gracefully. + dispatchStoppedLatch.await(5, TimeUnit.SECONDS); + } + + try { + transport.stop(); + LOG.debug("Stopped connection: " + transport.getRemoteAddress()); + } catch (Exception e) { + LOG.debug("Could not stop transport: " + e, e); + } + + if (taskRunner != null) { + taskRunner.shutdown(); + } + + active = false; + + // Run the MessageDispatch callbacks so that message references get + // cleaned up. + for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) { + Command command = iter.next(); + if (command.isMessageDispatch()) { + MessageDispatch md = (MessageDispatch)command; + Runnable sub = md.getTransmitCallback(); + broker.postProcessDispatch(md); + if (sub != null) { + sub.run(); + } + } + } + // + // Remove all logical connection associated with this connection + // from the broker. + + if (!broker.isStopped()) { + connectionStates = listConnectionStates(); for (TransportConnectionState cs : connectionStates) { cs.getContext().getStopping().set(true); - } - - if (taskRunner != null) { - taskRunner.wakeup(); - // Give it a change to stop gracefully. - dispatchStoppedLatch.await(5, TimeUnit.SECONDS); - disposeTransport(); - taskRunner.shutdown(); - } else { - disposeTransport(); - } - - if (taskRunner != null) { - taskRunner.shutdown(); - } - - // Run the MessageDispatch callbacks so that message references get - // cleaned up. - for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) { - Command command = iter.next(); - if (command.isMessageDispatch()) { - MessageDispatch md = (MessageDispatch)command; - Runnable sub = md.getTransmitCallback(); - broker.postProcessDispatch(md); - if (sub != null) { - sub.run(); - } + try { + LOG.debug("Cleaning up connection resources: " + getRemoteAddress()); + processRemoveConnection(cs.getInfo().getConnectionId()); + } catch (Throwable ignore) { + ignore.printStackTrace(); } } - // - // Remove all logical connection associated with this connection - // from the broker. - if (!broker.isStopped()) { - connectionStates = listConnectionStates(); - for (TransportConnectionState cs : connectionStates) { - cs.getContext().getStopping().set(true); - try { - LOG.debug("Cleaning up connection resources: " + getRemoteAddress()); - processRemoveConnection(cs.getInfo().getConnectionId()); - } catch (Throwable ignore) { - ignore.printStackTrace(); - } - } - - if (brokerInfo != null) { - broker.removeBroker(this, brokerInfo); - } + if (brokerInfo != null) { + broker.removeBroker(this, brokerInfo); } - LOG.debug("Connection Stopped: " + getRemoteAddress()); } + LOG.debug("Connection Stopped: " + getRemoteAddress()); } /** @@ -1233,18 +1236,6 @@ public class TransportConnection implements Service, Connection, Task, CommandVi } } - protected void disposeTransport() { - if (transportDisposed.compareAndSet(false, true)) { - try { - transport.stop(); - active = false; - LOG.debug("Stopped connection: " + transport.getRemoteAddress()); - } catch (Exception e) { - LOG.debug("Could not stop transport: " + e, e); - } - } - } - public int getProtocolVersion() { return protocolVersion.get(); }