[ML] More accurate job memory overhead (#47516)
When an ML job runs the memory required can be broken down into: 1. Memory required to load the executable code 2. Instrumented model memory 3. Other memory used by the job's main process or ancilliary processes that is not instrumented Previously we added a simple fixed overhead to account for 1 and 3. This was 100MB for anomaly detection jobs (large because of the completely uninstrumented categorization function and normalize process), and 20MB for data frame analytics jobs. However, this was an oversimplification because the executable code only needs to be loaded once per machine. Also the 100MB overhead for anomaly detection jobs was probably too high in most cases because categorization and normalization don't use _that_ much memory. This PR therefore changes the calculation of memory requirements as follows: 1. A per-node overhead of 30MB for _only_ the first job of any type to be run on a given node - this is to account for loading the executable code 2. The established model memory (if applicable) or model memory limit of the job 3. A per-job overhead of 10MB for anomaly detection jobs and 5MB for data frame analytics jobs, to account for the uninstrumented memory usage This change will enable more jobs to be run on the same node. It will be particularly beneficial when there are a large number of small jobs. It will have less of an effect when there are a small number of large jobs.
This commit is contained in:
parent
defc97a300
commit
31a5e1c7ee
|
@ -41,7 +41,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
|
|||
|
||||
public static final ByteSizeValue DEFAULT_MODEL_MEMORY_LIMIT = new ByteSizeValue(1, ByteSizeUnit.GB);
|
||||
public static final ByteSizeValue MIN_MODEL_MEMORY_LIMIT = new ByteSizeValue(1, ByteSizeUnit.MB);
|
||||
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(20, ByteSizeUnit.MB);
|
||||
/**
|
||||
* This includes the overhead of thread stacks and data structures that the program might use that
|
||||
* are not instrumented. But it does NOT include the memory used by loading the executable code.
|
||||
*/
|
||||
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(5, ByteSizeUnit.MB);
|
||||
|
||||
public static final ParseField ID = new ParseField("id");
|
||||
public static final ParseField DESCRIPTION = new ParseField("description");
|
||||
|
|
|
@ -86,7 +86,14 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
public static final ObjectParser<Builder, Void> STRICT_PARSER = createParser(false);
|
||||
|
||||
public static final TimeValue MIN_BACKGROUND_PERSIST_INTERVAL = TimeValue.timeValueHours(1);
|
||||
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(100, ByteSizeUnit.MB);
|
||||
|
||||
/**
|
||||
* This includes the overhead of thread stacks and data structures that the program might use that
|
||||
* are not instrumented. (For the <code>autodetect</code> process categorization is not instrumented,
|
||||
* and the <code>normalize</code> process is not instrumented at all.) But this overhead does NOT
|
||||
* include the memory used by loading the executable code.
|
||||
*/
|
||||
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(10, ByteSizeUnit.MB);
|
||||
|
||||
public static final long DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS = 1;
|
||||
|
||||
|
|
|
@ -331,6 +331,12 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
|
|||
public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory";
|
||||
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
|
||||
Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope);
|
||||
/**
|
||||
* The amount of memory needed to load the ML native code shared libraries. The assumption is that the first
|
||||
* ML job to run on a given node will do this, and then subsequent ML jobs on the same node will reuse the
|
||||
* same already-loaded code.
|
||||
*/
|
||||
public static final ByteSizeValue NATIVE_EXECUTABLE_CODE_OVERHEAD = new ByteSizeValue(30, ByteSizeUnit.MB);
|
||||
// Values higher than 100% are allowed to accommodate use cases where swapping has been determined to be acceptable.
|
||||
// Anomaly detector jobs only use their full model memory during background persistence, and this is deliberately
|
||||
// staggered, so with large numbers of jobs few will generally be persisting state at the same time.
|
||||
|
|
|
@ -110,7 +110,7 @@ public class JobNodeSelector {
|
|||
continue;
|
||||
}
|
||||
|
||||
// Assuming the node is elligible at all, check loading
|
||||
// Assuming the node is eligible at all, check loading
|
||||
CurrentLoad currentLoad = calculateCurrentLoadForNode(node, persistentTasks, allocateByMemory);
|
||||
allocateByMemory = currentLoad.allocateByMemory;
|
||||
|
||||
|
@ -170,6 +170,11 @@ public class JobNodeSelector {
|
|||
long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100;
|
||||
Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId);
|
||||
if (estimatedMemoryFootprint != null) {
|
||||
// If this will be the first job assigned to the node then it will need to
|
||||
// load the native code shared libraries, so add the overhead for this
|
||||
if (currentLoad.numberOfAssignedJobs == 0) {
|
||||
estimatedMemoryFootprint += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
|
||||
}
|
||||
long availableMemory = maxMlMemory - currentLoad.assignedJobMemory;
|
||||
if (estimatedMemoryFootprint > availableMemory) {
|
||||
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
|
||||
|
@ -283,6 +288,11 @@ public class JobNodeSelector {
|
|||
}
|
||||
}
|
||||
}
|
||||
// if any jobs are running then the native code will be loaded, but shared between all jobs,
|
||||
// so increase the total memory usage of the assigned jobs to account for this
|
||||
if (result.numberOfAssignedJobs > 0) {
|
||||
result.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
|
@ -175,9 +175,12 @@ public class JobNodeSelectorTests extends ESTestCase {
|
|||
int currentlyRunningJobsPerNode = randomIntBetween(1, 100);
|
||||
int maxRunningJobsPerNode = currentlyRunningJobsPerNode + 1;
|
||||
// Be careful if changing this - in order for the error message to be exactly as expected
|
||||
// the value here must divide exactly into (JOB_MEMORY_REQUIREMENT.getBytes() * 100)
|
||||
int maxMachineMemoryPercent = 40;
|
||||
long machineMemory = currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes() * 100 / maxMachineMemoryPercent;
|
||||
// the value here must divide exactly into both (JOB_MEMORY_REQUIREMENT.getBytes() * 100) and
|
||||
// MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes()
|
||||
int maxMachineMemoryPercent = 20;
|
||||
long currentlyRunningJobMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() +
|
||||
currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes();
|
||||
long machineMemory = currentlyRunningJobMemory * 100 / maxMachineMemoryPercent;
|
||||
|
||||
Map<String, String> nodeAttr = new HashMap<>();
|
||||
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
|
||||
|
@ -193,9 +196,33 @@ public class JobNodeSelectorTests extends ESTestCase {
|
|||
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
|
||||
+ "Available memory for ML [" + (machineMemory * maxMachineMemoryPercent / 100) + "], memory required by existing jobs ["
|
||||
+ (JOB_MEMORY_REQUIREMENT.getBytes() * currentlyRunningJobsPerNode) + "], estimated memory required for this job ["
|
||||
+ JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
|
||||
+ "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs ["
|
||||
+ currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
|
||||
}
|
||||
|
||||
public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_firstJobTooBigMemoryLimiting() {
|
||||
int numNodes = randomIntBetween(1, 10);
|
||||
int maxRunningJobsPerNode = randomIntBetween(1, 100);
|
||||
int maxMachineMemoryPercent = 20;
|
||||
long firstJobTotalMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + JOB_MEMORY_REQUIREMENT.getBytes();
|
||||
long machineMemory = (firstJobTotalMemory - 1) * 100 / maxMachineMemoryPercent;
|
||||
|
||||
Map<String, String> nodeAttr = new HashMap<>();
|
||||
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
|
||||
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory));
|
||||
|
||||
ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 0);
|
||||
|
||||
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());
|
||||
|
||||
JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker,
|
||||
0, node -> TransportOpenJobAction.nodeFilter(node, job));
|
||||
PersistentTasksCustomMetaData.Assignment result =
|
||||
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
|
||||
+ "Available memory for ML [" + (firstJobTotalMemory - 1)
|
||||
+ "], memory required by existing jobs [0], estimated memory required for this job [" + firstJobTotalMemory + "]"));
|
||||
}
|
||||
|
||||
public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemoryLimiting() {
|
||||
|
@ -203,9 +230,12 @@ public class JobNodeSelectorTests extends ESTestCase {
|
|||
int currentlyRunningJobsPerNode = randomIntBetween(1, 100);
|
||||
int maxRunningJobsPerNode = currentlyRunningJobsPerNode + 1;
|
||||
// Be careful if changing this - in order for the error message to be exactly as expected
|
||||
// the value here must divide exactly into (JOB_MEMORY_REQUIREMENT.getBytes() * 100)
|
||||
int maxMachineMemoryPercent = 40;
|
||||
long machineMemory = currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes() * 100 / maxMachineMemoryPercent;
|
||||
// the value here must divide exactly into both (JOB_MEMORY_REQUIREMENT.getBytes() * 100) and
|
||||
// MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes()
|
||||
int maxMachineMemoryPercent = 20;
|
||||
long currentlyRunningJobMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() +
|
||||
currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes();
|
||||
long machineMemory = currentlyRunningJobMemory * 100 / maxMachineMemoryPercent;
|
||||
|
||||
Map<String, String> nodeAttr = new HashMap<>();
|
||||
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
|
||||
|
@ -222,9 +252,34 @@ public class JobNodeSelectorTests extends ESTestCase {
|
|||
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
|
||||
+ "Available memory for ML [" + (machineMemory * maxMachineMemoryPercent / 100) + "], memory required by existing jobs ["
|
||||
+ (JOB_MEMORY_REQUIREMENT.getBytes() * currentlyRunningJobsPerNode) + "], estimated memory required for this job ["
|
||||
+ JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
|
||||
+ "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs ["
|
||||
+ currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
|
||||
}
|
||||
|
||||
public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_firstJobTooBigMemoryLimiting() {
|
||||
int numNodes = randomIntBetween(1, 10);
|
||||
int maxRunningJobsPerNode = randomIntBetween(1, 100);
|
||||
int maxMachineMemoryPercent = 20;
|
||||
long firstJobTotalMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + JOB_MEMORY_REQUIREMENT.getBytes();
|
||||
long machineMemory = (firstJobTotalMemory - 1) * 100 / maxMachineMemoryPercent;
|
||||
|
||||
Map<String, String> nodeAttr = new HashMap<>();
|
||||
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
|
||||
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory));
|
||||
|
||||
ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 0);
|
||||
|
||||
String dataFrameAnalyticsId = "data_frame_analytics_id1000";
|
||||
|
||||
JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), dataFrameAnalyticsId,
|
||||
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0,
|
||||
node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, dataFrameAnalyticsId));
|
||||
PersistentTasksCustomMetaData.Assignment result =
|
||||
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
|
||||
+ "Available memory for ML [" + (firstJobTotalMemory - 1)
|
||||
+ "], memory required by existing jobs [0], estimated memory required for this job [" + firstJobTotalMemory + "]"));
|
||||
}
|
||||
|
||||
public void testSelectLeastLoadedMlNode_noMlNodes() {
|
||||
|
|
Loading…
Reference in New Issue