[ML] Avoid stack traces in the log when deleting jobs (elastic/x-pack-elasticsearch#1615)
Now we've set the option for one type per index it causes a stack trace in to be logged if we issue a request to delete two documents with different types. We only do this to cover the case of documents left over from v5.4. We can avoid it by deleting by query using just the document IDs. Original commit: elastic/x-pack-elasticsearch@2abffc7d95
This commit is contained in:
parent
68ad44c5c2
commit
bf068e9ec3
|
@ -9,8 +9,8 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.index.query.IdsQueryBuilder;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryAction;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
|
@ -24,7 +24,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
|
||||
import org.elasticsearch.index.query.TermQueryBuilder;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
@ -73,7 +72,7 @@ public class JobStorageDeletionTask extends Task {
|
|||
},
|
||||
failureHandler);
|
||||
|
||||
// Step 4. Delete categoriser state done, DeleteByQuery on the index, matching all docs with the right job_id
|
||||
// Step 4. Delete categorizer state done, DeleteByQuery on the index, matching all docs with the right job_id
|
||||
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
|
||||
response -> {
|
||||
logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]");
|
||||
|
@ -106,22 +105,27 @@ public class JobStorageDeletionTask extends Task {
|
|||
}
|
||||
|
||||
private void deleteQuantiles(String jobId, Client client, ActionListener<Boolean> finishedHandler) {
|
||||
// The quantiles type and doc Id changed in v5.5 so delete both the old and new format
|
||||
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
|
||||
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE,
|
||||
Quantiles.documentId(jobId)));
|
||||
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), Quantiles.TYPE.getPreferredName(),
|
||||
Quantiles.v54DocumentId(jobId)));
|
||||
bulkRequestBuilder.execute(ActionListener.wrap(
|
||||
response -> finishedHandler.onResponse(true),
|
||||
e -> {
|
||||
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
|
||||
if (e instanceof IndexNotFoundException) {
|
||||
finishedHandler.onResponse(true);
|
||||
} else {
|
||||
finishedHandler.onFailure(e);
|
||||
}
|
||||
}));
|
||||
// The quantiles type and doc ID changed in v5.5 so delete both the old and new format
|
||||
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexName());
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
|
||||
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
|
||||
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId),
|
||||
// TODO: remove in 7.0
|
||||
Quantiles.v54DocumentId(jobId));
|
||||
searchRequest.source(new SearchSourceBuilder().query(query));
|
||||
searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
|
||||
request.setAbortOnVersionConflict(false);
|
||||
|
||||
client.execute(DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
|
||||
response -> finishedHandler.onResponse(true),
|
||||
e -> {
|
||||
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
|
||||
if (e instanceof IndexNotFoundException) {
|
||||
finishedHandler.onResponse(true);
|
||||
} else {
|
||||
finishedHandler.onFailure(e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private void deleteModelState(String jobId, Client client, ActionListener<BulkResponse> listener) {
|
||||
|
@ -136,23 +140,25 @@ public class JobStorageDeletionTask extends Task {
|
|||
}
|
||||
|
||||
private void deleteCategorizerState(String jobId, Client client, int docNum, ActionListener<Boolean> finishedHandler) {
|
||||
// The categorizer state type and doc Id changed in v5.5 so delete both the old and new format
|
||||
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
|
||||
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE,
|
||||
CategorizerState.documentId(jobId, docNum)));
|
||||
// TODO: remove in 7.0
|
||||
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE,
|
||||
CategorizerState.v54DocumentId(jobId, docNum)));
|
||||
bulkRequestBuilder.execute(ActionListener.wrap(
|
||||
// The categorizer state type and doc ID changed in v5.5 so delete both the old and new format
|
||||
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexName());
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
|
||||
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
|
||||
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum),
|
||||
// TODO: remove in 7.0
|
||||
CategorizerState.v54DocumentId(jobId, docNum));
|
||||
searchRequest.source(new SearchSourceBuilder().query(query));
|
||||
searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
|
||||
request.setAbortOnVersionConflict(false);
|
||||
|
||||
client.execute(DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
|
||||
response -> {
|
||||
// If we successfully deleted either document try the next one; if not we're done
|
||||
for (BulkItemResponse item : response.getItems()) {
|
||||
if (item.status() == RestStatus.OK) {
|
||||
// There's an assumption here that there won't be very many categorizer
|
||||
// state documents, so the recursion won't go more than, say, 5 levels deep
|
||||
deleteCategorizerState(jobId, client, docNum + 1, finishedHandler);
|
||||
return;
|
||||
}
|
||||
// If we successfully deleted a document try the next one; if not we're done
|
||||
if (response.getDeleted() > 0) {
|
||||
// There's an assumption here that there won't be very many categorizer
|
||||
// state documents, so the recursion won't go more than, say, 5 levels deep
|
||||
deleteCategorizerState(jobId, client, docNum + 1, finishedHandler);
|
||||
return;
|
||||
}
|
||||
finishedHandler.onResponse(true);
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue