From 34f33293703d6274f45d4c90dcbc65c723adc787 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Tue, 2 Jul 2013 15:28:42 +0000 Subject: [PATCH] When the leveldb replicated master was shutting down the client would get notified of a failure and it would not be hidden from the client app. We now suppress sending failure messages to clients when a broker is shutting down so that the client failover logic can kick in an reconnect the client to another server gracefully. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1498978 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/TransportConnection.java | 3 ++- .../org/apache/activemq/broker/region/Queue.java | 2 +- .../apache/activemq/leveldb/LevelDBClient.scala | 16 ++++++++++++++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index f505b6c0d3..1c88c4df10 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -172,6 +172,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { this.taskRunnerFactory = taskRunnerFactory; this.stopTaskRunnerFactory = stopTaskRunnerFactory; this.transport = transport; + final BrokerService brokerService = this.broker.getBrokerService(); this.transport.setTransportListener(new DefaultTransportListener() { @Override public void onCommand(Object o) { @@ -182,7 +183,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } Command command = (Command) o; Response response = service(command); - if (response != null) { + if (response != null && !brokerService.isStopping() ) { dispatchSync(response); } } finally { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index b38d385c95..14d7d29264 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -696,7 +696,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } catch (Exception e) { - if (!sendProducerAck && !context.isInRecoveryMode()) { + if (!sendProducerAck && !context.isInRecoveryMode() && !brokerService.isStopping()) { ExceptionResponse response = new ExceptionResponse(e); response.setCorrelationId(message.getCommandId()); context.getConnection().dispatchAsync(response); diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index 13181883b6..05a885a0f6 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -508,10 +508,22 @@ class LevelDBClient(store: LevelDBStore) { def might_fail[T](func : =>T):T = { def handleFailure(e:IOException) = { - store.stop() if( store.broker_service !=null ) { - store.broker_service.handleIOException(e); + // This should start stopping the broker but it might block, + // so do it on another thread... + new Thread("LevelDB IOException handler.") { + override def run() { + store.broker_service.handleIOException(e); + } + }.start() + // Lets wait until the broker service has started stopping. Once the + // stopping flag is raised, errors caused by stopping the store should + // not get propagated to the client. + while( !store.broker_service.isStopping ) { + Thread.sleep(100); + } } + store.stop() throw e; } try {