* [ML] have DELETE analytics ignore stats failures and clean up unused stats (#60776) When deleting an analytics configuration, the request MIGHT fail if the .ml-stats index does not exist or is in strange state (shards unallocated). Instead of making the request fail, we should log that we were unable to delete the stats docs and then have them cleaned up in the 'delete_expire_data' janitorial process
This commit is contained in:
parent
69645ee4ff
commit
bc17afc535
|
@ -0,0 +1,158 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.ml.integration;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
|
import org.elasticsearch.client.OriginSettingClient;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
|
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
import org.elasticsearch.xpack.core.ClientHelper;
|
||||||
|
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
|
||||||
|
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
|
||||||
|
import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAction;
|
||||||
|
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
|
||||||
|
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
|
||||||
|
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
|
||||||
|
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
|
||||||
|
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
|
||||||
|
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
|
||||||
|
import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition;
|
||||||
|
import org.elasticsearch.xpack.core.ml.inference.TrainedModelInput;
|
||||||
|
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceStats;
|
||||||
|
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.RegressionConfig;
|
||||||
|
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.tree.Tree;
|
||||||
|
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.tree.TreeNode;
|
||||||
|
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
|
||||||
|
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
|
||||||
|
import org.elasticsearch.xpack.ml.job.retention.UnusedStatsRemover;
|
||||||
|
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
|
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
|
||||||
|
|
||||||
|
public class UnusedStatsRemoverIT extends BaseMlIntegTestCase {
|
||||||
|
|
||||||
|
private OriginSettingClient client;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void createComponents() {
|
||||||
|
client = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
|
||||||
|
PlainActionFuture<Boolean> future = new PlainActionFuture<>();
|
||||||
|
MlStatsIndex.createStatsIndexAndAliasIfNecessary(client(), clusterService().state(), new IndexNameExpressionResolver(), future);
|
||||||
|
future.actionGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRemoveUnusedStats() throws Exception {
|
||||||
|
|
||||||
|
client().prepareIndex("foo", SINGLE_MAPPING_NAME).setId("some-empty-doc").setSource("{}", XContentType.JSON).get();
|
||||||
|
|
||||||
|
PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(new DataFrameAnalyticsConfig.Builder()
|
||||||
|
.setId("analytics-with-stats")
|
||||||
|
.setModelMemoryLimit(new ByteSizeValue(1, ByteSizeUnit.GB))
|
||||||
|
.setSource(new DataFrameAnalyticsSource(new String[]{"foo"}, null, null))
|
||||||
|
.setDest(new DataFrameAnalyticsDest("bar", null))
|
||||||
|
.setAnalysis(new Regression("prediction"))
|
||||||
|
.build());
|
||||||
|
client.execute(PutDataFrameAnalyticsAction.INSTANCE, request).actionGet();
|
||||||
|
|
||||||
|
client.execute(PutTrainedModelAction.INSTANCE,
|
||||||
|
new PutTrainedModelAction.Request(TrainedModelConfig.builder()
|
||||||
|
.setModelId("model-with-stats")
|
||||||
|
.setInferenceConfig(RegressionConfig.EMPTY_PARAMS)
|
||||||
|
.setInput(new TrainedModelInput(Arrays.asList("foo", "bar")))
|
||||||
|
.setParsedDefinition(new TrainedModelDefinition.Builder()
|
||||||
|
.setPreProcessors(Collections.emptyList())
|
||||||
|
.setTrainedModel(Tree.builder()
|
||||||
|
.setFeatureNames(Arrays.asList("foo", "bar"))
|
||||||
|
.setRoot(TreeNode.builder(0).setLeafValue(42))
|
||||||
|
.build())
|
||||||
|
)
|
||||||
|
.validate(true)
|
||||||
|
.build())).actionGet();
|
||||||
|
|
||||||
|
indexStatDocument(new DataCounts("analytics-with-stats", 1, 1, 1),
|
||||||
|
DataCounts.documentId("analytics-with-stats"));
|
||||||
|
indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1),
|
||||||
|
DataCounts.documentId("missing-analytics-with-stats"));
|
||||||
|
indexStatDocument(new InferenceStats(1,
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(),
|
||||||
|
"test",
|
||||||
|
Instant.now()),
|
||||||
|
InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test"));
|
||||||
|
indexStatDocument(new InferenceStats(1,
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
"missing-model",
|
||||||
|
"test",
|
||||||
|
Instant.now()),
|
||||||
|
InferenceStats.docId("missing-model", "test"));
|
||||||
|
indexStatDocument(new InferenceStats(1,
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
"model-with-stats",
|
||||||
|
"test",
|
||||||
|
Instant.now()),
|
||||||
|
InferenceStats.docId("model-with-stats", "test"));
|
||||||
|
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
|
||||||
|
|
||||||
|
PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
|
||||||
|
UnusedStatsRemover statsRemover = new UnusedStatsRemover(client);
|
||||||
|
statsRemover.remove(10000.0f, deletionListener, () -> false);
|
||||||
|
deletionListener.actionGet();
|
||||||
|
|
||||||
|
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
|
||||||
|
|
||||||
|
final String initialStateIndex = MlStatsIndex.TEMPLATE_NAME + "-000001";
|
||||||
|
|
||||||
|
// Make sure that stats that should exist still exist
|
||||||
|
assertTrue(client().prepareGet(initialStateIndex,
|
||||||
|
SINGLE_MAPPING_NAME,
|
||||||
|
InferenceStats.docId("model-with-stats", "test")).get().isExists());
|
||||||
|
assertTrue(client().prepareGet(initialStateIndex,
|
||||||
|
SINGLE_MAPPING_NAME,
|
||||||
|
InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")).get().isExists());
|
||||||
|
assertTrue(client().prepareGet(initialStateIndex,
|
||||||
|
SINGLE_MAPPING_NAME,
|
||||||
|
DataCounts.documentId("analytics-with-stats")).get().isExists());
|
||||||
|
|
||||||
|
// make sure that unused stats were deleted
|
||||||
|
assertFalse(client().prepareGet(initialStateIndex,
|
||||||
|
SINGLE_MAPPING_NAME,
|
||||||
|
DataCounts.documentId("missing-analytics-with-stats")).get().isExists());
|
||||||
|
assertFalse(client().prepareGet(initialStateIndex,
|
||||||
|
SINGLE_MAPPING_NAME,
|
||||||
|
InferenceStats.docId("missing-model", "test")).get().isExists());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void indexStatDocument(ToXContentObject object, String docId) throws Exception {
|
||||||
|
ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE,
|
||||||
|
Boolean.toString(true)));
|
||||||
|
IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias());
|
||||||
|
doc.id(docId);
|
||||||
|
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
|
||||||
|
object.toXContent(builder, params);
|
||||||
|
doc.source(builder);
|
||||||
|
client.index(doc).actionGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -191,7 +191,10 @@ public class TransportDeleteDataFrameAnalyticsAction
|
||||||
}
|
}
|
||||||
deleteConfig(parentTaskClient, id, listener);
|
deleteConfig(parentTaskClient, id, listener);
|
||||||
},
|
},
|
||||||
listener::onFailure
|
failure -> {
|
||||||
|
logger.warn(new ParameterizedMessage("[{}] failed to remove stats", id), ExceptionsHelper.unwrapCause(failure));
|
||||||
|
deleteConfig(parentTaskClient, id, listener);
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
// Step 3. Delete job docs from stats index
|
// Step 3. Delete job docs from stats index
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
|
||||||
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
|
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
|
||||||
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
|
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
|
||||||
import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover;
|
import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover;
|
||||||
|
import org.elasticsearch.xpack.ml.job.retention.UnusedStatsRemover;
|
||||||
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
|
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
|
||||||
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
|
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
|
||||||
import org.elasticsearch.xpack.ml.utils.persistence.WrappedBatchedJobsIterator;
|
import org.elasticsearch.xpack.ml.utils.persistence.WrappedBatchedJobsIterator;
|
||||||
|
@ -169,7 +170,8 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
|
||||||
new ExpiredForecastsRemover(client, threadPool),
|
new ExpiredForecastsRemover(client, threadPool),
|
||||||
new ExpiredModelSnapshotsRemover(client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool),
|
new ExpiredModelSnapshotsRemover(client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool),
|
||||||
new UnusedStateRemover(client, clusterService),
|
new UnusedStateRemover(client, clusterService),
|
||||||
new EmptyStateIndexRemover(client));
|
new EmptyStateIndexRemover(client),
|
||||||
|
new UnusedStatsRemover(client));
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<MlDataRemover> createDataRemovers(List<Job> jobs, AnomalyDetectionAuditor auditor) {
|
private List<MlDataRemover> createDataRemovers(List<Job> jobs, AnomalyDetectionAuditor auditor) {
|
||||||
|
@ -178,7 +180,8 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
|
||||||
new ExpiredForecastsRemover(client, threadPool),
|
new ExpiredForecastsRemover(client, threadPool),
|
||||||
new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool),
|
new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool),
|
||||||
new UnusedStateRemover(client, clusterService),
|
new UnusedStateRemover(client, clusterService),
|
||||||
new EmptyStateIndexRemover(client));
|
new EmptyStateIndexRemover(client),
|
||||||
|
new UnusedStatsRemover(client));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,118 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.xpack.ml.job.retention;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.support.IndicesOptions;
|
||||||
|
import org.elasticsearch.client.OriginSettingClient;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||||
|
import org.elasticsearch.index.query.QueryBuilder;
|
||||||
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
|
import org.elasticsearch.index.reindex.DeleteByQueryAction;
|
||||||
|
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||||
|
import org.elasticsearch.xpack.core.ml.MlConfigIndex;
|
||||||
|
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
|
||||||
|
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
|
||||||
|
import org.elasticsearch.xpack.core.ml.dataframe.stats.Fields;
|
||||||
|
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
|
||||||
|
import org.elasticsearch.xpack.core.ml.inference.persistence.InferenceIndexConstants;
|
||||||
|
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
|
||||||
|
import org.elasticsearch.xpack.ml.utils.persistence.DocIdBatchedDocumentIterator;
|
||||||
|
|
||||||
|
import java.util.Deque;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If for any reason a job or trained model is deleted but some of its stats documents
|
||||||
|
* are left behind, this class deletes any unused documents stored
|
||||||
|
* in the .ml-stats* indices.
|
||||||
|
*/
|
||||||
|
public class UnusedStatsRemover implements MlDataRemover {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LogManager.getLogger(UnusedStatsRemover.class);
|
||||||
|
|
||||||
|
private final OriginSettingClient client;
|
||||||
|
|
||||||
|
public UnusedStatsRemover(OriginSettingClient client) {
|
||||||
|
this.client = Objects.requireNonNull(client);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove(float requestsPerSec, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
|
||||||
|
try {
|
||||||
|
if (isTimedOutSupplier.get()) {
|
||||||
|
listener.onResponse(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
|
||||||
|
.mustNot(QueryBuilders.termsQuery(Fields.JOB_ID.getPreferredName(), getDataFrameAnalyticsJobIds()))
|
||||||
|
.mustNot(QueryBuilders.termsQuery(TrainedModelConfig.MODEL_ID.getPreferredName(), getTrainedModelIds()));
|
||||||
|
|
||||||
|
if (isTimedOutSupplier.get()) {
|
||||||
|
listener.onResponse(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
executeDeleteUnusedStatsDocs(queryBuilder, requestsPerSec, listener);
|
||||||
|
} catch (Exception e) {
|
||||||
|
listener.onFailure(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> getDataFrameAnalyticsJobIds() {
|
||||||
|
Set<String> jobIds = new HashSet<>();
|
||||||
|
|
||||||
|
DocIdBatchedDocumentIterator iterator = new DocIdBatchedDocumentIterator(client, MlConfigIndex.indexName(),
|
||||||
|
QueryBuilders.termQuery(DataFrameAnalyticsConfig.CONFIG_TYPE.getPreferredName(), DataFrameAnalyticsConfig.TYPE));
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
Deque<String> docIds = iterator.next();
|
||||||
|
docIds.stream().map(DataFrameAnalyticsConfig::extractJobIdFromDocId).filter(Objects::nonNull).forEach(jobIds::add);
|
||||||
|
}
|
||||||
|
return jobIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> getTrainedModelIds() {
|
||||||
|
Set<String> modelIds = new HashSet<>(TrainedModelProvider.MODELS_STORED_AS_RESOURCE);
|
||||||
|
|
||||||
|
DocIdBatchedDocumentIterator iterator = new DocIdBatchedDocumentIterator(client, InferenceIndexConstants.INDEX_PATTERN,
|
||||||
|
QueryBuilders.termQuery(InferenceIndexConstants.DOC_TYPE.getPreferredName(), TrainedModelConfig.NAME));
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
Deque<String> docIds = iterator.next();
|
||||||
|
docIds.stream().filter(Objects::nonNull).forEach(modelIds::add);
|
||||||
|
}
|
||||||
|
return modelIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void executeDeleteUnusedStatsDocs(QueryBuilder dbq, float requestsPerSec, ActionListener<Boolean> listener) {
|
||||||
|
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(MlStatsIndex.indexPattern())
|
||||||
|
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
|
||||||
|
.setAbortOnVersionConflict(false)
|
||||||
|
.setRequestsPerSecond(requestsPerSec)
|
||||||
|
.setQuery(dbq);
|
||||||
|
|
||||||
|
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(
|
||||||
|
response -> {
|
||||||
|
if (response.getBulkFailures().size() > 0 || response.getSearchFailures().size() > 0) {
|
||||||
|
LOGGER.error("Some unused stats documents could not be deleted due to failures: {}",
|
||||||
|
Strings.collectionToCommaDelimitedString(response.getBulkFailures()) +
|
||||||
|
"," + Strings.collectionToCommaDelimitedString(response.getSearchFailures()));
|
||||||
|
} else {
|
||||||
|
LOGGER.info("Successfully deleted [{}] unused stats documents", response.getDeleted());
|
||||||
|
}
|
||||||
|
listener.onResponse(true);
|
||||||
|
},
|
||||||
|
e -> {
|
||||||
|
LOGGER.error("Error deleting unused model stats documents: ", e);
|
||||||
|
listener.onFailure(e);
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue