diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 7e460a0a5d4..369161e41f9 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -61,8 +61,10 @@ import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.AllocatedPersistentTask; @@ -390,15 +392,17 @@ public class OpenJobAction extends Action listener) throws Exception { + protected void masterOperation(Request request, ClusterState state, ActionListener listener) { JobParams jobParams = request.getJobParams(); if (licenseState.isMachineLearningAllowed()) { - // Step 4. Wait for job to be started and respond + // Step 5. Wait for job to be started and respond ActionListener> finalListener = new ActionListener>() { @Override public void onResponse(PersistentTask task) { @@ -442,11 +446,42 @@ public class OpenJobAction extends Action missingMappingsListener = ActionListener.wrap( + // Step 4. Start job task + ActionListener establishedMemoryUpdateListener = ActionListener.wrap( response -> persistentTasksService.startPersistentTask(MlMetadata.jobTaskId(jobParams.jobId), - TASK_NAME, jobParams, finalListener) - , listener::onFailure + TASK_NAME, jobParams, finalListener), + listener::onFailure + ); + + // Step 3. Update established model memory for pre-6.1 jobs that haven't had it set + ActionListener missingMappingsListener = ActionListener.wrap( + response -> { + MlMetadata mlMetadata = clusterService.state().getMetaData().custom(MlMetadata.TYPE); + Job job = mlMetadata.getJobs().get(jobParams.getJobId()); + if (job != null) { + Version jobVersion = job.getJobVersion(); + Long jobEstablishedModelMemory = job.getEstablishedModelMemory(); + if ((jobVersion == null || jobVersion.before(Version.V_6_1_0)) + && (jobEstablishedModelMemory == null || jobEstablishedModelMemory == 0)) { + jobProvider.getEstablishedMemoryUsage(job.getId(), null, null, establishedModelMemory -> { + if (establishedModelMemory != null && establishedModelMemory > 0) { + JobUpdate update = new JobUpdate.Builder(job.getId()) + .setEstablishedModelMemory(establishedModelMemory).build(); + UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(job.getId(), update); + + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, + establishedMemoryUpdateListener); + } else { + establishedMemoryUpdateListener.onResponse(null); + } + }, listener::onFailure); + } else { + establishedMemoryUpdateListener.onResponse(null); + } + } else { + establishedMemoryUpdateListener.onResponse(null); + } + }, listener::onFailure ); // Step 2. Try adding state doc mapping diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/30_ml_jobs_crud.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/30_ml_jobs_crud.yml index df3af9e789a..a8acecfead9 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/30_ml_jobs_crud.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/30_ml_jobs_crud.yml @@ -52,6 +52,53 @@ job_id: old-cluster-job - match: { count: 1 } +--- +"Put job on the old cluster with the default model memory limit and post some data": + - do: + xpack.ml.put_job: + job_id: no-model-memory-limit-job + body: > + { + "analysis_config" : { + "bucket_span": "60s", + "detectors" :[{"function":"count"}] + }, + "data_description" : { + "time_field":"time", + "time_format":"epoch" + } + } + - match: { job_id: no-model-memory-limit-job } + + - do: + xpack.ml.open_job: + job_id: no-model-memory-limit-job + + - do: + xpack.ml.post_data: + job_id: no-model-memory-limit-job + body: + - sourcetype: post-data-job + time: 1403481600 + - sourcetype: post-data-job + time: 1403484700 + - sourcetype: post-data-job + time: 1403487700 + - sourcetype: post-data-job + time: 1403490700 + - sourcetype: post-data-job + time: 1403493700 + - match: { processed_record_count: 5 } + + - do: + xpack.ml.close_job: + job_id: no-model-memory-limit-job + + - do: + xpack.ml.get_buckets: + job_id: no-model-memory-limit-job + - match: { count: 201 } + --- "Put job with empty strings in the configuration": - do: diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml index e158781d85f..1acfad77b26 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml @@ -75,3 +75,25 @@ setup: catch: missing xpack.ml.get_jobs: job_id: mixed-cluster-job + +--- +"Test job with no model memory limit has established model memory after reopening": + - do: + xpack.ml.open_job: + job_id: no-model-memory-limit-job + + - do: + xpack.ml.get_jobs: + job_id: no-model-memory-limit-job + - is_true: jobs.0.established_model_memory + - lt: { jobs.0.established_model_memory: 100000 } + + - do: + xpack.ml.close_job: + job_id: no-model-memory-limit-job + + - do: + xpack.ml.delete_job: + job_id: no-model-memory-limit-job + - match: { acknowledged: true } +