From 965af3a68bccd53ed3c7485828f618a695063b58 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 23 Mar 2020 19:55:36 +0200 Subject: [PATCH] [7.x][ML] Delete DF analytics stats upon job deletion (#53933) (#53997) Since a data frame analytics job may have associated docs in the .ml-stats-* indices, when the job is deleted we should delete those docs too. Backport of #53933 --- .../authz/store/ReservedRolesStore.java | 2 +- ...ansportDeleteDataFrameAnalyticsAction.java | 54 ++++++++++++++++-- .../test/ml/data_frame_analytics_crud.yml | 56 ++++++++++++++++++- 3 files changed, 103 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java index 8040d6ba52f..f2413ea93d5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java @@ -177,7 +177,7 @@ public class ReservedRolesStore implements BiConsumer, ActionListene .put("machine_learning_admin", new RoleDescriptor("machine_learning_admin", new String[] { "manage_ml" }, new RoleDescriptor.IndicesPrivileges[] { RoleDescriptor.IndicesPrivileges.builder() - .indices(".ml-anomalies*", ".ml-notifications*", ".ml-state*", ".ml-meta*") + .indices(".ml-anomalies*", ".ml-notifications*", ".ml-state*", ".ml-meta*", ".ml-stats-*") .privileges("view_index_metadata", "read").build(), RoleDescriptor.IndicesPrivileges.builder().indices(".ml-annotations*") .privileges("view_index_metadata", "read", "write").build() diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java index dfd0c3cb019..fc5447d1474 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -36,11 +37,13 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.MlStatsIndex; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.dataframe.stats.Fields; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -144,14 +147,14 @@ public class TransportDeleteDataFrameAnalyticsAction // We clean up the memory tracker on delete because there is no stop; the task stops by itself memoryTracker.removeDataFrameAnalyticsJob(id); - // Step 3. Delete the config - ActionListener deleteStateHandler = ActionListener.wrap( + // Step 4. Delete the config + ActionListener deleteStatsHandler = ActionListener.wrap( bulkByScrollResponse -> { if (bulkByScrollResponse.isTimedOut()) { - logger.warn("[{}] DeleteByQuery for state timed out", id); + logger.warn("[{}] DeleteByQuery for stats timed out", id); } if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) { - logger.warn("[{}] {} failures and {} conflicts encountered while runnint DeleteByQuery for state", id, + logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery for stats", id, bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts()); for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) { logger.warn("[{}] DBQ failure: {}", id, failure); @@ -162,6 +165,24 @@ public class TransportDeleteDataFrameAnalyticsAction listener::onFailure ); + // Step 3. Delete job docs from stats index + ActionListener deleteStateHandler = ActionListener.wrap( + bulkByScrollResponse -> { + if (bulkByScrollResponse.isTimedOut()) { + logger.warn("[{}] DeleteByQuery for state timed out", id); + } + if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) { + logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery for state", id, + bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts()); + for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) { + logger.warn("[{}] DBQ failure: {}", id, failure); + } + } + deleteStats(parentTaskClient, id, deleteStatsHandler); + }, + listener::onFailure + ); + // Step 2. Delete state ActionListener configListener = ActionListener.wrap( config -> deleteState(parentTaskClient, config, deleteStateHandler), @@ -199,8 +220,29 @@ public class TransportDeleteDataFrameAnalyticsAction if (config.getAnalysis().persistsState()) { ids.add(config.getAnalysis().getStateDocId(config.getId())); } - DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()); - request.setQuery(QueryBuilders.idsQuery().addIds(ids.toArray(new String[0]))); + executeDeleteByQuery( + parentTaskClient, + AnomalyDetectorsIndex.jobStateIndexPattern(), + QueryBuilders.idsQuery().addIds(ids.toArray(new String[0])), + listener + ); + } + + private void deleteStats(ParentTaskAssigningClient parentTaskClient, + String jobId, + ActionListener listener) { + executeDeleteByQuery( + parentTaskClient, + MlStatsIndex.indexPattern(), + QueryBuilders.termQuery(Fields.JOB_ID.getPreferredName(), jobId), + listener + ); + } + + private void executeDeleteByQuery(ParentTaskAssigningClient parentTaskClient, String index, QueryBuilder query, + ActionListener listener) { + DeleteByQueryRequest request = new DeleteByQueryRequest(index); + request.setQuery(query); request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); request.setAbortOnVersionConflict(false); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml index a5a99b30391..5ec3c978b73 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml @@ -938,7 +938,7 @@ setup: id: "missing_config" --- -"Test delete given config and state documents": +"Test delete given config, state and stats documents": - skip: features: headers - do: @@ -976,11 +976,33 @@ setup: { } + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + index: + index: .ml-stats-000001 + id: "delete_foo_stats_1" + body: > + { + "job_id": "delete_foo" + } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + index: + index: .ml-stats-000001 + id: "delete_foo_stats_2" + body: > + { + "job_id": "delete_foo" + } + - do: headers: Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser indices.refresh: - index: .ml-state* + index: .ml-stat* - do: search: @@ -997,6 +1019,21 @@ setup: - match: { hits.total.value: 2 } + - do: + search: + index: .ml-stats-* + body: + size: 0 + query: + bool: + filter: + ids: + values: + - "delete_foo_stats_1" + - "delete_foo_stats_2" + + - match: { hits.total.value: 2 } + - do: ml.delete_data_frame_analytics: id: "delete_foo" @@ -1016,6 +1053,21 @@ setup: - "delete_foo_regression_state#1" - match: { hits.total.value: 0 } + - do: + search: + index: .ml-stats-* + body: + size: 0 + query: + bool: + filter: + ids: + values: + - "delete_foo_stats_1" + - "delete_foo_stats_2" + + - match: { hits.total.value: 0 } + --- "Test max model memory limit": - skip: