From fd3f079276a7e6f8a3f226343fbbb42e7bb0925c Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 18 Apr 2018 10:10:55 +0100 Subject: [PATCH] [ML] Exclude failed jobs from node allocation decision (elastic/x-pack-elasticsearch#4395) When calculating the current load on each ML node during the node allocation process we should be ignoring failed jobs. This is because failed jobs do not have a corresponding native process, so do not consume memory or CPU resources. relates elastic/x-pack-elasticsearch#4381 Original commit: elastic/x-pack-elasticsearch@1cb0ca973e829670f581e206fe1e64fe569c1e1f --- .../ml/action/TransportOpenJobAction.java | 17 ++++++--- .../action/TransportOpenJobActionTests.java | 38 +++++++++++++++++++ 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index c90ee9e7fea..5ae84129254 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -199,18 +199,25 @@ public class TransportOpenJobAction extends TransportMasterNodeAction> assignedTasks = persistentTasks.findTasks(OpenJobAction.TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); - numberOfAssignedJobs = assignedTasks.size(); for (PersistentTasksCustomMetaData.PersistentTask assignedTask : assignedTasks) { JobTaskStatus jobTaskState = (JobTaskStatus) assignedTask.getStatus(); + JobState jobState; if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING // previous executor node failed and current executor node didn't have the chance to set job status to OPENING jobTaskState.isStatusStale(assignedTask)) { ++numberOfAllocatingJobs; + jobState = JobState.OPENING; + } else { + jobState = jobTaskState.getState(); + } + // Don't count FAILED jobs, as they don't consume native memory + if (jobState != JobState.FAILED) { + ++numberOfAssignedJobs; + String assignedJobId = ((OpenJobAction.JobParams) assignedTask.getParams()).getJobId(); + Job assignedJob = mlMetadata.getJobs().get(assignedJobId); + assert assignedJob != null; + assignedJobMemory += assignedJob.estimateMemoryFootprint(); } - String assignedJobId = ((OpenJobAction.JobParams) assignedTask.getParams()).getJobId(); - Job assignedJob = mlMetadata.getJobs().get(assignedJobId); - assert assignedJob != null; - assignedJobMemory += assignedJob.estimateMemoryFootprint(); } } if (numberOfAllocatingJobs >= maxConcurrentJobAllocations) { diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index a0e0570a3f1..5ee07d63a96 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -168,6 +168,44 @@ public class TransportOpenJobActionTests extends ESTestCase { assertEquals("_node_id2", result.getExecutorNode()); } + public void testSelectLeastLoadedMlNode_byMemoryWithFailedJobs() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); + // this leaves just under 300MB per node available for ML jobs + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "1000000000"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id1", "_node_id1", JobState.fromString("failed"), tasksBuilder); + addJobTask("job_id2", "_node_id2", JobState.fromString("failed"), tasksBuilder); + addJobTask("job_id3", "_node_id3", JobState.fromString("failed"), tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addJobAndIndices(metaData, routingTable, jobId -> { + // remember we add 100MB for the process overhead, so this model + // memory limit corresponds to a job size of 250MB + return BaseMlIntegTestCase.createFareQuoteJob(jobId, new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date()); + }, "job_id1", "job_id2", "job_id3", "job_id4"); + cs.nodes(nodes); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); + cs.metaData(metaData); + cs.routingTable(routingTable.build()); + // if the memory of the failed jobs is wrongly included in the calculation then this job will not be allocated + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, 10, 30, logger); + assertEquals("", result.getExplanation()); + assertNotNull(result.getExecutorNode()); + } + public void testSelectLeastLoadedMlNode_maxCapacity() { int numNodes = randomIntBetween(1, 10); int maxRunningJobsPerNode = randomIntBetween(1, 100);