From 9d4e3efad4f9f07f3b5c0240a1ffc071eb4e8c64 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 17 Mar 2017 11:54:31 +0100 Subject: [PATCH] [ML] Fix deleteCategorizerState() by using delete by query instead of individual blocking delete requests. Original commit: elastic/x-pack-elasticsearch@f28cc9fc5b7f30d589c596b2776c0e10cd89536b --- .../persistence/JobStorageDeletionTask.java | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java index bc776a69728..15c9777228d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java @@ -7,11 +7,11 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; +import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; @@ -21,12 +21,13 @@ import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; +import org.elasticsearch.index.query.PrefixQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; -import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest; -import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.ml.action.MlDeleteByQueryAction; import org.elasticsearch.xpack.ml.job.config.Job; @@ -34,7 +35,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; - import java.util.List; import java.util.function.Consumer; @@ -84,20 +84,20 @@ public class JobStorageDeletionTask extends Task { } })); }, - failureHandler::accept); + failureHandler); // Step 4. Delete model state done, delete the categorizer state ActionListener deleteStateHandler = ActionListener.wrap(response -> { deleteCategorizerState(jobId, client, deleteCategorizerStateHandler); }, - failureHandler::accept + failureHandler ); // Step 3. Delete quantiles done, delete the model state ActionListener deleteQuantilesHandler = ActionListener.wrap(deleteResponse -> { deleteModelState(jobId, client, deleteStateHandler); }, - failureHandler::accept + failureHandler ); @@ -115,7 +115,7 @@ public class JobStorageDeletionTask extends Task { } deleteQuantiles(jobId, client, deleteQuantilesHandler); }, - failureHandler::accept + failureHandler ); @@ -156,17 +156,24 @@ public class JobStorageDeletionTask extends Task { } private void deleteCategorizerState(String jobId, Client client, ActionListener finishedHandler) { - int docNum = 0; - while (true) { - String docId = CategorizerState.categorizerStateDocId(jobId, ++docNum); - DeleteRequest deleteRequest = - client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, docId).request(); - DeleteResponse response = client.delete(deleteRequest).actionGet(); - if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) { - break; - } - } + SearchRequest searchRequest = new SearchRequest(); + DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + PrefixQueryBuilder query = new PrefixQueryBuilder(UidFieldMapper.NAME, Uid.createUid(CategorizerState.TYPE, jobId)); + searchRequest.source(new SearchSourceBuilder().query(query)); + searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); + request.setSlices(5); - finishedHandler.onResponse(true); + client.execute(MlDeleteByQueryAction.INSTANCE, request, new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) { + finishedHandler.onResponse(true); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to delete categorizer state for job [" + jobId + "]", e); + finishedHandler.onResponse(false); + } + }); } }