diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java index f6f76cf5ec5..d7530107d46 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java @@ -8,19 +8,14 @@ package org.elasticsearch.xpack.ml.job; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; -import org.elasticsearch.xpack.core.ml.MlTasks; -import org.elasticsearch.xpack.core.ml.action.OpenJobAction; -import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; -import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; -import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; -import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -58,6 +53,7 @@ public class JobNodeSelector { private final ClusterState clusterState; private final MlMemoryTracker memoryTracker; private final Function nodeFilter; + private final NodeLoadDetector nodeLoadDetector; private final int maxLazyNodes; /** @@ -71,6 +67,7 @@ public class JobNodeSelector { this.taskName = Objects.requireNonNull(taskName); this.clusterState = Objects.requireNonNull(clusterState); this.memoryTracker = Objects.requireNonNull(memoryTracker); + this.nodeLoadDetector = new NodeLoadDetector(Objects.requireNonNull(memoryTracker)); this.maxLazyNodes = maxLazyNodes; this.nodeFilter = node -> { if (MachineLearning.isMlNode(node)) { @@ -109,38 +106,42 @@ public class JobNodeSelector { continue; } - // Assuming the node is eligible at all, check loading - CurrentLoad currentLoad = calculateCurrentLoadForNode(node, persistentTasks, allocateByMemory); - allocateByMemory = currentLoad.allocateByMemory; + NodeLoadDetector.NodeLoad currentLoad = nodeLoadDetector.detectNodeLoad( + clusterState, + allNodesHaveDynamicMaxWorkers, // Remove in 8.0.0 + node, + dynamicMaxOpenJobs, + maxMachineMemoryPercent, + allocateByMemory + ); + if (currentLoad.getError() != null) { + reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + + "], because [" + currentLoad.getError() + "]"; + logger.trace(reason); + reasons.add(reason); + continue; + } - if (currentLoad.numberOfAllocatingJobs >= maxConcurrentJobAllocations) { - reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because node exceeds [" - + currentLoad.numberOfAllocatingJobs + "] the maximum number of jobs [" + maxConcurrentJobAllocations + // Assuming the node is eligible at all, check loading + allocateByMemory = currentLoad.isUseMemory(); + int maxNumberOfOpenJobs = currentLoad.getMaxJobs(); + + if (currentLoad.getNumAllocatingJobs() >= maxConcurrentJobAllocations) { + reason = "Not opening job [" + + jobId + + "] on node [" + nodeNameAndMlAttributes(node) + "], because node exceeds [" + + currentLoad.getNumAllocatingJobs() + + "] the maximum number of jobs [" + maxConcurrentJobAllocations + "] in opening state"; logger.trace(reason); reasons.add(reason); continue; } - Map nodeAttributes = node.getAttributes(); - int maxNumberOfOpenJobs = dynamicMaxOpenJobs; - // TODO: remove this in 8.0.0 - if (allNodesHaveDynamicMaxWorkers == false) { - String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR); - try { - maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr); - } catch (NumberFormatException e) { - reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " + - MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer"; - logger.trace(reason); - reasons.add(reason); - continue; - } - } - long availableCount = maxNumberOfOpenJobs - currentLoad.numberOfAssignedJobs; + long availableCount = maxNumberOfOpenJobs - currentLoad.getNumAssignedJobs(); if (availableCount == 0) { reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) - + "], because this node is full. Number of opened jobs [" + currentLoad.numberOfAssignedJobs + + "], because this node is full. Number of opened jobs [" + currentLoad.getNumAssignedJobs() + "], " + MAX_OPEN_JOBS_PER_NODE.getKey() + " [" + maxNumberOfOpenJobs + "]"; logger.trace(reason); reasons.add(reason); @@ -152,33 +153,21 @@ public class JobNodeSelector { minLoadedNodeByCount = node; } - String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR); - long machineMemory; - try { - machineMemory = Long.parseLong(machineMemoryStr); - } catch (NumberFormatException e) { - reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " + - MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long"; - logger.trace(reason); - reasons.add(reason); - continue; - } - if (allocateByMemory) { - if (machineMemory > 0) { - long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100; + if (currentLoad.getMaxMlMemory() > 0) { Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId); if (estimatedMemoryFootprint != null) { // If this will be the first job assigned to the node then it will need to // load the native code shared libraries, so add the overhead for this - if (currentLoad.numberOfAssignedJobs == 0) { + if (currentLoad.getNumAssignedJobs() == 0) { estimatedMemoryFootprint += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(); } - long availableMemory = maxMlMemory - currentLoad.assignedJobMemory; + long availableMemory = currentLoad.getMaxMlMemory() - currentLoad.getAssignedJobMemory(); if (estimatedMemoryFootprint > availableMemory) { reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) - + "], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory - + "], memory required by existing jobs [" + currentLoad.assignedJobMemory + + "], because this node has insufficient available memory. Available memory for ML [" + + currentLoad.getMaxMlMemory() + + "], memory required by existing jobs [" + currentLoad.getAssignedJobMemory() + "], estimated memory required for this job [" + estimatedMemoryFootprint + "]"; logger.trace(reason); reasons.add(reason); @@ -193,15 +182,20 @@ public class JobNodeSelector { // If we cannot get the job memory requirement, // fall back to simply allocating by job count allocateByMemory = false; - logger.debug("Falling back to allocating job [{}] by job counts because its memory requirement was not available", - jobId); + logger.debug( + () -> new ParameterizedMessage( + "Falling back to allocating job [{}] by job counts because its memory requirement was not available", + jobId)); } } else { // If we cannot get the available memory on any machine in // the cluster, fall back to simply allocating by job count allocateByMemory = false; - logger.debug("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]", - jobId, nodeNameAndMlAttributes(node)); + logger.debug( + () -> new ParameterizedMessage( + "Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]", + jobId, + nodeNameAndMlAttributes(node))); } } } @@ -236,67 +230,6 @@ public class JobNodeSelector { return currentAssignment; } - private CurrentLoad calculateCurrentLoadForNode(DiscoveryNode node, PersistentTasksCustomMetadata persistentTasks, - final boolean allocateByMemory) { - CurrentLoad result = new CurrentLoad(allocateByMemory); - - if (persistentTasks != null) { - // find all the anomaly detector job tasks assigned to this node - Collection> assignedAnomalyDetectorTasks = persistentTasks.findTasks( - MlTasks.JOB_TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); - for (PersistentTasksCustomMetadata.PersistentTask assignedTask : assignedAnomalyDetectorTasks) { - JobState jobState = MlTasks.getJobStateModifiedForReassignments(assignedTask); - if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { - // Don't count CLOSED or FAILED jobs, as they don't consume native memory - ++result.numberOfAssignedJobs; - if (jobState == JobState.OPENING) { - ++result.numberOfAllocatingJobs; - } - OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams(); - Long jobMemoryRequirement = memoryTracker.getAnomalyDetectorJobMemoryRequirement(params.getJobId()); - if (jobMemoryRequirement == null) { - result.allocateByMemory = false; - logger.debug("Falling back to allocating job [{}] by job counts because " + - "the memory requirement for job [{}] was not available", jobId, params.getJobId()); - } else { - logger.debug("adding " + jobMemoryRequirement); - result.assignedJobMemory += jobMemoryRequirement; - } - } - } - // find all the data frame analytics job tasks assigned to this node - Collection> assignedAnalyticsTasks = persistentTasks.findTasks( - MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); - for (PersistentTasksCustomMetadata.PersistentTask assignedTask : assignedAnalyticsTasks) { - DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks.getDataFrameAnalyticsState(assignedTask); - - // Don't count stopped and failed df-analytics tasks as they don't consume native memory - if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) { - // The native process is only running in the ANALYZING and STOPPING states, but in the STARTED - // and REINDEXING states we're committed to using the memory soon, so account for it here - ++result.numberOfAssignedJobs; - StartDataFrameAnalyticsAction.TaskParams params = - (StartDataFrameAnalyticsAction.TaskParams) assignedTask.getParams(); - Long jobMemoryRequirement = memoryTracker.getDataFrameAnalyticsJobMemoryRequirement(params.getId()); - if (jobMemoryRequirement == null) { - result.allocateByMemory = false; - logger.debug("Falling back to allocating job [{}] by job counts because " + - "the memory requirement for job [{}] was not available", jobId, params.getId()); - } else { - result.assignedJobMemory += jobMemoryRequirement; - } - } - } - // if any jobs are running then the native code will be loaded, but shared between all jobs, - // so increase the total memory usage of the assigned jobs to account for this - if (result.numberOfAssignedJobs > 0) { - result.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(); - } - } - - return result; - } - static String nodeNameOrId(DiscoveryNode node) { String nodeNameOrID = node.getName(); if (Strings.isNullOrEmpty(nodeNameOrID)) { @@ -324,15 +257,4 @@ public class JobNodeSelector { return builder.toString(); } - private static class CurrentLoad { - - long numberOfAssignedJobs = 0; - long numberOfAllocatingJobs = 0; - long assignedJobMemory = 0; - boolean allocateByMemory; - - CurrentLoad(boolean allocateByMemory) { - this.allocateByMemory = allocateByMemory; - } - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java new file mode 100644 index 00000000000..0395b3df231 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java @@ -0,0 +1,215 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + + +public class NodeLoadDetector { + private static final Logger logger = LogManager.getLogger(NodeLoadDetector.class); + + private final MlMemoryTracker mlMemoryTracker; + + public NodeLoadDetector(MlMemoryTracker memoryTracker) { + this.mlMemoryTracker = memoryTracker; + } + + public MlMemoryTracker getMlMemoryTracker() { + return mlMemoryTracker; + } + + public NodeLoad detectNodeLoad(ClusterState clusterState, + boolean allNodesHaveDynamicMaxWorkers, + DiscoveryNode node, + int dynamicMaxOpenJobs, + int maxMachineMemoryPercent, + boolean isMemoryTrackerRecentlyRefreshed) { + PersistentTasksCustomMetadata persistentTasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + Map nodeAttributes = node.getAttributes(); + List errors = new ArrayList<>(); + int maxNumberOfOpenJobs = dynamicMaxOpenJobs; + // TODO: remove this in 8.0.0 + if (allNodesHaveDynamicMaxWorkers == false) { + String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR); + try { + maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr); + } catch (NumberFormatException e) { + errors.add(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer"); + maxNumberOfOpenJobs = -1; + } + } + String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR); + long machineMemory = -1; + try { + machineMemory = Long.parseLong(machineMemoryStr); + } catch (NumberFormatException e) { + errors.add(MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long"); + } + long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100; + + NodeLoad nodeLoad = new NodeLoad(node.getId(), maxMlMemory, maxNumberOfOpenJobs, isMemoryTrackerRecentlyRefreshed); + if (errors.isEmpty() == false) { + nodeLoad.error = Strings.collectionToCommaDelimitedString(errors); + return nodeLoad; + } + updateLoadGivenTasks(nodeLoad, persistentTasks); + return nodeLoad; + } + + private void updateLoadGivenTasks(NodeLoad nodeLoad, PersistentTasksCustomMetadata persistentTasks) { + if (persistentTasks != null) { + // find all the anomaly detector job tasks assigned to this node + Collection> assignedAnomalyDetectorTasks = persistentTasks.findTasks( + MlTasks.JOB_TASK_NAME, task -> nodeLoad.getNodeId().equals(task.getExecutorNode())); + for (PersistentTasksCustomMetadata.PersistentTask assignedTask : assignedAnomalyDetectorTasks) { + JobState jobState = MlTasks.getJobStateModifiedForReassignments(assignedTask); + if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { + // Don't count CLOSED or FAILED jobs, as they don't consume native memory + ++nodeLoad.numAssignedJobs; + if (jobState == JobState.OPENING) { + ++nodeLoad.numAllocatingJobs; + } + OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams(); + Long jobMemoryRequirement = mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(params.getJobId()); + if (jobMemoryRequirement == null) { + nodeLoad.useMemory = false; + logger.debug(() -> new ParameterizedMessage( + "[{}] memory requirement was not available. Calculating load by number of assigned jobs.", + params.getJobId() + )); + } else { + nodeLoad.assignedJobMemory += jobMemoryRequirement; + } + } + } + // find all the data frame analytics job tasks assigned to this node + Collection> assignedAnalyticsTasks = persistentTasks.findTasks( + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> nodeLoad.getNodeId().equals(task.getExecutorNode())); + for (PersistentTasksCustomMetadata.PersistentTask assignedTask : assignedAnalyticsTasks) { + DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks.getDataFrameAnalyticsState(assignedTask); + + // Don't count stopped and failed df-analytics tasks as they don't consume native memory + if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) { + // The native process is only running in the ANALYZING and STOPPING states, but in the STARTED + // and REINDEXING states we're committed to using the memory soon, so account for it here + ++nodeLoad.numAssignedJobs; + StartDataFrameAnalyticsAction.TaskParams params = + (StartDataFrameAnalyticsAction.TaskParams) assignedTask.getParams(); + Long jobMemoryRequirement = mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(params.getId()); + if (jobMemoryRequirement == null) { + nodeLoad.useMemory = false; + logger.debug(() -> new ParameterizedMessage( + "[{}] memory requirement was not available. Calculating load by number of assigned jobs.", + params.getId() + )); + } else { + nodeLoad.assignedJobMemory += jobMemoryRequirement; + } + } + } + // if any jobs are running then the native code will be loaded, but shared between all jobs, + // so increase the total memory usage of the assigned jobs to account for this + if (nodeLoad.numAssignedJobs > 0) { + nodeLoad.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(); + } + } + } + + public static class NodeLoad { + private final long maxMemory; + private final int maxJobs; + private final String nodeId; + private boolean useMemory; + private String error; + private long numAssignedJobs; + private long assignedJobMemory; + private long numAllocatingJobs; + + private NodeLoad(String nodeId, long maxMemory, int maxJobs, boolean useMemory) { + this.maxJobs = maxJobs; + this.maxMemory = maxMemory; + this.nodeId = nodeId; + this.useMemory = useMemory; + } + + /** + * @return The total number of assigned jobs + */ + public long getNumAssignedJobs() { + return numAssignedJobs; + } + + /** + * @return The total memory in bytes used by the assigned jobs. + */ + public long getAssignedJobMemory() { + return assignedJobMemory; + } + + /** + * @return The maximum memory on this node for jobs + */ + public long getMaxMlMemory() { + return maxMemory; + } + + /** + * @return The maximum number of jobs allowed on the node + */ + public int getMaxJobs() { + return maxJobs; + } + + /** + * @return returns `true` if the assignedJobMemory number is accurate + */ + public boolean isUseMemory() { + return useMemory; + } + + /** + * @return The node ID + */ + public String getNodeId() { + return nodeId; + } + + /** + * @return Returns a comma delimited string of errors if any were encountered. + */ + @Nullable + public String getError() { + return error; + } + + /** + * @return The current number of jobs allocating to the node + */ + public long getNumAllocatingJobs() { + return numAllocatingJobs; + } + } + +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java new file mode 100644 index 00000000000..e4b386c6150 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java @@ -0,0 +1,148 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; +import org.junit.Before; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +// TODO: in 8.0.0 remove all instances of MAX_OPEN_JOBS_NODE_ATTR from this file +public class JobNodeLoadDetectorTests extends ESTestCase { + + // To simplify the logic in this class all jobs have the same memory requirement + private static final ByteSizeValue JOB_MEMORY_REQUIREMENT = new ByteSizeValue(10, ByteSizeUnit.MB); + + private NodeLoadDetector nodeLoadDetector; + + @Before + public void setup() { + MlMemoryTracker memoryTracker = mock(MlMemoryTracker.class); + when(memoryTracker.isRecentlyRefreshed()).thenReturn(true); + when(memoryTracker.getAnomalyDetectorJobMemoryRequirement(anyString())).thenReturn(JOB_MEMORY_REQUIREMENT.getBytes()); + when(memoryTracker.getDataFrameAnalyticsJobMemoryRequirement(anyString())).thenReturn(JOB_MEMORY_REQUIREMENT.getBytes()); + when(memoryTracker.getJobMemoryRequirement(anyString(), anyString())).thenReturn(JOB_MEMORY_REQUIREMENT.getBytes()); + nodeLoadDetector = new NodeLoadDetector(memoryTracker); + } + + public void testNodeLoadDetection() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10"); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "-1"); + // MachineLearning.MACHINE_MEMORY_NODE_ATTR negative, so this will fall back to allocating by count + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name4", "_node_id4", new TransportAddress(InetAddress.getLoopbackAddress(), 9303), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder(); + TransportOpenJobActionTests.addJobTask("job_id1", "_node_id1", null, tasksBuilder); + TransportOpenJobActionTests.addJobTask("job_id2", "_node_id1", null, tasksBuilder); + TransportOpenJobActionTests.addJobTask("job_id3", "_node_id2", null, tasksBuilder); + TransportOpenJobActionTests.addJobTask("job_id4", "_node_id4", JobState.OPENED, tasksBuilder); + PersistentTasksCustomMetadata tasks = tasksBuilder.build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + Metadata.Builder metadata = Metadata.builder(); + metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks); + cs.metadata(metadata); + + NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id1"), 10, -1, true); + assertThat(load.getAssignedJobMemory(), equalTo(52428800L)); + assertThat(load.getNumAllocatingJobs(), equalTo(2L)); + assertThat(load.getNumAssignedJobs(), equalTo(2L)); + assertThat(load.getMaxJobs(), equalTo(10)); + assertThat(load.getMaxMlMemory(), equalTo(0L)); + + load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id2"), 5, -1, true); + assertThat(load.getAssignedJobMemory(), equalTo(41943040L)); + assertThat(load.getNumAllocatingJobs(), equalTo(1L)); + assertThat(load.getNumAssignedJobs(), equalTo(1L)); + assertThat(load.getMaxJobs(), equalTo(5)); + assertThat(load.getMaxMlMemory(), equalTo(0L)); + + load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id3"), 5, -1, true); + assertThat(load.getAssignedJobMemory(), equalTo(0L)); + assertThat(load.getNumAllocatingJobs(), equalTo(0L)); + assertThat(load.getNumAssignedJobs(), equalTo(0L)); + assertThat(load.getMaxJobs(), equalTo(5)); + assertThat(load.getMaxMlMemory(), equalTo(0L)); + + load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id4"), 5, -1, true); + assertThat(load.getAssignedJobMemory(), equalTo(41943040L)); + assertThat(load.getNumAllocatingJobs(), equalTo(0L)); + assertThat(load.getNumAssignedJobs(), equalTo(1L)); + assertThat(load.getMaxJobs(), equalTo(5)); + assertThat(load.getMaxMlMemory(), equalTo(0L)); + } + + public void testNodeLoadDetection_withBadMaxOpenJobsAttribute() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "foo"); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "-1"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + Metadata.Builder metadata = Metadata.builder(); + cs.metadata(metadata); + + NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, -1, true); + assertThat(load.getError(), containsString("ml.max_open_jobs attribute [foo] is not an integer")); + } + + public void testNodeLoadDetection_withBadMachineMemoryAttribute() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10"); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "bar"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + Metadata.Builder metadata = Metadata.builder(); + cs.metadata(metadata); + + NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, -1, true); + assertThat(load.getError(), containsString("ml.machine_memory attribute [bar] is not a long")); + } + +}