From 647c3aef5380cc6fe7b57d9b7b41c4f4ee6d9ed4 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 12 Apr 2017 16:07:09 +0100 Subject: [PATCH] [ML] Delete model snapshot and state before results (elastic/x-pack-elasticsearch#1066) Original commit: elastic/x-pack-elasticsearch@65152e9735fc4d901de8375731682339354041ac --- .../persistence/JobStorageDeletionTask.java | 64 +++++++++---------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java index ae574300c81..faa621d4d0b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java @@ -56,25 +56,9 @@ public class JobStorageDeletionTask extends Task { ActionListener deleteAliasHandler = ActionListener.wrap(finishedHandler, failureHandler); - // Step 5. Delete categorizer state done, delete the alias - ActionListener deleteCategorizerStateHandler = ActionListener.wrap( - bulkItemResponses -> deleteAlias(jobId, aliasName, indexName, client, deleteAliasHandler), - failureHandler); - - // Step 4. Delete model state done, delete the categorizer state - ActionListener deleteStateHandler = ActionListener.wrap( - response -> deleteCategorizerState(jobId, client, deleteCategorizerStateHandler), - failureHandler); - - // Step 3. Delete quantiles done, delete the model state - ActionListener deleteQuantilesHandler = ActionListener.wrap( - deleteResponse -> deleteModelState(jobId, client, deleteStateHandler), - failureHandler); - - - // Step 2. DBQ done, delete the state - // ------- - ActionListener dbqHandler = ActionListener.wrap(bulkByScrollResponse -> { + // Step 5. DBQ state done, delete the alias + ActionListener dbqHandler = ActionListener.wrap( + bulkByScrollResponse -> { if (bulkByScrollResponse.isTimedOut()) { logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName, indexPattern); } @@ -82,23 +66,38 @@ public class JobStorageDeletionTask extends Task { logger.warn("[{}] {} failures encountered while running DeleteByQuery on indices [{}, {}].", jobId, bulkByScrollResponse.getBulkFailures().size(), indexName, indexPattern); } - deleteQuantiles(jobId, client, deleteQuantilesHandler); + deleteAlias(jobId, aliasName, indexName, client, deleteAliasHandler); }, - failureHandler - ); + failureHandler); + // Step 4. Delete categoriser state done, DeleteByQuery on the index, matching all docs with the right job_id + ActionListener deleteCategorizerStateHandler = ActionListener.wrap( + response -> { + logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]"); + SearchRequest searchRequest = new SearchRequest(indexName, indexPattern); + DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + ConstantScoreQueryBuilder query = + new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); + searchRequest.source(new SearchSourceBuilder().query(query)); + searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); + request.setSlices(5); - // Step 1. DeleteByQuery on the index, matching all docs with the right job_id - // ------- - logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]"); - SearchRequest searchRequest = new SearchRequest(indexName, indexPattern); - DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); - ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); - searchRequest.source(new SearchSourceBuilder().query(query)); - searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); - request.setSlices(5); + client.execute(XPackDeleteByQueryAction.INSTANCE, request, dbqHandler); + }, + failureHandler); - client.execute(XPackDeleteByQueryAction.INSTANCE, request, dbqHandler); + // Step 3. Delete quantiles done, delete the categorizer state + ActionListener deleteQuantilesHandler = ActionListener.wrap( + response -> deleteCategorizerState(jobId, client, deleteCategorizerStateHandler), + failureHandler); + + // Step 2. Delete state done, delete the quantiles + ActionListener deleteStateHandler = ActionListener.wrap( + bulkResponse -> deleteQuantiles(jobId, client, deleteQuantilesHandler), + failureHandler); + + // Step 1. Delete the model state + deleteModelState(jobId, client, deleteStateHandler); } private void deleteQuantiles(String jobId, Client client, ActionListener finishedHandler) { @@ -107,7 +106,6 @@ public class JobStorageDeletionTask extends Task { } private void deleteModelState(String jobId, Client client, ActionListener listener) { - JobProvider jobProvider = new JobProvider(client, Settings.EMPTY); jobProvider.modelSnapshots(jobId, 0, 10000, page -> {