Delete state in the JobStorageDeletionTask (elastic/x-pack-elasticsearch#702)

Original commit: elastic/x-pack-elasticsearch@272b0894c5
This commit is contained in:
David Kyle 2017-03-17 09:37:28 +00:00 committed by GitHub
parent 60c79ccdbb
commit a7d42f3732
3 changed files with 127 additions and 44 deletions

View File

@ -7,14 +7,19 @@ package org.elasticsearch.xpack.ml.job.persistence;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
@ -25,8 +30,12 @@ import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.ml.action.MlDeleteByQueryAction;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import java.util.List;
import java.util.function.Consumer;
public class JobStorageDeletionTask extends Task {
@ -55,35 +64,60 @@ public class JobStorageDeletionTask extends Task {
finishedHandler.accept(true);
};
// Step 2. DBQ done, delete the alias
// Step 5. Delete categorizer state done, delete the alias
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
bulkItemResponses -> {
IndicesAliasesRequest request = new IndicesAliasesRequest()
.addAliasAction(IndicesAliasesRequest.AliasActions.remove().alias(aliasName).index(indexName));
client.admin().indices().aliases(request, ActionListener.wrap(deleteAliasHandler,
e -> {
if (e instanceof IndexNotFoundException) {
logger.warn("Alias [" + aliasName + "] not found. Continuing to delete job.");
try {
finishedHandler.accept(false);
} catch (Exception e1) {
failureHandler.accept(e1);
}
} else {
// all other exceptions should die
failureHandler.accept(e);
}
}));
},
failureHandler::accept);
// Step 4. Delete model state done, delete the categorizer state
ActionListener<BulkResponse> deleteStateHandler = ActionListener.wrap(response -> {
deleteCategorizerState(jobId, client, deleteCategorizerStateHandler);
},
failureHandler::accept
);
// Step 3. Delete quantiles done, delete the model state
ActionListener<DeleteResponse> deleteQuantilesHandler = ActionListener.wrap(deleteResponse -> {
deleteModelState(jobId, client, deleteStateHandler);
},
failureHandler::accept
);
// Step 2. DBQ done, delete the state
// -------
// TODO norelease more robust handling of failures?
CheckedConsumer<BulkByScrollResponse, Exception> dbqHandler = bulkByScrollResponse -> {
if (bulkByScrollResponse.isTimedOut()) {
logger.warn("DeleteByQuery for indices [" + indexName + ", " + indexPattern + "] timed out.");
}
if (!bulkByScrollResponse.getBulkFailures().isEmpty()) {
logger.warn("[" + bulkByScrollResponse.getBulkFailures().size()
+ "] failures encountered while running DeleteByQuery on indices [" + indexName + ", "
+ indexPattern + "]. ");
}
IndicesAliasesRequest request = new IndicesAliasesRequest()
.addAliasAction(IndicesAliasesRequest.AliasActions.remove().alias(aliasName).index(indexName));
client.admin().indices().aliases(request, ActionListener.wrap(deleteAliasHandler,
e -> {
if (e instanceof IndexNotFoundException) {
logger.warn("Alias [" + aliasName + "] not found. Continuing to delete job.");
try {
finishedHandler.accept(false);
} catch (Exception e1) {
failureHandler.accept(e1);
}
} else {
// all other exceptions should die
failureHandler.accept(e);
}
}));
};
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(bulkByScrollResponse -> {
if (bulkByScrollResponse.isTimedOut()) {
logger.warn("DeleteByQuery for indices [" + indexName + ", " + indexPattern + "] timed out.");
}
if (!bulkByScrollResponse.getBulkFailures().isEmpty()) {
logger.warn("[" + bulkByScrollResponse.getBulkFailures().size()
+ "] failures encountered while running DeleteByQuery on indices [" + indexName + ", "
+ indexPattern + "]. ");
}
deleteQuantiles(jobId, client, deleteQuantilesHandler);
},
failureHandler::accept
);
// Step 1. DeleteByQuery on the index, matching all docs with the right job_id
// -------
@ -95,6 +129,44 @@ public class JobStorageDeletionTask extends Task {
searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setSlices(5);
client.execute(MlDeleteByQueryAction.INSTANCE, request, ActionListener.wrap(dbqHandler, failureHandler));
client.execute(MlDeleteByQueryAction.INSTANCE, request, dbqHandler);
}
public void deleteQuantiles(String jobId, Client client, ActionListener<DeleteResponse> finishedHandler) {
client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), Quantiles.TYPE.getPreferredName(), Quantiles.documentId(jobId))
.execute(finishedHandler);
}
private void deleteModelState(String jobId, Client client, ActionListener<BulkResponse> listener) {
JobProvider jobProvider = new JobProvider(client, Settings.EMPTY);
jobProvider.modelSnapshots(jobId, 0, 10000,
page -> {
List<ModelSnapshot> deleteCandidates = page.results();
// Delete the snapshot and any associated state files
JobDataDeleter deleter = new JobDataDeleter(client, jobId);
for (ModelSnapshot deleteCandidate : deleteCandidates) {
deleter.deleteModelSnapshot(deleteCandidate);
}
deleter.commit(listener);
},
listener::onFailure);
}
private void deleteCategorizerState(String jobId, Client client, ActionListener<Boolean> finishedHandler) {
int docNum = 0;
while (true) {
String docId = CategorizerState.categorizerStateDocId(jobId, ++docNum);
DeleteRequest deleteRequest =
client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, docId).request();
DeleteResponse response = client.delete(deleteRequest).actionGet();
if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) {
break;
}
}
finishedHandler.onResponse(true);
}
}

View File

@ -19,8 +19,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.json.JsonXContentParser;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
@ -299,7 +297,7 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
}
public static ModelSnapshot fromJson(BytesReference bytesReference) {
try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, bytesReference)) {
try (XContentParser parser = XContentFactory.xContent(bytesReference).createParser(NamedXContentRegistry.EMPTY, bytesReference)) {
return PARSER.apply(parser, null);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse modelSnapshot", e);

View File

@ -95,7 +95,7 @@ setup:
- do:
indices.exists:
index: ".ml-anomalies-farequote"
index: ".ml-state"
- is_true: ''
- do:
@ -106,6 +106,11 @@ setup:
- do:
indices.refresh: {}
- do:
count:
index: .ml-state
- gt: {count: 0}
- do:
count:
index: .ml-anomalies-shared
@ -163,10 +168,6 @@ setup:
job_id: "farequote"
- match: { acknowledged: true }
# TODO: delete_job is asynchronous and returns before all job results are deleted - ideally
# we'd have a way to wait for delete_job to finish its work, but for now just assert some
# other things before refreshing the indexes to give time for delete_job to do its work
- do:
indices.exists:
index: ".ml-anomalies-shared"
@ -175,6 +176,11 @@ setup:
- do:
indices.refresh: {}
- do:
count:
index: .ml-state
- match: {count: 2}
- do:
count:
index: .ml-anomalies-shared
@ -221,10 +227,6 @@ setup:
job_id: "farequote2"
- match: { acknowledged: true }
# TODO: delete_job is asynchronous and returns before all job results are deleted - ideally
# we'd have a way to wait for delete_job to finish its work, but for now just assert some
# other things before refreshing the indexes to give time for delete_job to do its work
- do:
indices.exists:
index: ".ml-anomalies-shared"
@ -243,6 +245,11 @@ setup:
- do:
indices.refresh: {}
- do:
count:
index: .ml-state
- match: {count: 0}
- do:
count:
index: .ml-anomalies-shared
@ -330,10 +337,6 @@ setup:
job_id: "farequote"
- match: { acknowledged: true }
# TODO: delete_job is asynchronous and returns before all job results are deleted - ideally
# we'd have a way to wait for delete_job to finish its work, but for now just assert some
# other things before refreshing the indexes to give time for delete_job to do its work
- do:
indices.exists:
index: ".ml-anomalies-shared"
@ -344,6 +347,11 @@ setup:
index: ".ml-anomalies-foo"
- is_true: ''
- do:
indices.exists:
index: ".ml-state"
- is_true: ''
- do:
indices.exists:
index: "foo"
@ -352,6 +360,11 @@ setup:
- do:
indices.refresh: {}
- do:
count:
index: .ml-state
- match: {count: 0}
- do:
count:
index: .ml-anomalies-shared