diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java index d36e4c9ce86..da9586c52e8 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java @@ -81,6 +81,10 @@ public class MlMetadata implements MetaData.Custom { return datafeeds.get(datafeedId); } + public Optional getDatafeedByJobId(String jobId) { + return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst(); + } + @Override public Version getMinimalSupportedVersion() { return Version.V_5_4_0_UNRELEASED; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 4714de069be..80e215b4329 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -38,6 +38,8 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; @@ -49,7 +51,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.function.Predicate; +import java.util.Optional; public class CloseJobAction extends Action { @@ -285,25 +287,26 @@ public class CloseJobAction extends Action> p = t -> { - OpenJobAction.Request storedRequest = (OpenJobAction.Request) t.getRequest(); - return storedRequest.getJobId().equals(jobId); - }; - for (PersistentTask task : tasks.findTasks(OpenJobAction.NAME, p)) { - OpenJobAction.Request storedRequest = (OpenJobAction.Request) task.getRequest(); - if (storedRequest.getJobId().equals(jobId)) { - JobState jobState = (JobState) task.getStatus(); - if (jobState.isAnyOf(JobState.OPENED, JobState.FAILED) == false) { - throw new ElasticsearchStatusException("cannot close job, expected job state [{}], but got [{}]", - RestStatus.CONFLICT, JobState.OPENED, jobState); - } - return task; - } + Optional datafeed = mlMetadata.getDatafeedByJobId(jobId); + if (datafeed.isPresent()) { + DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(), tasks); + if (datafeedState != DatafeedState.STOPPED) { + throw new ElasticsearchStatusException("cannot close job [{}], datafeed hasn't been stopped", + RestStatus.CONFLICT, jobId); } } - throw new ElasticsearchStatusException("cannot close job, expected job state [{}], but got [{}]", - RestStatus.CONFLICT, JobState.OPENED, JobState.CLOSED); + + PersistentTask jobTask = MlMetadata.getJobTask(jobId, tasks); + if (jobTask != null) { + JobState jobState = (JobState) jobTask.getStatus(); + if (jobState.isAnyOf(JobState.OPENED, JobState.FAILED) == false) { + throw new ElasticsearchStatusException("cannot close job [{}], expected job state [{}], but got [{}]", + RestStatus.CONFLICT, jobId, JobState.OPENED, jobState); + } + return jobTask; + } + throw new ElasticsearchStatusException("cannot close job [{}], expected job state [{}], but got [{}]", + RestStatus.CONFLICT, jobId, JobState.OPENED, JobState.CLOSED); } static ClusterState moveJobToClosingState(String jobId, ClusterState currentState) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index e60da51ffbd..b9beb4a6c25 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -33,6 +33,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver; import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; @@ -338,19 +339,26 @@ public class DatafeedJobRunner extends AbstractComponent { } private void closeJob() { - CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId()); - client.execute(CloseJobAction.INSTANCE, closeJobRequest, new ActionListener() { + DatafeedStateObserver observer = new DatafeedStateObserver(threadPool, clusterService); + observer.waitForState(datafeed.getId(), TimeValue.timeValueSeconds(20), DatafeedState.STOPPED, e1 -> { + if (e1 == null) { + CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId()); + client.execute(CloseJobAction.INSTANCE, closeJobRequest, new ActionListener() { - @Override - public void onResponse(CloseJobAction.Response response) { - if (!response.isClosed()) { - logger.error("[{}] job close action was not acknowledged", datafeed.getJobId()); - } - } + @Override + public void onResponse(CloseJobAction.Response response) { + if (!response.isClosed()) { + logger.error("[{}] job close action was not acknowledged", datafeed.getJobId()); + } + } - @Override - public void onFailure(Exception e) { - logger.error("[" + datafeed.getJobId() + "] failed to auto-close job", e); + @Override + public void onFailure(Exception e) { + logger.error("[" + datafeed.getJobId() + "] failed to auto-close job", e); + } + }); + } else { + logger.error("Cannot auto close job [" + datafeed.getJobId() + "]", e1); } }); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java index f1f8a8a5b53..0a1e70563da 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java @@ -10,13 +10,18 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTasks; import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; @@ -57,13 +62,50 @@ public class CloseJobActionTests extends ESTestCase { .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.singletonMap(1L, task)))); ElasticsearchStatusException result = expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder1.build())); - assertEquals("cannot close job, expected job state [opened], but got [opening]", result.getMessage()); + assertEquals("cannot close job [job_id], expected job state [opened], but got [opening]", result.getMessage()); ClusterState.Builder csBuilder2 = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.emptyMap()))); result = expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder2.build())); - assertEquals("cannot close job, expected job state [opened], but got [closed]", result.getMessage()); + assertEquals("cannot close job [job_id], expected job state [opened], but got [closed]", result.getMessage()); + } + + public void testCloseJob_datafeedNotStopped() { + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(), false); + mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", Collections.singletonList("*"))); + Map> tasks = new HashMap<>(); + PersistentTask jobTask = createJobTask(1L, "job_id", null, JobState.OPENED); + tasks.put(1L, jobTask); + tasks.put(2L, createDatafeedTask(2L, "datafeed_id", 0L, null, DatafeedState.STARTED)); + ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) + .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, tasks))).build(); + + ElasticsearchStatusException e = + expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.validateAndFindTask("job_id", cs1)); + assertEquals(RestStatus.CONFLICT, e.status()); + assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage()); + + tasks = new HashMap<>(); + tasks.put(1L, jobTask); + if (randomBoolean()) { + tasks.put(2L, createDatafeedTask(2L, "datafeed_id", 0L, null, DatafeedState.STOPPED)); + } + ClusterState cs2 = ClusterState.builder(new ClusterName("_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) + .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, tasks))).build(); + assertEquals(jobTask, CloseJobAction.validateAndFindTask("job_id", cs2)); + } + + public static PersistentTask createDatafeedTask(long id, String datafeedId, long startTime, + String nodeId, DatafeedState datafeedState) { + PersistentTask task = + new PersistentTask<>(id, StartDatafeedAction.NAME, new StartDatafeedAction.Request(datafeedId, startTime), false, true, + new PersistentTasks.Assignment(nodeId, "test assignment")); + task = new PersistentTask<>(task, datafeedState); + return task; } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index 089f0e0d299..c263bdf66fc 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -84,8 +84,8 @@ public class MlRestTestStateCleaner { logger.error("Got status code " + statusCode + " when closing job " + jobId); } } catch (Exception e) { - if (e.getMessage().contains("cannot close job, expected job state [opened], but got [closed]") - || e.getMessage().contains("cannot close job, expected job state [opened], but got [closing]")) { + if (e.getMessage().contains("expected job state [opened], but got [closed]") + || e.getMessage().contains("expected job state [opened], but got [closing]")) { logger.debug("job [" + jobId + "] has already been closed", e); } else { logger.warn("failed to close job [" + jobId + "]", e);