From 7dabaad7d9560508cad423b55de897b4c4652d5e Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 31 Aug 2020 14:00:23 -0400 Subject: [PATCH] [ML] refactor ml job node selection into its own class (#61521) (#61747) This is a minor refactor where the job node load logic (node availability, etc.) is refactored into its own class. This will allow future things (i.e. autoscaling decisions) to use the same node load detection class. backport of #61521 --- .../xpack/ml/job/JobNodeSelector.java | 168 ++++---------- .../xpack/ml/job/NodeLoadDetector.java | 215 ++++++++++++++++++ .../ml/job/JobNodeLoadDetectorTests.java | 148 ++++++++++++ 3 files changed, 408 insertions(+), 123 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java index f6f76cf5ec5..d7530107d46 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java @@ -8,19 +8,14 @@ package org.elasticsearch.xpack.ml.job; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; -import org.elasticsearch.xpack.core.ml.MlTasks; -import org.elasticsearch.xpack.core.ml.action.OpenJobAction; -import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; -import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; -import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; -import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -58,6 +53,7 @@ public class JobNodeSelector { private final ClusterState clusterState; private final MlMemoryTracker memoryTracker; private final Function nodeFilter; + private final NodeLoadDetector nodeLoadDetector; private final int maxLazyNodes; /** @@ -71,6 +67,7 @@ public class JobNodeSelector { this.taskName = Objects.requireNonNull(taskName); this.clusterState = Objects.requireNonNull(clusterState); this.memoryTracker = Objects.requireNonNull(memoryTracker); + this.nodeLoadDetector = new NodeLoadDetector(Objects.requireNonNull(memoryTracker)); this.maxLazyNodes = maxLazyNodes; this.nodeFilter = node -> { if (MachineLearning.isMlNode(node)) { @@ -109,38 +106,42 @@ public class JobNodeSelector { continue; } - // Assuming the node is eligible at all, check loading - CurrentLoad currentLoad = calculateCurrentLoadForNode(node, persistentTasks, allocateByMemory); - allocateByMemory = currentLoad.allocateByMemory; + NodeLoadDetector.NodeLoad currentLoad = nodeLoadDetector.detectNodeLoad( + clusterState, + allNodesHaveDynamicMaxWorkers, // Remove in 8.0.0 + node, + dynamicMaxOpenJobs, + maxMachineMemoryPercent, + allocateByMemory + ); + if (currentLoad.getError() != null) { + reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + + "], because [" + currentLoad.getError() + "]"; + logger.trace(reason); + reasons.add(reason); + continue; + } - if (currentLoad.numberOfAllocatingJobs >= maxConcurrentJobAllocations) { - reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because node exceeds [" - + currentLoad.numberOfAllocatingJobs + "] the maximum number of jobs [" + maxConcurrentJobAllocations + // Assuming the node is eligible at all, check loading + allocateByMemory = currentLoad.isUseMemory(); + int maxNumberOfOpenJobs = currentLoad.getMaxJobs(); + + if (currentLoad.getNumAllocatingJobs() >= maxConcurrentJobAllocations) { + reason = "Not opening job [" + + jobId + + "] on node [" + nodeNameAndMlAttributes(node) + "], because node exceeds [" + + currentLoad.getNumAllocatingJobs() + + "] the maximum number of jobs [" + maxConcurrentJobAllocations + "] in opening state"; logger.trace(reason); reasons.add(reason); continue; } - Map nodeAttributes = node.getAttributes(); - int maxNumberOfOpenJobs = dynamicMaxOpenJobs; - // TODO: remove this in 8.0.0 - if (allNodesHaveDynamicMaxWorkers == false) { - String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR); - try { - maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr); - } catch (NumberFormatException e) { - reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " + - MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer"; - logger.trace(reason); - reasons.add(reason); - continue; - } - } - long availableCount = maxNumberOfOpenJobs - currentLoad.numberOfAssignedJobs; + long availableCount = maxNumberOfOpenJobs - currentLoad.getNumAssignedJobs(); if (availableCount == 0) { reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) - + "], because this node is full. Number of opened jobs [" + currentLoad.numberOfAssignedJobs + + "], because this node is full. Number of opened jobs [" + currentLoad.getNumAssignedJobs() + "], " + MAX_OPEN_JOBS_PER_NODE.getKey() + " [" + maxNumberOfOpenJobs + "]"; logger.trace(reason); reasons.add(reason); @@ -152,33 +153,21 @@ public class JobNodeSelector { minLoadedNodeByCount = node; } - String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR); - long machineMemory; - try { - machineMemory = Long.parseLong(machineMemoryStr); - } catch (NumberFormatException e) { - reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " + - MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long"; - logger.trace(reason); - reasons.add(reason); - continue; - } - if (allocateByMemory) { - if (machineMemory > 0) { - long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100; + if (currentLoad.getMaxMlMemory() > 0) { Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId); if (estimatedMemoryFootprint != null) { // If this will be the first job assigned to the node then it will need to // load the native code shared libraries, so add the overhead for this - if (currentLoad.numberOfAssignedJobs == 0) { + if (currentLoad.getNumAssignedJobs() == 0) { estimatedMemoryFootprint += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(); } - long availableMemory = maxMlMemory - currentLoad.assignedJobMemory; + long availableMemory = currentLoad.getMaxMlMemory() - currentLoad.getAssignedJobMemory(); if (estimatedMemoryFootprint > availableMemory) { reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) - + "], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory - + "], memory required by existing jobs [" + currentLoad.assignedJobMemory + + "], because this node has insufficient available memory. Available memory for ML [" + + currentLoad.getMaxMlMemory() + + "], memory required by existing jobs [" + currentLoad.getAssignedJobMemory() + "], estimated memory required for this job [" + estimatedMemoryFootprint + "]"; logger.trace(reason); reasons.add(reason); @@ -193,15 +182,20 @@ public class JobNodeSelector { // If we cannot get the job memory requirement, // fall back to simply allocating by job count allocateByMemory = false; - logger.debug("Falling back to allocating job [{}] by job counts because its memory requirement was not available", - jobId); + logger.debug( + () -> new ParameterizedMessage( + "Falling back to allocating job [{}] by job counts because its memory requirement was not available", + jobId)); } } else { // If we cannot get the available memory on any machine in // the cluster, fall back to simply allocating by job count allocateByMemory = false; - logger.debug("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]", - jobId, nodeNameAndMlAttributes(node)); + logger.debug( + () -> new ParameterizedMessage( + "Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]", + jobId, + nodeNameAndMlAttributes(node))); } } } @@ -236,67 +230,6 @@ public class JobNodeSelector { return currentAssignment; } - private CurrentLoad calculateCurrentLoadForNode(DiscoveryNode node, PersistentTasksCustomMetadata persistentTasks, - final boolean allocateByMemory) { - CurrentLoad result = new CurrentLoad(allocateByMemory); - - if (persistentTasks != null) { - // find all the anomaly detector job tasks assigned to this node - Collection> assignedAnomalyDetectorTasks = persistentTasks.findTasks( - MlTasks.JOB_TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); - for (PersistentTasksCustomMetadata.PersistentTask assignedTask : assignedAnomalyDetectorTasks) { - JobState jobState = MlTasks.getJobStateModifiedForReassignments(assignedTask); - if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { - // Don't count CLOSED or FAILED jobs, as they don't consume native memory - ++result.numberOfAssignedJobs; - if (jobState == JobState.OPENING) { - ++result.numberOfAllocatingJobs; - } - OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams(); - Long jobMemoryRequirement = memoryTracker.getAnomalyDetectorJobMemoryRequirement(params.getJobId()); - if (jobMemoryRequirement == null) { - result.allocateByMemory = false; - logger.debug("Falling back to allocating job [{}] by job counts because " + - "the memory requirement for job [{}] was not available", jobId, params.getJobId()); - } else { - logger.debug("adding " + jobMemoryRequirement); - result.assignedJobMemory += jobMemoryRequirement; - } - } - } - // find all the data frame analytics job tasks assigned to this node - Collection> assignedAnalyticsTasks = persistentTasks.findTasks( - MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); - for (PersistentTasksCustomMetadata.PersistentTask assignedTask : assignedAnalyticsTasks) { - DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks.getDataFrameAnalyticsState(assignedTask); - - // Don't count stopped and failed df-analytics tasks as they don't consume native memory - if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) { - // The native process is only running in the ANALYZING and STOPPING states, but in the STARTED - // and REINDEXING states we're committed to using the memory soon, so account for it here - ++result.numberOfAssignedJobs; - StartDataFrameAnalyticsAction.TaskParams params = - (StartDataFrameAnalyticsAction.TaskParams) assignedTask.getParams(); - Long jobMemoryRequirement = memoryTracker.getDataFrameAnalyticsJobMemoryRequirement(params.getId()); - if (jobMemoryRequirement == null) { - result.allocateByMemory = false; - logger.debug("Falling back to allocating job [{}] by job counts because " + - "the memory requirement for job [{}] was not available", jobId, params.getId()); - } else { - result.assignedJobMemory += jobMemoryRequirement; - } - } - } - // if any jobs are running then the native code will be loaded, but shared between all jobs, - // so increase the total memory usage of the assigned jobs to account for this - if (result.numberOfAssignedJobs > 0) { - result.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(); - } - } - - return result; - } - static String nodeNameOrId(DiscoveryNode node) { String nodeNameOrID = node.getName(); if (Strings.isNullOrEmpty(nodeNameOrID)) { @@ -324,15 +257,4 @@ public class JobNodeSelector { return builder.toString(); } - private static class CurrentLoad { - - long numberOfAssignedJobs = 0; - long numberOfAllocatingJobs = 0; - long assignedJobMemory = 0; - boolean allocateByMemory; - - CurrentLoad(boolean allocateByMemory) { - this.allocateByMemory = allocateByMemory; - } - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java new file mode 100644 index 00000000000..0395b3df231 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java @@ -0,0 +1,215 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + + +public class NodeLoadDetector { + private static final Logger logger = LogManager.getLogger(NodeLoadDetector.class); + + private final MlMemoryTracker mlMemoryTracker; + + public NodeLoadDetector(MlMemoryTracker memoryTracker) { + this.mlMemoryTracker = memoryTracker; + } + + public MlMemoryTracker getMlMemoryTracker() { + return mlMemoryTracker; + } + + public NodeLoad detectNodeLoad(ClusterState clusterState, + boolean allNodesHaveDynamicMaxWorkers, + DiscoveryNode node, + int dynamicMaxOpenJobs, + int maxMachineMemoryPercent, + boolean isMemoryTrackerRecentlyRefreshed) { + PersistentTasksCustomMetadata persistentTasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + Map nodeAttributes = node.getAttributes(); + List errors = new ArrayList<>(); + int maxNumberOfOpenJobs = dynamicMaxOpenJobs; + // TODO: remove this in 8.0.0 + if (allNodesHaveDynamicMaxWorkers == false) { + String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR); + try { + maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr); + } catch (NumberFormatException e) { + errors.add(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer"); + maxNumberOfOpenJobs = -1; + } + } + String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR); + long machineMemory = -1; + try { + machineMemory = Long.parseLong(machineMemoryStr); + } catch (NumberFormatException e) { + errors.add(MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long"); + } + long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100; + + NodeLoad nodeLoad = new NodeLoad(node.getId(), maxMlMemory, maxNumberOfOpenJobs, isMemoryTrackerRecentlyRefreshed); + if (errors.isEmpty() == false) { + nodeLoad.error = Strings.collectionToCommaDelimitedString(errors); + return nodeLoad; + } + updateLoadGivenTasks(nodeLoad, persistentTasks); + return nodeLoad; + } + + private void updateLoadGivenTasks(NodeLoad nodeLoad, PersistentTasksCustomMetadata persistentTasks) { + if (persistentTasks != null) { + // find all the anomaly detector job tasks assigned to this node + Collection> assignedAnomalyDetectorTasks = persistentTasks.findTasks( + MlTasks.JOB_TASK_NAME, task -> nodeLoad.getNodeId().equals(task.getExecutorNode())); + for (PersistentTasksCustomMetadata.PersistentTask assignedTask : assignedAnomalyDetectorTasks) { + JobState jobState = MlTasks.getJobStateModifiedForReassignments(assignedTask); + if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { + // Don't count CLOSED or FAILED jobs, as they don't consume native memory + ++nodeLoad.numAssignedJobs; + if (jobState == JobState.OPENING) { + ++nodeLoad.numAllocatingJobs; + } + OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams(); + Long jobMemoryRequirement = mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(params.getJobId()); + if (jobMemoryRequirement == null) { + nodeLoad.useMemory = false; + logger.debug(() -> new ParameterizedMessage( + "[{}] memory requirement was not available. Calculating load by number of assigned jobs.", + params.getJobId() + )); + } else { + nodeLoad.assignedJobMemory += jobMemoryRequirement; + } + } + } + // find all the data frame analytics job tasks assigned to this node + Collection> assignedAnalyticsTasks = persistentTasks.findTasks( + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> nodeLoad.getNodeId().equals(task.getExecutorNode())); + for (PersistentTasksCustomMetadata.PersistentTask assignedTask : assignedAnalyticsTasks) { + DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks.getDataFrameAnalyticsState(assignedTask); + + // Don't count stopped and failed df-analytics tasks as they don't consume native memory + if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) { + // The native process is only running in the ANALYZING and STOPPING states, but in the STARTED + // and REINDEXING states we're committed to using the memory soon, so account for it here + ++nodeLoad.numAssignedJobs; + StartDataFrameAnalyticsAction.TaskParams params = + (StartDataFrameAnalyticsAction.TaskParams) assignedTask.getParams(); + Long jobMemoryRequirement = mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(params.getId()); + if (jobMemoryRequirement == null) { + nodeLoad.useMemory = false; + logger.debug(() -> new ParameterizedMessage( + "[{}] memory requirement was not available. Calculating load by number of assigned jobs.", + params.getId() + )); + } else { + nodeLoad.assignedJobMemory += jobMemoryRequirement; + } + } + } + // if any jobs are running then the native code will be loaded, but shared between all jobs, + // so increase the total memory usage of the assigned jobs to account for this + if (nodeLoad.numAssignedJobs > 0) { + nodeLoad.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(); + } + } + } + + public static class NodeLoad { + private final long maxMemory; + private final int maxJobs; + private final String nodeId; + private boolean useMemory; + private String error; + private long numAssignedJobs; + private long assignedJobMemory; + private long numAllocatingJobs; + + private NodeLoad(String nodeId, long maxMemory, int maxJobs, boolean useMemory) { + this.maxJobs = maxJobs; + this.maxMemory = maxMemory; + this.nodeId = nodeId; + this.useMemory = useMemory; + } + + /** + * @return The total number of assigned jobs + */ + public long getNumAssignedJobs() { + return numAssignedJobs; + } + + /** + * @return The total memory in bytes used by the assigned jobs. + */ + public long getAssignedJobMemory() { + return assignedJobMemory; + } + + /** + * @return The maximum memory on this node for jobs + */ + public long getMaxMlMemory() { + return maxMemory; + } + + /** + * @return The maximum number of jobs allowed on the node + */ + public int getMaxJobs() { + return maxJobs; + } + + /** + * @return returns `true` if the assignedJobMemory number is accurate + */ + public boolean isUseMemory() { + return useMemory; + } + + /** + * @return The node ID + */ + public String getNodeId() { + return nodeId; + } + + /** + * @return Returns a comma delimited string of errors if any were encountered. + */ + @Nullable + public String getError() { + return error; + } + + /** + * @return The current number of jobs allocating to the node + */ + public long getNumAllocatingJobs() { + return numAllocatingJobs; + } + } + +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java new file mode 100644 index 00000000000..e4b386c6150 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java @@ -0,0 +1,148 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; +import org.junit.Before; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +// TODO: in 8.0.0 remove all instances of MAX_OPEN_JOBS_NODE_ATTR from this file +public class JobNodeLoadDetectorTests extends ESTestCase { + + // To simplify the logic in this class all jobs have the same memory requirement + private static final ByteSizeValue JOB_MEMORY_REQUIREMENT = new ByteSizeValue(10, ByteSizeUnit.MB); + + private NodeLoadDetector nodeLoadDetector; + + @Before + public void setup() { + MlMemoryTracker memoryTracker = mock(MlMemoryTracker.class); + when(memoryTracker.isRecentlyRefreshed()).thenReturn(true); + when(memoryTracker.getAnomalyDetectorJobMemoryRequirement(anyString())).thenReturn(JOB_MEMORY_REQUIREMENT.getBytes()); + when(memoryTracker.getDataFrameAnalyticsJobMemoryRequirement(anyString())).thenReturn(JOB_MEMORY_REQUIREMENT.getBytes()); + when(memoryTracker.getJobMemoryRequirement(anyString(), anyString())).thenReturn(JOB_MEMORY_REQUIREMENT.getBytes()); + nodeLoadDetector = new NodeLoadDetector(memoryTracker); + } + + public void testNodeLoadDetection() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10"); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "-1"); + // MachineLearning.MACHINE_MEMORY_NODE_ATTR negative, so this will fall back to allocating by count + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name4", "_node_id4", new TransportAddress(InetAddress.getLoopbackAddress(), 9303), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder(); + TransportOpenJobActionTests.addJobTask("job_id1", "_node_id1", null, tasksBuilder); + TransportOpenJobActionTests.addJobTask("job_id2", "_node_id1", null, tasksBuilder); + TransportOpenJobActionTests.addJobTask("job_id3", "_node_id2", null, tasksBuilder); + TransportOpenJobActionTests.addJobTask("job_id4", "_node_id4", JobState.OPENED, tasksBuilder); + PersistentTasksCustomMetadata tasks = tasksBuilder.build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + Metadata.Builder metadata = Metadata.builder(); + metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks); + cs.metadata(metadata); + + NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id1"), 10, -1, true); + assertThat(load.getAssignedJobMemory(), equalTo(52428800L)); + assertThat(load.getNumAllocatingJobs(), equalTo(2L)); + assertThat(load.getNumAssignedJobs(), equalTo(2L)); + assertThat(load.getMaxJobs(), equalTo(10)); + assertThat(load.getMaxMlMemory(), equalTo(0L)); + + load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id2"), 5, -1, true); + assertThat(load.getAssignedJobMemory(), equalTo(41943040L)); + assertThat(load.getNumAllocatingJobs(), equalTo(1L)); + assertThat(load.getNumAssignedJobs(), equalTo(1L)); + assertThat(load.getMaxJobs(), equalTo(5)); + assertThat(load.getMaxMlMemory(), equalTo(0L)); + + load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id3"), 5, -1, true); + assertThat(load.getAssignedJobMemory(), equalTo(0L)); + assertThat(load.getNumAllocatingJobs(), equalTo(0L)); + assertThat(load.getNumAssignedJobs(), equalTo(0L)); + assertThat(load.getMaxJobs(), equalTo(5)); + assertThat(load.getMaxMlMemory(), equalTo(0L)); + + load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id4"), 5, -1, true); + assertThat(load.getAssignedJobMemory(), equalTo(41943040L)); + assertThat(load.getNumAllocatingJobs(), equalTo(0L)); + assertThat(load.getNumAssignedJobs(), equalTo(1L)); + assertThat(load.getMaxJobs(), equalTo(5)); + assertThat(load.getMaxMlMemory(), equalTo(0L)); + } + + public void testNodeLoadDetection_withBadMaxOpenJobsAttribute() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "foo"); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "-1"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + Metadata.Builder metadata = Metadata.builder(); + cs.metadata(metadata); + + NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, -1, true); + assertThat(load.getError(), containsString("ml.max_open_jobs attribute [foo] is not an integer")); + } + + public void testNodeLoadDetection_withBadMachineMemoryAttribute() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10"); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "bar"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + Metadata.Builder metadata = Metadata.builder(); + cs.metadata(metadata); + + NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, -1, true); + assertThat(load.getError(), containsString("ml.machine_memory attribute [bar] is not a long")); + } + +}