[ML] Remove expired forecasts (elastic/x-pack-elasticsearch#3077)

Closes elastic/machine-learning-cpp#322


Original commit: elastic/x-pack-elasticsearch@5249452a86
This commit is contained in:
Dimitris Athanasiou 2017-11-21 17:18:04 +00:00 committed by GitHub
parent 4ae1ca5fa5
commit 74beb9ca64
6 changed files with 277 additions and 2 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.retention.ExpiredForecastsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
@ -146,6 +147,7 @@ public class DeleteExpiredDataAction extends Action<DeleteExpiredDataAction.Requ
Auditor auditor = new Auditor(client, clusterService);
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, clusterService, auditor),
new ExpiredForecastsRemover(client),
new ExpiredModelSnapshotsRemover(client, clusterService)
);
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);

View File

@ -175,6 +175,10 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
return jobId;
}
public long getForecastId() {
return forecastId;
}
/**
* Return the document ID used for indexing. As there is 1 and only 1 document
* per forecast request, the id has no dynamic parts.

View File

@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.retention;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* Removes up to {@link #MAX_FORECASTS} forecasts (stats + forecasts docs) that have expired.
* A forecast is deleted if its expiration timestamp is earlier
* than the start of the current day (local time-zone).
*/
public class ExpiredForecastsRemover implements MlDataRemover {
private static final Logger LOGGER = Loggers.getLogger(ExpiredForecastsRemover.class);
private static final int MAX_FORECASTS = 10000;
private static final String RESULTS_INDEX_PATTERN = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*";
private final Client client;
private final long cutoffEpochMs;
public ExpiredForecastsRemover(Client client) {
this.client = Objects.requireNonNull(client);
this.cutoffEpochMs = DateTime.now(ISOChronology.getInstance()).getMillis();
}
@Override
public void remove(ActionListener<Boolean> listener) {
LOGGER.debug("Removing forecasts that expire before [{}]", cutoffEpochMs);
ActionListener<SearchResponse> forecastStatsHandler = ActionListener.wrap(
searchResponse -> deleteForecasts(searchResponse, listener),
e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e)));
SearchSourceBuilder source = new SearchSourceBuilder();
source.query(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE))
.filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName())));
source.size(MAX_FORECASTS);
SearchRequest searchRequest = new SearchRequest(RESULTS_INDEX_PATTERN);
searchRequest.source(source);
client.execute(SearchAction.INSTANCE, searchRequest, forecastStatsHandler);
}
private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener) {
List<ForecastRequestStats> forecastsToDelete;
try {
forecastsToDelete = findForecastsToDelete(searchResponse);
} catch (IOException e) {
listener.onFailure(e);
return;
}
DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete);
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
try {
if (bulkByScrollResponse.getDeleted() > 0) {
LOGGER.info("Deleted [{}] documents corresponding to [{}] expired forecasts",
bulkByScrollResponse.getDeleted(), forecastsToDelete.size());
}
listener.onResponse(true);
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(new ElasticsearchException("Failed to remove expired forecasts", e));
}
});
}
private List<ForecastRequestStats> findForecastsToDelete(SearchResponse searchResponse) throws IOException {
List<ForecastRequestStats> forecastsToDelete = new ArrayList<>();
SearchHits hits = searchResponse.getHits();
if (hits.getTotalHits() > MAX_FORECASTS) {
LOGGER.info("More than [{}] forecasts were found. This run will only delete [{}] of them", MAX_FORECASTS, MAX_FORECASTS);
}
for (SearchHit hit : hits.getHits()) {
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
NamedXContentRegistry.EMPTY, hit.getSourceRef());
ForecastRequestStats forecastRequestStats = ForecastRequestStats.PARSER.apply(parser, null);
if (forecastRequestStats.getDateExpired().toEpochMilli() < cutoffEpochMs) {
forecastsToDelete.add(forecastRequestStats);
}
}
return forecastsToDelete;
}
private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forecastsToDelete) {
SearchRequest searchRequest = new SearchRequest();
// We need to create the DeleteByQueryRequest before we modify the SearchRequest
// because the constructor of the former wipes the latter
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
request.setSlices(5);
searchRequest.indices(RESULTS_INDEX_PATTERN);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE));
for (ForecastRequestStats forecastToDelete : forecastsToDelete) {
boolQuery.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId()))
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId())));
}
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
searchRequest.source(new SearchSourceBuilder().query(query));
return request;
}
}

View File

@ -22,6 +22,8 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor;
@ -88,7 +90,8 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
request.setSlices(5);
searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
QueryBuilder excludeFilter = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE);
QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ModelSizeStats.RESULT_TYPE_VALUE, ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE);
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs)
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
.mustNot(excludeFilter);

View File

@ -23,11 +23,13 @@ import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -88,6 +90,8 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
registerJob(newJobBuilder("snapshots-retention-with-retain").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L));
registerJob(newJobBuilder("results-and-snapshots-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(2L));
List<Long> shortExpiryForecastIds = new ArrayList<>();
long now = System.currentTimeMillis();
long oneDayAgo = now - TimeValue.timeValueHours(48).getMillis() - 1;
for (Job.Builder job : getJobs()) {
@ -116,6 +120,16 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
UpdateRequest updateSnapshotRequest = new UpdateRequest(".ml-anomalies-" + job.getId(), "doc", snapshotDocId);
updateSnapshotRequest.doc(snapshotUpdate.getBytes(StandardCharsets.UTF_8), XContentType.JSON);
client().execute(UpdateAction.INSTANCE, updateSnapshotRequest).get();
// Now let's create some forecasts
openJob(job.getId());
long forecastShortExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.timeValueSeconds(1));
shortExpiryForecastIds.add(forecastShortExpiryId);
long forecastDefaultExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), null);
long forecastNoExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.ZERO);
waitForecastToFinish(job.getId(), forecastShortExpiryId);
waitForecastToFinish(job.getId(), forecastDefaultExpiryId);
waitForecastToFinish(job.getId(), forecastNoExpiryId);
}
// Refresh to ensure the snapshot timestamp updates are visible
client().admin().indices().prepareRefresh("*").get();
@ -125,7 +139,6 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
for (Job.Builder job : getJobs()) {
// Run up to now
openJob(job.getId());
startDatafeed(job.getId() + "-feed", 0, now);
waitUntilJobIsClosed(job.getId());
assertThat(getBuckets(job.getId()).size(), is(greaterThanOrEqualTo(70)));
@ -143,6 +156,14 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
assertThat(totalModelSizeStatsBeforeDelete, greaterThan(0L));
assertThat(totalNotificationsCountBeforeDelete, greaterThan(0L));
// Verify forecasts were created
List<ForecastRequestStats> forecastStats = getForecastStats();
assertThat(forecastStats.size(), equalTo(getJobs().size() * 3));
for (ForecastRequestStats forecastStat : forecastStats) {
assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()),
equalTo((long) forecastStat.getRecordCount()));
}
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
// We need to refresh to ensure the deletion is visible
@ -181,6 +202,19 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
long totalNotificationsCountAfterDelete = client().prepareSearch(".ml-notifications").get().getHits().totalHits;
assertThat(totalModelSizeStatsAfterDelete, equalTo(totalModelSizeStatsBeforeDelete));
assertThat(totalNotificationsCountAfterDelete, greaterThanOrEqualTo(totalNotificationsCountBeforeDelete));
// Verify short expiry forecasts were deleted only
forecastStats = getForecastStats();
assertThat(forecastStats.size(), equalTo(getJobs().size() * 2));
for (ForecastRequestStats forecastStat : forecastStats) {
assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()),
equalTo((long) forecastStat.getRecordCount()));
}
for (Job.Builder job : getJobs()) {
for (long forecastId : shortExpiryForecastIds) {
assertThat(countForecastDocs(job.getId(), forecastId), equalTo(0L));
}
}
}
private static Job.Builder newJobBuilder(String id) {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
@ -16,8 +17,14 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.SecurityIntegTestCase;
@ -28,6 +35,7 @@ import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.ForecastJobAction;
import org.elasticsearch.xpack.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.ml.action.GetCategoriesAction;
import org.elasticsearch.xpack.ml.action.GetJobsAction;
@ -49,11 +57,15 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksNodeService;
@ -72,6 +84,8 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
* Base class of ML integration tests that use a native autodetect process
@ -277,6 +291,75 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase {
return client().execute(PostDataAction.INSTANCE, request).actionGet().getDataCounts();
}
protected long forecast(String jobId, TimeValue duration, TimeValue expiresIn) {
ForecastJobAction.Request request = new ForecastJobAction.Request(jobId);
request.setDuration(duration.getStringRep());
if (expiresIn != null) {
request.setExpiresIn(expiresIn.getStringRep());
}
return client().execute(ForecastJobAction.INSTANCE, request).actionGet().getForecastId();
}
protected void waitForecastToFinish(String jobId, long forecastId) throws Exception {
assertBusy(() -> {
ForecastRequestStats forecastRequestStats = getForecastStats(jobId, forecastId);
assertThat(forecastRequestStats, is(notNullValue()));
assertThat(forecastRequestStats.getStatus(), equalTo(ForecastRequestStats.ForecastRequestStatus.FINISHED));
}, 30, TimeUnit.SECONDS);
}
protected ForecastRequestStats getForecastStats(String jobId, long forecastId) {
SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsAliasedName(jobId))
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE))
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.termQuery(ForecastRequestStats.FORECAST_ID.getPreferredName(), forecastId)))
.execute().actionGet();
SearchHits hits = searchResponse.getHits();
if (hits.getTotalHits() == 0) {
return null;
}
assertThat(hits.getTotalHits(), equalTo(1L));
try {
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
NamedXContentRegistry.EMPTY, hits.getHits()[0].getSourceRef());
return ForecastRequestStats.PARSER.apply(parser, null);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
protected List<ForecastRequestStats> getForecastStats() {
List<ForecastRequestStats> forecastStats = new ArrayList<>();
SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*")
.setSize(1000)
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE)))
.execute().actionGet();
SearchHits hits = searchResponse.getHits();
for (SearchHit hit : hits) {
try {
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
NamedXContentRegistry.EMPTY, hit.getSourceRef());
forecastStats.add(ForecastRequestStats.PARSER.apply(parser, null));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
return forecastStats;
}
protected long countForecastDocs(String jobId, long forecastId) {
SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*")
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Forecast.RESULT_TYPE_VALUE))
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastId)))
.execute().actionGet();
return searchResponse.getHits().getTotalHits();
}
@Override
protected void ensureClusterStateConsistency() throws IOException {
if (cluster() != null && cluster().size() > 0) {