[ML] Fix bug deleting job with missing alias (elastic/x-pack-elasticsearch#850)

Original commit: elastic/x-pack-elasticsearch@44fd88a834
This commit is contained in:
David Kyle 2017-03-27 17:47:29 +01:00 committed by GitHub
parent f3c4ec8a81
commit 9b4d399fc3
2 changed files with 55 additions and 45 deletions

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.job.persistence;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
@ -26,6 +25,7 @@ import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.WildcardQueryBuilder;
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
@ -54,63 +54,33 @@ public class JobStorageDeletionTask extends Task {
final String indexPattern = indexName + "-*";
final String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
CheckedConsumer<IndicesAliasesResponse, Exception> deleteAliasHandler = indicesAliasesResponse -> {
if (!indicesAliasesResponse.isAcknowledged()) {
logger.warn("Delete Alias request not acknowledged for alias [" + aliasName + "].");
} else {
logger.info("Done deleting alias [" + aliasName + "]");
}
finishedHandler.accept(true);
};
ActionListener<Boolean> deleteAliasHandler = ActionListener.wrap(finishedHandler, failureHandler);
// Step 5. Delete categorizer state done, delete the alias
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
bulkItemResponses -> {
IndicesAliasesRequest request = new IndicesAliasesRequest()
.addAliasAction(IndicesAliasesRequest.AliasActions.remove().alias(aliasName).index(indexName));
client.admin().indices().aliases(request, ActionListener.wrap(deleteAliasHandler,
e -> {
if (e instanceof IndexNotFoundException) {
logger.warn("Alias [" + aliasName + "] not found. Continuing to delete job.");
try {
finishedHandler.accept(false);
} catch (Exception e1) {
failureHandler.accept(e1);
}
} else {
// all other exceptions should die
failureHandler.accept(e);
}
}));
},
bulkItemResponses -> deleteAlias(jobId, aliasName, indexName, client, deleteAliasHandler),
failureHandler);
// Step 4. Delete model state done, delete the categorizer state
ActionListener<BulkResponse> deleteStateHandler = ActionListener.wrap(response -> {
deleteCategorizerState(jobId, client, deleteCategorizerStateHandler);
},
failureHandler
);
ActionListener<BulkResponse> deleteStateHandler = ActionListener.wrap(
response -> deleteCategorizerState(jobId, client, deleteCategorizerStateHandler),
failureHandler);
// Step 3. Delete quantiles done, delete the model state
ActionListener<DeleteResponse> deleteQuantilesHandler = ActionListener.wrap(deleteResponse -> {
deleteModelState(jobId, client, deleteStateHandler);
},
failureHandler
);
ActionListener<DeleteResponse> deleteQuantilesHandler = ActionListener.wrap(
deleteResponse -> deleteModelState(jobId, client, deleteStateHandler),
failureHandler);
// Step 2. DBQ done, delete the state
// -------
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(bulkByScrollResponse -> {
if (bulkByScrollResponse.isTimedOut()) {
logger.warn("DeleteByQuery for indices [" + indexName + ", " + indexPattern + "] timed out.");
logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName, indexPattern);
}
if (!bulkByScrollResponse.getBulkFailures().isEmpty()) {
logger.warn("[" + bulkByScrollResponse.getBulkFailures().size()
+ "] failures encountered while running DeleteByQuery on indices [" + indexName + ", "
+ indexPattern + "]. ");
logger.warn("[{}] {} failures encountered while running DeleteByQuery on indices [{}, {}].",
jobId, bulkByScrollResponse.getBulkFailures().size(), indexName, indexPattern);
}
deleteQuantiles(jobId, client, deleteQuantilesHandler);
},
@ -131,7 +101,7 @@ public class JobStorageDeletionTask extends Task {
client.execute(MlDeleteByQueryAction.INSTANCE, request, dbqHandler);
}
public void deleteQuantiles(String jobId, Client client, ActionListener<DeleteResponse> finishedHandler) {
private void deleteQuantiles(String jobId, Client client, ActionListener<DeleteResponse> finishedHandler) {
client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), Quantiles.TYPE.getPreferredName(), Quantiles.documentId(jobId))
.execute(finishedHandler);
}
@ -170,9 +140,26 @@ public class JobStorageDeletionTask extends Task {
@Override
public void onFailure(Exception e) {
logger.error("Failed to delete categorizer state for job [" + jobId + "]", e);
finishedHandler.onResponse(false);
logger.error("[" + jobId + "] Failed to delete categorizer state for job.", e);
finishedHandler.onFailure(e);
}
});
}
private void deleteAlias(String jobId, String aliasName, String indexName, Client client, ActionListener<Boolean> finishedHandler ) {
IndicesAliasesRequest request = new IndicesAliasesRequest()
.addAliasAction(IndicesAliasesRequest.AliasActions.remove().alias(aliasName).index(indexName));
client.admin().indices().aliases(request, ActionListener.wrap(
response -> finishedHandler.onResponse(true),
e -> {
if (e instanceof AliasesNotFoundException || e instanceof IndexNotFoundException) {
logger.warn("[{}] Alias [{}] not found. Continuing to delete job.", jobId, aliasName);
finishedHandler.onResponse(true);
} else {
// all other exceptions should die
logger.error("[" + jobId + "] Failed to delete alias [" + aliasName + "].", e);
finishedHandler.onFailure(e);
}
}));
}
}

View File

@ -470,6 +470,29 @@ public class MlJobIT extends ESRestTestCase {
client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
}
public void testDeleteJobAfterMissingAlias() throws Exception {
String jobId = "foo";
String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT;
createFarequoteJob(jobId);
Response response = client().performRequest("get", "_cat/aliases");
assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(aliasName));
// Manually delete the alias so that we can test that deletion proceeds
// normally anyway
response = client().performRequest("delete", indexName + "/_alias/" + aliasName);
assertEquals(200, response.getStatusLine().getStatusCode());
// check alias was deleted
expectThrows(ResponseException.class, () -> client().performRequest("get", "_cat/aliases"));
response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
}
public void testMultiIndexDelete() throws Exception {
String jobId = "foo";
String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT;