[7.x][ML] More advanced model snapshot retention options (#56194)

This PR implements the following changes to make ML model snapshot
retention more flexible in advance of adding a UI for the feature in
an upcoming release.

- The default for `model_snapshot_retention_days` for new jobs is now
  10 instead of 1
- There is a new job setting, `daily_model_snapshot_retention_after_days`,
  that defaults to 1 for new jobs and `model_snapshot_retention_days`
  for pre-7.8 jobs
- For days that are older than `model_snapshot_retention_days`, all
  model snapshots are deleted as before
- For days that are in between `daily_model_snapshot_retention_after_days`
  and `model_snapshot_retention_days` all but the first model snapshot
  for that day are deleted
- The `retain` setting of model snapshots is still respected to allow
  selected model snapshots to be retained indefinitely

Backport of #56125
This commit is contained in:
David Roberts 2020-05-05 14:31:58 +01:00 committed by GitHub
parent 4dfdd46dc3
commit 7aa0daaabd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 593 additions and 99 deletions

View File

@ -2549,10 +2549,12 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
.setFunction("count")
.setDetectorDescription(randomAlphaOfLength(10))
.build();
AnalysisConfig.Builder configBuilder = new AnalysisConfig.Builder(Arrays.asList(detector));
AnalysisConfig.Builder configBuilder = new AnalysisConfig.Builder(Collections.singletonList(detector));
//should not be random, see:https://github.com/elastic/ml-cpp/issues/208
configBuilder.setBucketSpan(new TimeValue(1, TimeUnit.HOURS));
builder.setAnalysisConfig(configBuilder);
builder.setModelSnapshotRetentionDays(1L);
builder.setDailyModelSnapshotRetentionAfterDays(1L);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);

View File

@ -141,12 +141,18 @@ public class JobTests extends AbstractXContentTestCase<Job> {
if (randomBoolean()) {
builder.setBackgroundPersistInterval(TimeValue.timeValueHours(randomIntBetween(1, 24)));
}
Long modelSnapshotRetentionDays = null;
if (randomBoolean()) {
builder.setModelSnapshotRetentionDays(randomNonNegativeLong());
modelSnapshotRetentionDays = randomNonNegativeLong();
builder.setModelSnapshotRetentionDays(modelSnapshotRetentionDays);
}
if (randomBoolean()) {
if (modelSnapshotRetentionDays != null) {
builder.setDailyModelSnapshotRetentionAfterDays(randomLongBetween(0, modelSnapshotRetentionDays));
} else {
builder.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong());
}
}
if (randomBoolean()) {
builder.setResultsRetentionDays(randomNonNegativeLong());
}

View File

@ -135,7 +135,8 @@ The API returns the following results:
"model_plot_config" : {
"enabled" : true
},
"model_snapshot_retention_days" : 1,
"model_snapshot_retention_days" : 10,
"daily_model_snapshot_retention_after_days" : 1,
"custom_settings" : {
"created_by" : "ml-module-sample",
...

View File

@ -102,7 +102,8 @@ This is a possible response:
},
"model_memory_limit" : "1gb",
"categorization_examples_limit" : 4,
"model_snapshot_retention_days" : 1
"model_snapshot_retention_days" : 10,
"daily_model_snapshot_retention_after_days" : 1
},
"datafeeds" : {
"scroll_size" : 1000

View File

@ -224,6 +224,10 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=custom-settings]
include::{docdir}/ml/ml-shared.asciidoc[tag=data-description]
//End data_description
`daily_model_snapshot_retention_after_days`::
(Optional, long)
include::{docdir}/ml/ml-shared.asciidoc[tag=daily-model-snapshot-retention-after-days]
`description`::
(Optional, string) A description of the job.
@ -320,7 +324,8 @@ When the job is created, you receive the following results:
"time_field" : "timestamp",
"time_format" : "epoch_ms"
},
"model_snapshot_retention_days" : 1,
"model_snapshot_retention_days" : 10,
"daily_model_snapshot_retention_after_days" : 1,
"results_index_name" : "shared",
"allow_lazy_open" : false
}

View File

@ -82,6 +82,10 @@ close the job, then reopen the job and restart the {dfeed} for the changes to ta
(object)
include::{docdir}/ml/ml-shared.asciidoc[tag=custom-settings]
`daily_model_snapshot_retention_after_days`::
(long)
include::{docdir}/ml/ml-shared.asciidoc[tag=daily-model-snapshot-retention-after-days]
`description`::
(string) A description of the job.

View File

@ -361,6 +361,18 @@ example, it can contain custom URL information as shown in
{ml-docs}/ml-configuring-url.html[Adding custom URLs to {ml} results].
end::custom-settings[]
tag::daily-model-snapshot-retention-after-days[]
Advanced configuration option. Specifies a number of days between 0 and the
value of `model_snapshot_retention_days`. After this period of time, only the first
model snapshot per day is retained for this job. Age is calculated relative to
the timestamp of the newest model snapshot. For new jobs, the default value is
`1`, which means that all snapshots are retained for one day. Older snapshots
are thinned out such that only one per day is retained. For jobs that were
created before this setting was available, the default value matches the
`model_snapshot_retention_days` value, which preserves the original behavior
and no thinning out of model snapshots occurs.
end::daily-model-snapshot-retention-after-days[]
tag::data-description[]
The data description defines the format of the input data when you send data to
the job by using the <<ml-post-data,post data>> API. Note that when configure
@ -992,8 +1004,8 @@ end::model-plot-config-terms[]
tag::model-snapshot-retention-days[]
Advanced configuration option. The period of time (in days) that model snapshots
are retained. Age is calculated relative to the timestamp of the newest model
snapshot. The default value is `1`, which means snapshots that are one day
(twenty-four hours) older than the newest snapshot are deleted.
snapshot. The default value is `10`, which means snapshots that are ten days
older than the newest snapshot are deleted.
end::model-snapshot-retention-days[]
tag::model-timestamp[]

View File

@ -98,7 +98,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
*/
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(10, ByteSizeUnit.MB);
public static final long DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS = 1;
public static final long DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS = 10;
public static final long DEFAULT_DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS = 1;
private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFields) {
ObjectParser<Builder, Void> parser = new ObjectParser<>("job_details", ignoreUnknownFields, Builder::new);
@ -858,6 +859,10 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return this;
}
public Long getModelSnapshotRetentionDays() {
return modelSnapshotRetentionDays;
}
public Builder setDailyModelSnapshotRetentionAfterDays(Long dailyModelSnapshotRetentionAfterDays) {
this.dailyModelSnapshotRetentionAfterDays = dailyModelSnapshotRetentionAfterDays;
return this;
@ -1105,9 +1110,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
checkValidBackgroundPersistInterval();
checkValueNotLessThan(0, RENORMALIZATION_WINDOW_DAYS.getPreferredName(), renormalizationWindowDays);
checkValueNotLessThan(0, MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), modelSnapshotRetentionDays);
checkValueNotLessThan(0, DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS.getPreferredName(),
dailyModelSnapshotRetentionAfterDays);
checkValueNotLessThan(0, RESULTS_RETENTION_DAYS.getPreferredName(), resultsRetentionDays);
if (!MlStrings.isValidId(id)) {
@ -1117,6 +1119,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_ID_TOO_LONG, MlStrings.ID_LENGTH_LIMIT));
}
validateModelSnapshotRetentionSettings();
validateGroups();
// Results index name not specified in user input means use the default, so is acceptable in this validation
@ -1138,6 +1142,37 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
AnalysisLimits.DEFAULT_MODEL_MEMORY_LIMIT_MB);
}
/**
* This is meant to be called when a new job is created.
* It sets {@link #dailyModelSnapshotRetentionAfterDays} to the default value if it is not set and the default makes sense.
*/
public void validateModelSnapshotRetentionSettingsAndSetDefaults() {
validateModelSnapshotRetentionSettings();
if (dailyModelSnapshotRetentionAfterDays == null &&
modelSnapshotRetentionDays != null &&
modelSnapshotRetentionDays > DEFAULT_DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS) {
dailyModelSnapshotRetentionAfterDays = DEFAULT_DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS;
}
}
/**
* Validates that {@link #modelSnapshotRetentionDays} and {@link #dailyModelSnapshotRetentionAfterDays} make sense,
* both individually and in combination.
*/
public void validateModelSnapshotRetentionSettings() {
checkValueNotLessThan(0, MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), modelSnapshotRetentionDays);
checkValueNotLessThan(0, DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS.getPreferredName(),
dailyModelSnapshotRetentionAfterDays);
if (modelSnapshotRetentionDays != null &&
dailyModelSnapshotRetentionAfterDays != null &&
dailyModelSnapshotRetentionAfterDays > modelSnapshotRetentionDays) {
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_MODEL_SNAPSHOT_RETENTION_SETTINGS_INCONSISTENT,
dailyModelSnapshotRetentionAfterDays, modelSnapshotRetentionDays));
}
}
private void validateGroups() {
for (String group : this.groups) {
if (MlStrings.isValidId(group) == false) {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.ml.job.messages;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import java.text.MessageFormat;
import java.util.Locale;
@ -212,6 +213,9 @@ public final class Messages {
"This job would cause a mapping clash with existing field [{0}] - avoid the clash by assigning a dedicated results index";
public static final String JOB_CONFIG_TIME_FIELD_NOT_ALLOWED_IN_ANALYSIS_CONFIG =
"data_description.time_field may not be used in the analysis_config";
public static final String JOB_CONFIG_MODEL_SNAPSHOT_RETENTION_SETTINGS_INCONSISTENT =
"The value of '" + Job.DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS + "' [{0}] cannot be greater than '" +
Job.MODEL_SNAPSHOT_RETENTION_DAYS + "' [{1}]";
public static final String JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE =
"job and group names must be unique but job [{0}] and group [{0}] have the same name";

View File

@ -232,6 +232,7 @@ public final class ReservedFieldNames {
Job.RENORMALIZATION_WINDOW_DAYS.getPreferredName(),
Job.BACKGROUND_PERSIST_INTERVAL.getPreferredName(),
Job.MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(),
Job.DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS.getPreferredName(),
Job.RESULTS_RETENTION_DAYS.getPreferredName(),
Job.MODEL_SNAPSHOT_ID.getPreferredName(),
Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(),

View File

@ -235,6 +235,9 @@
"type" : "object",
"enabled" : false
},
"daily_model_snapshot_retention_after_days" : {
"type" : "long"
},
"data_description" : {
"properties" : {
"field_delimiter" : {

View File

@ -105,7 +105,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
assertNull(job.getModelPlotConfig());
assertNull(job.getRenormalizationWindowDays());
assertNull(job.getBackgroundPersistInterval());
assertThat(job.getModelSnapshotRetentionDays(), equalTo(1L));
assertThat(job.getModelSnapshotRetentionDays(), equalTo(10L));
assertNull(job.getDailyModelSnapshotRetentionAfterDays());
assertNull(job.getResultsRetentionDays());
assertNotNull(job.allInputFields());
@ -168,7 +168,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
Job job1 = builder.build();
builder.setId("bar");
Job job2 = builder.build();
assertFalse(job1.equals(job2));
assertNotEquals(job1, job2);
}
public void testEquals_GivenDifferentRenormalizationWindowDays() {
@ -183,7 +183,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
jobDetails2.setRenormalizationWindowDays(4L);
jobDetails2.setAnalysisConfig(createAnalysisConfig());
jobDetails2.setCreateTime(date);
assertFalse(jobDetails1.build().equals(jobDetails2.build()));
assertNotEquals(jobDetails1.build(), jobDetails2.build());
}
public void testEquals_GivenDifferentBackgroundPersistInterval() {
@ -198,7 +198,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
jobDetails2.setBackgroundPersistInterval(TimeValue.timeValueSeconds(8000L));
jobDetails2.setAnalysisConfig(createAnalysisConfig());
jobDetails2.setCreateTime(date);
assertFalse(jobDetails1.build().equals(jobDetails2.build()));
assertNotEquals(jobDetails1.build(), jobDetails2.build());
}
public void testEquals_GivenDifferentModelSnapshotRetentionDays() {
@ -213,7 +213,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
jobDetails2.setModelSnapshotRetentionDays(8L);
jobDetails2.setAnalysisConfig(createAnalysisConfig());
jobDetails2.setCreateTime(date);
assertFalse(jobDetails1.build().equals(jobDetails2.build()));
assertNotEquals(jobDetails1.build(), jobDetails2.build());
}
public void testEquals_GivenDifferentResultsRetentionDays() {
@ -228,7 +228,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
jobDetails2.setResultsRetentionDays(4L);
jobDetails2.setAnalysisConfig(createAnalysisConfig());
jobDetails2.setCreateTime(date);
assertFalse(jobDetails1.build().equals(jobDetails2.build()));
assertNotEquals(jobDetails1.build(), jobDetails2.build());
}
public void testEquals_GivenDifferentCustomSettings() {
@ -240,7 +240,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
Map<String, Object> customSettings2 = new HashMap<>();
customSettings2.put("key2", "value2");
jobDetails2.setCustomSettings(customSettings2);
assertFalse(jobDetails1.build().equals(jobDetails2.build()));
assertNotEquals(jobDetails1.build(), jobDetails2.build());
}
// JobConfigurationTests:
@ -397,6 +397,30 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
assertEquals(errorMessage, e.getMessage());
}
public void testVerify_GivenNegativeDailyModelSnapshotRetentionAfterDays() {
String errorMessage =
Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, "daily_model_snapshot_retention_after_days", 0, -1);
Job.Builder builder = buildJobBuilder("foo");
builder.setDailyModelSnapshotRetentionAfterDays(-1L);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::build);
assertEquals(errorMessage, e.getMessage());
}
public void testVerify_GivenInconsistentModelSnapshotRetentionSettings() {
long dailyModelSnapshotRetentionAfterDays = randomLongBetween(1, Long.MAX_VALUE);
long modelSnapshotRetentionDays = randomLongBetween(0, dailyModelSnapshotRetentionAfterDays - 1);
String errorMessage =
Messages.getMessage(Messages.JOB_CONFIG_MODEL_SNAPSHOT_RETENTION_SETTINGS_INCONSISTENT,
dailyModelSnapshotRetentionAfterDays, modelSnapshotRetentionDays);
Job.Builder builder = buildJobBuilder("foo");
builder.setDailyModelSnapshotRetentionAfterDays(dailyModelSnapshotRetentionAfterDays);
builder.setModelSnapshotRetentionDays(modelSnapshotRetentionDays);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::build);
assertEquals(errorMessage, e.getMessage());
}
public void testVerify_GivenLowBackgroundPersistInterval() {
String errorMessage = Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, "background_persist_interval", 3600, 3599);
Job.Builder builder = buildJobBuilder("foo");
@ -628,8 +652,12 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
builder.setModelSnapshotRetentionDays(randomNonNegativeLong());
}
if (randomBoolean()) {
if (builder.getModelSnapshotRetentionDays() != null) {
builder.setDailyModelSnapshotRetentionAfterDays(randomLongBetween(0, builder.getModelSnapshotRetentionDays()));
} else {
builder.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong());
}
}
if (randomBoolean()) {
builder.setResultsRetentionDays(randomNonNegativeLong());
}

View File

@ -72,12 +72,32 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
if (randomBoolean()) {
update.setBackgroundPersistInterval(TimeValue.timeValueHours(randomIntBetween(1, 24)));
}
if (randomBoolean()) {
// It's quite complicated to ensure updates of the two model snapshot retention settings are valid:
// - We might be updating both, one or neither.
// - If we update both the values in the update must be consistent.
// - If we update just one then that one must be consistent with the value of the other one in the job that's being updated.
Long maxValidDailyModelSnapshotRetentionAfterDays = (job == null) ? null : job.getModelSnapshotRetentionDays();
boolean willSetModelSnapshotRetentionDays = randomBoolean();
boolean willSetDailyModelSnapshotRetentionAfterDays = randomBoolean();
if (willSetModelSnapshotRetentionDays) {
if (willSetDailyModelSnapshotRetentionAfterDays) {
maxValidDailyModelSnapshotRetentionAfterDays = randomNonNegativeLong();
update.setModelSnapshotRetentionDays(maxValidDailyModelSnapshotRetentionAfterDays);
} else {
if (job == null || job.getDailyModelSnapshotRetentionAfterDays() == null) {
update.setModelSnapshotRetentionDays(randomNonNegativeLong());
} else {
update.setModelSnapshotRetentionDays(randomLongBetween(job.getDailyModelSnapshotRetentionAfterDays(), Long.MAX_VALUE));
}
if (randomBoolean()) {
}
}
if (willSetDailyModelSnapshotRetentionAfterDays) {
if (maxValidDailyModelSnapshotRetentionAfterDays != null) {
update.setDailyModelSnapshotRetentionAfterDays(randomLongBetween(0, maxValidDailyModelSnapshotRetentionAfterDays));
} else {
update.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong());
}
}
if (randomBoolean()) {
update.setResultsRetentionDays(randomNonNegativeLong());
}
@ -214,7 +234,10 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
updateBuilder.setAnalysisLimits(analysisLimits);
updateBuilder.setBackgroundPersistInterval(TimeValue.timeValueHours(randomIntBetween(1, 24)));
updateBuilder.setResultsRetentionDays(randomNonNegativeLong());
updateBuilder.setModelSnapshotRetentionDays(randomNonNegativeLong());
// The createRandom() method tests the complex interactions between these next two, so this test can always update both
long newModelSnapshotRetentionDays = randomNonNegativeLong();
updateBuilder.setModelSnapshotRetentionDays(newModelSnapshotRetentionDays);
updateBuilder.setDailyModelSnapshotRetentionAfterDays(randomLongBetween(0, newModelSnapshotRetentionDays));
updateBuilder.setRenormalizationWindowDays(randomNonNegativeLong());
updateBuilder.setCategorizationFilters(categorizationFilters);
updateBuilder.setCustomSettings(customSettings);
@ -223,7 +246,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
JobUpdate update = updateBuilder.build();
Job.Builder jobBuilder = new Job.Builder("foo");
jobBuilder.setGroups(Arrays.asList("group-1"));
jobBuilder.setGroups(Collections.singletonList("group-1"));
Detector.Builder d1 = new Detector.Builder("info_content", "domain");
d1.setOverFieldName("mlcategory");
Detector.Builder d2 = new Detector.Builder("min", "field");
@ -280,7 +303,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
assertTrue(update.isAutodetectProcessUpdate());
update = new JobUpdate.Builder("foo").setDetectorUpdates(Collections.singletonList(mock(JobUpdate.DetectorUpdate.class))).build();
assertTrue(update.isAutodetectProcessUpdate());
update = new JobUpdate.Builder("foo").setGroups(Arrays.asList("bar")).build();
update = new JobUpdate.Builder("foo").setGroups(Collections.singletonList("bar")).build();
assertTrue(update.isAutodetectProcessUpdate());
}

View File

@ -159,6 +159,7 @@ integTest.runner {
'ml/jobs_crud/Test put job after closing results index',
'ml/jobs_crud/Test put job after closing state index',
'ml/jobs_crud/Test put job with inconsistent body/param ids',
'ml/jobs_crud/Test put job with inconsistent model snapshot settings',
'ml/jobs_crud/Test put job with time field in analysis_config',
'ml/jobs_crud/Test put job with duplicate detector configurations',
'ml/jobs_crud/Test job with categorization_analyzer and categorization_filters',

View File

@ -111,11 +111,17 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
}
ActionFuture<BulkResponse> indexUnusedStateDocsResponse = bulkRequestBuilder.execute();
registerJob(newJobBuilder("no-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(1000L));
registerJob(newJobBuilder("results-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(1000L));
registerJob(newJobBuilder("snapshots-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L));
registerJob(newJobBuilder("snapshots-retention-with-retain").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L));
registerJob(newJobBuilder("results-and-snapshots-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(2L));
// These jobs don't thin out model state; ModelSnapshotRetentionIT tests that
registerJob(newJobBuilder("no-retention")
.setResultsRetentionDays(null).setModelSnapshotRetentionDays(1000L).setDailyModelSnapshotRetentionAfterDays(1000L));
registerJob(newJobBuilder("results-retention")
.setResultsRetentionDays(1L).setModelSnapshotRetentionDays(1000L).setDailyModelSnapshotRetentionAfterDays(1000L));
registerJob(newJobBuilder("snapshots-retention")
.setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L).setDailyModelSnapshotRetentionAfterDays(2L));
registerJob(newJobBuilder("snapshots-retention-with-retain")
.setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L).setDailyModelSnapshotRetentionAfterDays(2L));
registerJob(newJobBuilder("results-and-snapshots-retention")
.setResultsRetentionDays(1L).setModelSnapshotRetentionDays(2L).setDailyModelSnapshotRetentionAfterDays(2L));
List<String> shortExpiryForecastIds = new ArrayList<>();
@ -173,9 +179,10 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
// Refresh to ensure the snapshot timestamp updates are visible
refresh("*");
// We need to wait a second to ensure the second time around model snapshots will have a different ID (it depends on epoch seconds)
// FIXME it would be better to wait for something concrete instead of wait for time to elapse
assertBusy(() -> {}, 1, TimeUnit.SECONDS);
// We need to wait for the clock to tick to a new second to ensure the second time
// around model snapshots will have a different ID (it depends on epoch seconds)
long before = System.currentTimeMillis() / 1000;
assertBusy(() -> assertNotEquals(before, System.currentTimeMillis() / 1000), 1, TimeUnit.SECONDS);
for (Job.Builder job : getJobs()) {
// Run up to now

View File

@ -0,0 +1,234 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
import org.junit.After;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase {
private static final long MS_IN_DAY = TimeValue.timeValueDays(1).millis();
@After
public void cleanUpTest() {
cleanUp();
}
public void testModelSnapshotRetentionNoDailyThinning() throws Exception {
String jobId = "no-daily-thinning";
int numDocsPerSnapshot = randomIntBetween(1, 4);
int numSnapshotsPerDay = randomIntBetween(1, 4);
int modelSnapshotRetentionDays = randomIntBetween(1, 10);
int numPriorDays = randomIntBetween(1, 5);
createJob(jobId, modelSnapshotRetentionDays, modelSnapshotRetentionDays);
List<String> expectedModelSnapshotDocIds = new ArrayList<>();
List<String> expectedModelStateDocIds = new ArrayList<>();
long now = System.currentTimeMillis();
long timeMs = now;
// We add 1 to make the maths easier, because the retention period includes
// the cutoff time, yet is measured from the timestamp of the latest snapshot
int numSnapshotsTotal = numSnapshotsPerDay * (modelSnapshotRetentionDays + numPriorDays) + 1;
for (int i = numSnapshotsTotal; i > 0; --i) {
String snapshotId = String.valueOf(i);
createModelSnapshot(jobId, snapshotId, new Date(timeMs), numDocsPerSnapshot, i == numSnapshotsTotal);
if (timeMs >= now - MS_IN_DAY * modelSnapshotRetentionDays) {
expectedModelSnapshotDocIds.add(ModelSnapshot.documentId(jobId, snapshotId));
for (int j = 1; j <= numDocsPerSnapshot; ++j) {
expectedModelStateDocIds.add(ModelState.documentId(jobId, snapshotId, j));
}
}
timeMs -= (MS_IN_DAY / numSnapshotsPerDay);
}
refresh(".ml*");
deleteExpiredData();
Collections.sort(expectedModelSnapshotDocIds);
Collections.sort(expectedModelStateDocIds);
assertThat(getAvailableModelSnapshotDocIds(jobId), is(expectedModelSnapshotDocIds));
assertThat(getAvailableModelStateDocIds(), is(expectedModelStateDocIds));
}
public void testModelSnapshotRetentionWithDailyThinning() throws Exception {
String jobId = "with-daily-thinning";
int numDocsPerSnapshot = randomIntBetween(1, 4);
int numSnapshotsPerDay = randomIntBetween(1, 4);
int modelSnapshotRetentionDays = randomIntBetween(2, 10);
int numPriorDays = randomIntBetween(1, 5);
int dailyModelSnapshotRetentionAfterDays = randomIntBetween(0, modelSnapshotRetentionDays - 1);
createJob(jobId, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays);
List<String> expectedModelSnapshotDocIds = new ArrayList<>();
List<String> expectedModelStateDocIds = new ArrayList<>();
long now = System.currentTimeMillis();
long timeMs = now;
// We add 1 to make the maths easier, because the retention period includes
// the cutoff time, yet is measured from the timestamp of the latest snapshot
int numSnapshotsTotal = numSnapshotsPerDay * (modelSnapshotRetentionDays + numPriorDays) + 1;
for (int i = numSnapshotsTotal; i > 0; --i) {
String snapshotId = String.valueOf(i);
createModelSnapshot(jobId, snapshotId, new Date(timeMs), numDocsPerSnapshot, i == numSnapshotsTotal);
// We should retain:
// - Nothing older than modelSnapshotRetentionDays
// - Everything newer than dailyModelSnapshotRetentionAfterDays
// - The first snapshot of each day in between
if (timeMs >= now - MS_IN_DAY * modelSnapshotRetentionDays &&
(timeMs >= now - MS_IN_DAY * dailyModelSnapshotRetentionAfterDays ||
(now - timeMs) % MS_IN_DAY < MS_IN_DAY / numSnapshotsPerDay)) {
expectedModelSnapshotDocIds.add(ModelSnapshot.documentId(jobId, snapshotId));
for (int j = 1; j <= numDocsPerSnapshot; ++j) {
expectedModelStateDocIds.add(ModelState.documentId(jobId, snapshotId, j));
}
}
timeMs -= (MS_IN_DAY / numSnapshotsPerDay);
}
refresh(".ml*");
deleteExpiredData();
Collections.sort(expectedModelSnapshotDocIds);
Collections.sort(expectedModelStateDocIds);
assertThat(getAvailableModelSnapshotDocIds(jobId), is(expectedModelSnapshotDocIds));
assertThat(getAvailableModelStateDocIds(), is(expectedModelStateDocIds));
}
private List<String> getAvailableModelSnapshotDocIds(String jobId) {
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
QueryBuilder query = QueryBuilders.boolQuery()
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()));
searchRequest.source(new SearchSourceBuilder().query(query).size(10000));
return getDocIdsFromSearch(searchRequest);
}
private List<String> getAvailableModelStateDocIds() {
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(AnomalyDetectorsIndex.jobStateIndexPattern());
searchRequest.source(new SearchSourceBuilder().size(10000));
return getDocIdsFromSearch(searchRequest);
}
private List<String> getDocIdsFromSearch(SearchRequest searchRequest) {
SearchResponse searchResponse = client().execute(SearchAction.INSTANCE, searchRequest).actionGet();
List<String> docIds = new ArrayList<>();
assertThat(searchResponse.getHits(), notNullValue());
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
docIds.add(searchHit.getId());
}
Collections.sort(docIds);
return docIds;
}
private void createJob(String jobId, long modelSnapshotRetentionDays, long dailyModelSnapshotRetentionAfterDays) {
Detector detector = new Detector.Builder("count", null).build();
Job.Builder builder = new Job.Builder();
builder.setId(jobId);
builder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector)));
builder.setDataDescription(new DataDescription.Builder());
builder.setModelSnapshotRetentionDays(modelSnapshotRetentionDays);
builder.setDailyModelSnapshotRetentionAfterDays(dailyModelSnapshotRetentionAfterDays);
PutJobAction.Request putJobRequest = new PutJobAction.Request(builder);
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
}
private void createModelSnapshot(String jobId, String snapshotId, Date timestamp, int numDocs, boolean isActive) throws IOException {
persistModelSnapshotDoc(jobId, snapshotId, timestamp, numDocs, isActive);
persistModelStateDocs(jobId, snapshotId, numDocs);
if (isActive) {
JobUpdate jobUpdate = new JobUpdate.Builder(jobId).setModelSnapshotId(snapshotId).build();
UpdateJobAction.Request updateJobRequest = UpdateJobAction.Request.internal(jobId, jobUpdate);
client().execute(UpdateJobAction.INSTANCE, updateJobRequest).actionGet();
}
}
private void persistModelSnapshotDoc(String jobId, String snapshotId, Date timestamp, int numDocs,
boolean immediateRefresh) throws IOException {
ModelSnapshot.Builder modelSnapshotBuilder = new ModelSnapshot.Builder();
modelSnapshotBuilder.setJobId(jobId).setSnapshotId(snapshotId).setTimestamp(timestamp).setSnapshotDocCount(numDocs);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId));
indexRequest.id(ModelSnapshot.documentId(jobId, snapshotId));
if (immediateRefresh) {
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
XContentBuilder xContentBuilder = JsonXContent.contentBuilder();
modelSnapshotBuilder.build().toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
indexRequest.source(xContentBuilder);
IndexResponse indexResponse = client().execute(IndexAction.INSTANCE, indexRequest).actionGet();
assertThat(indexResponse.getResult(), is(DocWriteResponse.Result.CREATED));
}
private void persistModelStateDocs(String jobId, String snapshotId, int numDocs) {
assertThat(numDocs, greaterThan(0));
BulkRequest bulkRequest = new BulkRequest();
for (int i = 1; i <= numDocs; ++i) {
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
indexRequest.id(ModelState.documentId(jobId, snapshotId, i));
// The exact contents of the model state doesn't matter - we are not going to try and restore it
indexRequest.source(Collections.singletonMap("compressed", Collections.singletonList("foo")));
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client().execute(BulkAction.INSTANCE, bulkRequest).actionGet();
assertFalse(bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
}
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@ -93,10 +94,12 @@ public class TransportMlInfoAction extends HandledTransportAction<MlInfoAction.R
}
private Map<String, Object> anomalyDetectorsDefaults() {
Map<String, Object> defaults = new HashMap<>();
Map<String, Object> defaults = new LinkedHashMap<>();
defaults.put(AnalysisLimits.MODEL_MEMORY_LIMIT.getPreferredName(), defaultModelMemoryLimit());
defaults.put(AnalysisLimits.CATEGORIZATION_EXAMPLES_LIMIT.getPreferredName(), AnalysisLimits.DEFAULT_CATEGORIZATION_EXAMPLES_LIMIT);
defaults.put(Job.MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), Job.DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS);
defaults.put(Job.DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS.getPreferredName(),
Job.DEFAULT_DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS);
try {
defaults.put(CategorizationAnalyzerConfig.CATEGORIZATION_ANALYZER.getPreferredName(),
CategorizationAnalyzerConfig.buildDefaultCategorizationAnalyzer(Collections.emptyList())

View File

@ -142,10 +142,7 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
// acknowledged responses
return ActionListener.wrap(response -> {
Date deleteAfter = modelSnapshot.getLatestResultTimeStamp();
logger.debug("Removing intervening records: last record: " + deleteAfter + ", last result: "
+ modelSnapshot.getLatestResultTimeStamp());
logger.info("Deleting results after '" + deleteAfter + "'");
logger.info("[{}] Removing intervening records after reverting model: deleting results after [{}]", jobId, deleteAfter);
JobDataDeleter dataDeleter = new JobDataDeleter(client, jobId);
dataDeleter.deleteResultsFromTime(deleteAfter.getTime() + 1, new ActionListener<Boolean>() {

View File

@ -240,10 +240,12 @@ public class JobManager {
public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegistry, ClusterState state,
ActionListener<PutJobAction.Response> actionListener) throws IOException {
request.getJobBuilder().validateAnalysisLimitsAndSetDefaults(maxModelMemoryLimit);
validateCategorizationAnalyzer(request.getJobBuilder(), analysisRegistry);
Job.Builder jobBuilder = request.getJobBuilder();
jobBuilder.validateAnalysisLimitsAndSetDefaults(maxModelMemoryLimit);
jobBuilder.validateModelSnapshotRetentionSettingsAndSetDefaults();
validateCategorizationAnalyzer(jobBuilder, analysisRegistry);
Job job = request.getJobBuilder().build(new Date());
Job job = jobBuilder.build(new Date());
if (job.getDataDescription() != null && job.getDataDescription().getFormat() == DataDescription.DataFormat.DELIMITED) {
deprecationLogger.deprecatedAndMaybeLog("ml_create_job_delimited_data",

View File

@ -305,7 +305,6 @@ public class JobConfigProvider {
return;
}
final long version = getResponse.getVersion();
final long seqNo = getResponse.getSeqNo();
final long primaryTerm = getResponse.getPrimaryTerm();
BytesReference source = getResponse.getSourceAsBytesRef();

View File

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -66,12 +67,12 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
}
calcCutoffEpochMs(job.getId(), retentionDays, ActionListener.wrap(
cutoffEpochMs -> {
if (cutoffEpochMs == null) {
response -> {
if (response == null) {
removeData(jobIterator, listener, isTimedOutSupplier);
} else {
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(
response -> removeData(jobIterator, listener, isTimedOutSupplier),
removeDataBefore(job, response.latestTimeMs, response.cutoffEpochMs, ActionListener.wrap(
r -> removeData(jobIterator, listener, isTimedOutSupplier),
listener::onFailure));
}
},
@ -84,7 +85,7 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
return new WrappedBatchedJobsIterator(jobsIterator);
}
abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener);
abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<CutoffDetails> listener);
abstract Long getRetentionDays(Job job);
@ -92,7 +93,7 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
* Template method to allow implementation details of various types of data (e.g. results, model snapshots).
* Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job.
*/
abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener);
abstract void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener<Boolean> listener);
static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
return QueryBuilders.boolQuery()
@ -106,7 +107,7 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
* This class abstracts away the logic of pulling one job at a time from
* multiple batches.
*/
private class WrappedBatchedJobsIterator implements Iterator<Job> {
private static class WrappedBatchedJobsIterator implements Iterator<Job> {
private final BatchedJobsIterator batchedIterator;
private VolatileCursorIterator<Job> currentBatch;
@ -144,4 +145,39 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
return new VolatileCursorIterator<>(jobs);
}
}
/**
* The latest time that cutoffs are measured from is not wall clock time,
* but some other reference point that makes sense for the type of data
* being removed. This class groups the cutoff time with it's "latest"
* reference point.
*/
protected static final class CutoffDetails {
public final long latestTimeMs;
public final long cutoffEpochMs;
public CutoffDetails(long latestTimeMs, long cutoffEpochMs) {
this.latestTimeMs = latestTimeMs;
this.cutoffEpochMs = cutoffEpochMs;
}
@Override
public int hashCode() {
return Objects.hash(latestTimeMs, cutoffEpochMs);
}
@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (other instanceof CutoffDetails == false) {
return false;
}
CutoffDetails that = (CutoffDetails) other;
return this.latestTimeMs == that.latestTimeMs &&
this.cutoffEpochMs == that.cutoffEpochMs;
}
}
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
@ -54,6 +53,8 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
private static final Logger LOGGER = LogManager.getLogger(ExpiredModelSnapshotsRemover.class);
private static final long MS_IN_ONE_DAY = TimeValue.timeValueDays(1).getMillis();
/**
* The max number of snapshots to fetch per job. It is set to 10K, the default for an index as
* we don't change that in our ML indices. It should be more than enough for most cases. If not,
@ -72,12 +73,19 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
@Override
Long getRetentionDays(Job job) {
return job.getModelSnapshotRetentionDays();
// If a daily retention cutoff is set then we need to tell the base class that this is the cutoff
// point so that we get to consider deleting model snapshots older than this. Later on we will
// not actually delete all of the ones in between the hard cutoff and the daily retention cutoff.
Long retentionDaysForConsideration = job.getDailyModelSnapshotRetentionAfterDays();
if (retentionDaysForConsideration == null) {
retentionDaysForConsideration = job.getModelSnapshotRetentionDays();
}
return retentionDaysForConsideration;
}
@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
ThreadedActionListener<Long> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<CutoffDetails> listener) {
ThreadedActionListener<CutoffDetails> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);
latestSnapshotTimeStamp(jobId, ActionListener.wrap(
@ -86,7 +94,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
threadedActionListener.onResponse(null);
} else {
long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
threadedActionListener.onResponse(cutoff);
threadedActionListener.onResponse(new CutoffDetails(latestTime, cutoff));
}
},
listener::onFailure
@ -125,13 +133,14 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
}
@Override
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener<Boolean> listener) {
// TODO: delete this test if we ever allow users to revert a job to no model snapshot, e.g. to recover from data loss
if (job.getModelSnapshotId() == null) {
// No snapshot to remove
listener.onResponse(true);
return;
}
LOGGER.debug("Removing model snapshots of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
LOGGER.debug("Considering model snapshots of job [{}] that have a timestamp before [{}] for removal", job.getId(), cutoffEpochMs);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
@ -144,20 +153,33 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
.mustNot(activeSnapshotFilter)
.mustNot(retainFilter);
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE).sort(ElasticsearchMappings.ES_DOC));
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE)
.sort(ModelSnapshot.TIMESTAMP.getPreferredName()));
long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null)
? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis();
client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false));
MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), deleteAllBeforeMs, listener), false));
}
private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, ActionListener<Boolean> listener) {
private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, long deleteAllBeforeMs,
ActionListener<Boolean> listener) {
return new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
long nextToKeepMs = deleteAllBeforeMs;
try {
List<ModelSnapshot> modelSnapshots = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits()) {
modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef()));
ModelSnapshot modelSnapshot = ModelSnapshot.fromJson(hit.getSourceRef());
long timestampMs = modelSnapshot.getTimestamp().getTime();
if (timestampMs >= nextToKeepMs) {
do {
nextToKeepMs += MS_IN_ONE_DAY;
} while (timestampMs >= nextToKeepMs);
continue;
}
modelSnapshots.add(modelSnapshot);
}
deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener);
} catch (Exception e) {

View File

@ -84,7 +84,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
}
@Override
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener<Boolean> listener) {
LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs);
@ -131,8 +131,8 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
}
@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
ThreadedActionListener<Long> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<CutoffDetails> listener) {
ThreadedActionListener<CutoffDetails> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);
latestBucketTime(jobId, ActionListener.wrap(
latestTime -> {
@ -140,7 +140,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
threadedActionListener.onResponse(null);
} else {
long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
threadedActionListener.onResponse(cutoff);
threadedActionListener.onResponse(new CutoffDetails(latestTime, cutoff));
}
},
listener::onFailure

View File

@ -47,7 +47,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
// We can't test an abstract class so make a concrete class
// as simple as possible
private class ConcreteExpiredJobDataRemover extends AbstractExpiredJobDataRemover {
private static class ConcreteExpiredJobDataRemover extends AbstractExpiredJobDataRemover {
private int getRetentionDaysCallCount = 0;
@ -62,13 +62,14 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
return randomBoolean() ? null : 0L;
}
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<CutoffDetails> listener) {
long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
listener.onResponse(nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis());
listener.onResponse(new CutoffDetails(nowEpochMs, nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis()));
}
@Override
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener<Boolean> listener) {
listener.onResponse(Boolean.TRUE);
}
}

View File

@ -89,15 +89,17 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("job-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
)));
Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis());
Date now = new Date();
Date oneDayAgo = new Date(now.getTime() - TimeValue.timeValueDays(1).getMillis());
ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo);
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1)));
Date eightDaysAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(8).getMillis());
ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAgo);
// It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond
Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1);
ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAndOneMsAgo);
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshotToBeDeleted)));
ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAgo);
ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAndOneMsAgo);
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_1)));
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList()));
@ -127,9 +129,10 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
)));
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
Date now = new Date();
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1", now),
createModelSnapshot("snapshots-1", "snapshots-1_2", now));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1", now));
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
@ -173,15 +176,19 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
)));
ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1");
Date now = new Date();
Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis());
ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1", oneDayAgo);
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1)));
// It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond
Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1);
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(
snapshot1_1,
createModelSnapshot("snapshots-1", "snapshots-1_2"));
createModelSnapshot("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo));
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1");
ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo);
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2)));
givenClientDeleteModelSnapshotRequestsFail(searchResponses);
@ -197,7 +204,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1));
DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2"));
}
@SuppressWarnings("unchecked")
@ -211,12 +218,12 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
givenClientRequests(searchResponses, true, true);
long retentionDays = 3L;
ActionListener<Long> cutoffListener = mock(ActionListener.class);
ActionListener<AbstractExpiredJobDataRemover.CutoffDetails> cutoffListener = mock(ActionListener.class);
createExpiredModelSnapshotsRemover().calcCutoffEpochMs("job-1", retentionDays, cutoffListener);
long dayInMills = 60 * 60 * 24 * 1000;
long expectedCutoffTime = oneDayAgo.getTime() - (dayInMills * retentionDays);
verify(cutoffListener).onResponse(eq(expectedCutoffTime));
verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(oneDayAgo.getTime(), expectedCutoffTime)));
}
private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() {
@ -234,10 +241,6 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
return new ExpiredModelSnapshotsRemover(originSettingClient, threadPool);
}
private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId) {
return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(new Date()).build();
}
private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) {
return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(date).build();
}
@ -269,7 +272,8 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
capturedSearchRequests.add(searchRequest);
// Only the last search request should fail
if (shouldSearchRequestsSucceed || callCount.get() < searchResponses.size()) {
listener.onResponse(searchResponses.get(callCount.getAndIncrement()));
SearchResponse response = searchResponses.get(callCount.getAndIncrement());
listener.onResponse(response);
} else {
listener.onFailure(new RuntimeException("search failed"));
}

View File

@ -145,12 +145,12 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
givenSearchResponses(Collections.singletonList(JobTests.buildJobBuilder(jobId).setResultsRetentionDays(1L).build()),
new Bucket(jobId, latest, 60));
ActionListener<Long> cutoffListener = mock(ActionListener.class);
ActionListener<AbstractExpiredJobDataRemover.CutoffDetails> cutoffListener = mock(ActionListener.class);
createExpiredResultsRemover().calcCutoffEpochMs(jobId, 1L, cutoffListener);
long dayInMills = 60 * 60 * 24 * 1000;
long expectedCutoffTime = latest.getTime() - dayInMills;
verify(cutoffListener).onResponse(eq(expectedCutoffTime));
verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(latest.getTime(), expectedCutoffTime)));
}
private void givenDBQRequestsSucceed() {

View File

@ -172,6 +172,44 @@
ml.get_jobs:
job_id: "non-existing"
---
"Test put job with inconsistent model snapshot settings":
- do:
catch: /The value of daily_model_snapshot_retention_after_days \[4\] cannot be greater than model_snapshot_retention_days \[3\]/
ml.put_job:
job_id: inconsistent-snapshot-settings-1
body: >
{
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"time_field":"@timestamp"
},
"model_snapshot_retention_days": 3,
"daily_model_snapshot_retention_after_days": 4
}
- do:
catch: /daily_model_snapshot_retention_after_days cannot be less than 0. Value = -1/
ml.put_job:
job_id: inconsistent-snapshot-settings-2
body: >
{
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"time_field":"@timestamp"
},
"model_snapshot_retention_days": 3,
"daily_model_snapshot_retention_after_days": -1
}
---
"Test put job with inconsistent body/param ids":
- do:
@ -297,6 +335,7 @@
"renormalization_window_days": 1,
"background_persist_interval": "2h",
"model_snapshot_retention_days": 3,
"daily_model_snapshot_retention_after_days": 2,
"results_retention_days": 4,
"custom_settings": {
"setting1": "custom1",
@ -363,6 +402,7 @@
- match: { renormalization_window_days: 10 }
- match: { background_persist_interval: "3h" }
- match: { model_snapshot_retention_days: 30 }
- match: { daily_model_snapshot_retention_after_days: 2 }
- match: { results_retention_days: 40 }
- do:
@ -403,6 +443,24 @@
}
- match: { analysis_limits.model_memory_limit: "15mb" }
- do:
catch: /The value of daily_model_snapshot_retention_after_days \[31\] cannot be greater than model_snapshot_retention_days \[30\]/
ml.update_job:
job_id: jobs-crud-update-job
body: >
{
"daily_model_snapshot_retention_after_days": 31
}
- do:
catch: /The value of daily_model_snapshot_retention_after_days \[2\] cannot be greater than model_snapshot_retention_days \[1\]/
ml.update_job:
job_id: jobs-crud-update-job
body: >
{
"model_snapshot_retention_days": 1
}
- do:
catch: bad_request
ml.update_job:

View File

@ -13,7 +13,8 @@ teardown:
- match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" }
- match: { defaults.anomaly_detectors.model_memory_limit: "1gb" }
- match: { defaults.anomaly_detectors.categorization_examples_limit: 4 }
- match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 }
- match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 }
- match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 }
- match: { defaults.datafeeds.scroll_size: 1000 }
- is_false: limits.max_model_memory_limit
# We cannot assert an exact value for the next one as it will vary depending on the test machine
@ -31,7 +32,8 @@ teardown:
- match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" }
- match: { defaults.anomaly_detectors.model_memory_limit: "512mb" }
- match: { defaults.anomaly_detectors.categorization_examples_limit: 4 }
- match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 }
- match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 }
- match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 }
- match: { defaults.datafeeds.scroll_size: 1000 }
- match: { limits.max_model_memory_limit: "512mb" }
# We cannot assert an exact value for the next one as it will vary depending on the test machine
@ -49,7 +51,8 @@ teardown:
- match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" }
- match: { defaults.anomaly_detectors.model_memory_limit: "1gb" }
- match: { defaults.anomaly_detectors.categorization_examples_limit: 4 }
- match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 }
- match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 }
- match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 }
- match: { defaults.datafeeds.scroll_size: 1000 }
- match: { limits.max_model_memory_limit: "6gb" }
# We cannot assert an exact value for the next one as it will vary depending on the test machine
@ -67,7 +70,8 @@ teardown:
- match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" }
- match: { defaults.anomaly_detectors.model_memory_limit: "1gb" }
- match: { defaults.anomaly_detectors.categorization_examples_limit: 4 }
- match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 }
- match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 }
- match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 }
- match: { defaults.datafeeds.scroll_size: 1000 }
- match: { limits.max_model_memory_limit: "6gb" }
# We cannot assert an exact value for the next one as it will vary depending on the test machine
@ -85,7 +89,8 @@ teardown:
- match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" }
- match: { defaults.anomaly_detectors.model_memory_limit: "1mb" }
- match: { defaults.anomaly_detectors.categorization_examples_limit: 4 }
- match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 }
- match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 }
- match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 }
- match: { defaults.datafeeds.scroll_size: 1000 }
- match: { limits.max_model_memory_limit: "1mb" }
# This time we can assert an exact value for the next one because the hard limit is so low