[ml] Also allow closing failed jobs

Relates to elastic/x-pack-elasticsearch#545

Original commit: elastic/x-pack-elasticsearch@1c814afbf0
This commit is contained in:
Martijn van Groningen 2017-02-13 15:21:25 +01:00
parent 4976feb53b
commit 67fe584e0f
3 changed files with 60 additions and 12 deletions

View File

@ -317,7 +317,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
OpenJobAction.Request storedRequest = (OpenJobAction.Request) task.getRequest();
if (storedRequest.getJobId().equals(jobId)) {
JobState jobState = (JobState) task.getStatus();
if (jobState != JobState.OPENED) {
if (jobState.isAnyOf(JobState.OPENED, JobState.FAILED) == false) {
throw new ElasticsearchStatusException("cannot close job, expected job state [{}], but got [{}]",
RestStatus.CONFLICT, JobState.OPENED, jobState);
}

View File

@ -27,7 +27,7 @@ public class CloseJobActionTests extends ESTestCase {
mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), null);
task = new PersistentTaskInProgress<>(task, JobState.OPENED);
task = new PersistentTaskInProgress<>(task, randomFrom(JobState.OPENED, JobState.FAILED));
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()))

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
@ -16,11 +17,55 @@ import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import java.util.concurrent.ExecutionException;
public class TooManyJobsIT extends BaseMlIntegTestCase {
public void testCloseFailedJob() throws Exception {
startMlCluster(1, 1);
// create and open first job, which succeeds:
Job.Builder job = createJob("1");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId()));
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request("1")).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
});
// create and try to open second job, which fails:
job = createJob("2");
putJobRequest = new PutJobAction.Request(job.build(true, job.getId()));
putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
expectThrows(ElasticsearchStatusException.class,
() -> client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request("2")).actionGet());
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request("2")).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.FAILED);
});
// close second job:
client().execute(CloseJobAction.INSTANCE, new CloseJobAction.Request("2")).actionGet();
// ensure that we remove persistent task for job 2, so that we stop the persistent task allocation loop:
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = state.custom(PersistentTasksInProgress.TYPE);
assertEquals(1, tasks.taskMap().size());
// now just double check that the first job is still opened:
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
assertEquals(JobState.OPENED, task.getStatus());
OpenJobAction.Request openJobRequest = (OpenJobAction.Request) task.getRequest();
assertEquals("1", openJobRequest.getJobId());
});
}
public void testSingleNode() throws Exception {
verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 32));
}
@ -30,15 +75,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
}
private void verifyMaxNumberOfJobsLimit(int numNodes, int maxNumberOfJobsPerNode) throws Exception {
// clear all nodes, so that we can set max_running_jobs setting:
internalCluster().ensureAtMostNumDataNodes(0);
logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode);
for (int i = 0; i < numNodes; i++) {
internalCluster().startNode(Settings.builder()
.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode));
}
logger.info("Started [{}] nodes", numNodes);
startMlCluster(numNodes, maxNumberOfJobsPerNode);
int clusterWideMaxNumberOfJobs = numNodes * maxNumberOfJobsPerNode;
for (int i = 1; i <= (clusterWideMaxNumberOfJobs + 1); i++) {
Job.Builder job = createJob(Integer.toString(i));
@ -78,4 +115,15 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
fail("shouldn't be able to add more than [" + clusterWideMaxNumberOfJobs + "] jobs");
}
private void startMlCluster(int numNodes, int maxNumberOfJobsPerNode) throws Exception {
// clear all nodes, so that we can set max_running_jobs setting:
internalCluster().ensureAtMostNumDataNodes(0);
logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode);
for (int i = 0; i < numNodes; i++) {
internalCluster().startNode(Settings.builder()
.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode));
}
logger.info("Started [{}] nodes", numNodes);
}
}