Delete custom index if the only contained job is deleted (#33788)
* Delete custom index if the only contained job is deleted
This commit is contained in:
parent
c118581617
commit
4190a9f1e9
|
@ -11,8 +11,12 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -23,10 +27,12 @@ import org.elasticsearch.common.logging.Loggers;
|
|||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
|
||||
import org.elasticsearch.index.query.IdsQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.query.TermQueryBuilder;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryAction;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
|
||||
|
@ -69,11 +75,16 @@ public class JobStorageDeletionTask extends Task {
|
|||
final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(state, jobId);
|
||||
final String indexPattern = indexName + "-*";
|
||||
|
||||
ActionListener<Boolean> deleteAliasHandler = ActionListener.wrap(finishedHandler, failureHandler);
|
||||
final ActionListener<AcknowledgedResponse> completionHandler = ActionListener.wrap(
|
||||
response -> finishedHandler.accept(response.isAcknowledged()),
|
||||
failureHandler);
|
||||
|
||||
// Step 5. DBQ state done, delete the aliases
|
||||
// Step 7. If we did not drop the index and after DBQ state done, we delete the aliases
|
||||
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(
|
||||
bulkByScrollResponse -> {
|
||||
if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume Index was deleted
|
||||
completionHandler.onResponse(new AcknowledgedResponse(true));
|
||||
} else {
|
||||
if (bulkByScrollResponse.isTimedOut()) {
|
||||
logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName, indexPattern);
|
||||
}
|
||||
|
@ -85,13 +96,15 @@ public class JobStorageDeletionTask extends Task {
|
|||
logger.warn("DBQ failure: " + failure);
|
||||
}
|
||||
}
|
||||
deleteAliases(jobId, client, deleteAliasHandler);
|
||||
deleteAliases(jobId, client, completionHandler);
|
||||
}
|
||||
},
|
||||
failureHandler);
|
||||
|
||||
// Step 4. Delete categorizer state done, DeleteByQuery on the index, matching all docs with the right job_id
|
||||
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
|
||||
// Step 6. If we did not delete the index, we run a delete by query
|
||||
ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
|
||||
response -> {
|
||||
if (response) {
|
||||
logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]");
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName, indexPattern);
|
||||
ConstantScoreQueryBuilder query =
|
||||
|
@ -103,9 +116,62 @@ public class JobStorageDeletionTask extends Task {
|
|||
request.setRefresh(true);
|
||||
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler);
|
||||
} else { // We did not execute DBQ, no need to delete aliases or check the response
|
||||
dbqHandler.onResponse(null);
|
||||
}
|
||||
},
|
||||
failureHandler);
|
||||
|
||||
// Step 5. If we have any hits, that means we are NOT the only job on this index, and should not delete it
|
||||
// if we do not have any hits, we can drop the index and then skip the DBQ and alias deletion
|
||||
ActionListener<SearchResponse> customIndexSearchHandler = ActionListener.wrap(
|
||||
searchResponse -> {
|
||||
if (searchResponse == null || searchResponse.getHits().totalHits > 0) {
|
||||
deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion
|
||||
} else {
|
||||
logger.info("Running DELETE Index on [" + indexName + "] for job [" + jobId + "]");
|
||||
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
|
||||
request.indicesOptions(IndicesOptions.lenientExpandOpen());
|
||||
// If we have deleted the index, then we don't need to delete the aliases or run the DBQ
|
||||
executeAsyncWithOrigin(
|
||||
client.threadPool().getThreadContext(),
|
||||
ML_ORIGIN,
|
||||
request,
|
||||
ActionListener.<AcknowledgedResponse>wrap(
|
||||
response -> deleteByQueryExecutor.onResponse(false), // skip DBQ && Alias
|
||||
failureHandler),
|
||||
client.admin().indices()::delete);
|
||||
}
|
||||
},
|
||||
failure -> {
|
||||
if (failure.getClass() == IndexNotFoundException.class) { // assume the index is already deleted
|
||||
deleteByQueryExecutor.onResponse(false); // skip DBQ && Alias
|
||||
} else {
|
||||
failureHandler.accept(failure);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// Step 4. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases
|
||||
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
|
||||
response -> {
|
||||
if (indexName.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
|
||||
AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) {
|
||||
customIndexSearchHandler.onResponse(null); //don't bother searching the index any further, we are on the default shared
|
||||
} else {
|
||||
SearchSourceBuilder source = new SearchSourceBuilder()
|
||||
.size(1)
|
||||
.query(QueryBuilders.boolQuery().filter(
|
||||
QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))));
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(indexName);
|
||||
searchRequest.source(source);
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler);
|
||||
}
|
||||
},
|
||||
failureHandler
|
||||
);
|
||||
|
||||
// Step 3. Delete quantiles done, delete the categorizer state
|
||||
ActionListener<Boolean> deleteQuantilesHandler = ActionListener.wrap(
|
||||
response -> deleteCategorizerState(jobId, client, 1, deleteCategorizerStateHandler),
|
||||
|
@ -189,7 +255,7 @@ public class JobStorageDeletionTask extends Task {
|
|||
}));
|
||||
}
|
||||
|
||||
private void deleteAliases(String jobId, Client client, ActionListener<Boolean> finishedHandler) {
|
||||
private void deleteAliases(String jobId, Client client, ActionListener<AcknowledgedResponse> finishedHandler) {
|
||||
final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
||||
final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
|
||||
|
||||
|
@ -204,11 +270,12 @@ public class JobStorageDeletionTask extends Task {
|
|||
if (removeRequest == null) {
|
||||
// don't error if the job's aliases have already been deleted - carry on and delete the
|
||||
// rest of the job's data
|
||||
finishedHandler.onResponse(true);
|
||||
finishedHandler.onResponse(new AcknowledgedResponse(true));
|
||||
return;
|
||||
}
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, removeRequest,
|
||||
ActionListener.<AcknowledgedResponse>wrap(removeResponse -> finishedHandler.onResponse(true),
|
||||
ActionListener.<AcknowledgedResponse>wrap(
|
||||
finishedHandler::onResponse,
|
||||
finishedHandler::onFailure),
|
||||
client.admin().indices()::aliases);
|
||||
},
|
||||
|
|
|
@ -192,6 +192,7 @@ public class MlJobIT extends ESRestTestCase {
|
|||
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId1))));
|
||||
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId2))));
|
||||
|
||||
{ //create jobId1 docs
|
||||
String id = String.format(Locale.ROOT, "%s_bucket_%s_%s", jobId1, "1234", 300);
|
||||
Request createResultRequest = new Request("PUT", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/doc/" + id);
|
||||
createResultRequest.setJsonEntity(String.format(Locale.ROOT,
|
||||
|
@ -215,10 +216,36 @@ public class MlJobIT extends ESRestTestCase {
|
|||
responseAsString = EntityUtils.toString(client().performRequest(
|
||||
new Request("GET", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/_search")).getEntity());
|
||||
assertThat(responseAsString, containsString("\"total\":2"));
|
||||
}
|
||||
{ //create jobId2 docs
|
||||
String id = String.format(Locale.ROOT, "%s_bucket_%s_%s", jobId2, "1234", 300);
|
||||
Request createResultRequest = new Request("PUT", AnomalyDetectorsIndex.jobResultsAliasedName(jobId2) + "/doc/" + id);
|
||||
createResultRequest.setJsonEntity(String.format(Locale.ROOT,
|
||||
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"result_type\":\"bucket\", \"bucket_span\": \"%s\"}",
|
||||
jobId2, "1234", 1));
|
||||
client().performRequest(createResultRequest);
|
||||
|
||||
id = String.format(Locale.ROOT, "%s_bucket_%s_%s", jobId2, "1236", 300);
|
||||
createResultRequest = new Request("PUT", AnomalyDetectorsIndex.jobResultsAliasedName(jobId2) + "/doc/" + id);
|
||||
createResultRequest.setJsonEntity(String.format(Locale.ROOT,
|
||||
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"result_type\":\"bucket\", \"bucket_span\": \"%s\"}",
|
||||
jobId2, "1236", 1));
|
||||
client().performRequest(createResultRequest);
|
||||
|
||||
client().performRequest(new Request("POST", "/_refresh"));
|
||||
|
||||
responseAsString = EntityUtils.toString(client().performRequest(
|
||||
new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2 + "/results/buckets")).getEntity());
|
||||
assertThat(responseAsString, containsString("\"count\":2"));
|
||||
|
||||
responseAsString = EntityUtils.toString(client().performRequest(
|
||||
new Request("GET", AnomalyDetectorsIndex.jobResultsAliasedName(jobId2) + "/_search")).getEntity());
|
||||
assertThat(responseAsString, containsString("\"total\":2"));
|
||||
}
|
||||
|
||||
client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1));
|
||||
|
||||
// check that indices still exist, but are empty and aliases are gone
|
||||
// check that indices still exist, but no longer have job1 entries and aliases are gone
|
||||
responseAsString = EntityUtils.toString(client().performRequest(new Request("GET", "/_aliases")).getEntity());
|
||||
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId1))));
|
||||
assertThat(responseAsString, containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId2))); //job2 still exists
|
||||
|
@ -230,7 +257,16 @@ public class MlJobIT extends ESRestTestCase {
|
|||
|
||||
responseAsString = EntityUtils.toString(client().performRequest(
|
||||
new Request("GET", AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-" + indexName + "/_count")).getEntity());
|
||||
assertThat(responseAsString, containsString("\"count\":0"));
|
||||
assertThat(responseAsString, containsString("\"count\":2"));
|
||||
|
||||
// Delete the second job and verify aliases are gone, and original concrete/custom index is gone
|
||||
client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2));
|
||||
responseAsString = EntityUtils.toString(client().performRequest(new Request("GET", "/_aliases")).getEntity());
|
||||
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId2))));
|
||||
|
||||
client().performRequest(new Request("POST", "/_refresh"));
|
||||
responseAsString = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
|
||||
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-" + indexName)));
|
||||
}
|
||||
|
||||
public void testCreateJobInSharedIndexUpdatesMapping() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue