[ML] Delete model snapshot and state before results (elastic/x-pack-elasticsearch#1066)

Original commit: elastic/x-pack-elasticsearch@65152e9735
This commit is contained in:
David Kyle 2017-04-12 16:07:09 +01:00 committed by GitHub
parent f72967eb7f
commit 647c3aef53
1 changed files with 31 additions and 33 deletions

View File

@ -56,25 +56,9 @@ public class JobStorageDeletionTask extends Task {
ActionListener<Boolean> deleteAliasHandler = ActionListener.wrap(finishedHandler, failureHandler);
// Step 5. Delete categorizer state done, delete the alias
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
bulkItemResponses -> deleteAlias(jobId, aliasName, indexName, client, deleteAliasHandler),
failureHandler);
// Step 4. Delete model state done, delete the categorizer state
ActionListener<BulkResponse> deleteStateHandler = ActionListener.wrap(
response -> deleteCategorizerState(jobId, client, deleteCategorizerStateHandler),
failureHandler);
// Step 3. Delete quantiles done, delete the model state
ActionListener<DeleteResponse> deleteQuantilesHandler = ActionListener.wrap(
deleteResponse -> deleteModelState(jobId, client, deleteStateHandler),
failureHandler);
// Step 2. DBQ done, delete the state
// -------
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(bulkByScrollResponse -> {
// Step 5. DBQ state done, delete the alias
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(
bulkByScrollResponse -> {
if (bulkByScrollResponse.isTimedOut()) {
logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName, indexPattern);
}
@ -82,23 +66,38 @@ public class JobStorageDeletionTask extends Task {
logger.warn("[{}] {} failures encountered while running DeleteByQuery on indices [{}, {}].",
jobId, bulkByScrollResponse.getBulkFailures().size(), indexName, indexPattern);
}
deleteQuantiles(jobId, client, deleteQuantilesHandler);
deleteAlias(jobId, aliasName, indexName, client, deleteAliasHandler);
},
failureHandler
);
failureHandler);
// Step 4. Delete categoriser state done, DeleteByQuery on the index, matching all docs with the right job_id
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
response -> {
logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]");
SearchRequest searchRequest = new SearchRequest(indexName, indexPattern);
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
ConstantScoreQueryBuilder query =
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
searchRequest.source(new SearchSourceBuilder().query(query));
searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setSlices(5);
// Step 1. DeleteByQuery on the index, matching all docs with the right job_id
// -------
logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]");
SearchRequest searchRequest = new SearchRequest(indexName, indexPattern);
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
searchRequest.source(new SearchSourceBuilder().query(query));
searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setSlices(5);
client.execute(XPackDeleteByQueryAction.INSTANCE, request, dbqHandler);
},
failureHandler);
client.execute(XPackDeleteByQueryAction.INSTANCE, request, dbqHandler);
// Step 3. Delete quantiles done, delete the categorizer state
ActionListener<DeleteResponse> deleteQuantilesHandler = ActionListener.wrap(
response -> deleteCategorizerState(jobId, client, deleteCategorizerStateHandler),
failureHandler);
// Step 2. Delete state done, delete the quantiles
ActionListener<BulkResponse> deleteStateHandler = ActionListener.wrap(
bulkResponse -> deleteQuantiles(jobId, client, deleteQuantilesHandler),
failureHandler);
// Step 1. Delete the model state
deleteModelState(jobId, client, deleteStateHandler);
}
private void deleteQuantiles(String jobId, Client client, ActionListener<DeleteResponse> finishedHandler) {
@ -107,7 +106,6 @@ public class JobStorageDeletionTask extends Task {
}
private void deleteModelState(String jobId, Client client, ActionListener<BulkResponse> listener) {
JobProvider jobProvider = new JobProvider(client, Settings.EMPTY);
jobProvider.modelSnapshots(jobId, 0, 10000,
page -> {