mirror of https://github.com/apache/activemq.git
When the leveldb replicated master was shutting down the client would get notified of a failure and it would not be hidden from the client app. We now suppress sending failure messages to clients when a broker is shutting down so that the client failover logic can kick in an reconnect the client to another server gracefully.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1498978 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6167c49fc1
commit
34f3329370
|
@ -172,6 +172,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
this.taskRunnerFactory = taskRunnerFactory;
|
this.taskRunnerFactory = taskRunnerFactory;
|
||||||
this.stopTaskRunnerFactory = stopTaskRunnerFactory;
|
this.stopTaskRunnerFactory = stopTaskRunnerFactory;
|
||||||
this.transport = transport;
|
this.transport = transport;
|
||||||
|
final BrokerService brokerService = this.broker.getBrokerService();
|
||||||
this.transport.setTransportListener(new DefaultTransportListener() {
|
this.transport.setTransportListener(new DefaultTransportListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onCommand(Object o) {
|
public void onCommand(Object o) {
|
||||||
|
@ -182,7 +183,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
}
|
}
|
||||||
Command command = (Command) o;
|
Command command = (Command) o;
|
||||||
Response response = service(command);
|
Response response = service(command);
|
||||||
if (response != null) {
|
if (response != null && !brokerService.isStopping() ) {
|
||||||
dispatchSync(response);
|
dispatchSync(response);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -696,7 +696,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (!sendProducerAck && !context.isInRecoveryMode()) {
|
if (!sendProducerAck && !context.isInRecoveryMode() && !brokerService.isStopping()) {
|
||||||
ExceptionResponse response = new ExceptionResponse(e);
|
ExceptionResponse response = new ExceptionResponse(e);
|
||||||
response.setCorrelationId(message.getCommandId());
|
response.setCorrelationId(message.getCommandId());
|
||||||
context.getConnection().dispatchAsync(response);
|
context.getConnection().dispatchAsync(response);
|
||||||
|
|
|
@ -508,10 +508,22 @@ class LevelDBClient(store: LevelDBStore) {
|
||||||
|
|
||||||
def might_fail[T](func : =>T):T = {
|
def might_fail[T](func : =>T):T = {
|
||||||
def handleFailure(e:IOException) = {
|
def handleFailure(e:IOException) = {
|
||||||
store.stop()
|
|
||||||
if( store.broker_service !=null ) {
|
if( store.broker_service !=null ) {
|
||||||
store.broker_service.handleIOException(e);
|
// This should start stopping the broker but it might block,
|
||||||
|
// so do it on another thread...
|
||||||
|
new Thread("LevelDB IOException handler.") {
|
||||||
|
override def run() {
|
||||||
|
store.broker_service.handleIOException(e);
|
||||||
|
}
|
||||||
|
}.start()
|
||||||
|
// Lets wait until the broker service has started stopping. Once the
|
||||||
|
// stopping flag is raised, errors caused by stopping the store should
|
||||||
|
// not get propagated to the client.
|
||||||
|
while( !store.broker_service.isStopping ) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
store.stop()
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue