From 92de94c237bf03f1a0f76560134dab90117696ca Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 5 Jul 2018 13:26:17 +0100 Subject: [PATCH] [ML] Don't treat stale FAILED jobs as OPENING in job allocation (#31800) Job persistent tasks with stale allocation IDs used to always be considered as OPENING jobs in the ML job node allocation decision. However, FAILED jobs are not relocated to other nodes, which leads to them blocking up the nodes they failed on after node restarts. FAILED jobs should not restrict how many other jobs can open on a node, regardless of whether they are stale or not. Closes #31794 --- .../ml/action/TransportOpenJobAction.java | 21 +++++-- .../action/TransportOpenJobActionTests.java | 58 +++++++++++++++++-- 2 files changed, 69 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index e7fb0fe5fb3..290e407ab66 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -210,16 +210,27 @@ public class TransportOpenJobAction extends TransportMasterNodeAction assignedTask : assignedTasks) { JobTaskState jobTaskState = (JobTaskState) assignedTask.getState(); JobState jobState; - if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING - // previous executor node failed and current executor node didn't have the chance to set job status to OPENING - jobTaskState.isStatusStale(assignedTask)) { + if (jobTaskState == null) { + // executor node didn't have the chance to set job status to OPENING ++numberOfAllocatingJobs; jobState = JobState.OPENING; } else { jobState = jobTaskState.getState(); + if (jobTaskState.isStatusStale(assignedTask)) { + if (jobState == JobState.CLOSING) { + // previous executor node failed while the job was closing - it won't + // be reopened, so consider it CLOSED for resource usage purposes + jobState = JobState.CLOSED; + } else if (jobState != JobState.FAILED) { + // previous executor node failed and current executor node didn't + // have the chance to set job status to OPENING + ++numberOfAllocatingJobs; + jobState = JobState.OPENING; + } + } } - // Don't count FAILED jobs, as they don't consume native memory - if (jobState != JobState.FAILED) { + // Don't count CLOSED or FAILED jobs, as they don't consume native memory + if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { ++numberOfAssignedJobs; String assignedJobId = ((OpenJobAction.JobParams) assignedTask.getParams()).getJobId(); Job assignedJob = mlMetadata.getJobs().get(assignedJobId); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index b5a315d9687..dd8ddf3aa62 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -55,7 +55,6 @@ import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -285,7 +284,7 @@ public class TransportOpenJobActionTests extends ESTestCase { nodeAttr, Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id1", "_node_id1", null, tasksBuilder); addJobTask("job_id2", "_node_id1", null, tasksBuilder); addJobTask("job_id3", "_node_id2", null, tasksBuilder); @@ -340,6 +339,55 @@ public class TransportOpenJobActionTests extends ESTestCase { assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } + public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id1", "_node_id1", JobState.fromString("failed"), tasksBuilder); + // This will make the allocation stale for job_id1 + tasksBuilder.reassignTask(MlMetadata.jobTaskId("job_id1"), new Assignment("_node_id1", "test assignment")); + addJobTask("job_id2", "_node_id1", null, tasksBuilder); + addJobTask("job_id3", "_node_id2", null, tasksBuilder); + addJobTask("job_id4", "_node_id2", null, tasksBuilder); + addJobTask("job_id5", "_node_id3", null, tasksBuilder); + addJobTask("job_id6", "_node_id3", null, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.nodes(nodes); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5", "job_id6", "job_id7", "job_id8"); + csBuilder.routingTable(routingTable.build()); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); + csBuilder.metaData(metaData); + + ClusterState cs = csBuilder.build(); + // Allocation won't be possible if the stale failed job is treated as opening + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger); + assertEquals("_node_id1", result.getExecutorNode()); + + tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); + addJobTask("job_id7", "_node_id1", null, tasksBuilder); + tasks = tasksBuilder.build(); + + csBuilder = ClusterState.builder(cs); + csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); + cs = csBuilder.build(); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", cs, 2, 10, 30, logger); + assertNull("no node selected, because OPENING state", result.getExecutorNode()); + assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); + } + public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); @@ -710,13 +758,13 @@ public class TransportOpenJobActionTests extends ESTestCase { private static Function jobWithRulesCreator() { return jobId -> { - DetectionRule rule = new DetectionRule.Builder(Arrays.asList( + DetectionRule rule = new DetectionRule.Builder(Collections.singletonList( new RuleCondition(RuleCondition.AppliesTo.TYPICAL, Operator.LT, 100.0) )).build(); Detector.Builder detector = new Detector.Builder("count", null); - detector.setRules(Arrays.asList(rule)); - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); + detector.setRules(Collections.singletonList(rule)); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); DataDescription.Builder dataDescription = new DataDescription.Builder(); Job.Builder job = new Job.Builder(jobId); job.setAnalysisConfig(analysisConfig);