[7.x] Delete auto-generated annotations when job is deleted. (#58169) (#58219)

This commit is contained in:
Przemysław Witek 2020-06-17 09:17:20 +02:00 committed by GitHub
parent 46d797b1d9
commit b22e91cefc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 199 additions and 36 deletions

View File

@ -0,0 +1,144 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.security.user.XPackUser;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.ml.annotations.AnnotationTests.randomAnnotation;
import static org.hamcrest.Matchers.hasSize;
public class DeleteJobIT extends MlNativeAutodetectIntegTestCase {
private static final String DATA_INDEX = "delete-job-annotations-test-data";
private static final String TIME_FIELD = "time";
@Before
public void setUpData() {
client().admin().indices().prepareCreate(DATA_INDEX)
.addMapping(SINGLE_MAPPING_NAME, TIME_FIELD, "type=date,format=epoch_millis")
.get();
}
@After
public void tearDownData() {
client().admin().indices().prepareDelete(DATA_INDEX).get();
cleanUp();
}
public void testDeleteJobDeletesAnnotations() throws Exception {
String jobIdA = "delete-annotations-a";
String datafeedIdA = jobIdA + "-feed";
String jobIdB = "delete-annotations-b";
String datafeedIdB = jobIdB + "-feed";
// No annotations so far
assertThatNumberOfAnnotationsIsEqualTo(0);
runJob(jobIdA, datafeedIdA);
client().index(randomAnnotationIndexRequest(jobIdA, XPackUser.NAME)).actionGet();
client().index(randomAnnotationIndexRequest(jobIdA, XPackUser.NAME)).actionGet();
client().index(randomAnnotationIndexRequest(jobIdA, "real_user")).actionGet();
// 3 jobA annotations (2 _xpack, 1 real_user)
assertThatNumberOfAnnotationsIsEqualTo(3);
runJob(jobIdB, datafeedIdB);
client().index(randomAnnotationIndexRequest(jobIdB, XPackUser.NAME)).actionGet();
client().index(randomAnnotationIndexRequest(jobIdB, "other_real_user")).actionGet();
// 3 jobA annotations (2 _xpack, 1 real_user) and 2 jobB annotations (1 _xpack, 1 real_user)
assertThatNumberOfAnnotationsIsEqualTo(5);
deleteDatafeed(datafeedIdA);
deleteJob(jobIdA);
// 1 jobA annotation (real_user) and 2 jobB annotations (1 _xpack, 1 real_user)
assertThatNumberOfAnnotationsIsEqualTo(3);
deleteDatafeed(datafeedIdB);
deleteJob(jobIdB);
// 1 jobA annotation (real_user) and 1 jobB annotation (real_user)
assertThatNumberOfAnnotationsIsEqualTo(2);
}
private void runJob(String jobId, String datafeedId) throws Exception {
Detector.Builder detector = new Detector.Builder().setFunction("count");
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()))
.setBucketSpan(TimeValue.timeValueHours(1));
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField(TIME_FIELD);
Job.Builder job = new Job.Builder(jobId)
.setAnalysisConfig(analysisConfig)
.setDataDescription(dataDescription);
registerJob(job);
putJob(job);
DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, jobId);
datafeedConfig.setIndices(Collections.singletonList(DATA_INDEX));
DatafeedConfig datafeedA = datafeedConfig.build();
registerDatafeed(datafeedA);
putDatafeed(datafeedA);
openJob(jobId);
// Run up to a day ago
long now = System.currentTimeMillis();
startDatafeed(datafeedId, 0, now - TimeValue.timeValueHours(24).getMillis());
waitUntilJobIsClosed(jobId);
}
private static IndexRequest randomAnnotationIndexRequest(String jobId, String createUsername) throws IOException {
Annotation annotation = new Annotation.Builder(randomAnnotation(jobId)).setCreateUsername(createUsername).build();
try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
return new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME)
.source(xContentBuilder)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
}
private void assertThatNumberOfAnnotationsIsEqualTo(int expectedNumberOfAnnotations) throws Exception {
// Refresh the annotations index so that recently indexed annotation docs are visible.
client().admin().indices().prepareRefresh(AnnotationIndex.INDEX_NAME)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.execute()
.actionGet();
SearchRequest searchRequest =
new SearchRequest(AnnotationIndex.READ_ALIAS_NAME).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
List<Annotation> annotations = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
try (XContentParser parser = createParser(jsonXContent, hit.getSourceRef())) {
annotations.add(Annotation.fromXContent(parser, null));
}
}
assertThat("Hits were: " + annotations, annotations, hasSize(expectedNumberOfAnnotations));
}
}

View File

@ -59,6 +59,8 @@ import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.core.ml.action.KillProcessAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
@ -69,6 +71,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerS
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.security.user.XPackUser;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
@ -292,7 +295,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
response -> finishedHandler.accept(response.isAcknowledged()), response -> finishedHandler.accept(response.isAcknowledged()),
failureHandler); failureHandler);
// Step 8. If we did not drop the indices and after DBQ state done, we delete the aliases // Step 9. If we did not drop the indices and after DBQ state done, we delete the aliases
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap( ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(
bulkByScrollResponse -> { bulkByScrollResponse -> {
if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume indices were deleted if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume indices were deleted
@ -314,7 +317,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
}, },
failureHandler); failureHandler);
// Step 7. If we did not delete the indices, we run a delete by query // Step 8. If we did not delete the indices, we run a delete by query
ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap( ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
response -> { response -> {
if (response && indexNames.get().length > 0) { if (response && indexNames.get().length > 0) {
@ -335,7 +338,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
}, },
failureHandler); failureHandler);
// Step 6. Handle each multi-search response. There should be one response for each underlying index. // Step 7. Handle each multi-search response. There should be one response for each underlying index.
// For each underlying index that contains results ONLY for the current job, we will delete that index. // For each underlying index that contains results ONLY for the current job, we will delete that index.
// If there exists at least 1 index that has another job's results, we will run DBQ. // If there exists at least 1 index that has another job's results, we will run DBQ.
ActionListener<MultiSearchResponse> customIndexSearchHandler = ActionListener.wrap( ActionListener<MultiSearchResponse> customIndexSearchHandler = ActionListener.wrap(
@ -395,7 +398,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
} }
); );
// Step 5. If we successfully find a job, gather information about its result indices. // Step 6. If we successfully find a job, gather information about its result indices.
// This will execute a multi-search action for every concrete index behind the job results alias. // This will execute a multi-search action for every concrete index behind the job results alias.
// If there are no concrete indices, take no action and go to the next step. // If there are no concrete indices, take no action and go to the next step.
ActionListener<Job.Builder> getJobHandler = ActionListener.wrap( ActionListener<Job.Builder> getJobHandler = ActionListener.wrap(
@ -428,11 +431,15 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
failureHandler failureHandler
); );
// Step 4. Get the job as the initial result index name is required // Step 5. Get the job as the initial result index name is required
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap( ActionListener<Boolean> deleteAnnotationsHandler = ActionListener.wrap(
response -> jobConfigProvider.getJob(jobId, getJobHandler), response -> jobConfigProvider.getJob(jobId, getJobHandler),
failureHandler failureHandler);
);
// Step 4. Delete annotations associated with the job
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
response -> deleteAnnotations(parentTaskClient, jobId, deleteAnnotationsHandler),
failureHandler);
// Step 3. Delete quantiles done, delete the categorizer state // Step 3. Delete quantiles done, delete the categorizer state
ActionListener<Boolean> deleteQuantilesHandler = ActionListener.wrap( ActionListener<Boolean> deleteQuantilesHandler = ActionListener.wrap(
@ -449,25 +456,17 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
} }
private void deleteQuantiles(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<Boolean> finishedHandler) { private void deleteQuantiles(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<Boolean> finishedHandler) {
// The quantiles type and doc ID changed in v5.5 so delete both the old and new format
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern());
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId)); IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId));
request.setQuery(query); DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); .setQuery(query)
request.setAbortOnVersionConflict(false); .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
request.setRefresh(true); .setAbortOnVersionConflict(false)
.setRefresh(true);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap( executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
response -> finishedHandler.onResponse(true), response -> finishedHandler.onResponse(true),
e -> { ignoreIndexNotFoundException(finishedHandler)));
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
finishedHandler.onResponse(true);
} else {
finishedHandler.onFailure(e);
}
}));
} }
private void deleteModelState(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<BulkByScrollResponse> listener) { private void deleteModelState(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<BulkByScrollResponse> listener) {
@ -484,14 +483,13 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
private void deleteCategorizerState(ParentTaskAssigningClient parentTaskClient, String jobId, int docNum, private void deleteCategorizerState(ParentTaskAssigningClient parentTaskClient, String jobId, int docNum,
ActionListener<Boolean> finishedHandler) { ActionListener<Boolean> finishedHandler) {
// The categorizer state type and doc ID changed in v5.5 so delete both the old and new format
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern());
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum)); IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum));
request.setQuery(query); DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); .setQuery(query)
request.setAbortOnVersionConflict(false); .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
request.setRefresh(true); .setAbortOnVersionConflict(false)
.setRefresh(true);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap( executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
response -> { response -> {
@ -504,14 +502,35 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
} }
finishedHandler.onResponse(true); finishedHandler.onResponse(true);
}, },
e -> { ignoreIndexNotFoundException(finishedHandler)));
}
private void deleteAnnotations(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<Boolean> finishedHandler) {
ConstantScoreQueryBuilder query =
QueryBuilders.constantScoreQuery(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.termQuery(Annotation.CREATE_USERNAME.getPreferredName(), XPackUser.NAME)));
DeleteByQueryRequest request = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME)
.setQuery(query)
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
.setAbortOnVersionConflict(false)
.setRefresh(true);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
response -> finishedHandler.onResponse(true),
ignoreIndexNotFoundException(finishedHandler)));
}
private static Consumer<Exception> ignoreIndexNotFoundException(ActionListener<Boolean> finishedHandler) {
return e -> {
// It's not a problem for us if the index wasn't found - it's equivalent to document not found // It's not a problem for us if the index wasn't found - it's equivalent to document not found
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) { if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
finishedHandler.onResponse(true); finishedHandler.onResponse(true);
} else { } else {
finishedHandler.onFailure(e); finishedHandler.onFailure(e);
} }
})); };
} }
private void deleteAliases(ParentTaskAssigningClient parentTaskClient, String jobId, private void deleteAliases(ParentTaskAssigningClient parentTaskClient, String jobId,