From dc07b593b731085b8c46e311e560521ad4241d28 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Thu, 9 Feb 2017 15:37:37 -0500 Subject: [PATCH] [ML] Support job deletion from multiple indices (elastic/elasticsearch#4918) This extends the DBQ to delete from a pattern, rather than a specific index. Once shared/rollover indices are implemented, this will be capable of purging results from the matching set. Original commit: elastic/x-pack-elasticsearch@4ec094417397e650750aeafc62360e83ac988b1e --- .../xpack/ml/job/JobManager.java | 3 +- .../persistence/AnomalyDetectorsIndex.java | 13 ++++ .../persistence/JobStorageDeletionTask.java | 13 +++- .../xpack/ml/integration/MlJobIT.java | 76 +++++++++++++++++++ 4 files changed, 100 insertions(+), 5 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 053ae7a8674..bef433f2abd 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -278,7 +278,8 @@ public class JobManager extends AbstractComponent { } // This task manages the physical deletion of the job (removing the results, then the index) - task.delete(jobId, indexName, client, deleteJobStateHandler::accept, actionListener::onFailure); + task.delete(jobId, client, clusterService.state(), + deleteJobStateHandler::accept, actionListener::onFailure); }; // Step 0. Kick off the chain of callbacks with the initial UpdateStatus call diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java index 37fc8cf552a..4787f652e7d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java @@ -5,6 +5,9 @@ */ package org.elasticsearch.xpack.ml.job.persistence; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; + /** * Methods for handling index naming related functions */ @@ -24,6 +27,16 @@ public final class AnomalyDetectorsIndex { return RESULTS_INDEX_PREFIX + jobId; } + /** + * The default index pattern for rollover index results + * @param jobId Job Id + * @return The index name + */ + public static String getCurrentResultsIndex(ClusterState state, String jobId) { + MlMetadata meta = state.getMetaData().custom(MlMetadata.TYPE); + return RESULTS_INDEX_PREFIX + meta.getJobs().get(jobId).getIndexName(); + } + /** * The name of the default index where a job's state is stored * @return The index name diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java index f0307348286..f5a2811d179 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.index.IndexNotFoundException; @@ -32,19 +33,23 @@ public class JobStorageDeletionTask extends Task { this.logger = Loggers.getLogger(getClass()); } - public void delete(String jobId, String indexName, Client client, + public void delete(String jobId, Client client, ClusterState state, CheckedConsumer finishedHandler, Consumer failureHandler) { + String indexName = AnomalyDetectorsIndex.getCurrentResultsIndex(state, jobId); + String indexPattern = indexName + "-*"; + // Step 2. Regardless of if the DBQ succeeds, we delete the physical index // ------- + // TODO this will be removed once shared indices are used CheckedConsumer dbqHandler = bulkByScrollResponse -> { if (bulkByScrollResponse.isTimedOut()) { - logger.warn("DeleteByQuery for index [" + indexName + "] timed out. Continuing to delete index."); + logger.warn("DeleteByQuery for index [" + indexPattern + "] timed out. Continuing to delete index."); } if (!bulkByScrollResponse.getBulkFailures().isEmpty()) { logger.warn("[" + bulkByScrollResponse.getBulkFailures().size() - + "] failures encountered while running DeleteByQuery on index [" + indexName + "]. " + + "] failures encountered while running DeleteByQuery on index [" + indexPattern + "]. " + "Continuing to delete index"); } @@ -63,7 +68,7 @@ public class JobStorageDeletionTask extends Task { // Step 1. DeleteByQuery on the index, matching all docs with the right job_id // ------- - SearchRequest searchRequest = new SearchRequest(indexName); + SearchRequest searchRequest = new SearchRequest(indexPattern); searchRequest.source(new SearchSourceBuilder().query(new TermQueryBuilder("job_id", jobId))); DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); request.setSlices(5); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index 5140d710eca..dfb1a92a176 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -316,6 +316,82 @@ public class MlJobIT extends ESRestTestCase { client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")); } + public void testMultiIndexDelete() throws Exception { + String jobId = "foo"; + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + createFarequoteJob(jobId); + + Response response = client().performRequest("put", indexName + "-001"); + assertEquals(200, response.getStatusLine().getStatusCode()); + + response = client().performRequest("put", indexName + "-002"); + assertEquals(200, response.getStatusLine().getStatusCode()); + + response = client().performRequest("get", "_cat/indices"); + assertEquals(200, response.getStatusLine().getStatusCode()); + String responseAsString = responseEntityToString(response); + assertThat(responseAsString, containsString(indexName)); + assertThat(responseAsString, containsString(indexName + "-001")); + assertThat(responseAsString, containsString(indexName + "-002")); + + // Add some documents to each index to make sure the DBQ clears them out + String recordResult = + String.format(Locale.ROOT, + "{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"sequence_num\": %d, \"result_type\":\"record\"}", + jobId, 123, 1, 1); + client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + 123, + Collections.singletonMap("refresh", "true"), new StringEntity(recordResult)); + client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/result/" + 123, + Collections.singletonMap("refresh", "true"), new StringEntity(recordResult)); + client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/result/" + 123, + Collections.singletonMap("refresh", "true"), new StringEntity(recordResult)); + + + client().performRequest("post", "_refresh"); + + // check for the documents + response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/_count"); + assertEquals(200, response.getStatusLine().getStatusCode()); + responseAsString = responseEntityToString(response); + assertThat(responseAsString, containsString("\"count\":1")); + + response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/_count"); + assertEquals(200, response.getStatusLine().getStatusCode()); + responseAsString = responseEntityToString(response); + assertThat(responseAsString, containsString("\"count\":1")); + + response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/_count"); + assertEquals(200, response.getStatusLine().getStatusCode()); + responseAsString = responseEntityToString(response); + assertThat(responseAsString, containsString("\"count\":1")); + + // Delete + response = client().performRequest("delete", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + + client().performRequest("post", "_refresh"); + + // check index was deleted + response = client().performRequest("get", "_cat/indices"); + assertEquals(200, response.getStatusLine().getStatusCode()); + responseAsString = responseEntityToString(response); + assertThat(responseAsString, not(containsString("\t" + indexName + "\t"))); + + // The other two indices won't be deleted, but the DBQ should have cleared them out + response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/_count"); + assertEquals(200, response.getStatusLine().getStatusCode()); + responseAsString = responseEntityToString(response); + assertThat(responseAsString, containsString("\"count\":0")); + + response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/_count"); + assertEquals(200, response.getStatusLine().getStatusCode()); + responseAsString = responseEntityToString(response); + assertThat(responseAsString, containsString("\"count\":0")); + + expectThrows(ResponseException.class, () -> + client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")); + } + private Response addBucketResult(String jobId, String timestamp, long bucketSpan) throws Exception { try { client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId),