[7.x][ML] Delete unused data frame analytics state (#50243) (#50280)

This commit adds removal of unused data frame analytics state
from the _delete_expired_data API (and in extend th ML daily
maintenance task). At the moment the potential state docs
include the progress document and state for regression and
classification analyses.

Backport of #50243
This commit is contained in:
Dimitris Athanasiou 2019-12-18 12:30:11 +00:00 committed by GitHub
parent 82086929d7
commit 447bac27d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 235 additions and 22 deletions

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -310,6 +311,15 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
return TYPE + "-" + id;
}
/**
* Returns the job id from the doc id. Returns {@code null} if the doc id is invalid.
*/
@Nullable
public static String extractJobIdFromDocId(String docId) {
String jobId = docId.replaceAll("^" + TYPE +"-", "");
return jobId.equals(docId) ? null : jobId;
}
public static class Builder {
private String id;

View File

@ -39,6 +39,8 @@ public class Classification implements DataFrameAnalysis {
public static final ParseField TRAINING_PERCENT = new ParseField("training_percent");
public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed");
private static final String STATE_DOC_ID_SUFFIX = "_classification_state#1";
private static final ConstructingObjectParser<Classification, Void> LENIENT_PARSER = createParser(true);
private static final ConstructingObjectParser<Classification, Void> STRICT_PARSER = createParser(false);
@ -258,7 +260,12 @@ public class Classification implements DataFrameAnalysis {
@Override
public String getStateDocId(String jobId) {
return jobId + "_classification_state#1";
return jobId + STATE_DOC_ID_SUFFIX;
}
public static String extractJobIdFromStateDoc(String stateDocId) {
int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);
}
@Override

View File

@ -36,6 +36,8 @@ public class Regression implements DataFrameAnalysis {
public static final ParseField TRAINING_PERCENT = new ParseField("training_percent");
public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed");
private static final String STATE_DOC_ID_SUFFIX = "_regression_state#1";
private static final ConstructingObjectParser<Regression, Void> LENIENT_PARSER = createParser(true);
private static final ConstructingObjectParser<Regression, Void> STRICT_PARSER = createParser(false);
@ -196,7 +198,12 @@ public class Regression implements DataFrameAnalysis {
@Override
public String getStateDocId(String jobId) {
return jobId + "_regression_state#1";
return jobId + STATE_DOC_ID_SUFFIX;
}
public static String extractJobIdFromStateDoc(String stateDocId) {
int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);
}
@Override

View File

@ -53,6 +53,7 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
public class DataFrameAnalyticsConfigTests extends AbstractSerializingTestCase<DataFrameAnalyticsConfig> {
@ -384,6 +385,13 @@ public class DataFrameAnalyticsConfigTests extends AbstractSerializingTestCase<D
}
}
public void testExtractJobIdFromDocId() {
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-foo"), equalTo("foo"));
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-data_frame_analytics_config-foo"),
equalTo("data_frame_analytics_config-foo"));
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("foo"), is(nullValue()));
}
private static void assertTooSmall(ElasticsearchStatusException e) {
assertThat(e.getMessage(), startsWith("model_memory_limit must be at least 1kb."));
}

View File

@ -215,4 +215,9 @@ public class ClassificationTests extends AbstractSerializingTestCase<Classificat
String randomId = randomAlphaOfLength(10);
assertThat(classification.getStateDocId(randomId), equalTo(randomId + "_classification_state#1"));
}
public void testExtractJobIdFromStateDoc() {
assertThat(Classification.extractJobIdFromStateDoc("foo_bar-1_classification_state#1"), equalTo("foo_bar-1"));
assertThat(Classification.extractJobIdFromStateDoc("noop"), is(nullValue()));
}
}

View File

@ -111,6 +111,11 @@ public class RegressionTests extends AbstractSerializingTestCase<Regression> {
assertThat(regression.getStateDocId(randomId), equalTo(randomId + "_regression_state#1"));
}
public void testExtractJobIdFromStateDoc() {
assertThat(Regression.extractJobIdFromStateDoc("foo_bar-1_regression_state#1"), equalTo("foo_bar-1"));
assertThat(Regression.extractJobIdFromStateDoc("noop"), is(nullValue()));
}
public void testToXContent_GivenVersionBeforeRandomizeSeedWasIntroduced() throws IOException {
Regression regression = createRandom();
assertThat(regression.getRandomizeSeed(), is(notNullValue()));

View File

@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
@ -17,6 +18,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@ -314,6 +316,38 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
}
public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
initialize("classification_delete_expired_data");
indexData(sourceIndex, 100, 0, KEYWORD_FIELD);
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);
assertProgress(jobId, 100, 100, 100, 100);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());
assertInferenceModelPersisted(jobId);
// Call _delete_expired_data API and check nothing was deleted
assertThat(deleteExpiredData().isDeleted(), is(true));
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());
// Delete the config straight from the config index
DeleteResponse deleteResponse = client().prepareDelete().setIndex(".ml-config").setId(DataFrameAnalyticsConfig.documentId(jobId))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet();
assertThat(deleteResponse.status(), equalTo(RestStatus.OK));
// Now calling the _delete_expired_data API should remove unused state
assertThat(deleteExpiredData().isDeleted(), is(true));
SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
}
private void initialize(String jobId) {
this.jobId = jobId;
this.sourceIndex = jobId + "_source_index";

View File

@ -87,7 +87,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
cleanUp();
}
public void testDeleteExpiredDataGivenNothingToDelete() throws Exception {
public void testDeleteExpiredData_GivenNothingToDelete() throws Exception {
// Tests that nothing goes wrong when there's nothing to delete
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
}
@ -202,10 +202,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
assertThat(indexUnusedStateDocsResponse.get().status(), equalTo(RestStatus.OK));
// Now call the action under test
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
// We need to refresh to ensure the deletion is visible
client().admin().indices().prepareRefresh("*").get();
assertThat(deleteExpiredData().isDeleted(), is(true));
// no-retention job should have kept all data
assertThat(getBuckets("no-retention").size(), is(greaterThanOrEqualTo(70)));

View File

@ -21,6 +21,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
@ -45,7 +46,6 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;

View File

@ -40,7 +40,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
@ -205,7 +205,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
}
protected SearchResponse searchStoredProgress(String jobId) {
String docId = DataFrameAnalyticsTask.progressDocId(jobId);
String docId = StoredProgress.documentId(jobId);
return client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
.setQuery(QueryBuilders.idsQuery().addIds(docId))
.get();

View File

@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.XPackClientPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
@ -113,6 +114,16 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase {
}
}
protected DeleteExpiredDataAction.Response deleteExpiredData() throws Exception {
DeleteExpiredDataAction.Response response = client().execute(DeleteExpiredDataAction.INSTANCE,
new DeleteExpiredDataAction.Request()).get();
// We need to refresh to ensure the deletion is visible
client().admin().indices().prepareRefresh("*").get();
return response;
}
@Override
protected void ensureClusterStateConsistency() throws IOException {
if (cluster() != null && cluster().size() > 0) {

View File

@ -7,11 +7,13 @@ package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
@ -272,6 +274,38 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
}
public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
initialize("regression_delete_expired_data");
indexData(sourceIndex, 100, 0);
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);
assertProgress(jobId, 100, 100, 100, 100);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());
assertInferenceModelPersisted(jobId);
// Call _delete_expired_data API and check nothing was deleted
assertThat(deleteExpiredData().isDeleted(), is(true));
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());
// Delete the config straight from the config index
DeleteResponse deleteResponse = client().prepareDelete().setIndex(".ml-config").setId(DataFrameAnalyticsConfig.documentId(jobId))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet();
assertThat(deleteResponse.status(), equalTo(RestStatus.OK));
// Now calling the _delete_expired_data API should remove unused state
assertThat(deleteExpiredData().isDeleted(), is(true));
SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
}
private void initialize(String jobId) {
this.jobId = jobId;
this.sourceIndex = jobId + "_source_index";

View File

@ -43,7 +43,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
@ -171,7 +171,7 @@ public class TransportDeleteDataFrameAnalyticsAction
DataFrameAnalyticsConfig config,
ActionListener<BulkByScrollResponse> listener) {
List<String> ids = new ArrayList<>();
ids.add(DataFrameAnalyticsTask.progressDocId(config.getId()));
ids.add(StoredProgress.documentId(config.getId()));
if (config.getAnalysis().persistsState()) {
ids.add(config.getAnalysis().getStateDocId(config.getId()));
}

View File

@ -187,7 +187,7 @@ public class TransportGetDataFrameAnalyticsStatsAction
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern());
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
searchRequest.source().size(1);
searchRequest.source().query(QueryBuilders.idsQuery().addIds(DataFrameAnalyticsTask.progressDocId(configId)));
searchRequest.source().query(QueryBuilders.idsQuery().addIds(StoredProgress.documentId(configId)));
multiSearchRequest.add(searchRequest);
}

View File

@ -244,7 +244,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
statsResponse -> {
GetDataFrameAnalyticsStatsAction.Response.Stats stats = statsResponse.getResponse().results().get(0);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
indexRequest.id(progressDocId(taskParams.getId()));
indexRequest.id(StoredProgress.documentId(taskParams.getId()));
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {
new StoredProgress(stats.getProgress()).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
@ -310,10 +310,6 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
}
}
public static String progressDocId(String id) {
return "data_frame_analytics-" + id + "-progress";
}
public static class ProgressTracker {
public static final String REINDEXING = "reindexing";

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.dataframe;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
@ -14,6 +15,8 @@ import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class StoredProgress implements ToXContentObject {
@ -57,4 +60,15 @@ public class StoredProgress implements ToXContentObject {
public int hashCode() {
return Objects.hash(progress);
}
public static String documentId(String id) {
return "data_frame_analytics-" + id + "-progress";
}
@Nullable
public static String extractJobIdFromDocId(String docId) {
Pattern pattern = Pattern.compile("^data_frame_analytics-(.*)-progress$");
Matcher matcher = pattern.matcher(docId);
return matcher.find() ? matcher.group(1) : null;
}
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
import java.io.IOException;
import java.io.InputStream;

View File

@ -10,6 +10,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
public abstract class BatchedResultsIterator<T> extends BatchedDocumentsIterator<Result<T>> {

View File

@ -9,6 +9,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
/**
* Iterates through the state doc ids

View File

@ -12,7 +12,7 @@ import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;

View File

@ -16,14 +16,19 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator;
import org.elasticsearch.xpack.ml.utils.persistence.DocIdBatchedDocumentIterator;
import java.util.ArrayList;
import java.util.Arrays;
@ -93,6 +98,13 @@ public class UnusedStateRemover implements MlDataRemover {
private Set<String> getJobIds() {
Set<String> jobIds = new HashSet<>();
jobIds.addAll(getAnamalyDetectionJobIds());
jobIds.addAll(getDataFrameAnalyticsJobIds());
return jobIds;
}
private Set<String> getAnamalyDetectionJobIds() {
Set<String> jobIds = new HashSet<>();
// TODO Once at 8.0, we can stop searching for jobs in cluster state
// and remove cluster service as a member all together.
@ -106,6 +118,18 @@ public class UnusedStateRemover implements MlDataRemover {
return jobIds;
}
private Set<String> getDataFrameAnalyticsJobIds() {
Set<String> jobIds = new HashSet<>();
DocIdBatchedDocumentIterator iterator = new DocIdBatchedDocumentIterator(client, AnomalyDetectorsIndex.configIndexName(),
QueryBuilders.termQuery(DataFrameAnalyticsConfig.CONFIG_TYPE.getPreferredName(), DataFrameAnalyticsConfig.TYPE));
while (iterator.hasNext()) {
Deque<String> docIds = iterator.next();
docIds.stream().map(DataFrameAnalyticsConfig::extractJobIdFromDocId).filter(Objects::nonNull).forEach(jobIds::add);
}
return jobIds;
}
private void executeDeleteUnusedStateDocs(List<String> unusedDocIds, ActionListener<Boolean> listener) {
LOGGER.info("Found [{}] unused state documents; attempting to delete",
unusedDocIds.size());
@ -137,7 +161,13 @@ public class UnusedStateRemover implements MlDataRemover {
private static class JobIdExtractor {
private static List<Function<String, String>> extractors = Arrays.asList(
ModelState::extractJobId, Quantiles::extractJobId, CategorizerState::extractJobId);
ModelState::extractJobId,
Quantiles::extractJobId,
CategorizerState::extractJobId,
Classification::extractJobIdFromStateDoc,
Regression::extractJobIdFromStateDoc,
StoredProgress::extractJobIdFromDocId
);
private static String extractJobId(String docId) {
String jobId;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
package org.elasticsearch.xpack.ml.utils.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

View File

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.utils.persistence;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import java.util.Objects;
/**
* This is a document iterator that returns just the id of each matched document.
*/
public class DocIdBatchedDocumentIterator extends BatchedDocumentsIterator<String> {
private final QueryBuilder query;
public DocIdBatchedDocumentIterator(Client client, String index, QueryBuilder query) {
super(client, index);
this.query = Objects.requireNonNull(query);
}
@Override
protected QueryBuilder getQuery() {
return query;
}
@Override
protected String map(SearchHit hit) {
return hit.getId();
}
@Override
protected boolean shouldFetchSource() {
return false;
}
}

View File

@ -13,6 +13,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
public class StoredProgressTests extends AbstractXContentTestCase<StoredProgress> {
@Override
@ -34,4 +36,14 @@ public class StoredProgressTests extends AbstractXContentTestCase<StoredProgress
}
return new StoredProgress(progress);
}
public void testDocumentId() {
assertThat(StoredProgress.documentId("foo"), equalTo("data_frame_analytics-foo-progress"));
}
public void testExtractJobIdFromDocId() {
assertThat(StoredProgress.extractJobIdFromDocId("data_frame_analytics-foo-progress"), equalTo("foo"));
assertThat(StoredProgress.extractJobIdFromDocId("data_frame_analytics-data_frame_analytics-bar-progress-progress"),
equalTo("data_frame_analytics-bar-progress"));
}
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
package org.elasticsearch.xpack.ml.utils.persistence;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ActionFuture;