[ML] Not an error to close a job twice (elastic/x-pack-elasticsearch#1340)

* [ML] Not an error to close a job twice

* Error if job is opening

* Address review comments

* Test closed job isn’t resolved

Original commit: elastic/x-pack-elasticsearch@7da7b24c08
This commit is contained in:
David Kyle 2017-05-08 16:34:46 +01:00 committed by GitHub
parent 9264ad541e
commit e5b11d0222
5 changed files with 269 additions and 74 deletions

View File

@ -418,47 +418,49 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
final AtomicInteger counter = new AtomicInteger();
final AtomicArray<Exception> failures = new AtomicArray<>(numberOfJobs);
for (String jobId : request.resolvedJobIds) {
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
PersistentTask<?> jobTask = validateAndReturnJobTask(jobId, currentState);
persistentTasksService.cancelPersistentTask(jobTask.getId(),
new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> task) {
if (counter.incrementAndGet() == numberOfJobs) {
sendResponseOrFailure(request.getJobId(), listener, failures);
}
}
@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
failures.set(slot - 1, e);
if (slot == numberOfJobs) {
sendResponseOrFailure(request.getJobId(), listener, failures);
}
}
private void sendResponseOrFailure(String jobId,
ActionListener<Response> listener,
AtomicArray<Exception> failures) {
List<Exception> catchedExceptions = failures.asList();
if (catchedExceptions.size() == 0) {
listener.onResponse(new Response(true));
return;
Optional<PersistentTask<?>> jobTask = validateAndReturnJobTask(jobId, currentState);
if (jobTask.isPresent()) {
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
persistentTasksService.cancelPersistentTask(jobTask.get().getId(),
new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> task) {
if (counter.incrementAndGet() == numberOfJobs) {
sendResponseOrFailure(request.getJobId(), listener, failures);
}
}
String msg = "Failed to force close job [" + jobId + "] with ["
+ catchedExceptions.size()
+ "] failures, rethrowing last, all Exceptions: ["
+ catchedExceptions.stream().map(Exception::getMessage)
.collect(Collectors.joining(", "))
+ "]";
@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
failures.set(slot - 1, e);
if (slot == numberOfJobs) {
sendResponseOrFailure(request.getJobId(), listener, failures);
}
}
ElasticsearchException e = new ElasticsearchException(msg,
catchedExceptions.get(0));
listener.onFailure(e);
}
});
private void sendResponseOrFailure(String jobId,
ActionListener<Response> listener,
AtomicArray<Exception> failures) {
List<Exception> catchedExceptions = failures.asList();
if (catchedExceptions.size() == 0) {
listener.onResponse(new Response(true));
return;
}
String msg = "Failed to force close job [" + jobId + "] with ["
+ catchedExceptions.size()
+ "] failures, rethrowing last, all Exceptions: ["
+ catchedExceptions.stream().map(Exception::getMessage)
.collect(Collectors.joining(", "))
+ "]";
ElasticsearchException e = new ElasticsearchException(msg,
catchedExceptions.get(0));
listener.onFailure(e);
}
});
}
}
}
@ -467,9 +469,16 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
Map<String, String> jobIdToPersistentTaskId = new HashMap<>();
for (String jobId : request.resolvedJobIds) {
auditor.info(jobId, Messages.JOB_AUDIT_CLOSING);
PersistentTask<?> jobTask = validateAndReturnJobTask(jobId, currentState);
jobIdToPersistentTaskId.put(jobId, jobTask.getId());
Optional<PersistentTask<?>> jobTask = validateAndReturnJobTask(jobId, currentState);
if (jobTask.isPresent()) {
auditor.info(jobId, Messages.JOB_AUDIT_CLOSING);
jobIdToPersistentTaskId.put(jobId, jobTask.get().getId());
}
}
// An empty map means all the jobs in the request are currently closed.
if (jobIdToPersistentTaskId.isEmpty()) {
listener.onResponse(new Response(true));
}
ActionListener<Response> finalListener =
@ -527,8 +536,11 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
.custom(PersistentTasksCustomMetaData.TYPE);
if (!Job.ALL.equals(jobId)) {
validateAndReturnJobTask(jobId, state);
return Collections.singletonList(jobId);
if (validateAndReturnJobTask(jobId, state).isPresent()) {
return Collections.singletonList(jobId);
} else {
return Collections.emptyList();
}
}
if (mlMetadata.getJobs().isEmpty()) {
@ -545,11 +557,6 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
continue;
}
PersistentTask<?> jobTask = MlMetadata.getJobTask(resolvedJobId, tasks);
if (jobTask == null) {
continue;
}
if (MlMetadata.getJobState(resolvedJobId, tasks).isAnyOf(JobState.OPENED, JobState.FAILED) == false) {
continue;
}
@ -562,7 +569,22 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
return matchedJobs;
}
static PersistentTask<?> validateAndReturnJobTask(String jobId, ClusterState state) {
/**
* Validate the close request. Throws an exception on any of these conditions:
* <ul>
* <li>If the job does not exist</li>
* <li>If the job has a data feed the feed must be closed first</li>
* <li>If the job is opening i.e. the job has a task but the task has null status</li>
* <li>If the job is not already closed, opened or failed
* i.e. the job is in the {@link JobState#CLOSING} state</li>
* </ul>
*
* If the job is already closed an empty Optional is returned.
* @param jobId Job Id
* @param state Current cluster state
* @return The Job PersistentTask or an empty optional if the job is closed.
*/
static Optional<PersistentTask<?>> validateAndReturnJobTask(String jobId, ClusterState state) {
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
Job job = mlMetadata.getJobs().get(jobId);
if (job == null) {
@ -572,9 +594,12 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
PersistentTasksCustomMetaData tasks = state.getMetaData()
.custom(PersistentTasksCustomMetaData.TYPE);
PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask == null || jobTask.getStatus() == null) {
throw ExceptionsHelper.conflictStatusException("cannot close job, because job [" + jobId + "] is " + JobState.CLOSED);
if (jobTask == null) {
return Optional.empty();
} else if (jobTask.getStatus() == null) {
throw ExceptionsHelper.conflictStatusException("cannot close job, because job [" + jobId + "] is opening");
}
JobTaskStatus jobTaskStatus = (JobTaskStatus) jobTask.getStatus();
if (jobTaskStatus.getState().isAnyOf(JobState.OPENED, JobState.FAILED) == false) {
throw ExceptionsHelper.conflictStatusException("cannot close job, because job [" + jobId + "] is " + jobTaskStatus.getState());
@ -587,7 +612,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
throw ExceptionsHelper.conflictStatusException("cannot close job [{}], datafeed hasn't been stopped", jobId);
}
}
return jobTask;
return Optional.of(jobTask);
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
@ -24,6 +25,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignme
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Optional;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask;
@ -80,37 +82,99 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
CloseJobAction.validateAndReturnJobTask("job_id", cs2);
}
public void testResolve() {
public void testValidate_jobIsClosed() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()),
false);
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_1", "job_id_1",
Collections.singletonList("*")));
mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("closed-job").build(new Date()), false);
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()),
false);
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_2", "job_id_2",
Collections.singletonList("*")));
// A closed job doesn't have a task
ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()))
.build();
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()),
false);
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_3", "job_id_3",
Collections.singletonList("*")));
Optional<PersistentTasksCustomMetaData.PersistentTask<?>> persistentTask =
CloseJobAction.validateAndReturnJobTask("closed-job", cs1);
assertFalse(persistentTask.isPresent());
}
public void testValidate_jobIsOpening() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("opening-job").build(new Date()), false);
// An opening job has a null status field
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("opening-job", null, null, tasksBuilder);
ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
.build();
ElasticsearchStatusException conflictException =
expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.validateAndReturnJobTask("opening-job", cs1));
assertEquals(RestStatus.CONFLICT, conflictException.status());
}
public void testValidate_jobIsMissing() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("missing-job", null, null, tasksBuilder);
ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
.build();
expectThrows(ResourceNotFoundException.class, () -> CloseJobAction.validateAndReturnJobTask("missing-job", cs1));
}
public void testResolve_givenAll() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()), false);
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()), false);
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()), false);
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_4").build(new Date()), false);
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_5").build(new Date()), false);
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id_1", null, JobState.OPENED, tasksBuilder);
addJobTask("job_id_2", null, JobState.CLOSED, tasksBuilder);
addJobTask("job_id_2", null, JobState.OPENED, tasksBuilder);
addJobTask("job_id_3", null, JobState.FAILED, tasksBuilder);
addJobTask("job_id_4", null, JobState.CLOSING, tasksBuilder);
ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
.build();
assertEquals(Arrays.asList("job_id_1", "job_id_3"),
assertEquals(Arrays.asList("job_id_1", "job_id_2", "job_id_3"),
CloseJobAction.resolveAndValidateJobId("_all", cs1));
}
public void testResolve_givenJobId() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_1").build(new Date()), false);
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id_1", null, JobState.OPENED, tasksBuilder);
ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
.build();
assertEquals(Arrays.asList("job_id_1"),
CloseJobAction.resolveAndValidateJobId("job_id_1", cs1));
// Job without task is closed
cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()))
.build();
assertEquals(Collections.emptyList(),
CloseJobAction.resolveAndValidateJobId("job_id_1", cs1));
}
public static void addTask(String datafeedId, long startTime, String nodeId, DatafeedState state,
PersistentTasksCustomMetaData.Builder tasks) {
tasks.addTask(MlMetadata.datafeedTaskId(datafeedId), StartDatafeedAction.TASK_NAME,

View File

@ -381,6 +381,84 @@
- match:
metadata.persistent_tasks.tasks: []
---
"Test closing a closed job isn't an error":
- do:
xpack.ml.put_job:
job_id: jobs-crud-close-a-closed-job
body: >
{
"description":"Analysis of response time by airline",
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"xcontent"
}
}
- match: { job_id: "jobs-crud-close-a-closed-job" }
- do:
xpack.ml.open_job:
job_id: jobs-crud-close-a-closed-job
- do:
xpack.ml.close_job:
job_id: jobs-crud-close-a-closed-job
- match: { closed: true }
- do:
xpack.ml.close_job:
job_id: jobs-crud-close-a-closed-job
- match: { closed: true }
---
"Test close all jobs":
- do:
xpack.ml.put_job:
job_id: jobs-crud-close-all-1
body: >
{
"description":"Analysis of response time by airline",
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"xcontent"
}
}
- match: { job_id: "jobs-crud-close-all-1" }
- do:
xpack.ml.put_job:
job_id: jobs-crud-close-all-2
body: >
{
"description":"Analysis of response time by airline",
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"xcontent"
}
}
- match: { job_id: "jobs-crud-close-all-2" }
- do:
xpack.ml.open_job:
job_id: jobs-crud-close-all-2
- do:
xpack.ml.close_job:
job_id: _all
- match: { closed: true }
- do:
xpack.ml.get_job_stats:
job_id: _all
- match: { jobs.0.state: closed }
- match: { jobs.1.state: closed }
---
"Test force close job":
@ -438,3 +516,36 @@
filter_path: metadata.persistent_tasks
- match:
metadata.persistent_tasks.tasks: []
---
"Test force closing a closed job isn't an error":
- do:
xpack.ml.put_job:
job_id: jobs-crud-close-a-closed-job
body: >
{
"description":"Analysis of response time by airline",
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"xcontent"
}
}
- match: { job_id: "jobs-crud-close-a-closed-job" }
- do:
xpack.ml.open_job:
job_id: jobs-crud-close-a-closed-job
- do:
xpack.ml.close_job:
job_id: jobs-crud-close-a-closed-job
force: true
- match: { closed: true }
- do:
xpack.ml.close_job:
job_id: jobs-crud-close-a-closed-job
force: true
- match: { closed: true }

View File

@ -196,18 +196,13 @@ setup:
job_id: not_a_job
---
"Test flushing, posting and closing a closed job":
"Test flushing and posting a closed job":
- do:
catch: /status_exception/
xpack.ml.flush_job:
job_id: post-data-closed-job
- do:
catch: /status_exception/
xpack.ml.close_job:
job_id: post-data-closed-job
- do:
catch: /status_exception/
xpack.ml.post_data:

View File

@ -39,7 +39,7 @@ integTestRunner {
'ml/jobs_get_result_categories/Test with invalid param combinations via body',
'ml/jobs_get_stats/Test get job stats given missing job',
'ml/post_data/Test Flush data with invalid parameters',
'ml/post_data/Test flushing, posting and closing a closed job',
'ml/post_data/Test flushing and posting a closed job',
'ml/post_data/Test open and close with non-existent job id',
'ml/post_data/Test POST data with invalid parameters',
'ml/preview_datafeed/Test preview missing datafeed',