[7.x] Delete auto-generated annotations when model snapshot is reverted (#58240) (#58335)

This commit is contained in:
Przemysław Witek 2020-06-18 17:59:52 +02:00 committed by GitHub
parent be08268562
commit 9dd3d5aa48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 266 additions and 95 deletions

View File

@ -6,16 +6,11 @@
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
@ -28,14 +23,10 @@ import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.ml.annotations.AnnotationTests.randomAnnotation;
import static org.hamcrest.Matchers.hasSize;
public class DeleteJobIT extends MlNativeAutodetectIntegTestCase {
@ -122,23 +113,4 @@ public class DeleteJobIT extends MlNativeAutodetectIntegTestCase {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
}
private void assertThatNumberOfAnnotationsIsEqualTo(int expectedNumberOfAnnotations) throws Exception {
// Refresh the annotations index so that recently indexed annotation docs are visible.
client().admin().indices().prepareRefresh(AnnotationIndex.INDEX_NAME)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.execute()
.actionGet();
SearchRequest searchRequest =
new SearchRequest(AnnotationIndex.READ_ALIAS_NAME).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
List<Annotation> annotations = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
try (XContentParser parser = createParser(jsonXContent, hit.getSourceRef())) {
annotations.add(Annotation.fromXContent(parser, null));
}
}
assertThat("Hits were: " + annotations, annotations, hasSize(expectedNumberOfAnnotations));
}
}

View File

@ -5,7 +5,9 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
@ -45,6 +47,8 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
@ -70,7 +74,9 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -243,8 +249,9 @@ abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase {
return response.getPage().results();
}
protected RevertModelSnapshotAction.Response revertModelSnapshot(String jobId, String snapshotId) {
protected RevertModelSnapshotAction.Response revertModelSnapshot(String jobId, String snapshotId, boolean deleteInterveningResults) {
RevertModelSnapshotAction.Request request = new RevertModelSnapshotAction.Request(jobId, snapshotId);
request.setDeleteInterveningResults(deleteInterveningResults);
return client().execute(RevertModelSnapshotAction.INSTANCE, request).actionGet();
}
@ -295,6 +302,25 @@ abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase {
}, 30, TimeUnit.SECONDS);
}
protected void assertThatNumberOfAnnotationsIsEqualTo(int expectedNumberOfAnnotations) throws IOException {
// Refresh the annotations index so that recently indexed annotation docs are visible.
client().admin().indices().prepareRefresh(AnnotationIndex.INDEX_NAME)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.execute()
.actionGet();
SearchRequest searchRequest =
new SearchRequest(AnnotationIndex.READ_ALIAS_NAME).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
List<Annotation> annotations = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
try (XContentParser parser = createParser(jsonXContent, hit.getSourceRef())) {
annotations.add(Annotation.fromXContent(parser, null));
}
}
assertThat("Annotations were: " + annotations, annotations, hasSize(expectedNumberOfAnnotations));
}
protected ForecastRequestStats getForecastStats(String jobId, String forecastId) {
SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsAliasedName(jobId))
.setQuery(QueryBuilders.idsQuery().addIds(ForecastRequestStats.documentId(jobId, forecastId)))

View File

@ -5,14 +5,22 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.Annotation.Event;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
@ -21,9 +29,12 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeSta
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.security.user.XPackUser;
import org.junit.After;
import java.io.IOException;
import java.sql.Date;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -33,6 +44,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ml.annotations.AnnotationTests.randomAnnotation;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
@ -50,11 +62,19 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
cleanUp();
}
public void test() throws Exception {
public void testRevertModelSnapshot() throws Exception {
test("revert-model-snapshot-it-job", false);
}
public void testRevertModelSnapshot_DeleteInterveningResults() throws Exception {
test("revert-model-snapshot-it-job-delete-intervening-results", true);
}
private void test(String jobId, boolean deleteInterveningResults) throws Exception {
TimeValue bucketSpan = TimeValue.timeValueHours(1);
long startTime = 1491004800000L;
Job.Builder job = buildAndRegisterJob("revert-model-snapshot-it-job", bucketSpan);
Job.Builder job = buildAndRegisterJob(jobId, bucketSpan);
openJob(job.getId());
postData(job.getId(), generateData(startTime, bucketSpan, 10, Arrays.asList("foo"),
(bucketIndex, series) -> bucketIndex == 5 ? 100.0 : 10.0).stream().collect(Collectors.joining()));
@ -97,7 +117,20 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
assertThat(getJob(job.getId()).get(0).getModelSnapshotId(), equalTo(modelSnapshots.get(0).getSnapshotId()));
ModelSnapshot revertSnapshot = modelSnapshots.get(1);
assertThat(revertModelSnapshot(job.getId(), revertSnapshot.getSnapshotId()).status(), equalTo(RestStatus.OK));
// Check there are 2 annotations (one per model snapshot)
assertThatNumberOfAnnotationsIsEqualTo(2);
// Add 3 new annotations...
Instant lastResultTimestamp = revertSnapshot.getLatestResultTimeStamp().toInstant();
client().index(randomAnnotationIndexRequest(job.getId(), lastResultTimestamp.plusSeconds(10), Event.DELAYED_DATA)).actionGet();
client().index(randomAnnotationIndexRequest(job.getId(), lastResultTimestamp.plusSeconds(20), Event.MODEL_CHANGE)).actionGet();
client().index(randomAnnotationIndexRequest(job.getId(), lastResultTimestamp.minusSeconds(10), Event.MODEL_CHANGE)).actionGet();
// ... and check there are 5 annotations in total now
assertThatNumberOfAnnotationsIsEqualTo(5);
assertThat(
revertModelSnapshot(job.getId(), revertSnapshot.getSnapshotId(), deleteInterveningResults).status(),
equalTo(RestStatus.OK));
// Check model_size_stats has been reverted
assertThat(getJobStats(job.getId()).get(0).getModelSizeStats().getModelBytes(), equalTo(modelSizeStats1.getModelBytes()));
@ -105,6 +138,9 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
// Check quantiles have been reverted
assertThat(getQuantiles(job.getId()).getTimestamp(), equalTo(revertSnapshot.getLatestResultTimeStamp()));
// Check annotations with event type from {delayed_data, model_change} have been removed if deleteInterveningResults flag is set
assertThatNumberOfAnnotationsIsEqualTo(deleteInterveningResults ? 3 : 5);
// Re-run 2nd half of data
openJob(job.getId());
postData(job.getId(), generateData(startTime + 10 * bucketSpan.getMillis(), bucketSpan, 10, Arrays.asList("foo", "bar"),
@ -170,4 +206,17 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
throw new IllegalStateException(e);
}
}
private static IndexRequest randomAnnotationIndexRequest(String jobId, Instant timestamp, Event event) throws IOException {
Annotation annotation = new Annotation.Builder(randomAnnotation(jobId))
.setTimestamp(Date.from(timestamp))
.setCreateUsername(XPackUser.NAME)
.setEvent(event)
.build();
try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
return new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME)
.source(xContentBuilder)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
}
}

View File

@ -146,7 +146,7 @@ public class SetUpgradeModeIT extends MlNativeAutodetectIntegTestCase {
expectThrowsUpgradeModeException(() -> forecast(jobId, null, null));
String snapshotId = "snapshot_id";
expectThrowsUpgradeModeException(() -> revertModelSnapshot(jobId, snapshotId));
expectThrowsUpgradeModeException(() -> revertModelSnapshot(jobId, snapshotId, false));
String datafeedId = "datafeed_id";
expectThrowsUpgradeModeException(() -> putDatafeed(createDatafeed(datafeedId, jobId, Collections.singletonList("index"))));

View File

@ -59,8 +59,6 @@ import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
@ -71,7 +69,6 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerS
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.security.user.XPackUser;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
@ -506,20 +503,9 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
}
private void deleteAnnotations(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<Boolean> finishedHandler) {
ConstantScoreQueryBuilder query =
QueryBuilders.constantScoreQuery(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.termQuery(Annotation.CREATE_USERNAME.getPreferredName(), XPackUser.NAME)));
DeleteByQueryRequest request = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME)
.setQuery(query)
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
.setAbortOnVersionConflict(false)
.setRefresh(true);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
response -> finishedHandler.onResponse(true),
ignoreIndexNotFoundException(finishedHandler)));
JobDataDeleter deleter = new JobDataDeleter(parentTaskClient, jobId);
deleter.deleteAllAnnotations(
ActionListener.wrap(r -> finishedHandler.onResponse(true), ignoreIndexNotFoundException(finishedHandler)));
}
private static Consumer<Exception> ignoreIndexNotFoundException(ActionListener<Boolean> finishedHandler) {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@ -38,6 +39,8 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
public class TransportRevertModelSnapshotAction extends TransportMasterNodeAction<RevertModelSnapshotAction.Request,
@ -100,6 +103,7 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
if (request.getDeleteInterveningResults()) {
wrappedListener = wrapDeleteOldAnnotationsListener(wrappedListener, modelSnapshot, request.getJobId());
wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId());
}
@ -133,9 +137,40 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
}, errorHandler);
}
private ActionListener<RevertModelSnapshotAction.Response> wrapDeleteOldAnnotationsListener(
ActionListener<RevertModelSnapshotAction.Response> listener,
ModelSnapshot modelSnapshot,
String jobId) {
return ActionListener.wrap(response -> {
Date deleteAfter = modelSnapshot.getLatestResultTimeStamp();
logger.info("[{}] Removing intervening annotations after reverting model: deleting annotations after [{}]", jobId, deleteAfter);
JobDataDeleter dataDeleter = new JobDataDeleter(client, jobId);
Set<String> eventsToDelete = new HashSet<>();
// Because the results based on the delayed data are being deleted, the fact that the data was originally delayed is not
// relevant
eventsToDelete.add(Annotation.Event.DELAYED_DATA.toString());
// Because the model that changed is no longer in use as it has been rolled back to a time before those changes occurred
eventsToDelete.add(Annotation.Event.MODEL_CHANGE.toString());
dataDeleter.deleteAnnotationsFromTime(deleteAfter.getTime() + 1, eventsToDelete, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean success) {
listener.onResponse(response);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}, listener::onFailure);
}
private ActionListener<RevertModelSnapshotAction.Response> wrapDeleteOldDataListener(
ActionListener<RevertModelSnapshotAction.Response> listener,
ModelSnapshot modelSnapshot, String jobId) {
ModelSnapshot modelSnapshot,
String jobId) {
// If we need to delete buckets that occurred after the snapshot, we
// wrap the listener with one that invokes the OldDataRemover on
@ -161,8 +196,8 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
private ActionListener<RevertModelSnapshotAction.Response> wrapRevertDataCountsListener(
ActionListener<RevertModelSnapshotAction.Response> listener,
ModelSnapshot modelSnapshot, String jobId) {
ModelSnapshot modelSnapshot,
String jobId) {
return ActionListener.wrap(response -> {
jobResultsProvider.dataCounts(jobId, counts -> {

View File

@ -10,10 +10,10 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
@ -21,12 +21,15 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.security.user.XPackUser;
import java.util.ArrayList;
import java.util.Collections;
@ -81,7 +84,7 @@ public class JobDataDeleter {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indices.toArray(new String[0]))
.setRefresh(true)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(new IdsQueryBuilder().addIds(idsToDelete.toArray(new String[0])));
.setQuery(QueryBuilders.idsQuery().addIds(idsToDelete.toArray(new String[0])));
// _doc is the most efficient sort order and will also disable scoring
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
@ -89,6 +92,55 @@ public class JobDataDeleter {
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
}
/**
* Asynchronously delete all the auto-generated (i.e. created by the _xpack user) annotations
*
* @param listener Response listener
*/
public void deleteAllAnnotations(ActionListener<Boolean> listener) {
deleteAnnotationsFromTime(null, null, listener);
}
/**
* Asynchronously delete all the auto-generated (i.e. created by the _xpack user) annotations starting from {@code cutOffTime}
*
* @param cutoffEpochMs Only annotations at and after this time will be deleted. If {@code null}, no cutoff is applied
* @param eventsToDelete Only annotations with one of the provided event types will be deleted.
* If {@code null} or empty, no event-related filtering is applied
* @param listener Response listener
*/
public void deleteAnnotationsFromTime(@Nullable Long cutoffEpochMs,
@Nullable Set<String> eventsToDelete,
ActionListener<Boolean> listener) {
BoolQueryBuilder boolQuery =
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.termQuery(Annotation.CREATE_USERNAME.getPreferredName(), XPackUser.NAME));
if (cutoffEpochMs != null) {
boolQuery.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
}
if (eventsToDelete != null && eventsToDelete.isEmpty() == false) {
boolQuery.filter(QueryBuilders.termsQuery(Annotation.EVENT.getPreferredName(), eventsToDelete));
}
QueryBuilder query = QueryBuilders.constantScoreQuery(boolQuery);
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME)
.setQuery(query)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setAbortOnVersionConflict(false)
.setRefresh(true)
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
// _doc is the most efficient sort order and will also disable scoring
dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
executeAsyncWithOrigin(
client,
ML_ORIGIN,
DeleteByQueryAction.INSTANCE,
dbqRequest,
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
}
/**
* Asynchronously delete all result types (Buckets, Records, Influencers) from {@code cutOffTime}
*
@ -96,43 +148,49 @@ public class JobDataDeleter {
* @param listener Response listener
*/
public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener) {
DeleteByQueryHolder deleteByQueryHolder = new DeleteByQueryHolder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
deleteByQueryHolder.dbqRequest.setRefresh(true);
QueryBuilder query = QueryBuilders.boolQuery()
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen());
deleteByQueryHolder.dbqRequest.setQuery(query);
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId))
.setQuery(query)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setAbortOnVersionConflict(false)
.setRefresh(true)
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
// _doc is the most efficient sort order and will also disable scoring
deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest,
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
executeAsyncWithOrigin(
client,
ML_ORIGIN,
DeleteByQueryAction.INSTANCE,
dbqRequest,
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
}
/**
* Delete all results marked as interim
*/
public void deleteInterimResults() {
DeleteByQueryHolder deleteByQueryHolder = new DeleteByQueryHolder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
deleteByQueryHolder.dbqRequest.setRefresh(false);
deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen());
QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true);
deleteByQueryHolder.dbqRequest.setQuery(new ConstantScoreQueryBuilder(qb));
QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true));
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId))
.setQuery(query)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setAbortOnVersionConflict(false)
.setRefresh(false)
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
// _doc is the most efficient sort order and will also disable scoring
deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest).get();
client.execute(DeleteByQueryAction.INSTANCE, dbqRequest).get();
} catch (Exception e) {
LOGGER.error("[" + jobId + "] An error occurred while deleting interim results", e);
}
}
/**
* Delete the datafeed timing stats document from all the job results indices
*
@ -142,24 +200,11 @@ public class JobDataDeleter {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId))
.setRefresh(true)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(new IdsQueryBuilder().addIds(DatafeedTimingStats.documentId(jobId)));
.setQuery(QueryBuilders.idsQuery().addIds(DatafeedTimingStats.documentId(jobId)));
// _doc is the most efficient sort order and will also disable scoring
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
}
// Wrapper to ensure safety
private static class DeleteByQueryHolder {
private final DeleteByQueryRequest dbqRequest;
private DeleteByQueryHolder(String index) {
dbqRequest = new DeleteByQueryRequest();
dbqRequest.indices(index);
dbqRequest.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
dbqRequest.setAbortOnVersionConflict(false);
}
}
}

View File

@ -7,17 +7,25 @@ package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.util.Collections;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@ -30,13 +38,66 @@ public class JobDataDeleterTests extends ESTestCase {
private static final String JOB_ID = "my-job-id";
private Client client;
private ArgumentCaptor<DeleteByQueryRequest> deleteRequestCaptor;
@Before
public void setUpTests() {
client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
deleteRequestCaptor = ArgumentCaptor.forClass(DeleteByQueryRequest.class);
}
@After
public void verifyNoMoreInteractionsWithClient() {
verify(client).threadPool();
verifyNoMoreInteractions(client);
}
public void testDeleteAllAnnotations() {
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID);
jobDataDeleter.deleteAllAnnotations(ActionListener.wrap(
deleteResponse -> {},
e -> fail(e.toString())
));
verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any());
DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue();
assertThat(deleteRequest.indices(), is(arrayContaining(AnnotationIndex.READ_ALIAS_NAME)));
assertThat(Strings.toString(deleteRequest), not(containsString("timestamp")));
assertThat(Strings.toString(deleteRequest), not(containsString("event")));
}
public void testDeleteAnnotationsFromTime_TimestampFiltering() {
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID);
jobDataDeleter.deleteAnnotationsFromTime(1_000_000_000L, null, ActionListener.wrap(
deleteResponse -> {},
e -> fail(e.toString())
));
verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any());
DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue();
assertThat(deleteRequest.indices(), is(arrayContaining(AnnotationIndex.READ_ALIAS_NAME)));
assertThat(Strings.toString(deleteRequest), containsString("timestamp"));
assertThat(Strings.toString(deleteRequest), not(containsString("event")));
}
public void testDeleteAnnotationsFromTime_EventFiltering() {
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID);
jobDataDeleter.deleteAnnotationsFromTime(null, Collections.singleton("dummy_event"), ActionListener.wrap(
deleteResponse -> {},
e -> fail(e.toString())
));
verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any());
DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue();
assertThat(deleteRequest.indices(), is(arrayContaining(AnnotationIndex.READ_ALIAS_NAME)));
assertThat(Strings.toString(deleteRequest), not(containsString("timestamp")));
assertThat(Strings.toString(deleteRequest), containsString("event"));
}
public void testDeleteDatafeedTimingStats() {
@ -46,12 +107,9 @@ public class JobDataDeleterTests extends ESTestCase {
e -> fail(e.toString())
));
ArgumentCaptor<DeleteByQueryRequest> deleteRequestCaptor = ArgumentCaptor.forClass(DeleteByQueryRequest.class);
verify(client).threadPool();
verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any());
verifyNoMoreInteractions(client);
DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue();
assertThat(deleteRequest.indices(), arrayContaining(AnomalyDetectorsIndex.jobResultsAliasedName(JOB_ID)));
assertThat(deleteRequest.indices(), is(arrayContaining(AnomalyDetectorsIndex.jobResultsAliasedName(JOB_ID))));
}
}