[7.x][ML] Delete DF analytics stats upon job deletion (#53933) (#53997)

Since a data frame analytics job may have associated docs
in the .ml-stats-* indices, when the job is deleted we
should delete those docs too.

Backport of #53933
This commit is contained in:
Dimitris Athanasiou 2020-03-23 19:55:36 +02:00 committed by GitHub
parent 08a8345269
commit 965af3a68b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 103 additions and 9 deletions

View File

@ -177,7 +177,7 @@ public class ReservedRolesStore implements BiConsumer<Set<String>, ActionListene
.put("machine_learning_admin", new RoleDescriptor("machine_learning_admin", new String[] { "manage_ml" },
new RoleDescriptor.IndicesPrivileges[] {
RoleDescriptor.IndicesPrivileges.builder()
.indices(".ml-anomalies*", ".ml-notifications*", ".ml-state*", ".ml-meta*")
.indices(".ml-anomalies*", ".ml-notifications*", ".ml-state*", ".ml-meta*", ".ml-stats-*")
.privileges("view_index_metadata", "read").build(),
RoleDescriptor.IndicesPrivileges.builder().indices(".ml-annotations*")
.privileges("view_index_metadata", "read", "write").build()

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
@ -36,11 +37,13 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.stats.Fields;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
@ -144,14 +147,14 @@ public class TransportDeleteDataFrameAnalyticsAction
// We clean up the memory tracker on delete because there is no stop; the task stops by itself
memoryTracker.removeDataFrameAnalyticsJob(id);
// Step 3. Delete the config
ActionListener<BulkByScrollResponse> deleteStateHandler = ActionListener.wrap(
// Step 4. Delete the config
ActionListener<BulkByScrollResponse> deleteStatsHandler = ActionListener.wrap(
bulkByScrollResponse -> {
if (bulkByScrollResponse.isTimedOut()) {
logger.warn("[{}] DeleteByQuery for state timed out", id);
logger.warn("[{}] DeleteByQuery for stats timed out", id);
}
if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) {
logger.warn("[{}] {} failures and {} conflicts encountered while runnint DeleteByQuery for state", id,
logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery for stats", id,
bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts());
for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
logger.warn("[{}] DBQ failure: {}", id, failure);
@ -162,6 +165,24 @@ public class TransportDeleteDataFrameAnalyticsAction
listener::onFailure
);
// Step 3. Delete job docs from stats index
ActionListener<BulkByScrollResponse> deleteStateHandler = ActionListener.wrap(
bulkByScrollResponse -> {
if (bulkByScrollResponse.isTimedOut()) {
logger.warn("[{}] DeleteByQuery for state timed out", id);
}
if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) {
logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery for state", id,
bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts());
for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
logger.warn("[{}] DBQ failure: {}", id, failure);
}
}
deleteStats(parentTaskClient, id, deleteStatsHandler);
},
listener::onFailure
);
// Step 2. Delete state
ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
config -> deleteState(parentTaskClient, config, deleteStateHandler),
@ -199,8 +220,29 @@ public class TransportDeleteDataFrameAnalyticsAction
if (config.getAnalysis().persistsState()) {
ids.add(config.getAnalysis().getStateDocId(config.getId()));
}
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern());
request.setQuery(QueryBuilders.idsQuery().addIds(ids.toArray(new String[0])));
executeDeleteByQuery(
parentTaskClient,
AnomalyDetectorsIndex.jobStateIndexPattern(),
QueryBuilders.idsQuery().addIds(ids.toArray(new String[0])),
listener
);
}
private void deleteStats(ParentTaskAssigningClient parentTaskClient,
String jobId,
ActionListener<BulkByScrollResponse> listener) {
executeDeleteByQuery(
parentTaskClient,
MlStatsIndex.indexPattern(),
QueryBuilders.termQuery(Fields.JOB_ID.getPreferredName(), jobId),
listener
);
}
private void executeDeleteByQuery(ParentTaskAssigningClient parentTaskClient, String index, QueryBuilder query,
ActionListener<BulkByScrollResponse> listener) {
DeleteByQueryRequest request = new DeleteByQueryRequest(index);
request.setQuery(query);
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
request.setAbortOnVersionConflict(false);

View File

@ -938,7 +938,7 @@ setup:
id: "missing_config"
---
"Test delete given config and state documents":
"Test delete given config, state and stats documents":
- skip:
features: headers
- do:
@ -976,11 +976,33 @@ setup:
{
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
index:
index: .ml-stats-000001
id: "delete_foo_stats_1"
body: >
{
"job_id": "delete_foo"
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
index:
index: .ml-stats-000001
id: "delete_foo_stats_2"
body: >
{
"job_id": "delete_foo"
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
indices.refresh:
index: .ml-state*
index: .ml-stat*
- do:
search:
@ -997,6 +1019,21 @@ setup:
- match: { hits.total.value: 2 }
- do:
search:
index: .ml-stats-*
body:
size: 0
query:
bool:
filter:
ids:
values:
- "delete_foo_stats_1"
- "delete_foo_stats_2"
- match: { hits.total.value: 2 }
- do:
ml.delete_data_frame_analytics:
id: "delete_foo"
@ -1016,6 +1053,21 @@ setup:
- "delete_foo_regression_state#1"
- match: { hits.total.value: 0 }
- do:
search:
index: .ml-stats-*
body:
size: 0
query:
bool:
filter:
ids:
values:
- "delete_foo_stats_1"
- "delete_foo_stats_2"
- match: { hits.total.value: 0 }
---
"Test max model memory limit":
- skip: