[ML] Support job deletion from multiple indices (elastic/elasticsearch#4918)

This extends the DBQ to delete from a pattern, rather than a specific index.  Once shared/rollover
indices are implemented, this will be capable of purging results from the matching set.

Original commit: elastic/x-pack-elasticsearch@4ec0944173
This commit is contained in:
Zachary Tong 2017-02-09 15:37:37 -05:00 committed by GitHub
parent 04a4c816bd
commit dc07b593b7
4 changed files with 100 additions and 5 deletions

View File

@ -278,7 +278,8 @@ public class JobManager extends AbstractComponent {
}
// This task manages the physical deletion of the job (removing the results, then the index)
task.delete(jobId, indexName, client, deleteJobStateHandler::accept, actionListener::onFailure);
task.delete(jobId, client, clusterService.state(),
deleteJobStateHandler::accept, actionListener::onFailure);
};
// Step 0. Kick off the chain of callbacks with the initial UpdateStatus call

View File

@ -5,6 +5,9 @@
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
/**
* Methods for handling index naming related functions
*/
@ -24,6 +27,16 @@ public final class AnomalyDetectorsIndex {
return RESULTS_INDEX_PREFIX + jobId;
}
/**
* The default index pattern for rollover index results
* @param jobId Job Id
* @return The index name
*/
public static String getCurrentResultsIndex(ClusterState state, String jobId) {
MlMetadata meta = state.getMetaData().custom(MlMetadata.TYPE);
return RESULTS_INDEX_PREFIX + meta.getJobs().get(jobId).getIndexName();
}
/**
* The name of the default index where a job's state is stored
* @return The index name

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.IndexNotFoundException;
@ -32,19 +33,23 @@ public class JobStorageDeletionTask extends Task {
this.logger = Loggers.getLogger(getClass());
}
public void delete(String jobId, String indexName, Client client,
public void delete(String jobId, Client client, ClusterState state,
CheckedConsumer<Boolean, Exception> finishedHandler,
Consumer<Exception> failureHandler) {
String indexName = AnomalyDetectorsIndex.getCurrentResultsIndex(state, jobId);
String indexPattern = indexName + "-*";
// Step 2. Regardless of if the DBQ succeeds, we delete the physical index
// -------
// TODO this will be removed once shared indices are used
CheckedConsumer<BulkByScrollResponse, Exception> dbqHandler = bulkByScrollResponse -> {
if (bulkByScrollResponse.isTimedOut()) {
logger.warn("DeleteByQuery for index [" + indexName + "] timed out. Continuing to delete index.");
logger.warn("DeleteByQuery for index [" + indexPattern + "] timed out. Continuing to delete index.");
}
if (!bulkByScrollResponse.getBulkFailures().isEmpty()) {
logger.warn("[" + bulkByScrollResponse.getBulkFailures().size()
+ "] failures encountered while running DeleteByQuery on index [" + indexName + "]. "
+ "] failures encountered while running DeleteByQuery on index [" + indexPattern + "]. "
+ "Continuing to delete index");
}
@ -63,7 +68,7 @@ public class JobStorageDeletionTask extends Task {
// Step 1. DeleteByQuery on the index, matching all docs with the right job_id
// -------
SearchRequest searchRequest = new SearchRequest(indexName);
SearchRequest searchRequest = new SearchRequest(indexPattern);
searchRequest.source(new SearchSourceBuilder().query(new TermQueryBuilder("job_id", jobId)));
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
request.setSlices(5);

View File

@ -316,6 +316,82 @@ public class MlJobIT extends ESRestTestCase {
client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
}
public void testMultiIndexDelete() throws Exception {
String jobId = "foo";
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
createFarequoteJob(jobId);
Response response = client().performRequest("put", indexName + "-001");
assertEquals(200, response.getStatusLine().getStatusCode());
response = client().performRequest("put", indexName + "-002");
assertEquals(200, response.getStatusLine().getStatusCode());
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(indexName));
assertThat(responseAsString, containsString(indexName + "-001"));
assertThat(responseAsString, containsString(indexName + "-002"));
// Add some documents to each index to make sure the DBQ clears them out
String recordResult =
String.format(Locale.ROOT,
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"sequence_num\": %d, \"result_type\":\"record\"}",
jobId, 123, 1, 1);
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + 123,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult));
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/result/" + 123,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult));
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/result/" + 123,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult));
client().performRequest("post", "_refresh");
// check for the documents
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":1"));
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":1"));
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":1"));
// Delete
response = client().performRequest("delete", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
client().performRequest("post", "_refresh");
// check index was deleted
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, not(containsString("\t" + indexName + "\t")));
// The other two indices won't be deleted, but the DBQ should have cleared them out
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":0"));
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":0"));
expectThrows(ResponseException.class, () ->
client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
}
private Response addBucketResult(String jobId, String timestamp, long bucketSpan) throws Exception {
try {
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId),