diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index f67a2bbb95a..d4661a45dba 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -418,47 +418,49 @@ public class CloseJobAction extends Action failures = new AtomicArray<>(numberOfJobs); for (String jobId : request.resolvedJobIds) { - auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); - PersistentTask jobTask = validateAndReturnJobTask(jobId, currentState); - persistentTasksService.cancelPersistentTask(jobTask.getId(), - new ActionListener>() { - @Override - public void onResponse(PersistentTask task) { - if (counter.incrementAndGet() == numberOfJobs) { - sendResponseOrFailure(request.getJobId(), listener, failures); - } - } - - @Override - public void onFailure(Exception e) { - final int slot = counter.incrementAndGet(); - failures.set(slot - 1, e); - if (slot == numberOfJobs) { - sendResponseOrFailure(request.getJobId(), listener, failures); - } - } - - private void sendResponseOrFailure(String jobId, - ActionListener listener, - AtomicArray failures) { - List catchedExceptions = failures.asList(); - if (catchedExceptions.size() == 0) { - listener.onResponse(new Response(true)); - return; + Optional> jobTask = validateAndReturnJobTask(jobId, currentState); + if (jobTask.isPresent()) { + auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); + persistentTasksService.cancelPersistentTask(jobTask.get().getId(), + new ActionListener>() { + @Override + public void onResponse(PersistentTask task) { + if (counter.incrementAndGet() == numberOfJobs) { + sendResponseOrFailure(request.getJobId(), listener, failures); + } } - String msg = "Failed to force close job [" + jobId + "] with [" - + catchedExceptions.size() - + "] failures, rethrowing last, all Exceptions: [" - + catchedExceptions.stream().map(Exception::getMessage) - .collect(Collectors.joining(", ")) - + "]"; + @Override + public void onFailure(Exception e) { + final int slot = counter.incrementAndGet(); + failures.set(slot - 1, e); + if (slot == numberOfJobs) { + sendResponseOrFailure(request.getJobId(), listener, failures); + } + } - ElasticsearchException e = new ElasticsearchException(msg, - catchedExceptions.get(0)); - listener.onFailure(e); - } - }); + private void sendResponseOrFailure(String jobId, + ActionListener listener, + AtomicArray failures) { + List catchedExceptions = failures.asList(); + if (catchedExceptions.size() == 0) { + listener.onResponse(new Response(true)); + return; + } + + String msg = "Failed to force close job [" + jobId + "] with [" + + catchedExceptions.size() + + "] failures, rethrowing last, all Exceptions: [" + + catchedExceptions.stream().map(Exception::getMessage) + .collect(Collectors.joining(", ")) + + "]"; + + ElasticsearchException e = new ElasticsearchException(msg, + catchedExceptions.get(0)); + listener.onFailure(e); + } + }); + } } } @@ -467,9 +469,16 @@ public class CloseJobAction extends Action jobIdToPersistentTaskId = new HashMap<>(); for (String jobId : request.resolvedJobIds) { - auditor.info(jobId, Messages.JOB_AUDIT_CLOSING); - PersistentTask jobTask = validateAndReturnJobTask(jobId, currentState); - jobIdToPersistentTaskId.put(jobId, jobTask.getId()); + Optional> jobTask = validateAndReturnJobTask(jobId, currentState); + if (jobTask.isPresent()) { + auditor.info(jobId, Messages.JOB_AUDIT_CLOSING); + jobIdToPersistentTaskId.put(jobId, jobTask.get().getId()); + } + } + + // An empty map means all the jobs in the request are currently closed. + if (jobIdToPersistentTaskId.isEmpty()) { + listener.onResponse(new Response(true)); } ActionListener finalListener = @@ -527,8 +536,11 @@ public class CloseJobAction extends Action jobTask = MlMetadata.getJobTask(resolvedJobId, tasks); - if (jobTask == null) { - continue; - } - if (MlMetadata.getJobState(resolvedJobId, tasks).isAnyOf(JobState.OPENED, JobState.FAILED) == false) { continue; } @@ -562,7 +569,22 @@ public class CloseJobAction extends Action validateAndReturnJobTask(String jobId, ClusterState state) { + /** + * Validate the close request. Throws an exception on any of these conditions: + *
    + *
  • If the job does not exist
  • + *
  • If the job has a data feed the feed must be closed first
  • + *
  • If the job is opening i.e. the job has a task but the task has null status
  • + *
  • If the job is not already closed, opened or failed + * i.e. the job is in the {@link JobState#CLOSING} state
  • + *
+ * + * If the job is already closed an empty Optional is returned. + * @param jobId Job Id + * @param state Current cluster state + * @return The Job PersistentTask or an empty optional if the job is closed. + */ + static Optional> validateAndReturnJobTask(String jobId, ClusterState state) { MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); Job job = mlMetadata.getJobs().get(jobId); if (job == null) { @@ -572,9 +594,12 @@ public class CloseJobAction extends Action jobTask = MlMetadata.getJobTask(jobId, tasks); - if (jobTask == null || jobTask.getStatus() == null) { - throw ExceptionsHelper.conflictStatusException("cannot close job, because job [" + jobId + "] is " + JobState.CLOSED); + if (jobTask == null) { + return Optional.empty(); + } else if (jobTask.getStatus() == null) { + throw ExceptionsHelper.conflictStatusException("cannot close job, because job [" + jobId + "] is opening"); } + JobTaskStatus jobTaskStatus = (JobTaskStatus) jobTask.getStatus(); if (jobTaskStatus.getState().isAnyOf(JobState.OPENED, JobState.FAILED) == false) { throw ExceptionsHelper.conflictStatusException("cannot close job, because job [" + jobId + "] is " + jobTaskStatus.getState()); @@ -587,7 +612,7 @@ public class CloseJobAction extends Action> persistentTask = + CloseJobAction.validateAndReturnJobTask("closed-job", cs1); + assertFalse(persistentTask.isPresent()); + } + + public void testValidate_jobIsOpening() { + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("opening-job").build(new Date()), false); + + // An opening job has a null status field + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("opening-job", null, null, tasksBuilder); + + ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) + .build(); + + ElasticsearchStatusException conflictException = + expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.validateAndReturnJobTask("opening-job", cs1)); + assertEquals(RestStatus.CONFLICT, conflictException.status()); + } + + public void testValidate_jobIsMissing() { + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("missing-job", null, null, tasksBuilder); + + ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) + .build(); + + expectThrows(ResourceNotFoundException.class, () -> CloseJobAction.validateAndReturnJobTask("missing-job", cs1)); + } + + public void testResolve_givenAll() { + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()), false); + mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()), false); + mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()), false); + mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_4").build(new Date()), false); + mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_5").build(new Date()), false); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id_1", null, JobState.OPENED, tasksBuilder); - addJobTask("job_id_2", null, JobState.CLOSED, tasksBuilder); + addJobTask("job_id_2", null, JobState.OPENED, tasksBuilder); addJobTask("job_id_3", null, JobState.FAILED, tasksBuilder); + addJobTask("job_id_4", null, JobState.CLOSING, tasksBuilder); ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) .build(); - assertEquals(Arrays.asList("job_id_1", "job_id_3"), + assertEquals(Arrays.asList("job_id_1", "job_id_2", "job_id_3"), CloseJobAction.resolveAndValidateJobId("_all", cs1)); } + public void testResolve_givenJobId() { + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_1").build(new Date()), false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id_1", null, JobState.OPENED, tasksBuilder); + + ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) + .build(); + + assertEquals(Arrays.asList("job_id_1"), + CloseJobAction.resolveAndValidateJobId("job_id_1", cs1)); + + // Job without task is closed + cs1 = ClusterState.builder(new ClusterName("_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())) + .build(); + + assertEquals(Collections.emptyList(), + CloseJobAction.resolveAndValidateJobId("job_id_1", cs1)); + } + public static void addTask(String datafeedId, long startTime, String nodeId, DatafeedState state, PersistentTasksCustomMetaData.Builder tasks) { tasks.addTask(MlMetadata.datafeedTaskId(datafeedId), StartDatafeedAction.TASK_NAME, diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml index ff06bd74b8c..de8658f732e 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml @@ -381,6 +381,84 @@ - match: metadata.persistent_tasks.tasks: [] +--- +"Test closing a closed job isn't an error": + - do: + xpack.ml.put_job: + job_id: jobs-crud-close-a-closed-job + body: > + { + "description":"Analysis of response time by airline", + "analysis_config" : { + "detectors" :[{"function":"count"}] + }, + "data_description" : { + "format":"xcontent" + } + } + - match: { job_id: "jobs-crud-close-a-closed-job" } + + - do: + xpack.ml.open_job: + job_id: jobs-crud-close-a-closed-job + + - do: + xpack.ml.close_job: + job_id: jobs-crud-close-a-closed-job + - match: { closed: true } + + - do: + xpack.ml.close_job: + job_id: jobs-crud-close-a-closed-job + - match: { closed: true } + +--- +"Test close all jobs": + - do: + xpack.ml.put_job: + job_id: jobs-crud-close-all-1 + body: > + { + "description":"Analysis of response time by airline", + "analysis_config" : { + "detectors" :[{"function":"count"}] + }, + "data_description" : { + "format":"xcontent" + } + } + - match: { job_id: "jobs-crud-close-all-1" } + + - do: + xpack.ml.put_job: + job_id: jobs-crud-close-all-2 + body: > + { + "description":"Analysis of response time by airline", + "analysis_config" : { + "detectors" :[{"function":"count"}] + }, + "data_description" : { + "format":"xcontent" + } + } + - match: { job_id: "jobs-crud-close-all-2" } + + - do: + xpack.ml.open_job: + job_id: jobs-crud-close-all-2 + + - do: + xpack.ml.close_job: + job_id: _all + - match: { closed: true } + + - do: + xpack.ml.get_job_stats: + job_id: _all + - match: { jobs.0.state: closed } + - match: { jobs.1.state: closed } + --- "Test force close job": @@ -438,3 +516,36 @@ filter_path: metadata.persistent_tasks - match: metadata.persistent_tasks.tasks: [] + +--- +"Test force closing a closed job isn't an error": + - do: + xpack.ml.put_job: + job_id: jobs-crud-close-a-closed-job + body: > + { + "description":"Analysis of response time by airline", + "analysis_config" : { + "detectors" :[{"function":"count"}] + }, + "data_description" : { + "format":"xcontent" + } + } + - match: { job_id: "jobs-crud-close-a-closed-job" } + + - do: + xpack.ml.open_job: + job_id: jobs-crud-close-a-closed-job + + - do: + xpack.ml.close_job: + job_id: jobs-crud-close-a-closed-job + force: true + - match: { closed: true } + + - do: + xpack.ml.close_job: + job_id: jobs-crud-close-a-closed-job + force: true + - match: { closed: true } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/post_data.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/post_data.yaml index b8095377755..907bb289dff 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/post_data.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/post_data.yaml @@ -196,18 +196,13 @@ setup: job_id: not_a_job --- -"Test flushing, posting and closing a closed job": +"Test flushing and posting a closed job": - do: catch: /status_exception/ xpack.ml.flush_job: job_id: post-data-closed-job - - do: - catch: /status_exception/ - xpack.ml.close_job: - job_id: post-data-closed-job - - do: catch: /status_exception/ xpack.ml.post_data: diff --git a/qa/smoke-test-ml-with-security/build.gradle b/qa/smoke-test-ml-with-security/build.gradle index a945c098cb1..e4cc2ae39fd 100644 --- a/qa/smoke-test-ml-with-security/build.gradle +++ b/qa/smoke-test-ml-with-security/build.gradle @@ -39,7 +39,7 @@ integTestRunner { 'ml/jobs_get_result_categories/Test with invalid param combinations via body', 'ml/jobs_get_stats/Test get job stats given missing job', 'ml/post_data/Test Flush data with invalid parameters', - 'ml/post_data/Test flushing, posting and closing a closed job', + 'ml/post_data/Test flushing and posting a closed job', 'ml/post_data/Test open and close with non-existent job id', 'ml/post_data/Test POST data with invalid parameters', 'ml/preview_datafeed/Test preview missing datafeed',