From 31a5e1c7eefaae8fb2bc48ce8564ba15f3aa9c0d Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 4 Oct 2019 09:16:56 +0100 Subject: [PATCH] [ML] More accurate job memory overhead (#47516) When an ML job runs the memory required can be broken down into: 1. Memory required to load the executable code 2. Instrumented model memory 3. Other memory used by the job's main process or ancilliary processes that is not instrumented Previously we added a simple fixed overhead to account for 1 and 3. This was 100MB for anomaly detection jobs (large because of the completely uninstrumented categorization function and normalize process), and 20MB for data frame analytics jobs. However, this was an oversimplification because the executable code only needs to be loaded once per machine. Also the 100MB overhead for anomaly detection jobs was probably too high in most cases because categorization and normalization don't use _that_ much memory. This PR therefore changes the calculation of memory requirements as follows: 1. A per-node overhead of 30MB for _only_ the first job of any type to be run on a given node - this is to account for loading the executable code 2. The established model memory (if applicable) or model memory limit of the job 3. A per-job overhead of 10MB for anomaly detection jobs and 5MB for data frame analytics jobs, to account for the uninstrumented memory usage This change will enable more jobs to be run on the same node. It will be particularly beneficial when there are a large number of small jobs. It will have less of an effect when there are a small number of large jobs. --- .../dataframe/DataFrameAnalyticsConfig.java | 6 +- .../xpack/core/ml/job/config/Job.java | 9 ++- .../xpack/ml/MachineLearning.java | 6 ++ .../xpack/ml/job/JobNodeSelector.java | 12 ++- .../xpack/ml/job/JobNodeSelectorTests.java | 79 ++++++++++++++++--- 5 files changed, 97 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java index 2d65013e31e..b327f4ec888 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java @@ -41,7 +41,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { public static final ByteSizeValue DEFAULT_MODEL_MEMORY_LIMIT = new ByteSizeValue(1, ByteSizeUnit.GB); public static final ByteSizeValue MIN_MODEL_MEMORY_LIMIT = new ByteSizeValue(1, ByteSizeUnit.MB); - public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(20, ByteSizeUnit.MB); + /** + * This includes the overhead of thread stacks and data structures that the program might use that + * are not instrumented. But it does NOT include the memory used by loading the executable code. + */ + public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(5, ByteSizeUnit.MB); public static final ParseField ID = new ParseField("id"); public static final ParseField DESCRIPTION = new ParseField("description"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index a5c8c2ae421..bb4c94755ac 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -86,7 +86,14 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO public static final ObjectParser STRICT_PARSER = createParser(false); public static final TimeValue MIN_BACKGROUND_PERSIST_INTERVAL = TimeValue.timeValueHours(1); - public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(100, ByteSizeUnit.MB); + + /** + * This includes the overhead of thread stacks and data structures that the program might use that + * are not instrumented. (For the autodetect process categorization is not instrumented, + * and the normalize process is not instrumented at all.) But this overhead does NOT + * include the memory used by loading the executable code. + */ + public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(10, ByteSizeUnit.MB); public static final long DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS = 1; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 635fa1361f9..d11e2ebb65d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -331,6 +331,12 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory"; public static final Setting CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope); + /** + * The amount of memory needed to load the ML native code shared libraries. The assumption is that the first + * ML job to run on a given node will do this, and then subsequent ML jobs on the same node will reuse the + * same already-loaded code. + */ + public static final ByteSizeValue NATIVE_EXECUTABLE_CODE_OVERHEAD = new ByteSizeValue(30, ByteSizeUnit.MB); // Values higher than 100% are allowed to accommodate use cases where swapping has been determined to be acceptable. // Anomaly detector jobs only use their full model memory during background persistence, and this is deliberately // staggered, so with large numbers of jobs few will generally be persisting state at the same time. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java index aa97c13b21d..79ddb58c294 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java @@ -110,7 +110,7 @@ public class JobNodeSelector { continue; } - // Assuming the node is elligible at all, check loading + // Assuming the node is eligible at all, check loading CurrentLoad currentLoad = calculateCurrentLoadForNode(node, persistentTasks, allocateByMemory); allocateByMemory = currentLoad.allocateByMemory; @@ -170,6 +170,11 @@ public class JobNodeSelector { long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100; Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId); if (estimatedMemoryFootprint != null) { + // If this will be the first job assigned to the node then it will need to + // load the native code shared libraries, so add the overhead for this + if (currentLoad.numberOfAssignedJobs == 0) { + estimatedMemoryFootprint += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(); + } long availableMemory = maxMlMemory - currentLoad.assignedJobMemory; if (estimatedMemoryFootprint > availableMemory) { reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) @@ -283,6 +288,11 @@ public class JobNodeSelector { } } } + // if any jobs are running then the native code will be loaded, but shared between all jobs, + // so increase the total memory usage of the assigned jobs to account for this + if (result.numberOfAssignedJobs > 0) { + result.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(); + } } return result; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java index ae1a738a4b3..ab65f8d8c79 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java @@ -175,9 +175,12 @@ public class JobNodeSelectorTests extends ESTestCase { int currentlyRunningJobsPerNode = randomIntBetween(1, 100); int maxRunningJobsPerNode = currentlyRunningJobsPerNode + 1; // Be careful if changing this - in order for the error message to be exactly as expected - // the value here must divide exactly into (JOB_MEMORY_REQUIREMENT.getBytes() * 100) - int maxMachineMemoryPercent = 40; - long machineMemory = currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes() * 100 / maxMachineMemoryPercent; + // the value here must divide exactly into both (JOB_MEMORY_REQUIREMENT.getBytes() * 100) and + // MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + int maxMachineMemoryPercent = 20; + long currentlyRunningJobMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + + currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes(); + long machineMemory = currentlyRunningJobMemory * 100 / maxMachineMemoryPercent; Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); @@ -193,9 +196,33 @@ public class JobNodeSelectorTests extends ESTestCase { jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " - + "Available memory for ML [" + (machineMemory * maxMachineMemoryPercent / 100) + "], memory required by existing jobs [" - + (JOB_MEMORY_REQUIREMENT.getBytes() * currentlyRunningJobsPerNode) + "], estimated memory required for this job [" - + JOB_MEMORY_REQUIREMENT.getBytes() + "]")); + + "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs [" + + currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]")); + } + + public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_firstJobTooBigMemoryLimiting() { + int numNodes = randomIntBetween(1, 10); + int maxRunningJobsPerNode = randomIntBetween(1, 100); + int maxMachineMemoryPercent = 20; + long firstJobTotalMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + JOB_MEMORY_REQUIREMENT.getBytes(); + long machineMemory = (firstJobTotalMemory - 1) * 100 / maxMachineMemoryPercent; + + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory)); + + ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 0); + + Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date()); + + JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, + 0, node -> TransportOpenJobAction.nodeFilter(node, job)); + PersistentTasksCustomMetaData.Assignment result = + jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " + + "Available memory for ML [" + (firstJobTotalMemory - 1) + + "], memory required by existing jobs [0], estimated memory required for this job [" + firstJobTotalMemory + "]")); } public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemoryLimiting() { @@ -203,9 +230,12 @@ public class JobNodeSelectorTests extends ESTestCase { int currentlyRunningJobsPerNode = randomIntBetween(1, 100); int maxRunningJobsPerNode = currentlyRunningJobsPerNode + 1; // Be careful if changing this - in order for the error message to be exactly as expected - // the value here must divide exactly into (JOB_MEMORY_REQUIREMENT.getBytes() * 100) - int maxMachineMemoryPercent = 40; - long machineMemory = currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes() * 100 / maxMachineMemoryPercent; + // the value here must divide exactly into both (JOB_MEMORY_REQUIREMENT.getBytes() * 100) and + // MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + int maxMachineMemoryPercent = 20; + long currentlyRunningJobMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + + currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes(); + long machineMemory = currentlyRunningJobMemory * 100 / maxMachineMemoryPercent; Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); @@ -222,9 +252,34 @@ public class JobNodeSelectorTests extends ESTestCase { jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " - + "Available memory for ML [" + (machineMemory * maxMachineMemoryPercent / 100) + "], memory required by existing jobs [" - + (JOB_MEMORY_REQUIREMENT.getBytes() * currentlyRunningJobsPerNode) + "], estimated memory required for this job [" - + JOB_MEMORY_REQUIREMENT.getBytes() + "]")); + + "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs [" + + currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]")); + } + + public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_firstJobTooBigMemoryLimiting() { + int numNodes = randomIntBetween(1, 10); + int maxRunningJobsPerNode = randomIntBetween(1, 100); + int maxMachineMemoryPercent = 20; + long firstJobTotalMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + JOB_MEMORY_REQUIREMENT.getBytes(); + long machineMemory = (firstJobTotalMemory - 1) * 100 / maxMachineMemoryPercent; + + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory)); + + ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 0); + + String dataFrameAnalyticsId = "data_frame_analytics_id1000"; + + JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), dataFrameAnalyticsId, + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0, + node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, dataFrameAnalyticsId)); + PersistentTasksCustomMetaData.Assignment result = + jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " + + "Available memory for ML [" + (firstJobTotalMemory - 1) + + "], memory required by existing jobs [0], estimated memory required for this job [" + firstJobTotalMemory + "]")); } public void testSelectLeastLoadedMlNode_noMlNodes() {