diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java new file mode 100644 index 00000000000..b6d1805cef0 --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java @@ -0,0 +1,144 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.ml.annotations.Annotation; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.security.user.XPackUser; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent; +import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.elasticsearch.xpack.core.ml.annotations.AnnotationTests.randomAnnotation; +import static org.hamcrest.Matchers.hasSize; + +public class DeleteJobIT extends MlNativeAutodetectIntegTestCase { + + private static final String DATA_INDEX = "delete-job-annotations-test-data"; + private static final String TIME_FIELD = "time"; + + @Before + public void setUpData() { + client().admin().indices().prepareCreate(DATA_INDEX) + .addMapping(SINGLE_MAPPING_NAME, TIME_FIELD, "type=date,format=epoch_millis") + .get(); + } + + @After + public void tearDownData() { + client().admin().indices().prepareDelete(DATA_INDEX).get(); + cleanUp(); + } + + public void testDeleteJobDeletesAnnotations() throws Exception { + String jobIdA = "delete-annotations-a"; + String datafeedIdA = jobIdA + "-feed"; + String jobIdB = "delete-annotations-b"; + String datafeedIdB = jobIdB + "-feed"; + + // No annotations so far + assertThatNumberOfAnnotationsIsEqualTo(0); + + runJob(jobIdA, datafeedIdA); + client().index(randomAnnotationIndexRequest(jobIdA, XPackUser.NAME)).actionGet(); + client().index(randomAnnotationIndexRequest(jobIdA, XPackUser.NAME)).actionGet(); + client().index(randomAnnotationIndexRequest(jobIdA, "real_user")).actionGet(); + // 3 jobA annotations (2 _xpack, 1 real_user) + assertThatNumberOfAnnotationsIsEqualTo(3); + + runJob(jobIdB, datafeedIdB); + client().index(randomAnnotationIndexRequest(jobIdB, XPackUser.NAME)).actionGet(); + client().index(randomAnnotationIndexRequest(jobIdB, "other_real_user")).actionGet(); + // 3 jobA annotations (2 _xpack, 1 real_user) and 2 jobB annotations (1 _xpack, 1 real_user) + assertThatNumberOfAnnotationsIsEqualTo(5); + + deleteDatafeed(datafeedIdA); + deleteJob(jobIdA); + // 1 jobA annotation (real_user) and 2 jobB annotations (1 _xpack, 1 real_user) + assertThatNumberOfAnnotationsIsEqualTo(3); + + deleteDatafeed(datafeedIdB); + deleteJob(jobIdB); + // 1 jobA annotation (real_user) and 1 jobB annotation (real_user) + assertThatNumberOfAnnotationsIsEqualTo(2); + } + + private void runJob(String jobId, String datafeedId) throws Exception { + Detector.Builder detector = new Detector.Builder().setFunction("count"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())) + .setBucketSpan(TimeValue.timeValueHours(1)); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField(TIME_FIELD); + Job.Builder job = new Job.Builder(jobId) + .setAnalysisConfig(analysisConfig) + .setDataDescription(dataDescription); + + registerJob(job); + putJob(job); + + DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, jobId); + datafeedConfig.setIndices(Collections.singletonList(DATA_INDEX)); + DatafeedConfig datafeedA = datafeedConfig.build(); + registerDatafeed(datafeedA); + putDatafeed(datafeedA); + + openJob(jobId); + // Run up to a day ago + long now = System.currentTimeMillis(); + startDatafeed(datafeedId, 0, now - TimeValue.timeValueHours(24).getMillis()); + waitUntilJobIsClosed(jobId); + } + + private static IndexRequest randomAnnotationIndexRequest(String jobId, String createUsername) throws IOException { + Annotation annotation = new Annotation.Builder(randomAnnotation(jobId)).setCreateUsername(createUsername).build(); + try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { + return new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME) + .source(xContentBuilder) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + } + } + + private void assertThatNumberOfAnnotationsIsEqualTo(int expectedNumberOfAnnotations) throws Exception { + // Refresh the annotations index so that recently indexed annotation docs are visible. + client().admin().indices().prepareRefresh(AnnotationIndex.INDEX_NAME) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) + .execute() + .actionGet(); + + SearchRequest searchRequest = + new SearchRequest(AnnotationIndex.READ_ALIAS_NAME).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + List annotations = new ArrayList<>(); + for (SearchHit hit : searchResponse.getHits().getHits()) { + try (XContentParser parser = createParser(jsonXContent, hit.getSourceRef())) { + annotations.add(Annotation.fromXContent(parser, null)); + } + } + assertThat("Hits were: " + annotations, annotations, hasSize(expectedNumberOfAnnotations)); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index b6af373c834..ada2a8c1df2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -59,6 +59,8 @@ import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction; +import org.elasticsearch.xpack.core.ml.annotations.Annotation; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; @@ -69,6 +71,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerS import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.security.user.XPackUser; import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; @@ -292,7 +295,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction finishedHandler.accept(response.isAcknowledged()), failureHandler); - // Step 8. If we did not drop the indices and after DBQ state done, we delete the aliases + // Step 9. If we did not drop the indices and after DBQ state done, we delete the aliases ActionListener dbqHandler = ActionListener.wrap( bulkByScrollResponse -> { if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume indices were deleted @@ -314,7 +317,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction deleteByQueryExecutor = ActionListener.wrap( response -> { if (response && indexNames.get().length > 0) { @@ -335,7 +338,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction customIndexSearchHandler = ActionListener.wrap( @@ -395,7 +398,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction getJobHandler = ActionListener.wrap( @@ -428,11 +431,15 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction deleteCategorizerStateHandler = ActionListener.wrap( + // Step 5. Get the job as the initial result index name is required + ActionListener deleteAnnotationsHandler = ActionListener.wrap( response -> jobConfigProvider.getJob(jobId, getJobHandler), - failureHandler - ); + failureHandler); + + // Step 4. Delete annotations associated with the job + ActionListener deleteCategorizerStateHandler = ActionListener.wrap( + response -> deleteAnnotations(parentTaskClient, jobId, deleteAnnotationsHandler), + failureHandler); // Step 3. Delete quantiles done, delete the categorizer state ActionListener deleteQuantilesHandler = ActionListener.wrap( @@ -449,25 +456,17 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction finishedHandler) { - // The quantiles type and doc ID changed in v5.5 so delete both the old and new format - DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()); // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId)); - request.setQuery(query); - request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); - request.setAbortOnVersionConflict(false); - request.setRefresh(true); + DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()) + .setQuery(query) + .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())) + .setAbortOnVersionConflict(false) + .setRefresh(true); executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap( response -> finishedHandler.onResponse(true), - e -> { - // It's not a problem for us if the index wasn't found - it's equivalent to document not found - if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) { - finishedHandler.onResponse(true); - } else { - finishedHandler.onFailure(e); - } - })); + ignoreIndexNotFoundException(finishedHandler))); } private void deleteModelState(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener listener) { @@ -484,14 +483,13 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction finishedHandler) { - // The categorizer state type and doc ID changed in v5.5 so delete both the old and new format - DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()); // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum)); - request.setQuery(query); - request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); - request.setAbortOnVersionConflict(false); - request.setRefresh(true); + DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()) + .setQuery(query) + .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())) + .setAbortOnVersionConflict(false) + .setRefresh(true); executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap( response -> { @@ -504,14 +502,35 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction { - // It's not a problem for us if the index wasn't found - it's equivalent to document not found - if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) { - finishedHandler.onResponse(true); - } else { - finishedHandler.onFailure(e); - } - })); + ignoreIndexNotFoundException(finishedHandler))); + } + + private void deleteAnnotations(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener finishedHandler) { + ConstantScoreQueryBuilder query = + QueryBuilders.constantScoreQuery( + QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) + .filter(QueryBuilders.termQuery(Annotation.CREATE_USERNAME.getPreferredName(), XPackUser.NAME))); + DeleteByQueryRequest request = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME) + .setQuery(query) + .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())) + .setAbortOnVersionConflict(false) + .setRefresh(true); + + executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap( + response -> finishedHandler.onResponse(true), + ignoreIndexNotFoundException(finishedHandler))); + } + + private static Consumer ignoreIndexNotFoundException(ActionListener finishedHandler) { + return e -> { + // It's not a problem for us if the index wasn't found - it's equivalent to document not found + if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) { + finishedHandler.onResponse(true); + } else { + finishedHandler.onFailure(e); + } + }; } private void deleteAliases(ParentTaskAssigningClient parentTaskClient, String jobId,