From ff7df40b20f25ffb354cae540632062f4a6957e0 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 4 Jan 2019 12:21:28 +0000 Subject: [PATCH] [ML] Uplift model memory limit on job migration (#37126) When a 6.1-6.3 job is opened in a later version we increase the model memory limit by 30% if it's below 0.5GB. The migration of jobs from cluster state to the config index changes the job version, so we need to also do this uplift as part of that config migration. Relates #36961 --- .../xpack/ml/MlConfigMigrator.java | 15 ++++++- .../ml/action/TransportOpenJobAction.java | 41 +------------------ 2 files changed, 16 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index f2fe80f3776..184ee44cf37 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -37,6 +37,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -403,11 +404,23 @@ public class MlConfigMigrator { Map custom = job.getCustomSettings() == null ? new HashMap<>() : new HashMap<>(job.getCustomSettings()); custom.put(MIGRATED_FROM_VERSION, job.getJobVersion()); builder.setCustomSettings(custom); + // Increase the model memory limit for 6.1 - 6.3 jobs + Version jobVersion = job.getJobVersion(); + if (jobVersion != null && jobVersion.onOrAfter(Version.V_6_1_0) && jobVersion.before(Version.V_6_3_0)) { + // Increase model memory limit if < 512MB + if (job.getAnalysisLimits() != null && job.getAnalysisLimits().getModelMemoryLimit() != null && + job.getAnalysisLimits().getModelMemoryLimit() < 512L) { + long updatedModelMemoryLimit = (long) (job.getAnalysisLimits().getModelMemoryLimit() * 1.3); + AnalysisLimits limits = new AnalysisLimits(updatedModelMemoryLimit, + job.getAnalysisLimits().getCategorizationExamplesLimit()); + builder.setAnalysisLimits(limits); + } + } // Pre v5.5 (ml beta) jobs do not have a version. // These jobs cannot be opened, we rely on the missing version // to indicate this. // See TransportOpenJobAction.validate() - if (job.getJobVersion() != null) { + if (jobVersion != null) { builder.setJobVersion(Version.CURRENT); } return builder.build(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 21f97cbb5dc..fad24247834 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -55,9 +55,6 @@ import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; -import org.elasticsearch.xpack.core.ml.action.PutJobAction; -import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; -import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -540,50 +537,16 @@ public class TransportOpenJobAction extends TransportMasterNodeAction jobUpdateListener = ActionListener.wrap( + ActionListener jobUpdateListener = ActionListener.wrap( response -> memoryTracker.refreshJobMemoryAndAllOthers(jobParams.getJobId(), memoryRequirementRefreshListener), listener::onFailure ); - // Increase the model memory limit for 6.1 - 6.3 jobs - ActionListener missingMappingsListener = ActionListener.wrap( - response -> { - Job job = jobParams.getJob(); - if (job != null) { - Version jobVersion = job.getJobVersion(); - if (jobVersion != null && - (jobVersion.onOrAfter(Version.V_6_1_0) && jobVersion.before(Version.V_6_3_0))) { - // Increase model memory limit if < 512MB - if (job.getAnalysisLimits() != null && job.getAnalysisLimits().getModelMemoryLimit() != null && - job.getAnalysisLimits().getModelMemoryLimit() < 512L) { - - long updatedModelMemoryLimit = (long) (job.getAnalysisLimits().getModelMemoryLimit() * 1.3); - AnalysisLimits limits = new AnalysisLimits(updatedModelMemoryLimit, - job.getAnalysisLimits().getCategorizationExamplesLimit()); - - JobUpdate update = new JobUpdate.Builder(job.getId()).setJobVersion(Version.CURRENT) - .setAnalysisLimits(limits).build(); - UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(job.getId(), update); - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, - jobUpdateListener); - } else { - jobUpdateListener.onResponse(null); - } - } - else { - jobUpdateListener.onResponse(null); - } - } else { - jobUpdateListener.onResponse(null); - } - }, listener::onFailure - ); - // Try adding state doc mapping ActionListener resultsPutMappingHandler = ActionListener.wrap( response -> { addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings::stateMapping, - state, missingMappingsListener); + state, jobUpdateListener); }, listener::onFailure );