[ML] Set established model memory on job open for pre-6.1 jobs (elastic/x-pack-elasticsearch#3222)

Before this was done it was easy to get into the situation where a
job created in 5.x with a default model memory limit of 4GB could not
be opened on any node in the cluster.  Following this change this
problem will no longer occur for jobs that ran for a decent amount of
time on the old cluster.

relates elastic/x-pack-elasticsearch#3181

Original commit: elastic/x-pack-elasticsearch@cb029debba
This commit is contained in:
David Roberts 2017-12-05 17:05:58 +00:00 committed by GitHub
parent 6c6b72db25
commit 751caaae76
3 changed files with 111 additions and 7 deletions

View File

@ -61,8 +61,10 @@ import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.AllocatedPersistentTask;
@ -390,15 +392,17 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService;
private final Client client;
private final JobProvider jobProvider;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState,
ClusterService clusterService, PersistentTasksService persistentTasksService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client) {
IndexNameExpressionResolver indexNameExpressionResolver, Client client, JobProvider jobProvider) {
super(settings, NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new);
this.licenseState = licenseState;
this.persistentTasksService = persistentTasksService;
this.client = client;
this.jobProvider = jobProvider;
}
@Override
@ -422,10 +426,10 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) {
JobParams jobParams = request.getJobParams();
if (licenseState.isMachineLearningAllowed()) {
// Step 4. Wait for job to be started and respond
// Step 5. Wait for job to be started and respond
ActionListener<PersistentTask<JobParams>> finalListener = new ActionListener<PersistentTask<JobParams>>() {
@Override
public void onResponse(PersistentTask<JobParams> task) {
@ -442,11 +446,42 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
};
// Step 3. Start job task
ActionListener<Boolean> missingMappingsListener = ActionListener.wrap(
// Step 4. Start job task
ActionListener<PutJobAction.Response> establishedMemoryUpdateListener = ActionListener.wrap(
response -> persistentTasksService.startPersistentTask(MlMetadata.jobTaskId(jobParams.jobId),
TASK_NAME, jobParams, finalListener)
, listener::onFailure
TASK_NAME, jobParams, finalListener),
listener::onFailure
);
// Step 3. Update established model memory for pre-6.1 jobs that haven't had it set
ActionListener<Boolean> missingMappingsListener = ActionListener.wrap(
response -> {
MlMetadata mlMetadata = clusterService.state().getMetaData().custom(MlMetadata.TYPE);
Job job = mlMetadata.getJobs().get(jobParams.getJobId());
if (job != null) {
Version jobVersion = job.getJobVersion();
Long jobEstablishedModelMemory = job.getEstablishedModelMemory();
if ((jobVersion == null || jobVersion.before(Version.V_6_1_0))
&& (jobEstablishedModelMemory == null || jobEstablishedModelMemory == 0)) {
jobProvider.getEstablishedMemoryUsage(job.getId(), null, null, establishedModelMemory -> {
if (establishedModelMemory != null && establishedModelMemory > 0) {
JobUpdate update = new JobUpdate.Builder(job.getId())
.setEstablishedModelMemory(establishedModelMemory).build();
UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(job.getId(), update);
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest,
establishedMemoryUpdateListener);
} else {
establishedMemoryUpdateListener.onResponse(null);
}
}, listener::onFailure);
} else {
establishedMemoryUpdateListener.onResponse(null);
}
} else {
establishedMemoryUpdateListener.onResponse(null);
}
}, listener::onFailure
);
// Step 2. Try adding state doc mapping

View File

@ -52,6 +52,53 @@
job_id: old-cluster-job
- match: { count: 1 }
---
"Put job on the old cluster with the default model memory limit and post some data":
- do:
xpack.ml.put_job:
job_id: no-model-memory-limit-job
body: >
{
"analysis_config" : {
"bucket_span": "60s",
"detectors" :[{"function":"count"}]
},
"data_description" : {
"time_field":"time",
"time_format":"epoch"
}
}
- match: { job_id: no-model-memory-limit-job }
- do:
xpack.ml.open_job:
job_id: no-model-memory-limit-job
- do:
xpack.ml.post_data:
job_id: no-model-memory-limit-job
body:
- sourcetype: post-data-job
time: 1403481600
- sourcetype: post-data-job
time: 1403484700
- sourcetype: post-data-job
time: 1403487700
- sourcetype: post-data-job
time: 1403490700
- sourcetype: post-data-job
time: 1403493700
- match: { processed_record_count: 5 }
- do:
xpack.ml.close_job:
job_id: no-model-memory-limit-job
- do:
xpack.ml.get_buckets:
job_id: no-model-memory-limit-job
- match: { count: 201 }
---
"Put job with empty strings in the configuration":
- do:

View File

@ -75,3 +75,25 @@ setup:
catch: missing
xpack.ml.get_jobs:
job_id: mixed-cluster-job
---
"Test job with no model memory limit has established model memory after reopening":
- do:
xpack.ml.open_job:
job_id: no-model-memory-limit-job
- do:
xpack.ml.get_jobs:
job_id: no-model-memory-limit-job
- is_true: jobs.0.established_model_memory
- lt: { jobs.0.established_model_memory: 100000 }
- do:
xpack.ml.close_job:
job_id: no-model-memory-limit-job
- do:
xpack.ml.delete_job:
job_id: no-model-memory-limit-job
- match: { acknowledged: true }