From c63d32482fa7f5b4b5430fd6b84be5322f2ce0b3 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 28 Mar 2018 10:06:14 +0100 Subject: [PATCH] [ML] Avoid timeout if ML persistent task assignment fails on master node (elastic/x-pack-elasticsearch#4236) The ML open_job and start_datafeed endpoints start persistent tasks and wait for these to be successfully assigned before returning. Since the setup sequence is complex they do a "fast fail" validation step on the coordinating node before the setup sequence. However, this leads to the possibility of the "fast fail" validation succeeding and the eventual persistent task assignment failing due to other changes during the setup sequence. Previously when this happened the endpoints would time out, which in the case of the open_job action takes 30 minutes by default. The start_datafeed endpoint has a shorter default timeout of 20 seconds, but in both cases the result of a timeout is an unfriendly HTTP 500 status. This change adjusts the criteria used to wait for the persistent tasks to be assigned to account for the possibility of assignment failure and, if this happens, return an error identical to what the "fast fail" validation would have returned. Additionally in this case the unassigned persistent task is cancelled, leaving the system in the same state as if the "fast fail" validation had failed. Original commit: elastic/x-pack-elasticsearch@16916cbc13077169f9fe788a224eb3f958baa805 --- .../ml/action/TransportOpenJobAction.java | 50 +++++++++++++- .../action/TransportStartDatafeedAction.java | 69 ++++++++++++++++--- .../rest-api-spec/test/ml/jobs_crud.yml | 51 +++++++++++++- .../test/ml/start_stop_datafeed.yml | 51 ++++++++++++++ 4 files changed, 205 insertions(+), 16 deletions(-) diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 658c3a5b607..137b193aaaf 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -510,14 +510,20 @@ public class TransportOpenJobAction extends TransportMasterNodeAction listener) { + private void waitForJobStarted(String taskId, OpenJobAction.JobParams jobParams, ActionListener listener) { JobPredicate predicate = new JobPredicate(); persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, jobParams.getTimeout(), new PersistentTasksService.WaitForPersistentTaskStatusListener() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { if (predicate.exception != null) { - listener.onFailure(predicate.exception); + if (predicate.shouldCancel) { + // We want to return to the caller without leaving an unassigned persistent task, to match + // what would have happened if the error had been detected in the "fast fail" validation + cancelJobStart(persistentTask, predicate.exception, listener); + } else { + listener.onFailure(predicate.exception); + } } else { listener.onResponse(new OpenJobAction.Response(predicate.opened)); } @@ -536,6 +542,27 @@ public class TransportOpenJobAction extends TransportMasterNodeAction persistentTask, Exception exception, + ActionListener listener) { + persistentTasksService.cancelPersistentTask(persistentTask.getId(), + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + // We succeeded in cancelling the persistent task, but the + // problem that caused us to cancel it is the overall result + listener.onFailure(exception); + } + + @Override + public void onFailure(Exception e) { + logger.error("[" + persistentTask.getParams().getJobId() + "] Failed to cancel persistent task that could " + + "not be assigned due to [" + exception.getMessage() + "]", e); + listener.onFailure(exception); + } + } + ); + } + private void addDocMappingIfMissing(String alias, CheckedSupplier mappingSupplier, ClusterState state, ActionListener listener) { AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias); @@ -699,6 +726,10 @@ public class TransportOpenJobAction extends TransportMasterNodeAction persistentTask) { @@ -713,6 +745,20 @@ public class TransportOpenJobAction extends TransportMasterNodeAction listener) { - Predicate> predicate = persistentTask -> { - if (persistentTask == null) { - return false; - } - DatafeedState datafeedState = (DatafeedState) persistentTask.getStatus(); - return datafeedState == DatafeedState.STARTED; - }; + private void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedParams params, + ActionListener listener) { + DatafeedPredicate predicate = new DatafeedPredicate(); persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, params.getTimeout(), new PersistentTasksService.WaitForPersistentTaskStatusListener() { @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { - listener.onResponse(new StartDatafeedAction.Response(true)); + public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { + if (predicate.exception != null) { + // We want to return to the caller without leaving an unassigned persistent task, to match + // what would have happened if the error had been detected in the "fast fail" validation + cancelDatafeedStart(persistentTask, predicate.exception, listener); + } else { + listener.onResponse(new StartDatafeedAction.Response(true)); + } } @Override @@ -182,6 +182,27 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction persistentTask, + Exception exception, ActionListener listener) { + persistentTasksService.cancelPersistentTask(persistentTask.getId(), + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + // We succeeded in cancelling the persistent task, but the + // problem that caused us to cancel it is the overall result + listener.onFailure(exception); + } + + @Override + public void onFailure(Exception e) { + logger.error("[" + persistentTask.getParams().getDatafeedId() + "] Failed to cancel persistent task that could " + + "not be assigned due to [" + exception.getMessage() + "]", e); + listener.onFailure(exception); + } + } + ); + } + public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor { private final DatafeedManager datafeedManager; private final IndexNameExpressionResolver resolver; @@ -284,4 +305,30 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction> { + + private volatile Exception exception; + + @Override + public boolean test(PersistentTasksCustomMetaData.PersistentTask persistentTask) { + if (persistentTask == null) { + return false; + } + PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment(); + if (assignment != null && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false && + assignment.isAssigned() == false) { + // Assignment has failed despite passing our "fast fail" validation + exception = new ElasticsearchStatusException("Could not start datafeed, allocation explanation [" + + assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS); + return true; + } + DatafeedState datafeedState = (DatafeedState) persistentTask.getStatus(); + return datafeedState == DatafeedState.STARTED; + } + } } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml index 92e68329ece..9ed14c2f860 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml @@ -1061,8 +1061,7 @@ body: transient: xpack.ml.max_model_memory_limit: "9g" - flat_settings: true - - match: {transient: {xpack.ml.max_model_memory_limit: "9g"}} + - match: {transient.xpack.ml.max_model_memory_limit: "9g"} - do: xpack.ml.put_job: @@ -1104,7 +1103,6 @@ body: transient: xpack.ml.max_model_memory_limit: null - flat_settings: true - match: {transient: {}} - do: @@ -1353,3 +1351,50 @@ } - match: { job_id: "jobs-function-shortcut-expansion" } - match: { analysis_config.detectors.0.function: "non_zero_count"} + +--- +"Test open job when persistent task allocation disabled": + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + cluster.put_settings: + body: + transient: + cluster.persistent_tasks.allocation.enable: "none" + - match: {transient.cluster.persistent_tasks.allocation.enable: "none"} + + - do: + xpack.ml.put_job: + job_id: persistent-task-allocation-allowed-test + body: > + { + "analysis_config" : { + "detectors" :[{"function":"count"}] + }, + "data_description" : { + }, + "analysis_limits": { + "model_memory_limit": "10m" + } + } + - match: { job_id: "persistent-task-allocation-allowed-test" } + + - do: + catch: /no persistent task assignments are allowed due to cluster settings/ + xpack.ml.open_job: + job_id: persistent-task-allocation-allowed-test + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + cluster.put_settings: + body: + transient: + cluster.persistent_tasks.allocation.enable: "all" + - match: {transient.cluster.persistent_tasks.allocation.enable: "all"} + + - do: + xpack.ml.open_job: + job_id: persistent-task-allocation-allowed-test + - match: { opened: true } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml index 55938068cfc..d216ecfe13e 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml @@ -393,3 +393,54 @@ setup: xpack.ml.get_datafeed_stats: datafeed_id: "start-stop-datafeed-job-bar-1-feed" - match: { datafeeds.0.state: "started"} + +--- +"Test start datafeed when persistent task allocation disabled": + + - do: + xpack.ml.open_job: + job_id: "start-stop-datafeed-job" + - match: { opened: true } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + cluster.put_settings: + body: + transient: + cluster.persistent_tasks.allocation.enable: "none" + - match: {transient.cluster.persistent_tasks.allocation.enable: "none"} + + - do: + catch: /no persistent task assignments are allowed due to cluster settings/ + xpack.ml.start_datafeed: + datafeed_id: "start-stop-datafeed-datafeed-1" + start: 0 + + - do: + xpack.ml.get_datafeed_stats: + datafeed_id: "start-stop-datafeed-datafeed-1" + - match: { datafeeds.0.state: stopped } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + cluster.put_settings: + body: + transient: + cluster.persistent_tasks.allocation.enable: "all" + - match: {transient.cluster.persistent_tasks.allocation.enable: "all"} + + - do: + xpack.ml.start_datafeed: + datafeed_id: "start-stop-datafeed-datafeed-1" + start: 0 + + - do: + xpack.ml.get_datafeed_stats: + datafeed_id: "start-stop-datafeed-datafeed-1" + - match: { datafeeds.0.state: started } + + - do: + xpack.ml.stop_datafeed: + datafeed_id: "start-stop-datafeed-datafeed-1"