Improve exception handling on TransportMasterNodeAction (#29314)

We have seen exceptions bubble up to the uncaught exception handler. Checking the blocks can
lead for example to IndexNotFoundException when the indices are resolved. In order to make
TransportMasterNodeAction more resilient against such expected exceptions, this code change
wraps the execution of doStart() into a try catch and informs the listener in case of failures.
This commit is contained in:
Yannick Welsch 2018-04-03 11:57:58 +02:00 committed by GitHub
parent 2dc546ccec
commit d4538df893
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 99 additions and 56 deletions

View File

@ -145,69 +145,79 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
}
protected void doStart(ClusterState clusterState) {
final Predicate<ClusterState> masterChangePredicate = MasterNodeChangePredicate.build(clusterState);
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
// check for block, if blocked, retry, else, execute locally
final ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException != null) {
if (!blockException.retryable()) {
listener.onFailure(blockException);
try {
final Predicate<ClusterState> masterChangePredicate = MasterNodeChangePredicate.build(clusterState);
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
// check for block, if blocked, retry, else, execute locally
final ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException != null) {
if (!blockException.retryable()) {
listener.onFailure(blockException);
} else {
logger.trace("can't execute due to a cluster block, retrying", blockException);
retry(blockException, newState -> {
try {
ClusterBlockException newException = checkBlock(request, newState);
return (newException == null || !newException.retryable());
} catch (Exception e) {
// accept state as block will be rechecked by doStart() and listener.onFailure() then called
logger.trace("exception occurred during cluster block checking, accepting state", e);
return true;
}
});
}
} else {
logger.trace("can't execute due to a cluster block, retrying", blockException);
retry(blockException, newState -> {
ClusterBlockException newException = checkBlock(request, newState);
return (newException == null || !newException.retryable());
ActionListener<Response> delegate = new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
listener.onResponse(response);
}
@Override
public void onFailure(Exception t) {
if (t instanceof Discovery.FailedToCommitClusterStateException
|| (t instanceof NotMasterException)) {
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
retry(t, masterChangePredicate);
} else {
listener.onFailure(t);
}
}
};
threadPool.executor(executor).execute(new ActionRunnable(delegate) {
@Override
protected void doRun() throws Exception {
masterOperation(task, request, clusterState, delegate);
}
});
}
} else {
ActionListener<Response> delegate = new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
listener.onResponse(response);
}
@Override
public void onFailure(Exception t) {
if (t instanceof Discovery.FailedToCommitClusterStateException
|| (t instanceof NotMasterException)) {
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
retry(t, masterChangePredicate);
} else {
listener.onFailure(t);
}
}
};
threadPool.executor(executor).execute(new ActionRunnable(delegate) {
@Override
protected void doRun() throws Exception {
masterOperation(task, request, clusterState, delegate);
}
});
}
} else {
if (nodes.getMasterNode() == null) {
logger.debug("no known master node, scheduling a retry");
retry(null, masterChangePredicate);
} else {
DiscoveryNode masterNode = nodes.getMasterNode();
final String actionName = getMasterActionName(masterNode);
transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler<Response>(listener,
TransportMasterNodeAction.this::newResponse) {
@Override
public void handleException(final TransportException exp) {
Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
if (nodes.getMasterNode() == null) {
logger.debug("no known master node, scheduling a retry");
retry(null, masterChangePredicate);
} else {
DiscoveryNode masterNode = nodes.getMasterNode();
final String actionName = getMasterActionName(masterNode);
transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler<Response>(listener,
TransportMasterNodeAction.this::newResponse) {
@Override
public void handleException(final TransportException exp) {
Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
actionName, nodes.getMasterNode(), exp.getDetailedMessage());
retry(cause, masterChangePredicate);
} else {
listener.onFailure(exp);
retry(cause, masterChangePredicate);
} else {
listener.onFailure(exp);
}
}
}
});
});
}
}
} catch (Exception e) {
listener.onFailure(e);
}
}

View File

@ -242,6 +242,39 @@ public class TransportMasterNodeActionTests extends ESTestCase {
}
}
public void testCheckBlockThrowsException() throws InterruptedException {
boolean throwExceptionOnRetry = randomBoolean();
Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(60));
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ClusterBlock block = new ClusterBlock(1, "", true, true,
false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
ClusterState stateWithBlock = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
setState(clusterService, stateWithBlock);
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
Set<ClusterBlock> blocks = state.blocks().global();
if (throwExceptionOnRetry == false || blocks.isEmpty()) {
throw new RuntimeException("checkBlock has thrown exception");
}
return new ClusterBlockException(blocks);
}
}.execute(request, listener);
if (throwExceptionOnRetry == false) {
assertListenerThrows("checkBlock has thrown exception", listener, RuntimeException.class);
} else {
assertFalse(listener.isDone());
setState(clusterService, ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build());
assertListenerThrows("checkBlock has thrown exception", listener, RuntimeException.class);
}
}
public void testForceLocalOperation() throws ExecutionException, InterruptedException {
Request request = new Request();
PlainActionFuture<Response> listener = new PlainActionFuture<>();