diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 658c3a5b607..137b193aaaf 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -510,14 +510,20 @@ public class TransportOpenJobAction extends TransportMasterNodeAction listener) { + private void waitForJobStarted(String taskId, OpenJobAction.JobParams jobParams, ActionListener listener) { JobPredicate predicate = new JobPredicate(); persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, jobParams.getTimeout(), new PersistentTasksService.WaitForPersistentTaskStatusListener() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { if (predicate.exception != null) { - listener.onFailure(predicate.exception); + if (predicate.shouldCancel) { + // We want to return to the caller without leaving an unassigned persistent task, to match + // what would have happened if the error had been detected in the "fast fail" validation + cancelJobStart(persistentTask, predicate.exception, listener); + } else { + listener.onFailure(predicate.exception); + } } else { listener.onResponse(new OpenJobAction.Response(predicate.opened)); } @@ -536,6 +542,27 @@ public class TransportOpenJobAction extends TransportMasterNodeAction persistentTask, Exception exception, + ActionListener listener) { + persistentTasksService.cancelPersistentTask(persistentTask.getId(), + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + // We succeeded in cancelling the persistent task, but the + // problem that caused us to cancel it is the overall result + listener.onFailure(exception); + } + + @Override + public void onFailure(Exception e) { + logger.error("[" + persistentTask.getParams().getJobId() + "] Failed to cancel persistent task that could " + + "not be assigned due to [" + exception.getMessage() + "]", e); + listener.onFailure(exception); + } + } + ); + } + private void addDocMappingIfMissing(String alias, CheckedSupplier mappingSupplier, ClusterState state, ActionListener listener) { AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias); @@ -699,6 +726,10 @@ public class TransportOpenJobAction extends TransportMasterNodeAction persistentTask) { @@ -713,6 +745,20 @@ public class TransportOpenJobAction extends TransportMasterNodeAction listener) { - Predicate> predicate = persistentTask -> { - if (persistentTask == null) { - return false; - } - DatafeedState datafeedState = (DatafeedState) persistentTask.getStatus(); - return datafeedState == DatafeedState.STARTED; - }; + private void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedParams params, + ActionListener listener) { + DatafeedPredicate predicate = new DatafeedPredicate(); persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, params.getTimeout(), new PersistentTasksService.WaitForPersistentTaskStatusListener() { @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { - listener.onResponse(new StartDatafeedAction.Response(true)); + public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { + if (predicate.exception != null) { + // We want to return to the caller without leaving an unassigned persistent task, to match + // what would have happened if the error had been detected in the "fast fail" validation + cancelDatafeedStart(persistentTask, predicate.exception, listener); + } else { + listener.onResponse(new StartDatafeedAction.Response(true)); + } } @Override @@ -182,6 +182,27 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction persistentTask, + Exception exception, ActionListener listener) { + persistentTasksService.cancelPersistentTask(persistentTask.getId(), + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + // We succeeded in cancelling the persistent task, but the + // problem that caused us to cancel it is the overall result + listener.onFailure(exception); + } + + @Override + public void onFailure(Exception e) { + logger.error("[" + persistentTask.getParams().getDatafeedId() + "] Failed to cancel persistent task that could " + + "not be assigned due to [" + exception.getMessage() + "]", e); + listener.onFailure(exception); + } + } + ); + } + public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor { private final DatafeedManager datafeedManager; private final IndexNameExpressionResolver resolver; @@ -284,4 +305,30 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction> { + + private volatile Exception exception; + + @Override + public boolean test(PersistentTasksCustomMetaData.PersistentTask persistentTask) { + if (persistentTask == null) { + return false; + } + PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment(); + if (assignment != null && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false && + assignment.isAssigned() == false) { + // Assignment has failed despite passing our "fast fail" validation + exception = new ElasticsearchStatusException("Could not start datafeed, allocation explanation [" + + assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS); + return true; + } + DatafeedState datafeedState = (DatafeedState) persistentTask.getStatus(); + return datafeedState == DatafeedState.STARTED; + } + } } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml index 92e68329ece..9ed14c2f860 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml @@ -1061,8 +1061,7 @@ body: transient: xpack.ml.max_model_memory_limit: "9g" - flat_settings: true - - match: {transient: {xpack.ml.max_model_memory_limit: "9g"}} + - match: {transient.xpack.ml.max_model_memory_limit: "9g"} - do: xpack.ml.put_job: @@ -1104,7 +1103,6 @@ body: transient: xpack.ml.max_model_memory_limit: null - flat_settings: true - match: {transient: {}} - do: @@ -1353,3 +1351,50 @@ } - match: { job_id: "jobs-function-shortcut-expansion" } - match: { analysis_config.detectors.0.function: "non_zero_count"} + +--- +"Test open job when persistent task allocation disabled": + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + cluster.put_settings: + body: + transient: + cluster.persistent_tasks.allocation.enable: "none" + - match: {transient.cluster.persistent_tasks.allocation.enable: "none"} + + - do: + xpack.ml.put_job: + job_id: persistent-task-allocation-allowed-test + body: > + { + "analysis_config" : { + "detectors" :[{"function":"count"}] + }, + "data_description" : { + }, + "analysis_limits": { + "model_memory_limit": "10m" + } + } + - match: { job_id: "persistent-task-allocation-allowed-test" } + + - do: + catch: /no persistent task assignments are allowed due to cluster settings/ + xpack.ml.open_job: + job_id: persistent-task-allocation-allowed-test + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + cluster.put_settings: + body: + transient: + cluster.persistent_tasks.allocation.enable: "all" + - match: {transient.cluster.persistent_tasks.allocation.enable: "all"} + + - do: + xpack.ml.open_job: + job_id: persistent-task-allocation-allowed-test + - match: { opened: true } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml index 55938068cfc..d216ecfe13e 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml @@ -393,3 +393,54 @@ setup: xpack.ml.get_datafeed_stats: datafeed_id: "start-stop-datafeed-job-bar-1-feed" - match: { datafeeds.0.state: "started"} + +--- +"Test start datafeed when persistent task allocation disabled": + + - do: + xpack.ml.open_job: + job_id: "start-stop-datafeed-job" + - match: { opened: true } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + cluster.put_settings: + body: + transient: + cluster.persistent_tasks.allocation.enable: "none" + - match: {transient.cluster.persistent_tasks.allocation.enable: "none"} + + - do: + catch: /no persistent task assignments are allowed due to cluster settings/ + xpack.ml.start_datafeed: + datafeed_id: "start-stop-datafeed-datafeed-1" + start: 0 + + - do: + xpack.ml.get_datafeed_stats: + datafeed_id: "start-stop-datafeed-datafeed-1" + - match: { datafeeds.0.state: stopped } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + cluster.put_settings: + body: + transient: + cluster.persistent_tasks.allocation.enable: "all" + - match: {transient.cluster.persistent_tasks.allocation.enable: "all"} + + - do: + xpack.ml.start_datafeed: + datafeed_id: "start-stop-datafeed-datafeed-1" + start: 0 + + - do: + xpack.ml.get_datafeed_stats: + datafeed_id: "start-stop-datafeed-datafeed-1" + - match: { datafeeds.0.state: started } + + - do: + xpack.ml.stop_datafeed: + datafeed_id: "start-stop-datafeed-datafeed-1"