[ML] Fix deleteCategorizerState() by using delete by query instead of individual blocking delete requests.

Original commit: elastic/x-pack-elasticsearch@f28cc9fc5b
This commit is contained in:
Martijn van Groningen 2017-03-17 11:54:31 +01:00
parent a7d42f3732
commit 9d4e3efad4
1 changed files with 27 additions and 20 deletions

View File

@ -7,11 +7,11 @@ package org.elasticsearch.xpack.ml.job.persistence;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
@ -21,12 +21,13 @@ import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.PrefixQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.ml.action.MlDeleteByQueryAction;
import org.elasticsearch.xpack.ml.job.config.Job;
@ -34,7 +35,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import java.util.List;
import java.util.function.Consumer;
@ -84,20 +84,20 @@ public class JobStorageDeletionTask extends Task {
}
}));
},
failureHandler::accept);
failureHandler);
// Step 4. Delete model state done, delete the categorizer state
ActionListener<BulkResponse> deleteStateHandler = ActionListener.wrap(response -> {
deleteCategorizerState(jobId, client, deleteCategorizerStateHandler);
},
failureHandler::accept
failureHandler
);
// Step 3. Delete quantiles done, delete the model state
ActionListener<DeleteResponse> deleteQuantilesHandler = ActionListener.wrap(deleteResponse -> {
deleteModelState(jobId, client, deleteStateHandler);
},
failureHandler::accept
failureHandler
);
@ -115,7 +115,7 @@ public class JobStorageDeletionTask extends Task {
}
deleteQuantiles(jobId, client, deleteQuantilesHandler);
},
failureHandler::accept
failureHandler
);
@ -156,17 +156,24 @@ public class JobStorageDeletionTask extends Task {
}
private void deleteCategorizerState(String jobId, Client client, ActionListener<Boolean> finishedHandler) {
int docNum = 0;
while (true) {
String docId = CategorizerState.categorizerStateDocId(jobId, ++docNum);
DeleteRequest deleteRequest =
client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, docId).request();
DeleteResponse response = client.delete(deleteRequest).actionGet();
if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) {
break;
}
}
SearchRequest searchRequest = new SearchRequest();
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
PrefixQueryBuilder query = new PrefixQueryBuilder(UidFieldMapper.NAME, Uid.createUid(CategorizerState.TYPE, jobId));
searchRequest.source(new SearchSourceBuilder().query(query));
searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setSlices(5);
client.execute(MlDeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
finishedHandler.onResponse(true);
}
@Override
public void onFailure(Exception e) {
logger.error("Failed to delete categorizer state for job [" + jobId + "]", e);
finishedHandler.onResponse(false);
}
});
}
}