From 93d7b8c14baa196cd13e77df7e605085376bdff5 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 23 Mar 2017 14:40:20 +0100 Subject: [PATCH] [ML] State check doesn't need to know if task is stale now that task validation is only done at create time. Original commit: elastic/x-pack-elasticsearch@d19858240b8a09b48d262e636e79208140aee2d3 --- .../xpack/ml/action/OpenJobAction.java | 20 ++----------- .../xpack/ml/action/StartDatafeedAction.java | 17 ++--------- .../xpack/ml/action/OpenJobActionTests.java | 4 --- .../ml/action/StartDatafeedActionTests.java | 30 ------------------- 4 files changed, 5 insertions(+), 66 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 3c1b954c51a..9799b2ba7dd 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -52,13 +52,12 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.JobStateObserver; import org.elasticsearch.xpack.persistent.NodePersistentTask; -import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; import org.elasticsearch.xpack.persistent.PersistentTaskRequest; -import org.elasticsearch.xpack.persistent.PersistentTasksService; -import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; -import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; +import org.elasticsearch.xpack.persistent.PersistentTasksService; +import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; import java.io.IOException; import java.util.ArrayList; @@ -452,20 +451,7 @@ public class OpenJobAction extends Action task = MlMetadata.getJobTask(jobId, tasks); JobState jobState = MlMetadata.getJobState(jobId, tasks); - if (task != null && jobState == JobState.OPENED) { - if (task.isAssigned() == false) { - // We can skip the job state check below, because the task got unassigned after we went into - // opened state on a node that disappeared and we didn't have the opportunity to set the status to failed - return; - } else if (nodes.nodeExists(task.getExecutorNode()) == false) { - // The state is open and the node were running on no longer exists. - // We can skip the job state check below, because when the node - // disappeared we didn't have time to set the status to failed. - return; - } - } if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { throw new ElasticsearchStatusException("[" + jobId + "] expected state [" + JobState.CLOSED + "] or [" + JobState.FAILED + "], but got [" + jobState + "]", RestStatus.CONFLICT); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 69df38b009a..d877548a64d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -58,12 +58,12 @@ import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.NodePersistentTask; import org.elasticsearch.xpack.persistent.PersistentTaskRequest; -import org.elasticsearch.xpack.persistent.PersistentTasksService; -import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; +import org.elasticsearch.xpack.persistent.PersistentTasksService; +import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; import java.io.IOException; import java.util.Objects; @@ -478,20 +478,7 @@ public class StartDatafeedAction RestStatus.CONFLICT, JobState.OPENED, jobState); } - PersistentTask datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks); DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks); - if (datafeedTask != null && datafeedState == DatafeedState.STARTED) { - if (datafeedTask.isAssigned() == false) { - // We can skip the datafeed state check below, because the task got unassigned after we went into - // started state on a node that disappeared and we didn't have the opportunity to set the status to stopped - return; - } else if (nodes.nodeExists(datafeedTask.getExecutorNode()) == false) { - // The state is started and the node were running on no longer exists. - // We can skip the datafeed state check below, because when the node - // disappeared we didn't have time to set the state to stopped. - return; - } - } if (datafeedState == DatafeedState.STARTED) { throw new ElasticsearchStatusException("datafeed [{}] already started, expected datafeed state [{}], but got [{}]", RestStatus.CONFLICT, datafeedId, DatafeedState.STOPPED, DatafeedState.STARTED); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index 25700d03dbe..8de72cb4d91 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -62,10 +62,6 @@ public class OpenJobActionTests extends ESTestCase { OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes); OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap()), nodes); OpenJobAction.validate("job_id", mlBuilder.build(), null, nodes); - - task = createJobTask(1L, "job_id", "_other_node_id", JobState.OPENED); - tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); - OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes); } public void testValidate_jobMissing() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index 951c141b7b3..a6a5eceb0c9 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -166,36 +166,6 @@ public class StartDatafeedActionTests extends ESTestCase { "but got [started]")); } - public void testValidate_staleTask() { - Job job1 = createScheduledJob("job_id").build(); - DatafeedConfig datafeedConfig = createDatafeed("datafeed_id", "job_id", Collections.singletonList("*")); - MlMetadata mlMetadata1 = new MlMetadata.Builder() - .putJob(job1, false) - .putDatafeed(datafeedConfig) - .build(); - DiscoveryNodes nodes = DiscoveryNodes.builder() - .add(new DiscoveryNode("node_name", "node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), - Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) - .build(); - - PersistentTask jobTask = createJobTask(0L, "job_id", "node_id2", JobState.OPENED); - PersistentTask datafeedTask = - new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), - false, true, new Assignment("node_id1", "test assignment")); - datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED); - Map> taskMap = new HashMap<>(); - taskMap.put(0L, jobTask); - taskMap.put(1L, datafeedTask); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(2L, taskMap); - StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes); - - datafeedTask = new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), - false, true, INITIAL_ASSIGNMENT); - datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED); - taskMap.put(1L, datafeedTask); - StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes); - } - public static StartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action, TaskId parentTaskId, StartDatafeedAction.Request request,