[ML] Clean left behind model state docs (#30659)
It is possible for state documents to be left behind in the state index. This may be because of bugs or uncontrollable scenarios. In any case, those documents may take up quite some disk space when they add up. This commit adds a step in the expired data deletion that is part of the daily maintenance service. The new step searches for state documents that do not belong to any of the current jobs and deletes them. Closes #30551
This commit is contained in:
parent
7c2fc26011
commit
75665a2d3e
|
@ -37,6 +37,16 @@ public class CategorizerState {
|
|||
return jobId + "#";
|
||||
}
|
||||
|
||||
/**
|
||||
* Given the id of a categorizer state document it extracts the job id
|
||||
* @param docId the categorizer state document id
|
||||
* @return the job id or {@code null} if the id is not valid
|
||||
*/
|
||||
public static final String extractJobId(String docId) {
|
||||
int suffixIndex = docId.lastIndexOf("_" + TYPE);
|
||||
return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex);
|
||||
}
|
||||
|
||||
private CategorizerState() {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,16 @@ public class ModelState {
|
|||
return jobId + "-" + snapshotId + "#" + docNum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given the id of a state document it extracts the job id
|
||||
* @param docId the state document id
|
||||
* @return the job id or {@code null} if the id is not valid
|
||||
*/
|
||||
public static final String extractJobId(String docId) {
|
||||
int suffixIndex = docId.lastIndexOf("_" + TYPE + "_");
|
||||
return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex);
|
||||
}
|
||||
|
||||
private ModelState() {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,6 +60,16 @@ public class Quantiles implements ToXContentObject, Writeable {
|
|||
return jobId + "-" + TYPE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given the id of a quantiles document it extracts the job id
|
||||
* @param docId the quantiles document id
|
||||
* @return the job id or {@code null} if the id is not valid
|
||||
*/
|
||||
public static final String extractJobId(String docId) {
|
||||
int suffixIndex = docId.lastIndexOf("_" + TYPE);
|
||||
return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex);
|
||||
}
|
||||
|
||||
private final String jobId;
|
||||
private final Date timestamp;
|
||||
private final String quantileState;
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
|
||||
public class CategorizerStateTests extends ESTestCase {
|
||||
|
||||
public void testExtractJobId_GivenValidDocId() {
|
||||
assertThat(CategorizerState.extractJobId("foo_categorizer_state#1"), equalTo("foo"));
|
||||
assertThat(CategorizerState.extractJobId("bar_categorizer_state#2"), equalTo("bar"));
|
||||
assertThat(CategorizerState.extractJobId("foo_bar_categorizer_state#3"), equalTo("foo_bar"));
|
||||
assertThat(CategorizerState.extractJobId("_categorizer_state_categorizer_state#3"), equalTo("_categorizer_state"));
|
||||
}
|
||||
|
||||
public void testExtractJobId_GivenInvalidDocId() {
|
||||
assertThat(CategorizerState.extractJobId(""), is(nullValue()));
|
||||
assertThat(CategorizerState.extractJobId("foo"), is(nullValue()));
|
||||
assertThat(CategorizerState.extractJobId("_categorizer_state"), is(nullValue()));
|
||||
assertThat(CategorizerState.extractJobId("foo_model_state_3141341341"), is(nullValue()));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
|
||||
public class ModelStateTests extends ESTestCase {
|
||||
|
||||
public void testExtractJobId_GivenValidDocId() {
|
||||
assertThat(ModelState.extractJobId("foo_model_state_3151373783#1"), equalTo("foo"));
|
||||
assertThat(ModelState.extractJobId("bar_model_state_451515#3"), equalTo("bar"));
|
||||
assertThat(ModelState.extractJobId("foo_bar_model_state_blah_blah"), equalTo("foo_bar"));
|
||||
assertThat(ModelState.extractJobId("_model_state_model_state_11111"), equalTo("_model_state"));
|
||||
}
|
||||
|
||||
public void testExtractJobId_GivenInvalidDocId() {
|
||||
assertThat(ModelState.extractJobId(""), is(nullValue()));
|
||||
assertThat(ModelState.extractJobId("foo"), is(nullValue()));
|
||||
assertThat(ModelState.extractJobId("_model_3141341341"), is(nullValue()));
|
||||
assertThat(ModelState.extractJobId("_state_3141341341"), is(nullValue()));
|
||||
assertThat(ModelState.extractJobId("_model_state_3141341341"), is(nullValue()));
|
||||
assertThat(ModelState.extractJobId("foo_quantiles"), is(nullValue()));
|
||||
}
|
||||
}
|
|
@ -15,9 +15,26 @@ import java.io.IOException;
|
|||
import java.util.Date;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
|
||||
public class QuantilesTests extends AbstractSerializingTestCase<Quantiles> {
|
||||
|
||||
public void testExtractJobId_GivenValidDocId() {
|
||||
assertThat(Quantiles.extractJobId("foo_quantiles"), equalTo("foo"));
|
||||
assertThat(Quantiles.extractJobId("bar_quantiles"), equalTo("bar"));
|
||||
assertThat(Quantiles.extractJobId("foo_bar_quantiles"), equalTo("foo_bar"));
|
||||
assertThat(Quantiles.extractJobId("_quantiles_quantiles"), equalTo("_quantiles"));
|
||||
}
|
||||
|
||||
public void testExtractJobId_GivenInvalidDocId() {
|
||||
assertThat(Quantiles.extractJobId(""), is(nullValue()));
|
||||
assertThat(Quantiles.extractJobId("foo"), is(nullValue()));
|
||||
assertThat(Quantiles.extractJobId("_quantiles"), is(nullValue()));
|
||||
assertThat(Quantiles.extractJobId("foo_model_state_3141341341"), is(nullValue()));
|
||||
}
|
||||
|
||||
public void testEquals_GivenSameObject() {
|
||||
Quantiles quantiles = new Quantiles("foo", new Date(0L), "foo");
|
||||
assertTrue(quantiles.equals(quantiles));
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.xpack.ml.job.retention.ExpiredForecastsRemover;
|
|||
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
|
||||
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
|
||||
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
|
||||
import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
|
||||
|
||||
|
@ -56,7 +57,8 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
|
|||
List<MlDataRemover> dataRemovers = Arrays.asList(
|
||||
new ExpiredResultsRemover(client, clusterService, auditor),
|
||||
new ExpiredForecastsRemover(client),
|
||||
new ExpiredModelSnapshotsRemover(client, clusterService)
|
||||
new ExpiredModelSnapshotsRemover(client, clusterService),
|
||||
new UnusedStateRemover(client, clusterService)
|
||||
);
|
||||
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
|
||||
deleteExpiredData(dataRemoversIterator, listener);
|
||||
|
|
|
@ -97,6 +97,7 @@ public abstract class BatchedDocumentsIterator<T> {
|
|||
searchRequest.source(new SearchSourceBuilder()
|
||||
.size(BATCH_SIZE)
|
||||
.query(getQuery())
|
||||
.fetchSource(shouldFetchSource())
|
||||
.sort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)));
|
||||
|
||||
SearchResponse searchResponse = client.search(searchRequest).actionGet();
|
||||
|
@ -123,6 +124,14 @@ public abstract class BatchedDocumentsIterator<T> {
|
|||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should fetch source? Defaults to {@code true}
|
||||
* @return whether the source should be fetched
|
||||
*/
|
||||
protected boolean shouldFetchSource() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the query to use for the search
|
||||
* @return the search query
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.persistence;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
|
||||
/**
|
||||
* Iterates through the state doc ids
|
||||
*/
|
||||
public class BatchedStateDocIdsIterator extends BatchedDocumentsIterator<String> {
|
||||
|
||||
public BatchedStateDocIdsIterator(Client client, String index) {
|
||||
super(client, index);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldFetchSource() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected QueryBuilder getQuery() {
|
||||
return QueryBuilders.matchAllQuery();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String map(SearchHit hit) {
|
||||
return hit.getId();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,134 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.retention;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.xpack.core.ml.MLMetadataField;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Deque;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* If for any reason a job is deleted by some of its state documents
|
||||
* are left behind, this class deletes any unused documents stored
|
||||
* in the .ml-state index.
|
||||
*/
|
||||
public class UnusedStateRemover implements MlDataRemover {
|
||||
|
||||
private static final Logger LOGGER = Loggers.getLogger(UnusedStateRemover.class);
|
||||
|
||||
private final Client client;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
public UnusedStateRemover(Client client, ClusterService clusterService) {
|
||||
this.client = Objects.requireNonNull(client);
|
||||
this.clusterService = Objects.requireNonNull(clusterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(ActionListener<Boolean> listener) {
|
||||
try {
|
||||
BulkRequestBuilder deleteUnusedStateRequestBuilder = findUnusedStateDocs();
|
||||
if (deleteUnusedStateRequestBuilder.numberOfActions() > 0) {
|
||||
executeDeleteUnusedStateDocs(deleteUnusedStateRequestBuilder, listener);
|
||||
} else {
|
||||
listener.onResponse(true);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
private BulkRequestBuilder findUnusedStateDocs() {
|
||||
Set<String> jobIds = getJobIds();
|
||||
BulkRequestBuilder deleteUnusedStateRequestBuilder = client.prepareBulk();
|
||||
BatchedStateDocIdsIterator stateDocIdsIterator = new BatchedStateDocIdsIterator(client, AnomalyDetectorsIndex.jobStateIndexName());
|
||||
while (stateDocIdsIterator.hasNext()) {
|
||||
Deque<String> stateDocIds = stateDocIdsIterator.next();
|
||||
for (String stateDocId : stateDocIds) {
|
||||
String jobId = JobIdExtractor.extractJobId(stateDocId);
|
||||
if (jobId == null) {
|
||||
// not a managed state document id
|
||||
continue;
|
||||
}
|
||||
if (jobIds.contains(jobId) == false) {
|
||||
deleteUnusedStateRequestBuilder.add(new DeleteRequest(
|
||||
AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, stateDocId));
|
||||
}
|
||||
}
|
||||
}
|
||||
return deleteUnusedStateRequestBuilder;
|
||||
}
|
||||
|
||||
private Set<String> getJobIds() {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
|
||||
if (mlMetadata != null) {
|
||||
return mlMetadata.getJobs().keySet();
|
||||
}
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener<Boolean> listener) {
|
||||
LOGGER.info("Found [{}] unused state documents; attempting to delete",
|
||||
deleteUnusedStateRequestBuilder.numberOfActions());
|
||||
deleteUnusedStateRequestBuilder.execute(new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkItemResponses) {
|
||||
if (bulkItemResponses.hasFailures()) {
|
||||
LOGGER.error("Some unused state documents could not be deleted due to failures: {}",
|
||||
bulkItemResponses.buildFailureMessage());
|
||||
} else {
|
||||
LOGGER.info("Successfully deleted all unused state documents");
|
||||
}
|
||||
listener.onResponse(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
LOGGER.error("Error deleting unused model state documents: ", e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static class JobIdExtractor {
|
||||
|
||||
private static List<Function<String, String>> extractors = Arrays.asList(
|
||||
ModelState::extractJobId, Quantiles::extractJobId, CategorizerState::extractJobId);
|
||||
|
||||
private static String extractJobId(String docId) {
|
||||
String jobId;
|
||||
for (Function<String, String> extractor : extractors) {
|
||||
jobId = extractor.apply(docId);
|
||||
if (jobId != null) {
|
||||
return jobId;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -8,12 +8,15 @@ package org.elasticsearch.xpack.ml.integration;
|
|||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.action.update.UpdateAction;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
|
@ -21,6 +24,7 @@ import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
|
|||
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
|
||||
|
@ -31,13 +35,16 @@ import java.io.IOException;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
|
||||
|
@ -78,11 +85,16 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
|
|||
}
|
||||
|
||||
@After
|
||||
public void tearDownData() throws Exception {
|
||||
public void tearDownData() {
|
||||
client().admin().indices().prepareDelete(DATA_INDEX).get();
|
||||
cleanUp();
|
||||
}
|
||||
|
||||
public void testDeleteExpiredDataGivenNothingToDelete() throws Exception {
|
||||
// Tests that nothing goes wrong when there's nothing to delete
|
||||
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
|
||||
}
|
||||
|
||||
public void testDeleteExpiredData() throws Exception {
|
||||
registerJob(newJobBuilder("no-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(null));
|
||||
registerJob(newJobBuilder("results-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(null));
|
||||
|
@ -166,6 +178,18 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
|
|||
assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), equalTo(forecastStat.getRecordCount()));
|
||||
}
|
||||
|
||||
// Index some unused state documents (more than 10K to test scrolling works)
|
||||
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
|
||||
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
for (int i = 0; i < 10010; i++) {
|
||||
String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
|
||||
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexName(), "doc", docId);
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
bulkRequestBuilder.add(indexRequest);
|
||||
}
|
||||
assertThat(bulkRequestBuilder.get().status(), equalTo(RestStatus.OK));
|
||||
|
||||
// Now call the action under test
|
||||
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
|
||||
|
||||
// We need to refresh to ensure the deletion is visible
|
||||
|
@ -216,6 +240,16 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
|
|||
assertThat(countForecastDocs(job.getId(), forecastId), equalTo(0L));
|
||||
}
|
||||
}
|
||||
|
||||
// Verify .ml-state doesn't contain unused state documents
|
||||
SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexName())
|
||||
.setFetchSource(false)
|
||||
.setSize(10000)
|
||||
.get();
|
||||
assertThat(stateDocsResponse.getHits().getTotalHits(), lessThan(10000L));
|
||||
for (SearchHit hit : stateDocsResponse.getHits().getHits()) {
|
||||
assertThat(hit.getId().startsWith("non_existing_job"), is(false));
|
||||
}
|
||||
}
|
||||
|
||||
private static Job.Builder newJobBuilder(String id) {
|
||||
|
|
Loading…
Reference in New Issue