[ML] Exclude failed jobs from node allocation decision (elastic/x-pack-elasticsearch#4395)

When calculating the current load on each ML node during the
node allocation process we should be ignoring failed jobs.
This is because failed jobs do not have a corresponding native
process, so do not consume memory or CPU resources.

relates elastic/x-pack-elasticsearch#4381

Original commit: elastic/x-pack-elasticsearch@1cb0ca973e
This commit is contained in:
David Roberts 2018-04-18 10:10:55 +01:00 committed by GitHub
parent 70f9bcc0d1
commit fd3f079276
2 changed files with 50 additions and 5 deletions

View File

@ -199,18 +199,25 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> assignedTasks = Collection<PersistentTasksCustomMetaData.PersistentTask<?>> assignedTasks =
persistentTasks.findTasks(OpenJobAction.TASK_NAME, persistentTasks.findTasks(OpenJobAction.TASK_NAME,
task -> node.getId().equals(task.getExecutorNode())); task -> node.getId().equals(task.getExecutorNode()));
numberOfAssignedJobs = assignedTasks.size();
for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedTasks) { for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedTasks) {
JobTaskStatus jobTaskState = (JobTaskStatus) assignedTask.getStatus(); JobTaskStatus jobTaskState = (JobTaskStatus) assignedTask.getStatus();
JobState jobState;
if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
// previous executor node failed and current executor node didn't have the chance to set job status to OPENING // previous executor node failed and current executor node didn't have the chance to set job status to OPENING
jobTaskState.isStatusStale(assignedTask)) { jobTaskState.isStatusStale(assignedTask)) {
++numberOfAllocatingJobs; ++numberOfAllocatingJobs;
jobState = JobState.OPENING;
} else {
jobState = jobTaskState.getState();
}
// Don't count FAILED jobs, as they don't consume native memory
if (jobState != JobState.FAILED) {
++numberOfAssignedJobs;
String assignedJobId = ((OpenJobAction.JobParams) assignedTask.getParams()).getJobId();
Job assignedJob = mlMetadata.getJobs().get(assignedJobId);
assert assignedJob != null;
assignedJobMemory += assignedJob.estimateMemoryFootprint();
} }
String assignedJobId = ((OpenJobAction.JobParams) assignedTask.getParams()).getJobId();
Job assignedJob = mlMetadata.getJobs().get(assignedJobId);
assert assignedJob != null;
assignedJobMemory += assignedJob.estimateMemoryFootprint();
} }
} }
if (numberOfAllocatingJobs >= maxConcurrentJobAllocations) { if (numberOfAllocatingJobs >= maxConcurrentJobAllocations) {

View File

@ -168,6 +168,44 @@ public class TransportOpenJobActionTests extends ESTestCase {
assertEquals("_node_id2", result.getExecutorNode()); assertEquals("_node_id2", result.getExecutorNode());
} }
public void testSelectLeastLoadedMlNode_byMemoryWithFailedJobs() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
// this leaves just under 300MB per node available for ML jobs
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "1000000000");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.build();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id1", "_node_id1", JobState.fromString("failed"), tasksBuilder);
addJobTask("job_id2", "_node_id2", JobState.fromString("failed"), tasksBuilder);
addJobTask("job_id3", "_node_id3", JobState.fromString("failed"), tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, jobId -> {
// remember we add 100MB for the process overhead, so this model
// memory limit corresponds to a job size of 250MB
return BaseMlIntegTestCase.createFareQuoteJob(jobId, new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date());
}, "job_id1", "job_id2", "job_id3", "job_id4");
cs.nodes(nodes);
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
// if the memory of the failed jobs is wrongly included in the calculation then this job will not be allocated
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, 10, 30, logger);
assertEquals("", result.getExplanation());
assertNotNull(result.getExecutorNode());
}
public void testSelectLeastLoadedMlNode_maxCapacity() { public void testSelectLeastLoadedMlNode_maxCapacity() {
int numNodes = randomIntBetween(1, 10); int numNodes = randomIntBetween(1, 10);
int maxRunningJobsPerNode = randomIntBetween(1, 100); int maxRunningJobsPerNode = randomIntBetween(1, 100);