[ML] refactor ml job node selection into its own class (#61521) (#61747)

This is a minor refactor where the job node load logic (node availability, etc.) is refactored into its own class.

This will allow future things (i.e. autoscaling decisions) to use the same node load detection class.
backport of #61521
This commit is contained in:
Benjamin Trent 2020-08-31 14:00:23 -04:00 committed by GitHub
parent 8b33d8813a
commit 7dabaad7d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 408 additions and 123 deletions

View File

@ -8,19 +8,14 @@ package org.elasticsearch.xpack.ml.job;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -58,6 +53,7 @@ public class JobNodeSelector {
private final ClusterState clusterState;
private final MlMemoryTracker memoryTracker;
private final Function<DiscoveryNode, String> nodeFilter;
private final NodeLoadDetector nodeLoadDetector;
private final int maxLazyNodes;
/**
@ -71,6 +67,7 @@ public class JobNodeSelector {
this.taskName = Objects.requireNonNull(taskName);
this.clusterState = Objects.requireNonNull(clusterState);
this.memoryTracker = Objects.requireNonNull(memoryTracker);
this.nodeLoadDetector = new NodeLoadDetector(Objects.requireNonNull(memoryTracker));
this.maxLazyNodes = maxLazyNodes;
this.nodeFilter = node -> {
if (MachineLearning.isMlNode(node)) {
@ -109,38 +106,42 @@ public class JobNodeSelector {
continue;
}
// Assuming the node is eligible at all, check loading
CurrentLoad currentLoad = calculateCurrentLoadForNode(node, persistentTasks, allocateByMemory);
allocateByMemory = currentLoad.allocateByMemory;
NodeLoadDetector.NodeLoad currentLoad = nodeLoadDetector.detectNodeLoad(
clusterState,
allNodesHaveDynamicMaxWorkers, // Remove in 8.0.0
node,
dynamicMaxOpenJobs,
maxMachineMemoryPercent,
allocateByMemory
);
if (currentLoad.getError() != null) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
+ "], because [" + currentLoad.getError() + "]";
logger.trace(reason);
reasons.add(reason);
continue;
}
if (currentLoad.numberOfAllocatingJobs >= maxConcurrentJobAllocations) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because node exceeds ["
+ currentLoad.numberOfAllocatingJobs + "] the maximum number of jobs [" + maxConcurrentJobAllocations
// Assuming the node is eligible at all, check loading
allocateByMemory = currentLoad.isUseMemory();
int maxNumberOfOpenJobs = currentLoad.getMaxJobs();
if (currentLoad.getNumAllocatingJobs() >= maxConcurrentJobAllocations) {
reason = "Not opening job ["
+ jobId
+ "] on node [" + nodeNameAndMlAttributes(node) + "], because node exceeds ["
+ currentLoad.getNumAllocatingJobs()
+ "] the maximum number of jobs [" + maxConcurrentJobAllocations
+ "] in opening state";
logger.trace(reason);
reasons.add(reason);
continue;
}
Map<String, String> nodeAttributes = node.getAttributes();
int maxNumberOfOpenJobs = dynamicMaxOpenJobs;
// TODO: remove this in 8.0.0
if (allNodesHaveDynamicMaxWorkers == false) {
String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR);
try {
maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr);
} catch (NumberFormatException e) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " +
MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer";
logger.trace(reason);
reasons.add(reason);
continue;
}
}
long availableCount = maxNumberOfOpenJobs - currentLoad.numberOfAssignedJobs;
long availableCount = maxNumberOfOpenJobs - currentLoad.getNumAssignedJobs();
if (availableCount == 0) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
+ "], because this node is full. Number of opened jobs [" + currentLoad.numberOfAssignedJobs
+ "], because this node is full. Number of opened jobs [" + currentLoad.getNumAssignedJobs()
+ "], " + MAX_OPEN_JOBS_PER_NODE.getKey() + " [" + maxNumberOfOpenJobs + "]";
logger.trace(reason);
reasons.add(reason);
@ -152,33 +153,21 @@ public class JobNodeSelector {
minLoadedNodeByCount = node;
}
String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR);
long machineMemory;
try {
machineMemory = Long.parseLong(machineMemoryStr);
} catch (NumberFormatException e) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " +
MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long";
logger.trace(reason);
reasons.add(reason);
continue;
}
if (allocateByMemory) {
if (machineMemory > 0) {
long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100;
if (currentLoad.getMaxMlMemory() > 0) {
Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId);
if (estimatedMemoryFootprint != null) {
// If this will be the first job assigned to the node then it will need to
// load the native code shared libraries, so add the overhead for this
if (currentLoad.numberOfAssignedJobs == 0) {
if (currentLoad.getNumAssignedJobs() == 0) {
estimatedMemoryFootprint += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
}
long availableMemory = maxMlMemory - currentLoad.assignedJobMemory;
long availableMemory = currentLoad.getMaxMlMemory() - currentLoad.getAssignedJobMemory();
if (estimatedMemoryFootprint > availableMemory) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
+ "], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory
+ "], memory required by existing jobs [" + currentLoad.assignedJobMemory
+ "], because this node has insufficient available memory. Available memory for ML ["
+ currentLoad.getMaxMlMemory()
+ "], memory required by existing jobs [" + currentLoad.getAssignedJobMemory()
+ "], estimated memory required for this job [" + estimatedMemoryFootprint + "]";
logger.trace(reason);
reasons.add(reason);
@ -193,15 +182,20 @@ public class JobNodeSelector {
// If we cannot get the job memory requirement,
// fall back to simply allocating by job count
allocateByMemory = false;
logger.debug("Falling back to allocating job [{}] by job counts because its memory requirement was not available",
jobId);
logger.debug(
() -> new ParameterizedMessage(
"Falling back to allocating job [{}] by job counts because its memory requirement was not available",
jobId));
}
} else {
// If we cannot get the available memory on any machine in
// the cluster, fall back to simply allocating by job count
allocateByMemory = false;
logger.debug("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]",
jobId, nodeNameAndMlAttributes(node));
logger.debug(
() -> new ParameterizedMessage(
"Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]",
jobId,
nodeNameAndMlAttributes(node)));
}
}
}
@ -236,67 +230,6 @@ public class JobNodeSelector {
return currentAssignment;
}
private CurrentLoad calculateCurrentLoadForNode(DiscoveryNode node, PersistentTasksCustomMetadata persistentTasks,
final boolean allocateByMemory) {
CurrentLoad result = new CurrentLoad(allocateByMemory);
if (persistentTasks != null) {
// find all the anomaly detector job tasks assigned to this node
Collection<PersistentTasksCustomMetadata.PersistentTask<?>> assignedAnomalyDetectorTasks = persistentTasks.findTasks(
MlTasks.JOB_TASK_NAME, task -> node.getId().equals(task.getExecutorNode()));
for (PersistentTasksCustomMetadata.PersistentTask<?> assignedTask : assignedAnomalyDetectorTasks) {
JobState jobState = MlTasks.getJobStateModifiedForReassignments(assignedTask);
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
// Don't count CLOSED or FAILED jobs, as they don't consume native memory
++result.numberOfAssignedJobs;
if (jobState == JobState.OPENING) {
++result.numberOfAllocatingJobs;
}
OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams();
Long jobMemoryRequirement = memoryTracker.getAnomalyDetectorJobMemoryRequirement(params.getJobId());
if (jobMemoryRequirement == null) {
result.allocateByMemory = false;
logger.debug("Falling back to allocating job [{}] by job counts because " +
"the memory requirement for job [{}] was not available", jobId, params.getJobId());
} else {
logger.debug("adding " + jobMemoryRequirement);
result.assignedJobMemory += jobMemoryRequirement;
}
}
}
// find all the data frame analytics job tasks assigned to this node
Collection<PersistentTasksCustomMetadata.PersistentTask<?>> assignedAnalyticsTasks = persistentTasks.findTasks(
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> node.getId().equals(task.getExecutorNode()));
for (PersistentTasksCustomMetadata.PersistentTask<?> assignedTask : assignedAnalyticsTasks) {
DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks.getDataFrameAnalyticsState(assignedTask);
// Don't count stopped and failed df-analytics tasks as they don't consume native memory
if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) {
// The native process is only running in the ANALYZING and STOPPING states, but in the STARTED
// and REINDEXING states we're committed to using the memory soon, so account for it here
++result.numberOfAssignedJobs;
StartDataFrameAnalyticsAction.TaskParams params =
(StartDataFrameAnalyticsAction.TaskParams) assignedTask.getParams();
Long jobMemoryRequirement = memoryTracker.getDataFrameAnalyticsJobMemoryRequirement(params.getId());
if (jobMemoryRequirement == null) {
result.allocateByMemory = false;
logger.debug("Falling back to allocating job [{}] by job counts because " +
"the memory requirement for job [{}] was not available", jobId, params.getId());
} else {
result.assignedJobMemory += jobMemoryRequirement;
}
}
}
// if any jobs are running then the native code will be loaded, but shared between all jobs,
// so increase the total memory usage of the assigned jobs to account for this
if (result.numberOfAssignedJobs > 0) {
result.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
}
}
return result;
}
static String nodeNameOrId(DiscoveryNode node) {
String nodeNameOrID = node.getName();
if (Strings.isNullOrEmpty(nodeNameOrID)) {
@ -324,15 +257,4 @@ public class JobNodeSelector {
return builder.toString();
}
private static class CurrentLoad {
long numberOfAssignedJobs = 0;
long numberOfAllocatingJobs = 0;
long assignedJobMemory = 0;
boolean allocateByMemory;
CurrentLoad(boolean allocateByMemory) {
this.allocateByMemory = allocateByMemory;
}
}
}

View File

@ -0,0 +1,215 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public class NodeLoadDetector {
private static final Logger logger = LogManager.getLogger(NodeLoadDetector.class);
private final MlMemoryTracker mlMemoryTracker;
public NodeLoadDetector(MlMemoryTracker memoryTracker) {
this.mlMemoryTracker = memoryTracker;
}
public MlMemoryTracker getMlMemoryTracker() {
return mlMemoryTracker;
}
public NodeLoad detectNodeLoad(ClusterState clusterState,
boolean allNodesHaveDynamicMaxWorkers,
DiscoveryNode node,
int dynamicMaxOpenJobs,
int maxMachineMemoryPercent,
boolean isMemoryTrackerRecentlyRefreshed) {
PersistentTasksCustomMetadata persistentTasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
Map<String, String> nodeAttributes = node.getAttributes();
List<String> errors = new ArrayList<>();
int maxNumberOfOpenJobs = dynamicMaxOpenJobs;
// TODO: remove this in 8.0.0
if (allNodesHaveDynamicMaxWorkers == false) {
String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR);
try {
maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr);
} catch (NumberFormatException e) {
errors.add(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer");
maxNumberOfOpenJobs = -1;
}
}
String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR);
long machineMemory = -1;
try {
machineMemory = Long.parseLong(machineMemoryStr);
} catch (NumberFormatException e) {
errors.add(MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long");
}
long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100;
NodeLoad nodeLoad = new NodeLoad(node.getId(), maxMlMemory, maxNumberOfOpenJobs, isMemoryTrackerRecentlyRefreshed);
if (errors.isEmpty() == false) {
nodeLoad.error = Strings.collectionToCommaDelimitedString(errors);
return nodeLoad;
}
updateLoadGivenTasks(nodeLoad, persistentTasks);
return nodeLoad;
}
private void updateLoadGivenTasks(NodeLoad nodeLoad, PersistentTasksCustomMetadata persistentTasks) {
if (persistentTasks != null) {
// find all the anomaly detector job tasks assigned to this node
Collection<PersistentTasksCustomMetadata.PersistentTask<?>> assignedAnomalyDetectorTasks = persistentTasks.findTasks(
MlTasks.JOB_TASK_NAME, task -> nodeLoad.getNodeId().equals(task.getExecutorNode()));
for (PersistentTasksCustomMetadata.PersistentTask<?> assignedTask : assignedAnomalyDetectorTasks) {
JobState jobState = MlTasks.getJobStateModifiedForReassignments(assignedTask);
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
// Don't count CLOSED or FAILED jobs, as they don't consume native memory
++nodeLoad.numAssignedJobs;
if (jobState == JobState.OPENING) {
++nodeLoad.numAllocatingJobs;
}
OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams();
Long jobMemoryRequirement = mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(params.getJobId());
if (jobMemoryRequirement == null) {
nodeLoad.useMemory = false;
logger.debug(() -> new ParameterizedMessage(
"[{}] memory requirement was not available. Calculating load by number of assigned jobs.",
params.getJobId()
));
} else {
nodeLoad.assignedJobMemory += jobMemoryRequirement;
}
}
}
// find all the data frame analytics job tasks assigned to this node
Collection<PersistentTasksCustomMetadata.PersistentTask<?>> assignedAnalyticsTasks = persistentTasks.findTasks(
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> nodeLoad.getNodeId().equals(task.getExecutorNode()));
for (PersistentTasksCustomMetadata.PersistentTask<?> assignedTask : assignedAnalyticsTasks) {
DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks.getDataFrameAnalyticsState(assignedTask);
// Don't count stopped and failed df-analytics tasks as they don't consume native memory
if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) {
// The native process is only running in the ANALYZING and STOPPING states, but in the STARTED
// and REINDEXING states we're committed to using the memory soon, so account for it here
++nodeLoad.numAssignedJobs;
StartDataFrameAnalyticsAction.TaskParams params =
(StartDataFrameAnalyticsAction.TaskParams) assignedTask.getParams();
Long jobMemoryRequirement = mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(params.getId());
if (jobMemoryRequirement == null) {
nodeLoad.useMemory = false;
logger.debug(() -> new ParameterizedMessage(
"[{}] memory requirement was not available. Calculating load by number of assigned jobs.",
params.getId()
));
} else {
nodeLoad.assignedJobMemory += jobMemoryRequirement;
}
}
}
// if any jobs are running then the native code will be loaded, but shared between all jobs,
// so increase the total memory usage of the assigned jobs to account for this
if (nodeLoad.numAssignedJobs > 0) {
nodeLoad.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
}
}
}
public static class NodeLoad {
private final long maxMemory;
private final int maxJobs;
private final String nodeId;
private boolean useMemory;
private String error;
private long numAssignedJobs;
private long assignedJobMemory;
private long numAllocatingJobs;
private NodeLoad(String nodeId, long maxMemory, int maxJobs, boolean useMemory) {
this.maxJobs = maxJobs;
this.maxMemory = maxMemory;
this.nodeId = nodeId;
this.useMemory = useMemory;
}
/**
* @return The total number of assigned jobs
*/
public long getNumAssignedJobs() {
return numAssignedJobs;
}
/**
* @return The total memory in bytes used by the assigned jobs.
*/
public long getAssignedJobMemory() {
return assignedJobMemory;
}
/**
* @return The maximum memory on this node for jobs
*/
public long getMaxMlMemory() {
return maxMemory;
}
/**
* @return The maximum number of jobs allowed on the node
*/
public int getMaxJobs() {
return maxJobs;
}
/**
* @return returns `true` if the assignedJobMemory number is accurate
*/
public boolean isUseMemory() {
return useMemory;
}
/**
* @return The node ID
*/
public String getNodeId() {
return nodeId;
}
/**
* @return Returns a comma delimited string of errors if any were encountered.
*/
@Nullable
public String getError() {
return error;
}
/**
* @return The current number of jobs allocating to the node
*/
public long getNumAllocatingJobs() {
return numAllocatingJobs;
}
}
}

View File

@ -0,0 +1,148 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import org.junit.Before;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
// TODO: in 8.0.0 remove all instances of MAX_OPEN_JOBS_NODE_ATTR from this file
public class JobNodeLoadDetectorTests extends ESTestCase {
// To simplify the logic in this class all jobs have the same memory requirement
private static final ByteSizeValue JOB_MEMORY_REQUIREMENT = new ByteSizeValue(10, ByteSizeUnit.MB);
private NodeLoadDetector nodeLoadDetector;
@Before
public void setup() {
MlMemoryTracker memoryTracker = mock(MlMemoryTracker.class);
when(memoryTracker.isRecentlyRefreshed()).thenReturn(true);
when(memoryTracker.getAnomalyDetectorJobMemoryRequirement(anyString())).thenReturn(JOB_MEMORY_REQUIREMENT.getBytes());
when(memoryTracker.getDataFrameAnalyticsJobMemoryRequirement(anyString())).thenReturn(JOB_MEMORY_REQUIREMENT.getBytes());
when(memoryTracker.getJobMemoryRequirement(anyString(), anyString())).thenReturn(JOB_MEMORY_REQUIREMENT.getBytes());
nodeLoadDetector = new NodeLoadDetector(memoryTracker);
}
public void testNodeLoadDetection() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "-1");
// MachineLearning.MACHINE_MEMORY_NODE_ATTR negative, so this will fall back to allocating by count
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name4", "_node_id4", new TransportAddress(InetAddress.getLoopbackAddress(), 9303),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.build();
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
TransportOpenJobActionTests.addJobTask("job_id1", "_node_id1", null, tasksBuilder);
TransportOpenJobActionTests.addJobTask("job_id2", "_node_id1", null, tasksBuilder);
TransportOpenJobActionTests.addJobTask("job_id3", "_node_id2", null, tasksBuilder);
TransportOpenJobActionTests.addJobTask("job_id4", "_node_id4", JobState.OPENED, tasksBuilder);
PersistentTasksCustomMetadata tasks = tasksBuilder.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
Metadata.Builder metadata = Metadata.builder();
metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks);
cs.metadata(metadata);
NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id1"), 10, -1, true);
assertThat(load.getAssignedJobMemory(), equalTo(52428800L));
assertThat(load.getNumAllocatingJobs(), equalTo(2L));
assertThat(load.getNumAssignedJobs(), equalTo(2L));
assertThat(load.getMaxJobs(), equalTo(10));
assertThat(load.getMaxMlMemory(), equalTo(0L));
load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id2"), 5, -1, true);
assertThat(load.getAssignedJobMemory(), equalTo(41943040L));
assertThat(load.getNumAllocatingJobs(), equalTo(1L));
assertThat(load.getNumAssignedJobs(), equalTo(1L));
assertThat(load.getMaxJobs(), equalTo(5));
assertThat(load.getMaxMlMemory(), equalTo(0L));
load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id3"), 5, -1, true);
assertThat(load.getAssignedJobMemory(), equalTo(0L));
assertThat(load.getNumAllocatingJobs(), equalTo(0L));
assertThat(load.getNumAssignedJobs(), equalTo(0L));
assertThat(load.getMaxJobs(), equalTo(5));
assertThat(load.getMaxMlMemory(), equalTo(0L));
load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id4"), 5, -1, true);
assertThat(load.getAssignedJobMemory(), equalTo(41943040L));
assertThat(load.getNumAllocatingJobs(), equalTo(0L));
assertThat(load.getNumAssignedJobs(), equalTo(1L));
assertThat(load.getMaxJobs(), equalTo(5));
assertThat(load.getMaxMlMemory(), equalTo(0L));
}
public void testNodeLoadDetection_withBadMaxOpenJobsAttribute() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "foo");
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "-1");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
Metadata.Builder metadata = Metadata.builder();
cs.metadata(metadata);
NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, -1, true);
assertThat(load.getError(), containsString("ml.max_open_jobs attribute [foo] is not an integer"));
}
public void testNodeLoadDetection_withBadMachineMemoryAttribute() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "bar");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
Metadata.Builder metadata = Metadata.builder();
cs.metadata(metadata);
NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, -1, true);
assertThat(load.getError(), containsString("ml.machine_memory attribute [bar] is not a long"));
}
}