From bc17afc53596a505d5d37ae77be2151e36c65bca Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 6 Aug 2020 08:55:35 -0400 Subject: [PATCH] [7.x] [ML] have DELETE analytics ignore stats failures and clean up unused stats (#60776) (#60784) * [ML] have DELETE analytics ignore stats failures and clean up unused stats (#60776) When deleting an analytics configuration, the request MIGHT fail if the .ml-stats index does not exist or is in strange state (shards unallocated). Instead of making the request fail, we should log that we were unable to delete the stats docs and then have them cleaned up in the 'delete_expire_data' janitorial process --- .../ml/integration/UnusedStatsRemoverIT.java | 158 ++++++++++++++++++ ...ansportDeleteDataFrameAnalyticsAction.java | 5 +- .../TransportDeleteExpiredDataAction.java | 7 +- .../ml/job/retention/UnusedStatsRemover.java | 118 +++++++++++++ 4 files changed, 285 insertions(+), 3 deletions(-) create mode 100644 x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java new file mode 100644 index 00000000000..166978ef5d4 --- /dev/null +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java @@ -0,0 +1,158 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.MlStatsIndex; +import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAction; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression; +import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts; +import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig; +import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition; +import org.elasticsearch.xpack.core.ml.inference.TrainedModelInput; +import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceStats; +import org.elasticsearch.xpack.core.ml.inference.trainedmodel.RegressionConfig; +import org.elasticsearch.xpack.core.ml.inference.trainedmodel.tree.Tree; +import org.elasticsearch.xpack.core.ml.inference.trainedmodel.tree.TreeNode; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; +import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; +import org.elasticsearch.xpack.ml.job.retention.UnusedStatsRemover; +import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; +import org.junit.Before; + +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; + +import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; + +public class UnusedStatsRemoverIT extends BaseMlIntegTestCase { + + private OriginSettingClient client; + + @Before + public void createComponents() { + client = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN); + PlainActionFuture future = new PlainActionFuture<>(); + MlStatsIndex.createStatsIndexAndAliasIfNecessary(client(), clusterService().state(), new IndexNameExpressionResolver(), future); + future.actionGet(); + } + + public void testRemoveUnusedStats() throws Exception { + + client().prepareIndex("foo", SINGLE_MAPPING_NAME).setId("some-empty-doc").setSource("{}", XContentType.JSON).get(); + + PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(new DataFrameAnalyticsConfig.Builder() + .setId("analytics-with-stats") + .setModelMemoryLimit(new ByteSizeValue(1, ByteSizeUnit.GB)) + .setSource(new DataFrameAnalyticsSource(new String[]{"foo"}, null, null)) + .setDest(new DataFrameAnalyticsDest("bar", null)) + .setAnalysis(new Regression("prediction")) + .build()); + client.execute(PutDataFrameAnalyticsAction.INSTANCE, request).actionGet(); + + client.execute(PutTrainedModelAction.INSTANCE, + new PutTrainedModelAction.Request(TrainedModelConfig.builder() + .setModelId("model-with-stats") + .setInferenceConfig(RegressionConfig.EMPTY_PARAMS) + .setInput(new TrainedModelInput(Arrays.asList("foo", "bar"))) + .setParsedDefinition(new TrainedModelDefinition.Builder() + .setPreProcessors(Collections.emptyList()) + .setTrainedModel(Tree.builder() + .setFeatureNames(Arrays.asList("foo", "bar")) + .setRoot(TreeNode.builder(0).setLeafValue(42)) + .build()) + ) + .validate(true) + .build())).actionGet(); + + indexStatDocument(new DataCounts("analytics-with-stats", 1, 1, 1), + DataCounts.documentId("analytics-with-stats")); + indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1), + DataCounts.documentId("missing-analytics-with-stats")); + indexStatDocument(new InferenceStats(1, + 1, + 1, + 1, + TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), + "test", + Instant.now()), + InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")); + indexStatDocument(new InferenceStats(1, + 1, + 1, + 1, + "missing-model", + "test", + Instant.now()), + InferenceStats.docId("missing-model", "test")); + indexStatDocument(new InferenceStats(1, + 1, + 1, + 1, + "model-with-stats", + "test", + Instant.now()), + InferenceStats.docId("model-with-stats", "test")); + client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get(); + + PlainActionFuture deletionListener = new PlainActionFuture<>(); + UnusedStatsRemover statsRemover = new UnusedStatsRemover(client); + statsRemover.remove(10000.0f, deletionListener, () -> false); + deletionListener.actionGet(); + + client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get(); + + final String initialStateIndex = MlStatsIndex.TEMPLATE_NAME + "-000001"; + + // Make sure that stats that should exist still exist + assertTrue(client().prepareGet(initialStateIndex, + SINGLE_MAPPING_NAME, + InferenceStats.docId("model-with-stats", "test")).get().isExists()); + assertTrue(client().prepareGet(initialStateIndex, + SINGLE_MAPPING_NAME, + InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")).get().isExists()); + assertTrue(client().prepareGet(initialStateIndex, + SINGLE_MAPPING_NAME, + DataCounts.documentId("analytics-with-stats")).get().isExists()); + + // make sure that unused stats were deleted + assertFalse(client().prepareGet(initialStateIndex, + SINGLE_MAPPING_NAME, + DataCounts.documentId("missing-analytics-with-stats")).get().isExists()); + assertFalse(client().prepareGet(initialStateIndex, + SINGLE_MAPPING_NAME, + InferenceStats.docId("missing-model", "test")).get().isExists()); + } + + private void indexStatDocument(ToXContentObject object, String docId) throws Exception { + ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, + Boolean.toString(true))); + IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias()); + doc.id(docId); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + object.toXContent(builder, params); + doc.source(builder); + client.index(doc).actionGet(); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java index 552f9c05a3a..48ec3d8e14e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java @@ -191,7 +191,10 @@ public class TransportDeleteDataFrameAnalyticsAction } deleteConfig(parentTaskClient, id, listener); }, - listener::onFailure + failure -> { + logger.warn(new ParameterizedMessage("[{}] failed to remove stats", id), ExceptionsHelper.unwrapCause(failure)); + deleteConfig(parentTaskClient, id, listener); + } ); // Step 3. Delete job docs from stats index diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index 970c09f2d41..2d9f00d1bdf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover; import org.elasticsearch.xpack.ml.job.retention.MlDataRemover; import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover; +import org.elasticsearch.xpack.ml.job.retention.UnusedStatsRemover; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; import org.elasticsearch.xpack.ml.utils.persistence.WrappedBatchedJobsIterator; @@ -169,7 +170,8 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction createDataRemovers(List jobs, AnomalyDetectionAuditor auditor) { @@ -178,7 +180,8 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction(jobs), threadPool), new UnusedStateRemover(client, clusterService), - new EmptyStateIndexRemover(client)); + new EmptyStateIndexRemover(client), + new UnusedStatsRemover(client)); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java new file mode 100644 index 00000000000..fa7cb6ae6b2 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java @@ -0,0 +1,118 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.retention; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.xpack.core.ml.MlConfigIndex; +import org.elasticsearch.xpack.core.ml.MlStatsIndex; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.stats.Fields; +import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig; +import org.elasticsearch.xpack.core.ml.inference.persistence.InferenceIndexConstants; +import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; +import org.elasticsearch.xpack.ml.utils.persistence.DocIdBatchedDocumentIterator; + +import java.util.Deque; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.function.Supplier; + +/** + * If for any reason a job or trained model is deleted but some of its stats documents + * are left behind, this class deletes any unused documents stored + * in the .ml-stats* indices. + */ +public class UnusedStatsRemover implements MlDataRemover { + + private static final Logger LOGGER = LogManager.getLogger(UnusedStatsRemover.class); + + private final OriginSettingClient client; + + public UnusedStatsRemover(OriginSettingClient client) { + this.client = Objects.requireNonNull(client); + } + + @Override + public void remove(float requestsPerSec, ActionListener listener, Supplier isTimedOutSupplier) { + try { + if (isTimedOutSupplier.get()) { + listener.onResponse(false); + return; + } + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .mustNot(QueryBuilders.termsQuery(Fields.JOB_ID.getPreferredName(), getDataFrameAnalyticsJobIds())) + .mustNot(QueryBuilders.termsQuery(TrainedModelConfig.MODEL_ID.getPreferredName(), getTrainedModelIds())); + + if (isTimedOutSupplier.get()) { + listener.onResponse(false); + return; + } + executeDeleteUnusedStatsDocs(queryBuilder, requestsPerSec, listener); + } catch (Exception e) { + listener.onFailure(e); + } + } + + private Set getDataFrameAnalyticsJobIds() { + Set jobIds = new HashSet<>(); + + DocIdBatchedDocumentIterator iterator = new DocIdBatchedDocumentIterator(client, MlConfigIndex.indexName(), + QueryBuilders.termQuery(DataFrameAnalyticsConfig.CONFIG_TYPE.getPreferredName(), DataFrameAnalyticsConfig.TYPE)); + while (iterator.hasNext()) { + Deque docIds = iterator.next(); + docIds.stream().map(DataFrameAnalyticsConfig::extractJobIdFromDocId).filter(Objects::nonNull).forEach(jobIds::add); + } + return jobIds; + } + + private Set getTrainedModelIds() { + Set modelIds = new HashSet<>(TrainedModelProvider.MODELS_STORED_AS_RESOURCE); + + DocIdBatchedDocumentIterator iterator = new DocIdBatchedDocumentIterator(client, InferenceIndexConstants.INDEX_PATTERN, + QueryBuilders.termQuery(InferenceIndexConstants.DOC_TYPE.getPreferredName(), TrainedModelConfig.NAME)); + while (iterator.hasNext()) { + Deque docIds = iterator.next(); + docIds.stream().filter(Objects::nonNull).forEach(modelIds::add); + } + return modelIds; + } + + private void executeDeleteUnusedStatsDocs(QueryBuilder dbq, float requestsPerSec, ActionListener listener) { + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(MlStatsIndex.indexPattern()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setAbortOnVersionConflict(false) + .setRequestsPerSecond(requestsPerSec) + .setQuery(dbq); + + client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap( + response -> { + if (response.getBulkFailures().size() > 0 || response.getSearchFailures().size() > 0) { + LOGGER.error("Some unused stats documents could not be deleted due to failures: {}", + Strings.collectionToCommaDelimitedString(response.getBulkFailures()) + + "," + Strings.collectionToCommaDelimitedString(response.getSearchFailures())); + } else { + LOGGER.info("Successfully deleted [{}] unused stats documents", response.getDeleted()); + } + listener.onResponse(true); + }, + e -> { + LOGGER.error("Error deleting unused model stats documents: ", e); + listener.onFailure(e); + } + )); + } +}