[ML] Avoid timeout if ML persistent task assignment fails on master node (elastic/x-pack-elasticsearch#4236)

The ML open_job and start_datafeed endpoints start persistent tasks and
wait for these to be successfully assigned before returning.  Since the
setup sequence is complex they do a "fast fail" validation step on the
coordinating node before the setup sequence.  However, this leads to the
possibility of the "fast fail" validation succeeding and the eventual
persistent task assignment failing due to other changes during the setup
sequence.  Previously when this happened the endpoints would time out,
which in the case of the open_job action takes 30 minutes by default.
The start_datafeed endpoint has a shorter default timeout of 20 seconds,
but in both cases the result of a timeout is an unfriendly HTTP 500
status.

This change adjusts the criteria used to wait for the persistent tasks to
be assigned to account for the possibility of assignment failure and, if
this happens, return an error identical to what the "fast fail"
validation would have returned.  Additionally in this case the unassigned
persistent task is cancelled, leaving the system in the same state as if
the "fast fail" validation had failed.

Original commit: elastic/x-pack-elasticsearch@16916cbc13
This commit is contained in:
David Roberts 2018-03-28 10:06:14 +01:00 committed by GitHub
parent b7515f03cf
commit c63d32482f
4 changed files with 205 additions and 16 deletions

View File

@ -510,14 +510,20 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}
}
void waitForJobStarted(String taskId, OpenJobAction.JobParams jobParams, ActionListener<OpenJobAction.Response> listener) {
private void waitForJobStarted(String taskId, OpenJobAction.JobParams jobParams, ActionListener<OpenJobAction.Response> listener) {
JobPredicate predicate = new JobPredicate();
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, jobParams.getTimeout(),
new PersistentTasksService.WaitForPersistentTaskStatusListener<OpenJobAction.JobParams>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask) {
if (predicate.exception != null) {
listener.onFailure(predicate.exception);
if (predicate.shouldCancel) {
// We want to return to the caller without leaving an unassigned persistent task, to match
// what would have happened if the error had been detected in the "fast fail" validation
cancelJobStart(persistentTask, predicate.exception, listener);
} else {
listener.onFailure(predicate.exception);
}
} else {
listener.onResponse(new OpenJobAction.Response(predicate.opened));
}
@ -536,6 +542,27 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
});
}
private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,
ActionListener<OpenJobAction.Response> listener) {
persistentTasksService.cancelPersistentTask(persistentTask.getId(),
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
// We succeeded in cancelling the persistent task, but the
// problem that caused us to cancel it is the overall result
listener.onFailure(exception);
}
@Override
public void onFailure(Exception e) {
logger.error("[" + persistentTask.getParams().getJobId() + "] Failed to cancel persistent task that could " +
"not be assigned due to [" + exception.getMessage() + "]", e);
listener.onFailure(exception);
}
}
);
}
private void addDocMappingIfMissing(String alias, CheckedSupplier<XContentBuilder, IOException> mappingSupplier, ClusterState state,
ActionListener<Boolean> listener) {
AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias);
@ -699,6 +726,10 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}
/**
* This class contains the wait logic for waiting for a job's persistent task to be allocated on
* job opening. It should only be used in the open job action, and never at other times the job's
* persistent task may be assigned to a node, for example on recovery from node failures.
*
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
* of endpoints waiting for a condition tested by this predicate would never get a response.
*/
@ -706,6 +737,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private volatile boolean opened;
private volatile Exception exception;
private volatile boolean shouldCancel;
@Override
public boolean test(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
@ -713,6 +745,20 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
if (persistentTask != null) {
JobTaskStatus jobStateStatus = (JobTaskStatus) persistentTask.getStatus();
jobState = jobStateStatus == null ? JobState.OPENING : jobStateStatus.getState();
PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment();
// This logic is only appropriate when opening a job, not when reallocating following a failure,
// and this is why this class must only be used when opening a job
if (assignment != null && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false &&
assignment.isAssigned() == false) {
// Assignment has failed on the master node despite passing our "fast fail" validation
exception = new ElasticsearchStatusException("Could not open job because no suitable nodes were found, " +
"allocation explanation [" + assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS);
// The persistent task should be cancelled so that the observed outcome is the
// same as if the "fast fail" validation on the coordinating node had failed
shouldCancel = true;
return true;
}
}
switch (jobState) {
case OPENING:

View File

@ -153,20 +153,20 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedParams params,
ActionListener<StartDatafeedAction.Response> listener) {
Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> predicate = persistentTask -> {
if (persistentTask == null) {
return false;
}
DatafeedState datafeedState = (DatafeedState) persistentTask.getStatus();
return datafeedState == DatafeedState.STARTED;
};
private void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedParams params,
ActionListener<StartDatafeedAction.Response> listener) {
DatafeedPredicate predicate = new DatafeedPredicate();
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, params.getTimeout(),
new PersistentTasksService.WaitForPersistentTaskStatusListener<StartDatafeedAction.DatafeedParams>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> task) {
listener.onResponse(new StartDatafeedAction.Response(true));
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
if (predicate.exception != null) {
// We want to return to the caller without leaving an unassigned persistent task, to match
// what would have happened if the error had been detected in the "fast fail" validation
cancelDatafeedStart(persistentTask, predicate.exception, listener);
} else {
listener.onResponse(new StartDatafeedAction.Response(true));
}
}
@Override
@ -182,6 +182,27 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
});
}
private void cancelDatafeedStart(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask,
Exception exception, ActionListener<StartDatafeedAction.Response> listener) {
persistentTasksService.cancelPersistentTask(persistentTask.getId(),
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
// We succeeded in cancelling the persistent task, but the
// problem that caused us to cancel it is the overall result
listener.onFailure(exception);
}
@Override
public void onFailure(Exception e) {
logger.error("[" + persistentTask.getParams().getDatafeedId() + "] Failed to cancel persistent task that could " +
"not be assigned due to [" + exception.getMessage() + "]", e);
listener.onFailure(exception);
}
}
);
}
public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor<StartDatafeedAction.DatafeedParams> {
private final DatafeedManager datafeedManager;
private final IndexNameExpressionResolver resolver;
@ -284,4 +305,30 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
}
}
}
/**
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
* of endpoints waiting for a condition tested by this predicate would never get a response.
*/
private class DatafeedPredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> {
private volatile Exception exception;
@Override
public boolean test(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
if (persistentTask == null) {
return false;
}
PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment();
if (assignment != null && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false &&
assignment.isAssigned() == false) {
// Assignment has failed despite passing our "fast fail" validation
exception = new ElasticsearchStatusException("Could not start datafeed, allocation explanation [" +
assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS);
return true;
}
DatafeedState datafeedState = (DatafeedState) persistentTask.getStatus();
return datafeedState == DatafeedState.STARTED;
}
}
}

View File

@ -1061,8 +1061,7 @@
body:
transient:
xpack.ml.max_model_memory_limit: "9g"
flat_settings: true
- match: {transient: {xpack.ml.max_model_memory_limit: "9g"}}
- match: {transient.xpack.ml.max_model_memory_limit: "9g"}
- do:
xpack.ml.put_job:
@ -1104,7 +1103,6 @@
body:
transient:
xpack.ml.max_model_memory_limit: null
flat_settings: true
- match: {transient: {}}
- do:
@ -1353,3 +1351,50 @@
}
- match: { job_id: "jobs-function-shortcut-expansion" }
- match: { analysis_config.detectors.0.function: "non_zero_count"}
---
"Test open job when persistent task allocation disabled":
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
cluster.put_settings:
body:
transient:
cluster.persistent_tasks.allocation.enable: "none"
- match: {transient.cluster.persistent_tasks.allocation.enable: "none"}
- do:
xpack.ml.put_job:
job_id: persistent-task-allocation-allowed-test
body: >
{
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"data_description" : {
},
"analysis_limits": {
"model_memory_limit": "10m"
}
}
- match: { job_id: "persistent-task-allocation-allowed-test" }
- do:
catch: /no persistent task assignments are allowed due to cluster settings/
xpack.ml.open_job:
job_id: persistent-task-allocation-allowed-test
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
cluster.put_settings:
body:
transient:
cluster.persistent_tasks.allocation.enable: "all"
- match: {transient.cluster.persistent_tasks.allocation.enable: "all"}
- do:
xpack.ml.open_job:
job_id: persistent-task-allocation-allowed-test
- match: { opened: true }

View File

@ -393,3 +393,54 @@ setup:
xpack.ml.get_datafeed_stats:
datafeed_id: "start-stop-datafeed-job-bar-1-feed"
- match: { datafeeds.0.state: "started"}
---
"Test start datafeed when persistent task allocation disabled":
- do:
xpack.ml.open_job:
job_id: "start-stop-datafeed-job"
- match: { opened: true }
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
cluster.put_settings:
body:
transient:
cluster.persistent_tasks.allocation.enable: "none"
- match: {transient.cluster.persistent_tasks.allocation.enable: "none"}
- do:
catch: /no persistent task assignments are allowed due to cluster settings/
xpack.ml.start_datafeed:
datafeed_id: "start-stop-datafeed-datafeed-1"
start: 0
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "start-stop-datafeed-datafeed-1"
- match: { datafeeds.0.state: stopped }
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
cluster.put_settings:
body:
transient:
cluster.persistent_tasks.allocation.enable: "all"
- match: {transient.cluster.persistent_tasks.allocation.enable: "all"}
- do:
xpack.ml.start_datafeed:
datafeed_id: "start-stop-datafeed-datafeed-1"
start: 0
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "start-stop-datafeed-datafeed-1"
- match: { datafeeds.0.state: started }
- do:
xpack.ml.stop_datafeed:
datafeed_id: "start-stop-datafeed-datafeed-1"