diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index f505b6c0d3..1c88c4df10 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -172,6 +172,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { this.taskRunnerFactory = taskRunnerFactory; this.stopTaskRunnerFactory = stopTaskRunnerFactory; this.transport = transport; + final BrokerService brokerService = this.broker.getBrokerService(); this.transport.setTransportListener(new DefaultTransportListener() { @Override public void onCommand(Object o) { @@ -182,7 +183,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } Command command = (Command) o; Response response = service(command); - if (response != null) { + if (response != null && !brokerService.isStopping() ) { dispatchSync(response); } } finally { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index b38d385c95..14d7d29264 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -696,7 +696,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } catch (Exception e) { - if (!sendProducerAck && !context.isInRecoveryMode()) { + if (!sendProducerAck && !context.isInRecoveryMode() && !brokerService.isStopping()) { ExceptionResponse response = new ExceptionResponse(e); response.setCorrelationId(message.getCommandId()); context.getConnection().dispatchAsync(response); diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index 13181883b6..05a885a0f6 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -508,10 +508,22 @@ class LevelDBClient(store: LevelDBStore) { def might_fail[T](func : =>T):T = { def handleFailure(e:IOException) = { - store.stop() if( store.broker_service !=null ) { - store.broker_service.handleIOException(e); + // This should start stopping the broker but it might block, + // so do it on another thread... + new Thread("LevelDB IOException handler.") { + override def run() { + store.broker_service.handleIOException(e); + } + }.start() + // Lets wait until the broker service has started stopping. Once the + // stopping flag is raised, errors caused by stopping the store should + // not get propagated to the client. + while( !store.broker_service.isStopping ) { + Thread.sleep(100); + } } + store.stop() throw e; } try {