diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java index 1796c624696..a7174cf0f6e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest; @@ -26,6 +25,7 @@ import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.query.WildcardQueryBuilder; +import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -54,63 +54,33 @@ public class JobStorageDeletionTask extends Task { final String indexPattern = indexName + "-*"; final String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - CheckedConsumer deleteAliasHandler = indicesAliasesResponse -> { - if (!indicesAliasesResponse.isAcknowledged()) { - logger.warn("Delete Alias request not acknowledged for alias [" + aliasName + "]."); - } else { - logger.info("Done deleting alias [" + aliasName + "]"); - } - - finishedHandler.accept(true); - }; + ActionListener deleteAliasHandler = ActionListener.wrap(finishedHandler, failureHandler); // Step 5. Delete categorizer state done, delete the alias ActionListener deleteCategorizerStateHandler = ActionListener.wrap( - bulkItemResponses -> { - IndicesAliasesRequest request = new IndicesAliasesRequest() - .addAliasAction(IndicesAliasesRequest.AliasActions.remove().alias(aliasName).index(indexName)); - client.admin().indices().aliases(request, ActionListener.wrap(deleteAliasHandler, - e -> { - if (e instanceof IndexNotFoundException) { - logger.warn("Alias [" + aliasName + "] not found. Continuing to delete job."); - try { - finishedHandler.accept(false); - } catch (Exception e1) { - failureHandler.accept(e1); - } - } else { - // all other exceptions should die - failureHandler.accept(e); - } - })); - }, + bulkItemResponses -> deleteAlias(jobId, aliasName, indexName, client, deleteAliasHandler), failureHandler); // Step 4. Delete model state done, delete the categorizer state - ActionListener deleteStateHandler = ActionListener.wrap(response -> { - deleteCategorizerState(jobId, client, deleteCategorizerStateHandler); - }, - failureHandler - ); + ActionListener deleteStateHandler = ActionListener.wrap( + response -> deleteCategorizerState(jobId, client, deleteCategorizerStateHandler), + failureHandler); // Step 3. Delete quantiles done, delete the model state - ActionListener deleteQuantilesHandler = ActionListener.wrap(deleteResponse -> { - deleteModelState(jobId, client, deleteStateHandler); - }, - failureHandler - ); + ActionListener deleteQuantilesHandler = ActionListener.wrap( + deleteResponse -> deleteModelState(jobId, client, deleteStateHandler), + failureHandler); // Step 2. DBQ done, delete the state // ------- ActionListener dbqHandler = ActionListener.wrap(bulkByScrollResponse -> { if (bulkByScrollResponse.isTimedOut()) { - logger.warn("DeleteByQuery for indices [" + indexName + ", " + indexPattern + "] timed out."); + logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName, indexPattern); } if (!bulkByScrollResponse.getBulkFailures().isEmpty()) { - logger.warn("[" + bulkByScrollResponse.getBulkFailures().size() - + "] failures encountered while running DeleteByQuery on indices [" + indexName + ", " - + indexPattern + "]. "); + logger.warn("[{}] {} failures encountered while running DeleteByQuery on indices [{}, {}].", + jobId, bulkByScrollResponse.getBulkFailures().size(), indexName, indexPattern); } deleteQuantiles(jobId, client, deleteQuantilesHandler); }, @@ -131,7 +101,7 @@ public class JobStorageDeletionTask extends Task { client.execute(MlDeleteByQueryAction.INSTANCE, request, dbqHandler); } - public void deleteQuantiles(String jobId, Client client, ActionListener finishedHandler) { + private void deleteQuantiles(String jobId, Client client, ActionListener finishedHandler) { client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), Quantiles.TYPE.getPreferredName(), Quantiles.documentId(jobId)) .execute(finishedHandler); } @@ -170,9 +140,26 @@ public class JobStorageDeletionTask extends Task { @Override public void onFailure(Exception e) { - logger.error("Failed to delete categorizer state for job [" + jobId + "]", e); - finishedHandler.onResponse(false); + logger.error("[" + jobId + "] Failed to delete categorizer state for job.", e); + finishedHandler.onFailure(e); } }); } + + private void deleteAlias(String jobId, String aliasName, String indexName, Client client, ActionListener finishedHandler ) { + IndicesAliasesRequest request = new IndicesAliasesRequest() + .addAliasAction(IndicesAliasesRequest.AliasActions.remove().alias(aliasName).index(indexName)); + client.admin().indices().aliases(request, ActionListener.wrap( + response -> finishedHandler.onResponse(true), + e -> { + if (e instanceof AliasesNotFoundException || e instanceof IndexNotFoundException) { + logger.warn("[{}] Alias [{}] not found. Continuing to delete job.", jobId, aliasName); + finishedHandler.onResponse(true); + } else { + // all other exceptions should die + logger.error("[" + jobId + "] Failed to delete alias [" + aliasName + "].", e); + finishedHandler.onFailure(e); + } + })); + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index 1c3b83f224d..d8c5466a1cb 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -470,6 +470,29 @@ public class MlJobIT extends ESRestTestCase { client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")); } + public void testDeleteJobAfterMissingAlias() throws Exception { + String jobId = "foo"; + String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT; + createFarequoteJob(jobId); + + Response response = client().performRequest("get", "_cat/aliases"); + assertEquals(200, response.getStatusLine().getStatusCode()); + String responseAsString = responseEntityToString(response); + assertThat(responseAsString, containsString(aliasName)); + + // Manually delete the alias so that we can test that deletion proceeds + // normally anyway + response = client().performRequest("delete", indexName + "/_alias/" + aliasName); + assertEquals(200, response.getStatusLine().getStatusCode()); + + // check alias was deleted + expectThrows(ResponseException.class, () -> client().performRequest("get", "_cat/aliases")); + + response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + } + public void testMultiIndexDelete() throws Exception { String jobId = "foo"; String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT;