From a98d593576ef9ce93a3d81e2fef1eab76d70c1e5 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 24 Apr 2017 14:53:30 +0200 Subject: [PATCH] [ML] Use JobTaskStatus#staleTask(...) instead of PersistentTask#needsReassignement(...) for checking whether a job task is stale when allocation a datafeed to a node. Original commit: elastic/x-pack-elasticsearch@0952c455fe6a79cfcdf911c4599ef3ca0b247682 --- .../xpack/ml/action/OpenJobAction.java | 2 +- .../xpack/ml/action/StartDatafeedAction.java | 13 +++++-------- .../xpack/ml/job/config/JobTaskStatus.java | 2 +- .../xpack/ml/action/StartDatafeedActionTests.java | 12 ++++-------- .../ml/integration/BasicDistributedJobsIT.java | 2 +- 5 files changed, 12 insertions(+), 19 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 2747c2b190f..59dc047c7ee 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -585,7 +585,7 @@ public class OpenJobAction extends Action jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks); if (jobTask == null) { @@ -580,12 +578,6 @@ public class StartDatafeedAction logger.debug(reason); return new Assignment(null, reason); } - if (jobTask.needsReassignment(nodes)) { - String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + - "] is unassigned or unassigned to a non existing node"; - logger.debug(reason); - return new Assignment(null, reason); - } JobTaskStatus taskStatus = (JobTaskStatus) jobTask.getStatus(); if (taskStatus == null || taskStatus.getState() != JobState.OPENED) { // lets try again later when the job has been opened: @@ -595,6 +587,11 @@ public class StartDatafeedAction logger.debug(reason); return new Assignment(null, reason); } + if (taskStatus.isStatusStale(jobTask)) { + String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] status is stale"; + logger.debug(reason); + return new Assignment(null, reason); + } String reason = verifyIndicesActive(logger, datafeed, clusterState, resolver); if (reason != null) { return new Assignment(null, reason); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatus.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatus.java index 17f64a0360e..6d85fe17554 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatus.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatus.java @@ -67,7 +67,7 @@ public class JobTaskStatus implements Task.Status { return state; } - public boolean staleStatus(PersistentTask task) { + public boolean isStatusStale(PersistentTask task) { return allocationId != task.getAllocationId(); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index e35412df5c4..5a8e7dbaf73 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; @@ -237,24 +238,20 @@ public class StartDatafeedActionTests extends ESTestCase { String nodeId = randomBoolean() ? "node_id2" : null; PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder); + // Set to lower allocationId, so job task is stale: + tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId(job.getId()), new JobTaskStatus(JobState.OPENED, 0)); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); - DiscoveryNodes nodes = DiscoveryNodes.builder() - .add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), - Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) - .build(); - ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder() .putCustom(MlMetadata.TYPE, mlMetadata.build()) .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) .put(indexMetaData, false)) - .nodes(nodes) .routingTable(generateRoutingTable(indexMetaData)); Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); assertNull(result.getExecutorNode()); - assertEquals("cannot start datafeed [datafeed_id], job [job_id] is unassigned or unassigned to a non existing node", + assertEquals("cannot start datafeed [datafeed_id], job [job_id] status is stale", result.getExplanation()); tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -265,7 +262,6 @@ public class StartDatafeedActionTests extends ESTestCase { .putCustom(MlMetadata.TYPE, mlMetadata.build()) .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) .put(indexMetaData, false)) - .nodes(nodes) .routingTable(generateRoutingTable(indexMetaData)); result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); assertEquals("node_id1", result.getExecutorNode()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 00ae76e121a..38a03cc94e5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -273,7 +273,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { Collection> foundTasks = tasks.findTasks(OpenJobAction.TASK_NAME, task -> { JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus(); return node.getId().equals(task.getExecutorNode()) && - (jobTaskState == null || jobTaskState.staleStatus(task)); + (jobTaskState == null || jobTaskState.isStatusStale(task)); }); int count = foundTasks.size(); if (count > maxConcurrentJobAllocations) {