diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java index b6d1805cef0..a2d09ecaa31 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java @@ -6,16 +6,11 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.annotations.Annotation; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -28,14 +23,10 @@ import org.junit.After; import org.junit.Before; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; -import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; import static org.elasticsearch.xpack.core.ml.annotations.AnnotationTests.randomAnnotation; -import static org.hamcrest.Matchers.hasSize; public class DeleteJobIT extends MlNativeAutodetectIntegTestCase { @@ -122,23 +113,4 @@ public class DeleteJobIT extends MlNativeAutodetectIntegTestCase { .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); } } - - private void assertThatNumberOfAnnotationsIsEqualTo(int expectedNumberOfAnnotations) throws Exception { - // Refresh the annotations index so that recently indexed annotation docs are visible. - client().admin().indices().prepareRefresh(AnnotationIndex.INDEX_NAME) - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) - .execute() - .actionGet(); - - SearchRequest searchRequest = - new SearchRequest(AnnotationIndex.READ_ALIAS_NAME).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN); - SearchResponse searchResponse = client().search(searchRequest).actionGet(); - List annotations = new ArrayList<>(); - for (SearchHit hit : searchResponse.getHits().getHits()) { - try (XContentParser parser = createParser(jsonXContent, hit.getSourceRef())) { - annotations.add(Annotation.fromXContent(parser, null)); - } - } - assertThat("Hits were: " + annotations, annotations, hasSize(expectedNumberOfAnnotations)); - } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index c10c1b32137..06070eaa742 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -5,7 +5,9 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; @@ -45,6 +47,8 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.annotations.Annotation; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.calendars.Calendar; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -70,7 +74,9 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -243,8 +249,9 @@ abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase { return response.getPage().results(); } - protected RevertModelSnapshotAction.Response revertModelSnapshot(String jobId, String snapshotId) { + protected RevertModelSnapshotAction.Response revertModelSnapshot(String jobId, String snapshotId, boolean deleteInterveningResults) { RevertModelSnapshotAction.Request request = new RevertModelSnapshotAction.Request(jobId, snapshotId); + request.setDeleteInterveningResults(deleteInterveningResults); return client().execute(RevertModelSnapshotAction.INSTANCE, request).actionGet(); } @@ -295,6 +302,25 @@ abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase { }, 30, TimeUnit.SECONDS); } + protected void assertThatNumberOfAnnotationsIsEqualTo(int expectedNumberOfAnnotations) throws IOException { + // Refresh the annotations index so that recently indexed annotation docs are visible. + client().admin().indices().prepareRefresh(AnnotationIndex.INDEX_NAME) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) + .execute() + .actionGet(); + + SearchRequest searchRequest = + new SearchRequest(AnnotationIndex.READ_ALIAS_NAME).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + List annotations = new ArrayList<>(); + for (SearchHit hit : searchResponse.getHits().getHits()) { + try (XContentParser parser = createParser(jsonXContent, hit.getSourceRef())) { + annotations.add(Annotation.fromXContent(parser, null)); + } + } + assertThat("Annotations were: " + annotations, annotations, hasSize(expectedNumberOfAnnotations)); + } + protected ForecastRequestStats getForecastStats(String jobId, String forecastId) { SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)) .setQuery(QueryBuilders.idsQuery().addIds(ForecastRequestStats.documentId(jobId, forecastId))) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java index 4adb21df2a6..8c769e1cacb 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java @@ -5,14 +5,22 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.xpack.core.ml.annotations.Annotation; +import org.elasticsearch.xpack.core.ml.annotations.Annotation.Event; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; @@ -21,9 +29,12 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeSta import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.core.security.user.XPackUser; import org.junit.After; import java.io.IOException; +import java.sql.Date; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -33,6 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.ml.annotations.AnnotationTests.randomAnnotation; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; @@ -50,11 +62,19 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase { cleanUp(); } - public void test() throws Exception { + public void testRevertModelSnapshot() throws Exception { + test("revert-model-snapshot-it-job", false); + } + + public void testRevertModelSnapshot_DeleteInterveningResults() throws Exception { + test("revert-model-snapshot-it-job-delete-intervening-results", true); + } + + private void test(String jobId, boolean deleteInterveningResults) throws Exception { TimeValue bucketSpan = TimeValue.timeValueHours(1); long startTime = 1491004800000L; - Job.Builder job = buildAndRegisterJob("revert-model-snapshot-it-job", bucketSpan); + Job.Builder job = buildAndRegisterJob(jobId, bucketSpan); openJob(job.getId()); postData(job.getId(), generateData(startTime, bucketSpan, 10, Arrays.asList("foo"), (bucketIndex, series) -> bucketIndex == 5 ? 100.0 : 10.0).stream().collect(Collectors.joining())); @@ -97,7 +117,20 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase { assertThat(getJob(job.getId()).get(0).getModelSnapshotId(), equalTo(modelSnapshots.get(0).getSnapshotId())); ModelSnapshot revertSnapshot = modelSnapshots.get(1); - assertThat(revertModelSnapshot(job.getId(), revertSnapshot.getSnapshotId()).status(), equalTo(RestStatus.OK)); + // Check there are 2 annotations (one per model snapshot) + assertThatNumberOfAnnotationsIsEqualTo(2); + + // Add 3 new annotations... + Instant lastResultTimestamp = revertSnapshot.getLatestResultTimeStamp().toInstant(); + client().index(randomAnnotationIndexRequest(job.getId(), lastResultTimestamp.plusSeconds(10), Event.DELAYED_DATA)).actionGet(); + client().index(randomAnnotationIndexRequest(job.getId(), lastResultTimestamp.plusSeconds(20), Event.MODEL_CHANGE)).actionGet(); + client().index(randomAnnotationIndexRequest(job.getId(), lastResultTimestamp.minusSeconds(10), Event.MODEL_CHANGE)).actionGet(); + // ... and check there are 5 annotations in total now + assertThatNumberOfAnnotationsIsEqualTo(5); + + assertThat( + revertModelSnapshot(job.getId(), revertSnapshot.getSnapshotId(), deleteInterveningResults).status(), + equalTo(RestStatus.OK)); // Check model_size_stats has been reverted assertThat(getJobStats(job.getId()).get(0).getModelSizeStats().getModelBytes(), equalTo(modelSizeStats1.getModelBytes())); @@ -105,6 +138,9 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase { // Check quantiles have been reverted assertThat(getQuantiles(job.getId()).getTimestamp(), equalTo(revertSnapshot.getLatestResultTimeStamp())); + // Check annotations with event type from {delayed_data, model_change} have been removed if deleteInterveningResults flag is set + assertThatNumberOfAnnotationsIsEqualTo(deleteInterveningResults ? 3 : 5); + // Re-run 2nd half of data openJob(job.getId()); postData(job.getId(), generateData(startTime + 10 * bucketSpan.getMillis(), bucketSpan, 10, Arrays.asList("foo", "bar"), @@ -170,4 +206,17 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase { throw new IllegalStateException(e); } } + + private static IndexRequest randomAnnotationIndexRequest(String jobId, Instant timestamp, Event event) throws IOException { + Annotation annotation = new Annotation.Builder(randomAnnotation(jobId)) + .setTimestamp(Date.from(timestamp)) + .setCreateUsername(XPackUser.NAME) + .setEvent(event) + .build(); + try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { + return new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME) + .source(xContentBuilder) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + } + } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/SetUpgradeModeIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/SetUpgradeModeIT.java index 8aa912b3f5e..6553e51e681 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/SetUpgradeModeIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/SetUpgradeModeIT.java @@ -146,7 +146,7 @@ public class SetUpgradeModeIT extends MlNativeAutodetectIntegTestCase { expectThrowsUpgradeModeException(() -> forecast(jobId, null, null)); String snapshotId = "snapshot_id"; - expectThrowsUpgradeModeException(() -> revertModelSnapshot(jobId, snapshotId)); + expectThrowsUpgradeModeException(() -> revertModelSnapshot(jobId, snapshotId, false)); String datafeedId = "datafeed_id"; expectThrowsUpgradeModeException(() -> putDatafeed(createDatafeed(datafeedId, jobId, Collections.singletonList("index")))); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index ada2a8c1df2..c25a78b81c7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -59,8 +59,6 @@ import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction; -import org.elasticsearch.xpack.core.ml.annotations.Annotation; -import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; @@ -71,7 +69,6 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerS import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.core.security.user.XPackUser; import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; @@ -506,20 +503,9 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction finishedHandler) { - ConstantScoreQueryBuilder query = - QueryBuilders.constantScoreQuery( - QueryBuilders.boolQuery() - .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) - .filter(QueryBuilders.termQuery(Annotation.CREATE_USERNAME.getPreferredName(), XPackUser.NAME))); - DeleteByQueryRequest request = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME) - .setQuery(query) - .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())) - .setAbortOnVersionConflict(false) - .setRefresh(true); - - executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap( - response -> finishedHandler.onResponse(true), - ignoreIndexNotFoundException(finishedHandler))); + JobDataDeleter deleter = new JobDataDeleter(parentTaskClient, jobId); + deleter.deleteAllAnnotations( + ActionListener.wrap(r -> finishedHandler.onResponse(true), ignoreIndexNotFoundException(finishedHandler))); } private static Consumer ignoreIndexNotFoundException(ActionListener finishedHandler) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index 4687a784018..1901cf7783f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.annotations.Annotation; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -38,6 +39,8 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.io.IOException; import java.util.Date; +import java.util.HashSet; +import java.util.Set; import java.util.function.Consumer; public class TransportRevertModelSnapshotAction extends TransportMasterNodeAction { ActionListener wrappedListener = listener; if (request.getDeleteInterveningResults()) { + wrappedListener = wrapDeleteOldAnnotationsListener(wrappedListener, modelSnapshot, request.getJobId()); wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId()); wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId()); } @@ -133,9 +137,40 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio }, errorHandler); } + private ActionListener wrapDeleteOldAnnotationsListener( + ActionListener listener, + ModelSnapshot modelSnapshot, + String jobId) { + + return ActionListener.wrap(response -> { + Date deleteAfter = modelSnapshot.getLatestResultTimeStamp(); + logger.info("[{}] Removing intervening annotations after reverting model: deleting annotations after [{}]", jobId, deleteAfter); + + JobDataDeleter dataDeleter = new JobDataDeleter(client, jobId); + Set eventsToDelete = new HashSet<>(); + // Because the results based on the delayed data are being deleted, the fact that the data was originally delayed is not + // relevant + eventsToDelete.add(Annotation.Event.DELAYED_DATA.toString()); + // Because the model that changed is no longer in use as it has been rolled back to a time before those changes occurred + eventsToDelete.add(Annotation.Event.MODEL_CHANGE.toString()); + dataDeleter.deleteAnnotationsFromTime(deleteAfter.getTime() + 1, eventsToDelete, new ActionListener() { + @Override + public void onResponse(Boolean success) { + listener.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + }, listener::onFailure); + } + private ActionListener wrapDeleteOldDataListener( ActionListener listener, - ModelSnapshot modelSnapshot, String jobId) { + ModelSnapshot modelSnapshot, + String jobId) { // If we need to delete buckets that occurred after the snapshot, we // wrap the listener with one that invokes the OldDataRemover on @@ -161,8 +196,8 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio private ActionListener wrapRevertDataCountsListener( ActionListener listener, - ModelSnapshot modelSnapshot, String jobId) { - + ModelSnapshot modelSnapshot, + String jobId) { return ActionListener.wrap(response -> { jobResultsProvider.dataCounts(jobId, counts -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index 17a6f048340..c36ed88fe6e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -10,10 +10,10 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.index.query.ConstantScoreQueryBuilder; -import org.elasticsearch.index.query.IdsQueryBuilder; +import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; @@ -21,12 +21,15 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollTask; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.xpack.core.ml.annotations.Annotation; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.core.security.user.XPackUser; import java.util.ArrayList; import java.util.Collections; @@ -81,7 +84,7 @@ public class JobDataDeleter { DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indices.toArray(new String[0])) .setRefresh(true) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) - .setQuery(new IdsQueryBuilder().addIds(idsToDelete.toArray(new String[0]))); + .setQuery(QueryBuilders.idsQuery().addIds(idsToDelete.toArray(new String[0]))); // _doc is the most efficient sort order and will also disable scoring deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); @@ -89,6 +92,55 @@ public class JobDataDeleter { executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener); } + /** + * Asynchronously delete all the auto-generated (i.e. created by the _xpack user) annotations + * + * @param listener Response listener + */ + public void deleteAllAnnotations(ActionListener listener) { + deleteAnnotationsFromTime(null, null, listener); + } + + /** + * Asynchronously delete all the auto-generated (i.e. created by the _xpack user) annotations starting from {@code cutOffTime} + * + * @param cutoffEpochMs Only annotations at and after this time will be deleted. If {@code null}, no cutoff is applied + * @param eventsToDelete Only annotations with one of the provided event types will be deleted. + * If {@code null} or empty, no event-related filtering is applied + * @param listener Response listener + */ + public void deleteAnnotationsFromTime(@Nullable Long cutoffEpochMs, + @Nullable Set eventsToDelete, + ActionListener listener) { + BoolQueryBuilder boolQuery = + QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) + .filter(QueryBuilders.termQuery(Annotation.CREATE_USERNAME.getPreferredName(), XPackUser.NAME)); + if (cutoffEpochMs != null) { + boolQuery.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs)); + } + if (eventsToDelete != null && eventsToDelete.isEmpty() == false) { + boolQuery.filter(QueryBuilders.termsQuery(Annotation.EVENT.getPreferredName(), eventsToDelete)); + } + QueryBuilder query = QueryBuilders.constantScoreQuery(boolQuery); + DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME) + .setQuery(query) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setAbortOnVersionConflict(false) + .setRefresh(true) + .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); + + // _doc is the most efficient sort order and will also disable scoring + dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + + executeAsyncWithOrigin( + client, + ML_ORIGIN, + DeleteByQueryAction.INSTANCE, + dbqRequest, + ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure)); + } + /** * Asynchronously delete all result types (Buckets, Records, Influencers) from {@code cutOffTime} * @@ -96,43 +148,49 @@ public class JobDataDeleter { * @param listener Response listener */ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener listener) { - DeleteByQueryHolder deleteByQueryHolder = new DeleteByQueryHolder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)); - deleteByQueryHolder.dbqRequest.setRefresh(true); - QueryBuilder query = QueryBuilders.boolQuery() - .filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName())) - .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs)); - deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen()); - deleteByQueryHolder.dbqRequest.setQuery(query); + .filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName())) + .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs)); + DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)) + .setQuery(query) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setAbortOnVersionConflict(false) + .setRefresh(true) + .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); // _doc is the most efficient sort order and will also disable scoring - deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest, - ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure)); + executeAsyncWithOrigin( + client, + ML_ORIGIN, + DeleteByQueryAction.INSTANCE, + dbqRequest, + ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure)); } /** * Delete all results marked as interim */ public void deleteInterimResults() { - DeleteByQueryHolder deleteByQueryHolder = new DeleteByQueryHolder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)); - deleteByQueryHolder.dbqRequest.setRefresh(false); - - deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen()); - QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true); - deleteByQueryHolder.dbqRequest.setQuery(new ConstantScoreQueryBuilder(qb)); + QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true)); + DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)) + .setQuery(query) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setAbortOnVersionConflict(false) + .setRefresh(false) + .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); // _doc is the most efficient sort order and will also disable scoring - deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { - client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest).get(); + client.execute(DeleteByQueryAction.INSTANCE, dbqRequest).get(); } catch (Exception e) { LOGGER.error("[" + jobId + "] An error occurred while deleting interim results", e); } } - + /** * Delete the datafeed timing stats document from all the job results indices * @@ -142,24 +200,11 @@ public class JobDataDeleter { DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)) .setRefresh(true) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) - .setQuery(new IdsQueryBuilder().addIds(DatafeedTimingStats.documentId(jobId))); + .setQuery(QueryBuilders.idsQuery().addIds(DatafeedTimingStats.documentId(jobId))); // _doc is the most efficient sort order and will also disable scoring deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener); } - - // Wrapper to ensure safety - private static class DeleteByQueryHolder { - - private final DeleteByQueryRequest dbqRequest; - - private DeleteByQueryHolder(String index) { - dbqRequest = new DeleteByQueryRequest(); - dbqRequest.indices(index); - dbqRequest.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); - dbqRequest.setAbortOnVersionConflict(false); - } - } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java index 2cc53aec2bc..15621768af6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java @@ -7,17 +7,25 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.junit.After; import org.junit.Before; import org.mockito.ArgumentCaptor; +import java.util.Collections; + import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -30,13 +38,66 @@ public class JobDataDeleterTests extends ESTestCase { private static final String JOB_ID = "my-job-id"; private Client client; + private ArgumentCaptor deleteRequestCaptor; @Before public void setUpTests() { - client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); - when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + client = mock(Client.class); + when(client.threadPool()).thenReturn(threadPool); + deleteRequestCaptor = ArgumentCaptor.forClass(DeleteByQueryRequest.class); + } + + @After + public void verifyNoMoreInteractionsWithClient() { + verify(client).threadPool(); + verifyNoMoreInteractions(client); + } + + public void testDeleteAllAnnotations() { + JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID); + jobDataDeleter.deleteAllAnnotations(ActionListener.wrap( + deleteResponse -> {}, + e -> fail(e.toString()) + )); + + verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any()); + + DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue(); + assertThat(deleteRequest.indices(), is(arrayContaining(AnnotationIndex.READ_ALIAS_NAME))); + assertThat(Strings.toString(deleteRequest), not(containsString("timestamp"))); + assertThat(Strings.toString(deleteRequest), not(containsString("event"))); + } + + public void testDeleteAnnotationsFromTime_TimestampFiltering() { + JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID); + jobDataDeleter.deleteAnnotationsFromTime(1_000_000_000L, null, ActionListener.wrap( + deleteResponse -> {}, + e -> fail(e.toString()) + )); + + verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any()); + + DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue(); + assertThat(deleteRequest.indices(), is(arrayContaining(AnnotationIndex.READ_ALIAS_NAME))); + assertThat(Strings.toString(deleteRequest), containsString("timestamp")); + assertThat(Strings.toString(deleteRequest), not(containsString("event"))); + } + + public void testDeleteAnnotationsFromTime_EventFiltering() { + JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID); + jobDataDeleter.deleteAnnotationsFromTime(null, Collections.singleton("dummy_event"), ActionListener.wrap( + deleteResponse -> {}, + e -> fail(e.toString()) + )); + + verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any()); + + DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue(); + assertThat(deleteRequest.indices(), is(arrayContaining(AnnotationIndex.READ_ALIAS_NAME))); + assertThat(Strings.toString(deleteRequest), not(containsString("timestamp"))); + assertThat(Strings.toString(deleteRequest), containsString("event")); } public void testDeleteDatafeedTimingStats() { @@ -46,12 +107,9 @@ public class JobDataDeleterTests extends ESTestCase { e -> fail(e.toString()) )); - ArgumentCaptor deleteRequestCaptor = ArgumentCaptor.forClass(DeleteByQueryRequest.class); - verify(client).threadPool(); verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any()); - verifyNoMoreInteractions(client); DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue(); - assertThat(deleteRequest.indices(), arrayContaining(AnomalyDetectorsIndex.jobResultsAliasedName(JOB_ID))); + assertThat(deleteRequest.indices(), is(arrayContaining(AnomalyDetectorsIndex.jobResultsAliasedName(JOB_ID)))); } }