diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisLimits.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisLimits.java index 797df5892f8..1e114ee0f7a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisLimits.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisLimits.java @@ -36,7 +36,7 @@ public class AnalysisLimits implements ToXContentObject, Writeable { * the old default value should be used. From 6.3 onwards, the value will always be explicit. */ public static final long DEFAULT_MODEL_MEMORY_LIMIT_MB = 1024L; - static final long PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB = 4096L; + public static final long PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB = 4096L; public static final long DEFAULT_CATEGORIZATION_EXAMPLES_LIMIT = 4; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index 54a7400375f..441317bcbe2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -20,6 +20,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.JobManager; @@ -269,15 +270,16 @@ public class MlMemoryTracker implements LocalNodeMasterListener { private void setJobMemoryToLimit(String jobId, ActionListener listener) { jobManager.getJob(jobId, ActionListener.wrap(job -> { - Long memoryLimitMb = job.getAnalysisLimits().getModelMemoryLimit(); - if (memoryLimitMb != null) { - Long memoryRequirementBytes = ByteSizeUnit.MB.toBytes(memoryLimitMb) + Job.PROCESS_MEMORY_OVERHEAD.getBytes(); - memoryRequirementByJob.put(jobId, memoryRequirementBytes); - listener.onResponse(memoryRequirementBytes); - } else { - memoryRequirementByJob.remove(jobId); - listener.onResponse(null); + Long memoryLimitMb = (job.getAnalysisLimits() != null) ? job.getAnalysisLimits().getModelMemoryLimit() : null; + // Although recent versions of the code enforce a non-null model_memory_limit + // when parsing, the job could have been streamed from an older version node in + // a mixed version cluster + if (memoryLimitMb == null) { + memoryLimitMb = AnalysisLimits.PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB; } + Long memoryRequirementBytes = ByteSizeUnit.MB.toBytes(memoryLimitMb) + Job.PROCESS_MEMORY_OVERHEAD.getBytes(); + memoryRequirementByJob.put(jobId, memoryRequirementBytes); + listener.onResponse(memoryRequirementBytes); }, e -> { if (e instanceof ResourceNotFoundException) { // TODO: does this also happen if the .ml-config index exists but is unavailable? diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java index 197fa469bed..3e54994ac04 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java @@ -123,9 +123,10 @@ public class MlMemoryTrackerTests extends ESTestCase { return null; }).when(jobResultsProvider).getEstablishedMemoryUsage(eq(jobId), any(), any(), any(Consumer.class), any()); - long modelMemoryLimitMb = 2; + boolean simulateVeryOldJob = randomBoolean(); + long recentJobModelMemoryLimitMb = 2; Job job = mock(Job.class); - when(job.getAnalysisLimits()).thenReturn(new AnalysisLimits(modelMemoryLimitMb, 4L)); + when(job.getAnalysisLimits()).thenReturn(simulateVeryOldJob ? null : new AnalysisLimits(recentJobModelMemoryLimitMb, 4L)); doAnswer(invocation -> { @SuppressWarnings("unchecked") ActionListener listener = (ActionListener) invocation.getArguments()[1]; @@ -141,7 +142,9 @@ public class MlMemoryTrackerTests extends ESTestCase { assertEquals(Long.valueOf(modelBytes + Job.PROCESS_MEMORY_OVERHEAD.getBytes()), memoryTracker.getJobMemoryRequirement(jobId)); } else { - assertEquals(Long.valueOf(ByteSizeUnit.MB.toBytes(modelMemoryLimitMb) + Job.PROCESS_MEMORY_OVERHEAD.getBytes()), + long expectedModelMemoryLimit = + simulateVeryOldJob ? AnalysisLimits.PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB : recentJobModelMemoryLimitMb; + assertEquals(Long.valueOf(ByteSizeUnit.MB.toBytes(expectedModelMemoryLimit) + Job.PROCESS_MEMORY_OVERHEAD.getBytes()), memoryTracker.getJobMemoryRequirement(jobId)); } } else {