[ML] Calculate results and snapshot retention using latest bucket timestamps (#51061) (#51301)

The retention period is calculated relative to the last bucket result or snapshot
time rather than wall clock
This commit is contained in:
David Kyle 2020-01-22 14:52:33 +00:00 committed by GitHub
parent 59687a9384
commit ca4b90a001
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 377 additions and 181 deletions

View File

@ -861,9 +861,10 @@ Only the specified `terms` can be viewed when using the Single Metric Viewer.
end::model-plot-config-terms[] end::model-plot-config-terms[]
tag::model-snapshot-retention-days[] tag::model-snapshot-retention-days[]
The time in days that model snapshots are retained for the job. Older snapshots Advanced configuration option. The period of time (in days) that model snapshots are retained.
are deleted. The default value is `1`, which means snapshots are retained for Age is calculated relative to the timestamp of the newest model snapshot.
one day (twenty-four hours). The default value is `1`, which means snapshots that are one day (twenty-four hours)
older than the newest snapshot are deleted.
end::model-snapshot-retention-days[] end::model-snapshot-retention-days[]
tag::multivariate-by-fields[] tag::multivariate-by-fields[]
@ -961,10 +962,12 @@ is `shared`, which generates an index named `.ml-anomalies-shared`.
end::results-index-name[] end::results-index-name[]
tag::results-retention-days[] tag::results-retention-days[]
Advanced configuration option. The number of days for which job results are Advanced configuration option. The period of time (in days) that results are retained.
retained. Once per day at 00:30 (server time), results older than this period Age is calculated relative to the timestamp of the latest bucket result.
are deleted from {es}. The default value is null, which means results are If this property has a non-null value, once per day at 00:30 (server time),
retained. results that are the specified number of days older than the latest
bucket result are deleted from {es}. The default value is null, which means all
results are retained.
end::results-retention-days[] end::results-retention-days[]
tag::retain[] tag::retain[]

View File

@ -33,10 +33,8 @@ import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -53,20 +51,20 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
private static final String DATA_INDEX = "delete-expired-data-test-data"; private static final String DATA_INDEX = "delete-expired-data-test-data";
@Before @Before
public void setUpData() throws IOException { public void setUpData() {
client().admin().indices().prepareCreate(DATA_INDEX) client().admin().indices().prepareCreate(DATA_INDEX)
.addMapping(SINGLE_MAPPING_NAME, "time", "type=date,format=epoch_millis") .addMapping(SINGLE_MAPPING_NAME, "time", "type=date,format=epoch_millis")
.get(); .get();
// We are going to create data for last 2 days // We are going to create 3 days of data ending 1 hr ago
long nowMillis = System.currentTimeMillis(); long latestBucketTime = System.currentTimeMillis() - TimeValue.timeValueHours(1).millis();
int totalBuckets = 3 * 24; int totalBuckets = 3 * 24;
int normalRate = 10; int normalRate = 10;
int anomalousRate = 100; int anomalousRate = 100;
int anomalousBucket = 30; int anomalousBucket = 30;
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int bucket = 0; bucket < totalBuckets; bucket++) { for (int bucket = 0; bucket < totalBuckets; bucket++) {
long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis(); long timestamp = latestBucketTime - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate; int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate;
for (int point = 0; point < bucketRate; point++) { for (int point = 0; point < bucketRate; point++) {
IndexRequest indexRequest = new IndexRequest(DATA_INDEX); IndexRequest indexRequest = new IndexRequest(DATA_INDEX);
@ -122,7 +120,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
String datafeedId = job.getId() + "-feed"; String datafeedId = job.getId() + "-feed";
DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, job.getId()); DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, job.getId());
datafeedConfig.setIndices(Arrays.asList(DATA_INDEX)); datafeedConfig.setIndices(Collections.singletonList(DATA_INDEX));
DatafeedConfig datafeed = datafeedConfig.build(); DatafeedConfig datafeed = datafeedConfig.build();
registerDatafeed(datafeed); registerDatafeed(datafeed);
putDatafeed(datafeed); putDatafeed(datafeed);
@ -210,7 +208,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
assertThat(getModelSnapshots("no-retention").size(), equalTo(2)); assertThat(getModelSnapshots("no-retention").size(), equalTo(2));
List<Bucket> buckets = getBuckets("results-retention"); List<Bucket> buckets = getBuckets("results-retention");
assertThat(buckets.size(), is(lessThanOrEqualTo(24))); assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
assertThat(buckets.size(), is(greaterThanOrEqualTo(22))); assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo)); assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
assertThat(getRecords("results-retention").size(), equalTo(0)); assertThat(getRecords("results-retention").size(), equalTo(0));
@ -225,7 +223,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
assertThat(getModelSnapshots("snapshots-retention-with-retain").size(), equalTo(2)); assertThat(getModelSnapshots("snapshots-retention-with-retain").size(), equalTo(2));
buckets = getBuckets("results-and-snapshots-retention"); buckets = getBuckets("results-and-snapshots-retention");
assertThat(buckets.size(), is(lessThanOrEqualTo(24))); assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
assertThat(buckets.size(), is(greaterThanOrEqualTo(22))); assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo)); assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0)); assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0));
@ -278,7 +276,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
private static Job.Builder newJobBuilder(String id) { private static Job.Builder newJobBuilder(String id) {
Detector.Builder detector = new Detector.Builder(); Detector.Builder detector = new Detector.Builder();
detector.setFunction("count"); detector.setFunction("count");
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
analysisConfig.setBucketSpan(TimeValue.timeValueHours(1)); analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
DataDescription.Builder dataDescription = new DataDescription.Builder(); DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time"); dataDescription.setTimeField("time");

View File

@ -81,7 +81,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
Supplier<Boolean> isTimedOutSupplier) { Supplier<Boolean> isTimedOutSupplier) {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList( List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, auditor), new ExpiredResultsRemover(client, auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool), new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, threadPool), new ExpiredModelSnapshotsRemover(client, threadPool),
new UnusedStateRemover(client, clusterService) new UnusedStateRemover(client, clusterService)

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.job.retention;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
@ -16,12 +15,9 @@ import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator; import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
import java.time.Clock;
import java.time.Instant;
import java.util.Deque; import java.util.Deque;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -68,9 +64,19 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
removeData(jobIterator, listener, isTimedOutSupplier); removeData(jobIterator, listener, isTimedOutSupplier);
return; return;
} }
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs, calcCutoffEpochMs(job.getId(), retentionDays, ActionListener.wrap(
ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure)); cutoffEpochMs -> {
if (cutoffEpochMs == null) {
removeData(jobIterator, listener, isTimedOutSupplier);
} else {
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(
response -> removeData(jobIterator, listener, isTimedOutSupplier),
listener::onFailure));
}
},
listener::onFailure
));
} }
private WrappedBatchedJobsIterator newJobIterator() { private WrappedBatchedJobsIterator newJobIterator() {
@ -78,20 +84,17 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
return new WrappedBatchedJobsIterator(jobsIterator); return new WrappedBatchedJobsIterator(jobsIterator);
} }
private long calcCutoffEpochMs(long retentionDays) { abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener);
long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
return nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
}
protected abstract Long getRetentionDays(Job job); abstract Long getRetentionDays(Job job);
/** /**
* Template method to allow implementation details of various types of data (e.g. results, model snapshots). * Template method to allow implementation details of various types of data (e.g. results, model snapshots).
* Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job. * Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job.
*/ */
protected abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener); abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener);
protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
return QueryBuilders.boolQuery() return QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis")); .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis"));

View File

@ -15,10 +15,14 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
@ -27,12 +31,14 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit;
/** /**
* Deletes all model snapshots that have expired the configured retention time * Deletes all model snapshots that have expired the configured retention time
@ -65,10 +71,59 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
} }
@Override @Override
protected Long getRetentionDays(Job job) { Long getRetentionDays(Job job) {
return job.getModelSnapshotRetentionDays(); return job.getModelSnapshotRetentionDays();
} }
@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
ThreadedActionListener<Long> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);
latestSnapshotTimeStamp(jobId, ActionListener.wrap(
latestTime -> {
if (latestTime == null) {
threadedActionListener.onResponse(null);
} else {
long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
threadedActionListener.onResponse(cutoff);
}
},
listener::onFailure
));
}
private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener) {
SortBuilder<?> sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
QueryBuilder snapshotQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(snapshotQuery);
searchSourceBuilder.size(1);
searchSourceBuilder.trackTotalHits(false);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
client.search(searchRequest, ActionListener.wrap(
response -> {
SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0) {
// no snapshots found
listener.onResponse(null);
} else {
ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef());
listener.onResponse(snapshot.getTimestamp().getTime());
}
},
listener::onFailure)
);
}
@Override @Override
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) { protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
if (job.getModelSnapshotId() == null) { if (job.getModelSnapshotId() == null) {

View File

@ -8,29 +8,50 @@ package org.elasticsearch.xpack.ml.job.retention;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Forecast; import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit;
/** /**
* Removes all results that have expired the configured retention time * Removes all results that have expired the configured retention time
@ -48,15 +69,17 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
private final OriginSettingClient client; private final OriginSettingClient client;
private final AnomalyDetectionAuditor auditor; private final AnomalyDetectionAuditor auditor;
private final ThreadPool threadPool;
public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor) { public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor, ThreadPool threadPool) {
super(client); super(client);
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.auditor = Objects.requireNonNull(auditor); this.auditor = Objects.requireNonNull(auditor);
this.threadPool = Objects.requireNonNull(threadPool);
} }
@Override @Override
protected Long getRetentionDays(Job job) { Long getRetentionDays(Job job) {
return job.getResultsRetentionDays(); return job.getResultsRetentionDays();
} }
@ -107,6 +130,59 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
return request; return request;
} }
@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
ThreadedActionListener<Long> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);
latestBucketTime(jobId, ActionListener.wrap(
latestTime -> {
if (latestTime == null) {
threadedActionListener.onResponse(null);
} else {
long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
threadedActionListener.onResponse(cutoff);
}
},
listener::onFailure
));
}
private void latestBucketTime(String jobId, ActionListener<Long> listener) {
SortBuilder<?> sortBuilder = new FieldSortBuilder(Result.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
QueryBuilder bucketType = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(bucketType);
searchSourceBuilder.size(1);
searchSourceBuilder.trackTotalHits(false);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
client.search(searchRequest, ActionListener.wrap(
response -> {
SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0) {
// no buckets found
listener.onResponse(null);
} else {
try (InputStream stream = hits[0].getSourceRef().streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
Bucket bucket = Bucket.LENIENT_PARSER.apply(parser, null);
listener.onResponse(bucket.getTimestamp().getTime());
} catch (IOException e) {
listener.onFailure(new ElasticsearchParseException("failed to parse bucket", e));
}
}
}, listener::onFailure
));
}
private void auditResultsWereDeleted(String jobId, long cutoffEpochMs) { private void auditResultsWereDeleted(String jobId, long cutoffEpochMs) {
Instant instant = Instant.ofEpochMilli(cutoffEpochMs); Instant instant = Instant.ofEpochMilli(cutoffEpochMs);
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(instant, ZoneOffset.systemDefault()); ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(instant, ZoneOffset.systemDefault());

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -25,6 +26,8 @@ import org.elasticsearch.xpack.ml.test.MockOriginSettingClient;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -59,6 +62,11 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
return randomBoolean() ? null : 0L; return randomBoolean() ? null : 0L;
} }
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
listener.onResponse(nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis());
}
@Override @Override
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) { protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
listener.onResponse(Boolean.TRUE); listener.onResponse(Boolean.TRUE);

View File

@ -12,19 +12,16 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.config.JobTests;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -33,8 +30,9 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemoverTests.TestListener; import static org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemoverTests.TestListener;
@ -45,114 +43,97 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
private Client client; private Client client;
private OriginSettingClient originSettingClient; private OriginSettingClient originSettingClient;
private ThreadPool threadPool;
private List<SearchRequest> capturedSearchRequests; private List<SearchRequest> capturedSearchRequests;
private List<DeleteModelSnapshotAction.Request> capturedDeleteModelSnapshotRequests; private List<DeleteModelSnapshotAction.Request> capturedDeleteModelSnapshotRequests;
private List<SearchResponse> searchResponsesPerCall;
private TestListener listener; private TestListener listener;
@Before @Before
public void setUpTests() { public void setUpTests() {
capturedSearchRequests = new ArrayList<>(); capturedSearchRequests = new ArrayList<>();
capturedDeleteModelSnapshotRequests = new ArrayList<>(); capturedDeleteModelSnapshotRequests = new ArrayList<>();
searchResponsesPerCall = new ArrayList<>();
client = mock(Client.class); client = mock(Client.class);
originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN); originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN);
listener = new TestListener(); listener = new TestListener();
// Init thread pool
Settings settings = Settings.builder()
.put("node.name", "expired_model_snapshots_remover_test")
.build();
threadPool = new ThreadPool(settings,
new FixedExecutorBuilder(settings, MachineLearning.UTILITY_THREAD_POOL_NAME, 1, 1000, ""));
}
@After
public void shutdownThreadPool() {
terminate(threadPool);
}
public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException {
givenClientRequestsSucceed(Arrays.asList(
JobTests.buildJobBuilder("foo").build(),
JobTests.buildJobBuilder("bar").build()
));
createExpiredModelSnapshotsRemover().remove(listener, () -> false);
listener.waitToCompletion();
assertThat(listener.success, is(true));
verify(client).execute(eq(SearchAction.INSTANCE), any(), any());
} }
public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException { public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException {
givenClientRequestsSucceed(Collections.singletonList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); List<SearchResponse> responses = Arrays.asList(
AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(JobTests.buildJobBuilder("foo")
.setModelSnapshotRetentionDays(7L).build())),
AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList()));
givenClientRequestsSucceed(responses);
createExpiredModelSnapshotsRemover().remove(listener, () -> false); createExpiredModelSnapshotsRemover().remove(listener, () -> false);
listener.waitToCompletion(); listener.waitToCompletion();
assertThat(listener.success, is(true)); assertThat(listener.success, is(true));
verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); verify(client, times(2)).execute(eq(SearchAction.INSTANCE), any(), any());
} }
public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException { public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException {
givenClientRequestsSucceed( List<SearchResponse> searchResponses = new ArrayList<>();
Arrays.asList( searchResponses.add(
JobTests.buildJobBuilder("none").build(), AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList(
JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), JobTests.buildJobBuilder("job-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(),
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() JobTests.buildJobBuilder("job-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
)); )));
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis());
createModelSnapshot("snapshots-1", "snapshots-1_2")); ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo);
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1)));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
Date eightDaysAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(8).getMillis());
ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAgo);
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshotToBeDeleted)));
ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAgo);
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_1)));
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList()));
givenClientRequestsSucceed(searchResponses);
createExpiredModelSnapshotsRemover().remove(listener, () -> false); createExpiredModelSnapshotsRemover().remove(listener, () -> false);
listener.waitToCompletion(); listener.waitToCompletion();
assertThat(listener.success, is(true)); assertThat(listener.success, is(true));
assertThat(capturedSearchRequests.size(), equalTo(2)); assertThat(capturedSearchRequests.size(), equalTo(5));
SearchRequest searchRequest = capturedSearchRequests.get(0); SearchRequest searchRequest = capturedSearchRequests.get(1);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("job-1")}));
searchRequest = capturedSearchRequests.get(1); searchRequest = capturedSearchRequests.get(3);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")})); assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("job-2")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3)); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1));
DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); assertThat(deleteSnapshotRequest.getJobId(), equalTo("job-1"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("old-snapshot"));
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(1);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2"));
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1"));
} }
public void testRemove_GivenTimeout() throws IOException { public void testRemove_GivenTimeout() throws IOException {
givenClientRequestsSucceed( List<SearchResponse> searchResponses = new ArrayList<>();
Arrays.asList( searchResponses.add(
AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList(
JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(),
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
)); )));
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2")); createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
givenClientRequestsSucceed(searchResponses);
final int timeoutAfter = randomIntBetween(0, 1); final int timeoutAfter = randomIntBetween(0, 1);
AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter);
@ -164,52 +145,53 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
} }
public void testRemove_GivenClientSearchRequestsFail() throws IOException { public void testRemove_GivenClientSearchRequestsFail() throws IOException {
givenClientSearchRequestsFail( List<SearchResponse> searchResponses = new ArrayList<>();
Arrays.asList( searchResponses.add(
JobTests.buildJobBuilder("none").build(), AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList(
JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(),
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
)); )));
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
givenClientSearchRequestsFail(searchResponses);
createExpiredModelSnapshotsRemover().remove(listener, () -> false); createExpiredModelSnapshotsRemover().remove(listener, () -> false);
listener.waitToCompletion(); listener.waitToCompletion();
assertThat(listener.success, is(false)); assertThat(listener.success, is(false));
assertThat(capturedSearchRequests.size(), equalTo(1)); assertThat(capturedSearchRequests.size(), equalTo(2));
SearchRequest searchRequest = capturedSearchRequests.get(0); SearchRequest searchRequest = capturedSearchRequests.get(1);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0)); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0));
} }
public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOException { public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOException {
givenClientDeleteModelSnapshotRequestsFail( List<SearchResponse> searchResponses = new ArrayList<>();
Arrays.asList( searchResponses.add(
JobTests.buildJobBuilder("none").build(), AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList(
JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(),
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
)); )));
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1");
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1)));
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(
snapshot1_1,
createModelSnapshot("snapshots-1", "snapshots-1_2")); createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1");
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2)));
givenClientDeleteModelSnapshotRequestsFail(searchResponses);
createExpiredModelSnapshotsRemover().remove(listener, () -> false); createExpiredModelSnapshotsRemover().remove(listener, () -> false);
listener.waitToCompletion(); listener.waitToCompletion();
assertThat(listener.success, is(false)); assertThat(listener.success, is(false));
assertThat(capturedSearchRequests.size(), equalTo(1)); assertThat(capturedSearchRequests.size(), equalTo(3));
SearchRequest searchRequest = capturedSearchRequests.get(0); SearchRequest searchRequest = capturedSearchRequests.get(1);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1));
@ -218,49 +200,76 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1"));
} }
@SuppressWarnings("unchecked")
public void testCalcCutoffEpochMs() throws IOException {
List<SearchResponse> searchResponses = new ArrayList<>();
Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis());
ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "newest-snapshot", oneDayAgo);
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1)));
givenClientRequests(searchResponses, true, true);
long retentionDays = 3L;
ActionListener<Long> cutoffListener = mock(ActionListener.class);
createExpiredModelSnapshotsRemover().calcCutoffEpochMs("job-1", retentionDays, cutoffListener);
long dayInMills = 60 * 60 * 24 * 1000;
long expectedCutoffTime = oneDayAgo.getTime() - (dayInMills * retentionDays);
verify(cutoffListener).onResponse(eq(expectedCutoffTime));
}
private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() {
ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executor = mock(ExecutorService.class);
when(threadPool.executor(eq(MachineLearning.UTILITY_THREAD_POOL_NAME))).thenReturn(executor);
doAnswer(invocationOnMock -> {
Runnable run = (Runnable) invocationOnMock.getArguments()[0];
run.run();
return null;
}
).when(executor).execute(any());
return new ExpiredModelSnapshotsRemover(originSettingClient, threadPool); return new ExpiredModelSnapshotsRemover(originSettingClient, threadPool);
} }
private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId) { private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId) {
return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).build(); return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(new Date()).build();
} }
private void givenClientRequestsSucceed(List<Job> jobs) throws IOException { private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) {
givenClientRequests(jobs, true, true); return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(date).build();
} }
private void givenClientSearchRequestsFail(List<Job> jobs) throws IOException { private void givenClientRequestsSucceed(List<SearchResponse> searchResponses) {
givenClientRequests(jobs, false, true); givenClientRequests(searchResponses, true, true);
} }
private void givenClientDeleteModelSnapshotRequestsFail(List<Job> jobs) throws IOException { private void givenClientSearchRequestsFail(List<SearchResponse> searchResponses) {
givenClientRequests(jobs, true, false); givenClientRequests(searchResponses, false, true);
}
private void givenClientDeleteModelSnapshotRequestsFail(List<SearchResponse> searchResponses) {
givenClientRequests(searchResponses, true, false);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void givenClientRequests(List<Job> jobs, private void givenClientRequests(List<SearchResponse> searchResponses,
boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed) throws IOException { boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed) {
SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs);
doAnswer(new Answer<Void>() { doAnswer(new Answer<Void>() {
int callCount = 0; AtomicInteger callCount = new AtomicInteger();
AtomicBoolean isJobQuery = new AtomicBoolean(true);
@Override @Override
public Void answer(InvocationOnMock invocationOnMock) { public Void answer(InvocationOnMock invocationOnMock) {
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[2]; ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[2];
if (isJobQuery.get()) {
listener.onResponse(response);
isJobQuery.set(false);
return null;
}
SearchRequest searchRequest = (SearchRequest) invocationOnMock.getArguments()[1]; SearchRequest searchRequest = (SearchRequest) invocationOnMock.getArguments()[1];
capturedSearchRequests.add(searchRequest); capturedSearchRequests.add(searchRequest);
if (shouldSearchRequestsSucceed) { // Only the last search request should fail
listener.onResponse(searchResponsesPerCall.get(callCount++)); if (shouldSearchRequestsSucceed || callCount.get() < searchResponses.size()) {
listener.onResponse(searchResponses.get(callCount.getAndIncrement()));
} else { } else {
listener.onFailure(new RuntimeException("search failed")); listener.onFailure(new RuntimeException("search failed"));
} }
@ -268,9 +277,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
} }
}).when(client).execute(same(SearchAction.INSTANCE), any(), any()); }).when(client).execute(same(SearchAction.INSTANCE), any(), any());
doAnswer(new Answer<Void>() { doAnswer(invocationOnMock -> {
@Override
public Void answer(InvocationOnMock invocationOnMock) {
capturedDeleteModelSnapshotRequests.add((DeleteModelSnapshotAction.Request) invocationOnMock.getArguments()[1]); capturedDeleteModelSnapshotRequests.add((DeleteModelSnapshotAction.Request) invocationOnMock.getArguments()[1]);
ActionListener<AcknowledgedResponse> listener = ActionListener<AcknowledgedResponse> listener =
(ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[2]; (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[2];
@ -281,7 +288,6 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
} }
return null; return null;
} }
}).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any()); ).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any());
} }
} }

View File

@ -7,26 +7,32 @@ package org.elasticsearch.xpack.ml.job.retention;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.config.JobTests;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient;
import org.junit.Before; import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -56,7 +62,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
} }
public void testRemove_GivenNoJobs() throws IOException { public void testRemove_GivenNoJobs() throws IOException {
givenClientRequestsSucceed(); givenDBQRequestsSucceed();
AbstractExpiredJobDataRemoverTests.givenJobs(client, Collections.emptyList()); AbstractExpiredJobDataRemoverTests.givenJobs(client, Collections.emptyList());
createExpiredResultsRemover().remove(listener, () -> false); createExpiredResultsRemover().remove(listener, () -> false);
@ -66,7 +72,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
} }
public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException {
givenClientRequestsSucceed(); givenDBQRequestsSucceed();
AbstractExpiredJobDataRemoverTests.givenJobs(client, AbstractExpiredJobDataRemoverTests.givenJobs(client,
Arrays.asList( Arrays.asList(
JobTests.buildJobBuilder("foo").build(), JobTests.buildJobBuilder("foo").build(),
@ -79,14 +85,14 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); verify(client).execute(eq(SearchAction.INSTANCE), any(), any());
} }
public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception { public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() {
givenClientRequestsSucceed(); givenDBQRequestsSucceed();
AbstractExpiredJobDataRemoverTests.givenJobs(client,
Arrays.asList( givenSearchResponses(Arrays.asList(
JobTests.buildJobBuilder("none").build(), JobTests.buildJobBuilder("none").build(),
JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(),
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()),
)); new Bucket("id_not_important", new Date(), 60));
createExpiredResultsRemover().remove(listener, () -> false); createExpiredResultsRemover().remove(listener, () -> false);
@ -98,13 +104,12 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
verify(listener).onResponse(true); verify(listener).onResponse(true);
} }
public void testRemove_GivenTimeout() throws Exception { public void testRemove_GivenTimeout() {
givenClientRequestsSucceed(); givenDBQRequestsSucceed();
AbstractExpiredJobDataRemoverTests.givenJobs(client, givenSearchResponses(Arrays.asList(
Arrays.asList( JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(),
JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() ), new Bucket("id_not_important", new Date(), 60));
));
final int timeoutAfter = randomIntBetween(0, 1); final int timeoutAfter = randomIntBetween(0, 1);
AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter);
@ -115,14 +120,14 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
verify(listener).onResponse(false); verify(listener).onResponse(false);
} }
public void testRemove_GivenClientRequestsFailed() throws IOException { public void testRemove_GivenClientRequestsFailed() {
givenClientRequestsFailed(); givenDBQRequestsFailed();
AbstractExpiredJobDataRemoverTests.givenJobs(client, givenSearchResponses(
Arrays.asList( Arrays.asList(
JobTests.buildJobBuilder("none").build(), JobTests.buildJobBuilder("none").build(),
JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(),
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()),
)); new Bucket("id_not_important", new Date(), 60));
createExpiredResultsRemover().remove(listener, () -> false); createExpiredResultsRemover().remove(listener, () -> false);
@ -132,19 +137,33 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
verify(listener).onFailure(any()); verify(listener).onFailure(any());
} }
private void givenClientRequestsSucceed() { @SuppressWarnings("unchecked")
givenClientRequests(true); public void testCalcCutoffEpochMs() {
String jobId = "calc-cutoff";
Date latest = new Date();
givenSearchResponses(Collections.singletonList(JobTests.buildJobBuilder(jobId).setResultsRetentionDays(1L).build()),
new Bucket(jobId, latest, 60));
ActionListener<Long> cutoffListener = mock(ActionListener.class);
createExpiredResultsRemover().calcCutoffEpochMs(jobId, 1L, cutoffListener);
long dayInMills = 60 * 60 * 24 * 1000;
long expectedCutoffTime = latest.getTime() - dayInMills;
verify(cutoffListener).onResponse(eq(expectedCutoffTime));
} }
private void givenClientRequestsFailed() { private void givenDBQRequestsSucceed() {
givenClientRequests(false); givenDBQRequest(true);
}
private void givenDBQRequestsFailed() {
givenDBQRequest(false);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void givenClientRequests(boolean shouldSucceed) { private void givenDBQRequest(boolean shouldSucceed) {
doAnswer(new Answer<Void>() { doAnswer(invocationOnMock -> {
@Override
public Void answer(InvocationOnMock invocationOnMock) {
capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]); capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]);
ActionListener<BulkByScrollResponse> listener = ActionListener<BulkByScrollResponse> listener =
(ActionListener<BulkByScrollResponse>) invocationOnMock.getArguments()[2]; (ActionListener<BulkByScrollResponse>) invocationOnMock.getArguments()[2];
@ -157,10 +176,38 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
} }
return null; return null;
} }
}).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any()); ).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any());
}
@SuppressWarnings("unchecked")
private void givenSearchResponses(List<Job> jobs, Bucket bucket) {
doAnswer(invocationOnMock -> {
SearchRequest request = (SearchRequest) invocationOnMock.getArguments()[1];
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[2];
if (request.indices()[0].startsWith(AnomalyDetectorsIndex.jobResultsIndexPrefix())) {
// asking for the bucket result
listener.onResponse(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(bucket)));
} else {
listener.onResponse(AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs));
}
return null;
}).when(client).execute(eq(SearchAction.INSTANCE), any(), any());
} }
private ExpiredResultsRemover createExpiredResultsRemover() { private ExpiredResultsRemover createExpiredResultsRemover() {
return new ExpiredResultsRemover(originSettingClient, mock(AnomalyDetectionAuditor.class)); ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executor = mock(ExecutorService.class);
when(threadPool.executor(eq(MachineLearning.UTILITY_THREAD_POOL_NAME))).thenReturn(executor);
doAnswer(invocationOnMock -> {
Runnable run = (Runnable) invocationOnMock.getArguments()[0];
run.run();
return null;
}
).when(executor).execute(any());
return new ExpiredResultsRemover(originSettingClient, mock(AnomalyDetectionAuditor.class), threadPool);
} }
} }