[ML] Uplift model memory limit on job migration (#37126)
When a 6.1-6.3 job is opened in a later version we increase the model memory limit by 30% if it's below 0.5GB. The migration of jobs from cluster state to the config index changes the job version, so we need to also do this uplift as part of that config migration. Relates #36961
This commit is contained in:
parent
21d52f0dab
commit
ff7df40b20
|
@ -37,6 +37,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
|||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.core.ml.MlTasks;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
|
||||
|
@ -403,11 +404,23 @@ public class MlConfigMigrator {
|
|||
Map<String, Object> custom = job.getCustomSettings() == null ? new HashMap<>() : new HashMap<>(job.getCustomSettings());
|
||||
custom.put(MIGRATED_FROM_VERSION, job.getJobVersion());
|
||||
builder.setCustomSettings(custom);
|
||||
// Increase the model memory limit for 6.1 - 6.3 jobs
|
||||
Version jobVersion = job.getJobVersion();
|
||||
if (jobVersion != null && jobVersion.onOrAfter(Version.V_6_1_0) && jobVersion.before(Version.V_6_3_0)) {
|
||||
// Increase model memory limit if < 512MB
|
||||
if (job.getAnalysisLimits() != null && job.getAnalysisLimits().getModelMemoryLimit() != null &&
|
||||
job.getAnalysisLimits().getModelMemoryLimit() < 512L) {
|
||||
long updatedModelMemoryLimit = (long) (job.getAnalysisLimits().getModelMemoryLimit() * 1.3);
|
||||
AnalysisLimits limits = new AnalysisLimits(updatedModelMemoryLimit,
|
||||
job.getAnalysisLimits().getCategorizationExamplesLimit());
|
||||
builder.setAnalysisLimits(limits);
|
||||
}
|
||||
}
|
||||
// Pre v5.5 (ml beta) jobs do not have a version.
|
||||
// These jobs cannot be opened, we rely on the missing version
|
||||
// to indicate this.
|
||||
// See TransportOpenJobAction.validate()
|
||||
if (job.getJobVersion() != null) {
|
||||
if (jobVersion != null) {
|
||||
builder.setJobVersion(Version.CURRENT);
|
||||
}
|
||||
return builder.build();
|
||||
|
|
|
@ -55,9 +55,6 @@ import org.elasticsearch.xpack.core.ml.MlMetaIndex;
|
|||
import org.elasticsearch.xpack.core.ml.MlTasks;
|
||||
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
|
@ -540,50 +537,16 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
);
|
||||
|
||||
// Tell the job tracker to refresh the memory requirement for this job and all other jobs that have persistent tasks
|
||||
ActionListener<PutJobAction.Response> jobUpdateListener = ActionListener.wrap(
|
||||
ActionListener<Boolean> jobUpdateListener = ActionListener.wrap(
|
||||
response -> memoryTracker.refreshJobMemoryAndAllOthers(jobParams.getJobId(), memoryRequirementRefreshListener),
|
||||
listener::onFailure
|
||||
);
|
||||
|
||||
// Increase the model memory limit for 6.1 - 6.3 jobs
|
||||
ActionListener<Boolean> missingMappingsListener = ActionListener.wrap(
|
||||
response -> {
|
||||
Job job = jobParams.getJob();
|
||||
if (job != null) {
|
||||
Version jobVersion = job.getJobVersion();
|
||||
if (jobVersion != null &&
|
||||
(jobVersion.onOrAfter(Version.V_6_1_0) && jobVersion.before(Version.V_6_3_0))) {
|
||||
// Increase model memory limit if < 512MB
|
||||
if (job.getAnalysisLimits() != null && job.getAnalysisLimits().getModelMemoryLimit() != null &&
|
||||
job.getAnalysisLimits().getModelMemoryLimit() < 512L) {
|
||||
|
||||
long updatedModelMemoryLimit = (long) (job.getAnalysisLimits().getModelMemoryLimit() * 1.3);
|
||||
AnalysisLimits limits = new AnalysisLimits(updatedModelMemoryLimit,
|
||||
job.getAnalysisLimits().getCategorizationExamplesLimit());
|
||||
|
||||
JobUpdate update = new JobUpdate.Builder(job.getId()).setJobVersion(Version.CURRENT)
|
||||
.setAnalysisLimits(limits).build();
|
||||
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(job.getId(), update);
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest,
|
||||
jobUpdateListener);
|
||||
} else {
|
||||
jobUpdateListener.onResponse(null);
|
||||
}
|
||||
}
|
||||
else {
|
||||
jobUpdateListener.onResponse(null);
|
||||
}
|
||||
} else {
|
||||
jobUpdateListener.onResponse(null);
|
||||
}
|
||||
}, listener::onFailure
|
||||
);
|
||||
|
||||
// Try adding state doc mapping
|
||||
ActionListener<Boolean> resultsPutMappingHandler = ActionListener.wrap(
|
||||
response -> {
|
||||
addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings::stateMapping,
|
||||
state, missingMappingsListener);
|
||||
state, jobUpdateListener);
|
||||
}, listener::onFailure
|
||||
);
|
||||
|
||||
|
|
Loading…
Reference in New Issue