From 67fe584e0fafd90db51130c319359d8a645ffe00 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 13 Feb 2017 15:21:25 +0100 Subject: [PATCH] [ml] Also allow closing failed jobs Relates to elastic/x-pack-elasticsearch#545 Original commit: elastic/x-pack-elasticsearch@1c814afbf09bfc72e88e51e5332daff2376010ea --- .../xpack/ml/action/CloseJobAction.java | 2 +- .../xpack/ml/action/CloseJobActionTests.java | 2 +- .../xpack/ml/integration/TooManyJobsIT.java | 68 ++++++++++++++++--- 3 files changed, 60 insertions(+), 12 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 9a24cc5f37d..e44fbe6654a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -317,7 +317,7 @@ public class CloseJobAction extends Action task = new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), null); - task = new PersistentTaskInProgress<>(task, JobState.OPENED); + task = new PersistentTaskInProgress<>(task, randomFrom(JobState.OPENED, JobState.FAILED)); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index 837a7aba5f8..ac2aee473ca 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; @@ -16,11 +17,55 @@ import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentActionResponse; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import java.util.concurrent.ExecutionException; public class TooManyJobsIT extends BaseMlIntegTestCase { - + + public void testCloseFailedJob() throws Exception { + startMlCluster(1, 1); + + // create and open first job, which succeeds: + Job.Builder job = createJob("1"); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId())); + PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); + assertTrue(putJobResponse.isAcknowledged()); + client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get(); + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request("1")).actionGet(); + assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED); + }); + + // create and try to open second job, which fails: + job = createJob("2"); + putJobRequest = new PutJobAction.Request(job.build(true, job.getId())); + putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); + assertTrue(putJobResponse.isAcknowledged()); + expectThrows(ElasticsearchStatusException.class, + () -> client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request("2")).actionGet()); + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request("2")).actionGet(); + assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.FAILED); + }); + + // close second job: + client().execute(CloseJobAction.INSTANCE, new CloseJobAction.Request("2")).actionGet(); + // ensure that we remove persistent task for job 2, so that we stop the persistent task allocation loop: + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + PersistentTasksInProgress tasks = state.custom(PersistentTasksInProgress.TYPE); + assertEquals(1, tasks.taskMap().size()); + // now just double check that the first job is still opened: + PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); + assertEquals(JobState.OPENED, task.getStatus()); + OpenJobAction.Request openJobRequest = (OpenJobAction.Request) task.getRequest(); + assertEquals("1", openJobRequest.getJobId()); + }); + } + public void testSingleNode() throws Exception { verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 32)); } @@ -30,15 +75,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { } private void verifyMaxNumberOfJobsLimit(int numNodes, int maxNumberOfJobsPerNode) throws Exception { - // clear all nodes, so that we can set max_running_jobs setting: - internalCluster().ensureAtMostNumDataNodes(0); - logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode); - for (int i = 0; i < numNodes; i++) { - internalCluster().startNode(Settings.builder() - .put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode)); - } - logger.info("Started [{}] nodes", numNodes); - + startMlCluster(numNodes, maxNumberOfJobsPerNode); int clusterWideMaxNumberOfJobs = numNodes * maxNumberOfJobsPerNode; for (int i = 1; i <= (clusterWideMaxNumberOfJobs + 1); i++) { Job.Builder job = createJob(Integer.toString(i)); @@ -78,4 +115,15 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { fail("shouldn't be able to add more than [" + clusterWideMaxNumberOfJobs + "] jobs"); } + private void startMlCluster(int numNodes, int maxNumberOfJobsPerNode) throws Exception { + // clear all nodes, so that we can set max_running_jobs setting: + internalCluster().ensureAtMostNumDataNodes(0); + logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode); + for (int i = 0; i < numNodes; i++) { + internalCluster().startNode(Settings.builder() + .put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode)); + } + logger.info("Started [{}] nodes", numNodes); + } + }