diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index e7fb0fe5fb3..290e407ab66 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -210,16 +210,27 @@ public class TransportOpenJobAction extends TransportMasterNodeAction assignedTask : assignedTasks) { JobTaskState jobTaskState = (JobTaskState) assignedTask.getState(); JobState jobState; - if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING - // previous executor node failed and current executor node didn't have the chance to set job status to OPENING - jobTaskState.isStatusStale(assignedTask)) { + if (jobTaskState == null) { + // executor node didn't have the chance to set job status to OPENING ++numberOfAllocatingJobs; jobState = JobState.OPENING; } else { jobState = jobTaskState.getState(); + if (jobTaskState.isStatusStale(assignedTask)) { + if (jobState == JobState.CLOSING) { + // previous executor node failed while the job was closing - it won't + // be reopened, so consider it CLOSED for resource usage purposes + jobState = JobState.CLOSED; + } else if (jobState != JobState.FAILED) { + // previous executor node failed and current executor node didn't + // have the chance to set job status to OPENING + ++numberOfAllocatingJobs; + jobState = JobState.OPENING; + } + } } - // Don't count FAILED jobs, as they don't consume native memory - if (jobState != JobState.FAILED) { + // Don't count CLOSED or FAILED jobs, as they don't consume native memory + if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { ++numberOfAssignedJobs; String assignedJobId = ((OpenJobAction.JobParams) assignedTask.getParams()).getJobId(); Job assignedJob = mlMetadata.getJobs().get(assignedJobId); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index b5a315d9687..dd8ddf3aa62 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -55,7 +55,6 @@ import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -285,7 +284,7 @@ public class TransportOpenJobActionTests extends ESTestCase { nodeAttr, Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id1", "_node_id1", null, tasksBuilder); addJobTask("job_id2", "_node_id1", null, tasksBuilder); addJobTask("job_id3", "_node_id2", null, tasksBuilder); @@ -340,6 +339,55 @@ public class TransportOpenJobActionTests extends ESTestCase { assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } + public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id1", "_node_id1", JobState.fromString("failed"), tasksBuilder); + // This will make the allocation stale for job_id1 + tasksBuilder.reassignTask(MlMetadata.jobTaskId("job_id1"), new Assignment("_node_id1", "test assignment")); + addJobTask("job_id2", "_node_id1", null, tasksBuilder); + addJobTask("job_id3", "_node_id2", null, tasksBuilder); + addJobTask("job_id4", "_node_id2", null, tasksBuilder); + addJobTask("job_id5", "_node_id3", null, tasksBuilder); + addJobTask("job_id6", "_node_id3", null, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.nodes(nodes); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5", "job_id6", "job_id7", "job_id8"); + csBuilder.routingTable(routingTable.build()); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); + csBuilder.metaData(metaData); + + ClusterState cs = csBuilder.build(); + // Allocation won't be possible if the stale failed job is treated as opening + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger); + assertEquals("_node_id1", result.getExecutorNode()); + + tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); + addJobTask("job_id7", "_node_id1", null, tasksBuilder); + tasks = tasksBuilder.build(); + + csBuilder = ClusterState.builder(cs); + csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); + cs = csBuilder.build(); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", cs, 2, 10, 30, logger); + assertNull("no node selected, because OPENING state", result.getExecutorNode()); + assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); + } + public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); @@ -710,13 +758,13 @@ public class TransportOpenJobActionTests extends ESTestCase { private static Function jobWithRulesCreator() { return jobId -> { - DetectionRule rule = new DetectionRule.Builder(Arrays.asList( + DetectionRule rule = new DetectionRule.Builder(Collections.singletonList( new RuleCondition(RuleCondition.AppliesTo.TYPICAL, Operator.LT, 100.0) )).build(); Detector.Builder detector = new Detector.Builder("count", null); - detector.setRules(Arrays.asList(rule)); - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); + detector.setRules(Collections.singletonList(rule)); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); DataDescription.Builder dataDescription = new DataDescription.Builder(); Job.Builder job = new Job.Builder(jobId); job.setAnalysisConfig(analysisConfig);