[ML] Refactor job deletion logic into the transport action (#33891)
The job deletion logic was scattered around a few places: the transport action, the job manager and the deletion task. Overloading the task with deletion logic also meant extra dependencies in the core package which should be unnecessary. This commit consolidates all this logic into the transport action and replaces the deletion task with a plain one that needs not be aware of deletion logic.
This commit is contained in:
parent
f963c29876
commit
8e3a0fad9d
|
@ -16,7 +16,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.JobStorageDeletionTask;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.JobDeletionTask;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -71,7 +71,7 @@ public class DeleteJobAction extends Action<AcknowledgedResponse> {
|
|||
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
|
||||
return new JobStorageDeletionTask(id, type, action, "delete-job-" + jobId, parentTaskId, headers);
|
||||
return new JobDeletionTask(id, type, action, "delete-job-" + jobId, parentTaskId, headers);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ml.job.persistence;
|
||||
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class JobDeletionTask extends Task {
|
||||
|
||||
public JobDeletionTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
|
||||
super(id, type, action, description, parentTask, headers);
|
||||
}
|
||||
}
|
|
@ -1,301 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ml.job.persistence;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
|
||||
import org.elasticsearch.index.query.IdsQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.query.TermQueryBuilder;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryAction;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.util.PageParams;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
|
||||
import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
||||
|
||||
/*
|
||||
Moving this class to plugin-core caused a *lot* of server side logic to be pulled in to plugin-core. This should be considered as needing
|
||||
refactoring to move it back to core. See DeleteJobAction for its use.
|
||||
*/
|
||||
public class JobStorageDeletionTask extends Task {
|
||||
|
||||
private static final int MAX_SNAPSHOTS_TO_DELETE = 10000;
|
||||
|
||||
private final Logger logger;
|
||||
|
||||
public JobStorageDeletionTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
|
||||
super(id, type, action, description, parentTask, headers);
|
||||
this.logger = Loggers.getLogger(getClass());
|
||||
}
|
||||
|
||||
public void delete(String jobId, Client client, ClusterState state,
|
||||
CheckedConsumer<Boolean, Exception> finishedHandler,
|
||||
Consumer<Exception> failureHandler) {
|
||||
|
||||
final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(state, jobId);
|
||||
final String indexPattern = indexName + "-*";
|
||||
|
||||
final ActionListener<AcknowledgedResponse> completionHandler = ActionListener.wrap(
|
||||
response -> finishedHandler.accept(response.isAcknowledged()),
|
||||
failureHandler);
|
||||
|
||||
// Step 7. If we did not drop the index and after DBQ state done, we delete the aliases
|
||||
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(
|
||||
bulkByScrollResponse -> {
|
||||
if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume Index was deleted
|
||||
completionHandler.onResponse(new AcknowledgedResponse(true));
|
||||
} else {
|
||||
if (bulkByScrollResponse.isTimedOut()) {
|
||||
logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName, indexPattern);
|
||||
}
|
||||
if (!bulkByScrollResponse.getBulkFailures().isEmpty()) {
|
||||
logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}, {}].",
|
||||
jobId, bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts(),
|
||||
indexName, indexPattern);
|
||||
for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
|
||||
logger.warn("DBQ failure: " + failure);
|
||||
}
|
||||
}
|
||||
deleteAliases(jobId, client, completionHandler);
|
||||
}
|
||||
},
|
||||
failureHandler);
|
||||
|
||||
// Step 6. If we did not delete the index, we run a delete by query
|
||||
ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
|
||||
response -> {
|
||||
if (response) {
|
||||
logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]");
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName, indexPattern);
|
||||
ConstantScoreQueryBuilder query =
|
||||
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
|
||||
request.setQuery(query);
|
||||
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
|
||||
request.setSlices(5);
|
||||
request.setAbortOnVersionConflict(false);
|
||||
request.setRefresh(true);
|
||||
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler);
|
||||
} else { // We did not execute DBQ, no need to delete aliases or check the response
|
||||
dbqHandler.onResponse(null);
|
||||
}
|
||||
},
|
||||
failureHandler);
|
||||
|
||||
// Step 5. If we have any hits, that means we are NOT the only job on this index, and should not delete it
|
||||
// if we do not have any hits, we can drop the index and then skip the DBQ and alias deletion
|
||||
ActionListener<SearchResponse> customIndexSearchHandler = ActionListener.wrap(
|
||||
searchResponse -> {
|
||||
if (searchResponse == null || searchResponse.getHits().totalHits > 0) {
|
||||
deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion
|
||||
} else {
|
||||
logger.info("Running DELETE Index on [" + indexName + "] for job [" + jobId + "]");
|
||||
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
|
||||
request.indicesOptions(IndicesOptions.lenientExpandOpen());
|
||||
// If we have deleted the index, then we don't need to delete the aliases or run the DBQ
|
||||
executeAsyncWithOrigin(
|
||||
client.threadPool().getThreadContext(),
|
||||
ML_ORIGIN,
|
||||
request,
|
||||
ActionListener.<AcknowledgedResponse>wrap(
|
||||
response -> deleteByQueryExecutor.onResponse(false), // skip DBQ && Alias
|
||||
failureHandler),
|
||||
client.admin().indices()::delete);
|
||||
}
|
||||
},
|
||||
failure -> {
|
||||
if (failure.getClass() == IndexNotFoundException.class) { // assume the index is already deleted
|
||||
deleteByQueryExecutor.onResponse(false); // skip DBQ && Alias
|
||||
} else {
|
||||
failureHandler.accept(failure);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// Step 4. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases
|
||||
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
|
||||
response -> {
|
||||
if (indexName.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
|
||||
AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) {
|
||||
customIndexSearchHandler.onResponse(null); //don't bother searching the index any further, we are on the default shared
|
||||
} else {
|
||||
SearchSourceBuilder source = new SearchSourceBuilder()
|
||||
.size(1)
|
||||
.query(QueryBuilders.boolQuery().filter(
|
||||
QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))));
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(indexName);
|
||||
searchRequest.source(source);
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler);
|
||||
}
|
||||
},
|
||||
failureHandler
|
||||
);
|
||||
|
||||
// Step 3. Delete quantiles done, delete the categorizer state
|
||||
ActionListener<Boolean> deleteQuantilesHandler = ActionListener.wrap(
|
||||
response -> deleteCategorizerState(jobId, client, 1, deleteCategorizerStateHandler),
|
||||
failureHandler);
|
||||
|
||||
// Step 2. Delete state done, delete the quantiles
|
||||
ActionListener<BulkResponse> deleteStateHandler = ActionListener.wrap(
|
||||
bulkResponse -> deleteQuantiles(jobId, client, deleteQuantilesHandler),
|
||||
failureHandler);
|
||||
|
||||
// Step 1. Delete the model state
|
||||
deleteModelState(jobId, client, deleteStateHandler);
|
||||
}
|
||||
|
||||
private void deleteQuantiles(String jobId, Client client, ActionListener<Boolean> finishedHandler) {
|
||||
// The quantiles type and doc ID changed in v5.5 so delete both the old and new format
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName());
|
||||
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
|
||||
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId),
|
||||
// TODO: remove in 7.0
|
||||
Quantiles.v54DocumentId(jobId));
|
||||
request.setQuery(query);
|
||||
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
|
||||
request.setAbortOnVersionConflict(false);
|
||||
request.setRefresh(true);
|
||||
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
|
||||
response -> finishedHandler.onResponse(true),
|
||||
e -> {
|
||||
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
|
||||
if (e instanceof IndexNotFoundException) {
|
||||
finishedHandler.onResponse(true);
|
||||
} else {
|
||||
finishedHandler.onFailure(e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private void deleteModelState(String jobId, Client client, ActionListener<BulkResponse> listener) {
|
||||
GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null);
|
||||
request.setPageParams(new PageParams(0, MAX_SNAPSHOTS_TO_DELETE));
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap(
|
||||
response -> {
|
||||
List<ModelSnapshot> deleteCandidates = response.getPage().results();
|
||||
JobDataDeleter deleter = new JobDataDeleter(client, jobId);
|
||||
deleter.deleteModelSnapshots(deleteCandidates, listener);
|
||||
},
|
||||
listener::onFailure));
|
||||
}
|
||||
|
||||
private void deleteCategorizerState(String jobId, Client client, int docNum, ActionListener<Boolean> finishedHandler) {
|
||||
// The categorizer state type and doc ID changed in v5.5 so delete both the old and new format
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName());
|
||||
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
|
||||
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum),
|
||||
// TODO: remove in 7.0
|
||||
CategorizerState.v54DocumentId(jobId, docNum));
|
||||
request.setQuery(query);
|
||||
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
|
||||
request.setAbortOnVersionConflict(false);
|
||||
request.setRefresh(true);
|
||||
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
|
||||
response -> {
|
||||
// If we successfully deleted a document try the next one; if not we're done
|
||||
if (response.getDeleted() > 0) {
|
||||
// There's an assumption here that there won't be very many categorizer
|
||||
// state documents, so the recursion won't go more than, say, 5 levels deep
|
||||
deleteCategorizerState(jobId, client, docNum + 1, finishedHandler);
|
||||
return;
|
||||
}
|
||||
finishedHandler.onResponse(true);
|
||||
},
|
||||
e -> {
|
||||
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
|
||||
if (e instanceof IndexNotFoundException) {
|
||||
finishedHandler.onResponse(true);
|
||||
} else {
|
||||
finishedHandler.onFailure(e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private void deleteAliases(String jobId, Client client, ActionListener<AcknowledgedResponse> finishedHandler) {
|
||||
final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
||||
final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
|
||||
|
||||
// first find the concrete indices associated with the aliases
|
||||
GetAliasesRequest aliasesRequest = new GetAliasesRequest().aliases(readAliasName, writeAliasName)
|
||||
.indicesOptions(IndicesOptions.lenientExpandOpen());
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, aliasesRequest,
|
||||
ActionListener.<GetAliasesResponse>wrap(
|
||||
getAliasesResponse -> {
|
||||
// remove the aliases from the concrete indices found in the first step
|
||||
IndicesAliasesRequest removeRequest = buildRemoveAliasesRequest(getAliasesResponse);
|
||||
if (removeRequest == null) {
|
||||
// don't error if the job's aliases have already been deleted - carry on and delete the
|
||||
// rest of the job's data
|
||||
finishedHandler.onResponse(new AcknowledgedResponse(true));
|
||||
return;
|
||||
}
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, removeRequest,
|
||||
ActionListener.<AcknowledgedResponse>wrap(
|
||||
finishedHandler::onResponse,
|
||||
finishedHandler::onFailure),
|
||||
client.admin().indices()::aliases);
|
||||
},
|
||||
finishedHandler::onFailure), client.admin().indices()::getAliases);
|
||||
}
|
||||
|
||||
private IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesResponse getAliasesResponse) {
|
||||
Set<String> aliases = new HashSet<>();
|
||||
List<String> indices = new ArrayList<>();
|
||||
for (ObjectObjectCursor<String, List<AliasMetaData>> entry : getAliasesResponse.getAliases()) {
|
||||
// The response includes _all_ indices, but only those associated with
|
||||
// the aliases we asked about will have associated AliasMetaData
|
||||
if (entry.value.isEmpty() == false) {
|
||||
indices.add(entry.key);
|
||||
entry.value.forEach(metadata -> aliases.add(metadata.getAlias()));
|
||||
}
|
||||
}
|
||||
return aliases.isEmpty() ? null : new IndicesAliasesRequest().addAliasAction(
|
||||
IndicesAliasesRequest.AliasActions.remove()
|
||||
.aliases(aliases.toArray(new String[aliases.size()]))
|
||||
.indices(indices.toArray(new String[indices.size()])));
|
||||
}
|
||||
}
|
|
@ -6,7 +6,6 @@
|
|||
package org.elasticsearch.xpack.core.ml.job.config;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
|
||||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
|
|
@ -5,27 +5,50 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.action;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
|
||||
import org.elasticsearch.index.query.IdsQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.query.TermQueryBuilder;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryAction;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -33,31 +56,51 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
|||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.core.ml.MlTasks;
|
||||
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.JobStorageDeletionTask;
|
||||
import org.elasticsearch.xpack.ml.job.JobManager;
|
||||
import org.elasticsearch.xpack.core.ml.action.util.PageParams;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
||||
|
||||
public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJobAction.Request, AcknowledgedResponse> {
|
||||
|
||||
private static final int MAX_SNAPSHOTS_TO_DELETE = 10000;
|
||||
|
||||
private final Client client;
|
||||
private final JobManager jobManager;
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
private final Auditor auditor;
|
||||
private final JobResultsProvider jobResultsProvider;
|
||||
|
||||
@Inject
|
||||
public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager,
|
||||
PersistentTasksService persistentTasksService, Client client) {
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
|
||||
Client client, Auditor auditor, JobResultsProvider jobResultsProvider) {
|
||||
super(settings, DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
indexNameExpressionResolver, DeleteJobAction.Request::new);
|
||||
this.client = client;
|
||||
this.jobManager = jobManager;
|
||||
this.persistentTasksService = persistentTasksService;
|
||||
this.auditor = auditor;
|
||||
this.jobResultsProvider = jobResultsProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -72,14 +115,14 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
|
||||
@Override
|
||||
protected void masterOperation(Task task, DeleteJobAction.Request request, ClusterState state,
|
||||
ActionListener<AcknowledgedResponse> listener) throws Exception {
|
||||
ActionListener<AcknowledgedResponse> listener) {
|
||||
|
||||
ActionListener<Boolean> markAsDeletingListener = ActionListener.wrap(
|
||||
response -> {
|
||||
if (request.isForce()) {
|
||||
forceDeleteJob(request, (JobStorageDeletionTask) task, listener);
|
||||
forceDeleteJob(request, listener);
|
||||
} else {
|
||||
normalDeleteJob(request, (JobStorageDeletionTask) task, listener);
|
||||
normalDeleteJob(request, listener);
|
||||
}
|
||||
},
|
||||
e -> {
|
||||
|
@ -95,7 +138,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
listener::onResponse,
|
||||
e2 -> {
|
||||
if (request.isForce() && e2 instanceof TimeoutException) {
|
||||
forceDeleteJob(request, (JobStorageDeletionTask) task, listener);
|
||||
forceDeleteJob(request, listener);
|
||||
} else {
|
||||
listener.onFailure(e2);
|
||||
}
|
||||
|
@ -110,8 +153,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(DeleteJobAction.Request request, ClusterState state,
|
||||
ActionListener<AcknowledgedResponse> listener) throws Exception {
|
||||
protected void masterOperation(DeleteJobAction.Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
|
||||
throw new UnsupportedOperationException("the Task parameter is required");
|
||||
}
|
||||
|
||||
|
@ -120,13 +162,290 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
|
||||
private void normalDeleteJob(DeleteJobAction.Request request, JobStorageDeletionTask task,
|
||||
ActionListener<AcknowledgedResponse> listener) {
|
||||
jobManager.deleteJob(request, task, listener);
|
||||
private void normalDeleteJob(DeleteJobAction.Request request, ActionListener<AcknowledgedResponse> listener) {
|
||||
String jobId = request.getJobId();
|
||||
logger.debug("Deleting job '" + jobId + "'");
|
||||
|
||||
// Step 4. When the job has been removed from the cluster state, return a response
|
||||
// -------
|
||||
CheckedConsumer<Boolean, Exception> apiResponseHandler = jobDeleted -> {
|
||||
if (jobDeleted) {
|
||||
logger.info("Job [" + jobId + "] deleted");
|
||||
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DELETED));
|
||||
listener.onResponse(new AcknowledgedResponse(true));
|
||||
} else {
|
||||
listener.onResponse(new AcknowledgedResponse(false));
|
||||
}
|
||||
};
|
||||
|
||||
// Step 3. When the physical storage has been deleted, remove from Cluster State
|
||||
// -------
|
||||
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> clusterService.submitStateUpdateTask(
|
||||
"delete-job-" + jobId,
|
||||
new AckedClusterStateUpdateTask<Boolean>(request, ActionListener.wrap(apiResponseHandler, listener::onFailure)) {
|
||||
|
||||
@Override
|
||||
protected Boolean newResponse(boolean acknowledged) {
|
||||
return acknowledged && response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState);
|
||||
if (currentMlMetadata.getJobs().containsKey(jobId) == false) {
|
||||
// We wouldn't have got here if the job never existed so
|
||||
// the Job must have been deleted by another action.
|
||||
// Don't error in this case
|
||||
return currentState;
|
||||
}
|
||||
|
||||
MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
|
||||
builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
|
||||
return buildNewClusterState(currentState, builder);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
// Step 2. Remove the job from any calendars
|
||||
CheckedConsumer<Boolean, Exception> removeFromCalendarsHandler = response -> jobResultsProvider.removeJobFromCalendars(jobId,
|
||||
ActionListener.wrap(deleteJobStateHandler::accept, listener::onFailure ));
|
||||
|
||||
|
||||
// Step 1. Delete the physical storage
|
||||
deleteJobDocuments(jobId, removeFromCalendarsHandler, listener::onFailure);
|
||||
}
|
||||
|
||||
private void forceDeleteJob(DeleteJobAction.Request request, JobStorageDeletionTask task,
|
||||
ActionListener<AcknowledgedResponse> listener) {
|
||||
private void deleteJobDocuments(String jobId, CheckedConsumer<Boolean, Exception> finishedHandler, Consumer<Exception> failureHandler) {
|
||||
|
||||
final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(clusterService.state(), jobId);
|
||||
final String indexPattern = indexName + "-*";
|
||||
|
||||
final ActionListener<AcknowledgedResponse> completionHandler = ActionListener.wrap(
|
||||
response -> finishedHandler.accept(response.isAcknowledged()),
|
||||
failureHandler);
|
||||
|
||||
// Step 7. If we did not drop the index and after DBQ state done, we delete the aliases
|
||||
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(
|
||||
bulkByScrollResponse -> {
|
||||
if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume Index was deleted
|
||||
completionHandler.onResponse(new AcknowledgedResponse(true));
|
||||
} else {
|
||||
if (bulkByScrollResponse.isTimedOut()) {
|
||||
logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName, indexPattern);
|
||||
}
|
||||
if (!bulkByScrollResponse.getBulkFailures().isEmpty()) {
|
||||
logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}, {}].",
|
||||
jobId, bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts(),
|
||||
indexName, indexPattern);
|
||||
for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
|
||||
logger.warn("DBQ failure: " + failure);
|
||||
}
|
||||
}
|
||||
deleteAliases(jobId, client, completionHandler);
|
||||
}
|
||||
},
|
||||
failureHandler);
|
||||
|
||||
// Step 6. If we did not delete the index, we run a delete by query
|
||||
ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
|
||||
response -> {
|
||||
if (response) {
|
||||
logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]");
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName, indexPattern);
|
||||
ConstantScoreQueryBuilder query =
|
||||
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
|
||||
request.setQuery(query);
|
||||
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
|
||||
request.setSlices(5);
|
||||
request.setAbortOnVersionConflict(false);
|
||||
request.setRefresh(true);
|
||||
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler);
|
||||
} else { // We did not execute DBQ, no need to delete aliases or check the response
|
||||
dbqHandler.onResponse(null);
|
||||
}
|
||||
},
|
||||
failureHandler);
|
||||
|
||||
// Step 5. If we have any hits, that means we are NOT the only job on this index, and should not delete it
|
||||
// if we do not have any hits, we can drop the index and then skip the DBQ and alias deletion
|
||||
ActionListener<SearchResponse> customIndexSearchHandler = ActionListener.wrap(
|
||||
searchResponse -> {
|
||||
if (searchResponse == null || searchResponse.getHits().totalHits > 0) {
|
||||
deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion
|
||||
} else {
|
||||
logger.info("Running DELETE Index on [" + indexName + "] for job [" + jobId + "]");
|
||||
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
|
||||
request.indicesOptions(IndicesOptions.lenientExpandOpen());
|
||||
// If we have deleted the index, then we don't need to delete the aliases or run the DBQ
|
||||
executeAsyncWithOrigin(
|
||||
client.threadPool().getThreadContext(),
|
||||
ML_ORIGIN,
|
||||
request,
|
||||
ActionListener.<AcknowledgedResponse>wrap(
|
||||
response -> deleteByQueryExecutor.onResponse(false), // skip DBQ && Alias
|
||||
failureHandler),
|
||||
client.admin().indices()::delete);
|
||||
}
|
||||
},
|
||||
failure -> {
|
||||
if (failure.getClass() == IndexNotFoundException.class) { // assume the index is already deleted
|
||||
deleteByQueryExecutor.onResponse(false); // skip DBQ && Alias
|
||||
} else {
|
||||
failureHandler.accept(failure);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// Step 4. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases
|
||||
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
|
||||
response -> {
|
||||
if (indexName.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
|
||||
AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) {
|
||||
//don't bother searching the index any further, we are on the default shared
|
||||
customIndexSearchHandler.onResponse(null);
|
||||
} else {
|
||||
SearchSourceBuilder source = new SearchSourceBuilder()
|
||||
.size(1)
|
||||
.query(QueryBuilders.boolQuery().filter(
|
||||
QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))));
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(indexName);
|
||||
searchRequest.source(source);
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler);
|
||||
}
|
||||
},
|
||||
failureHandler
|
||||
);
|
||||
|
||||
// Step 3. Delete quantiles done, delete the categorizer state
|
||||
ActionListener<Boolean> deleteQuantilesHandler = ActionListener.wrap(
|
||||
response -> deleteCategorizerState(jobId, client, 1, deleteCategorizerStateHandler),
|
||||
failureHandler);
|
||||
|
||||
// Step 2. Delete state done, delete the quantiles
|
||||
ActionListener<BulkResponse> deleteStateHandler = ActionListener.wrap(
|
||||
bulkResponse -> deleteQuantiles(jobId, client, deleteQuantilesHandler),
|
||||
failureHandler);
|
||||
|
||||
// Step 1. Delete the model state
|
||||
deleteModelState(jobId, client, deleteStateHandler);
|
||||
}
|
||||
|
||||
private void deleteQuantiles(String jobId, Client client, ActionListener<Boolean> finishedHandler) {
|
||||
// The quantiles type and doc ID changed in v5.5 so delete both the old and new format
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName());
|
||||
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
|
||||
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId),
|
||||
// TODO: remove in 7.0
|
||||
Quantiles.v54DocumentId(jobId));
|
||||
request.setQuery(query);
|
||||
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
|
||||
request.setAbortOnVersionConflict(false);
|
||||
request.setRefresh(true);
|
||||
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
|
||||
response -> finishedHandler.onResponse(true),
|
||||
e -> {
|
||||
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
|
||||
if (e instanceof IndexNotFoundException) {
|
||||
finishedHandler.onResponse(true);
|
||||
} else {
|
||||
finishedHandler.onFailure(e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private void deleteModelState(String jobId, Client client, ActionListener<BulkResponse> listener) {
|
||||
GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null);
|
||||
request.setPageParams(new PageParams(0, MAX_SNAPSHOTS_TO_DELETE));
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap(
|
||||
response -> {
|
||||
List<ModelSnapshot> deleteCandidates = response.getPage().results();
|
||||
JobDataDeleter deleter = new JobDataDeleter(client, jobId);
|
||||
deleter.deleteModelSnapshots(deleteCandidates, listener);
|
||||
},
|
||||
listener::onFailure));
|
||||
}
|
||||
|
||||
private void deleteCategorizerState(String jobId, Client client, int docNum, ActionListener<Boolean> finishedHandler) {
|
||||
// The categorizer state type and doc ID changed in v5.5 so delete both the old and new format
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName());
|
||||
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
|
||||
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum),
|
||||
// TODO: remove in 7.0
|
||||
CategorizerState.v54DocumentId(jobId, docNum));
|
||||
request.setQuery(query);
|
||||
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
|
||||
request.setAbortOnVersionConflict(false);
|
||||
request.setRefresh(true);
|
||||
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
|
||||
response -> {
|
||||
// If we successfully deleted a document try the next one; if not we're done
|
||||
if (response.getDeleted() > 0) {
|
||||
// There's an assumption here that there won't be very many categorizer
|
||||
// state documents, so the recursion won't go more than, say, 5 levels deep
|
||||
deleteCategorizerState(jobId, client, docNum + 1, finishedHandler);
|
||||
return;
|
||||
}
|
||||
finishedHandler.onResponse(true);
|
||||
},
|
||||
e -> {
|
||||
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
|
||||
if (e instanceof IndexNotFoundException) {
|
||||
finishedHandler.onResponse(true);
|
||||
} else {
|
||||
finishedHandler.onFailure(e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private void deleteAliases(String jobId, Client client, ActionListener<AcknowledgedResponse> finishedHandler) {
|
||||
final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
||||
final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
|
||||
|
||||
// first find the concrete indices associated with the aliases
|
||||
GetAliasesRequest aliasesRequest = new GetAliasesRequest().aliases(readAliasName, writeAliasName)
|
||||
.indicesOptions(IndicesOptions.lenientExpandOpen());
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, aliasesRequest,
|
||||
ActionListener.<GetAliasesResponse>wrap(
|
||||
getAliasesResponse -> {
|
||||
// remove the aliases from the concrete indices found in the first step
|
||||
IndicesAliasesRequest removeRequest = buildRemoveAliasesRequest(getAliasesResponse);
|
||||
if (removeRequest == null) {
|
||||
// don't error if the job's aliases have already been deleted - carry on and delete the
|
||||
// rest of the job's data
|
||||
finishedHandler.onResponse(new AcknowledgedResponse(true));
|
||||
return;
|
||||
}
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, removeRequest,
|
||||
ActionListener.<AcknowledgedResponse>wrap(
|
||||
finishedHandler::onResponse,
|
||||
finishedHandler::onFailure),
|
||||
client.admin().indices()::aliases);
|
||||
},
|
||||
finishedHandler::onFailure), client.admin().indices()::getAliases);
|
||||
}
|
||||
|
||||
private IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesResponse getAliasesResponse) {
|
||||
Set<String> aliases = new HashSet<>();
|
||||
List<String> indices = new ArrayList<>();
|
||||
for (ObjectObjectCursor<String, List<AliasMetaData>> entry : getAliasesResponse.getAliases()) {
|
||||
// The response includes _all_ indices, but only those associated with
|
||||
// the aliases we asked about will have associated AliasMetaData
|
||||
if (entry.value.isEmpty() == false) {
|
||||
indices.add(entry.key);
|
||||
entry.value.forEach(metadata -> aliases.add(metadata.getAlias()));
|
||||
}
|
||||
}
|
||||
return aliases.isEmpty() ? null : new IndicesAliasesRequest().addAliasAction(
|
||||
IndicesAliasesRequest.AliasActions.remove()
|
||||
.aliases(aliases.toArray(new String[aliases.size()]))
|
||||
.indices(indices.toArray(new String[indices.size()])));
|
||||
}
|
||||
|
||||
private void forceDeleteJob(DeleteJobAction.Request request, ActionListener<AcknowledgedResponse> listener) {
|
||||
|
||||
final ClusterState state = clusterService.state();
|
||||
final String jobId = request.getJobId();
|
||||
|
@ -135,13 +454,13 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
ActionListener<Boolean> removeTaskListener = new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean response) {
|
||||
jobManager.deleteJob(request, task, listener);
|
||||
normalDeleteJob(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (e instanceof ResourceNotFoundException) {
|
||||
jobManager.deleteJob(request, task, listener);
|
||||
normalDeleteJob(request, listener);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
@ -151,9 +470,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
// 2. Cancel the persistent task. This closes the process gracefully so
|
||||
// the process should be killed first.
|
||||
ActionListener<KillProcessAction.Response> killJobListener = ActionListener.wrap(
|
||||
response -> {
|
||||
removePersistentTask(request.getJobId(), state, removeTaskListener);
|
||||
},
|
||||
response -> removePersistentTask(request.getJobId(), state, removeTaskListener),
|
||||
e -> {
|
||||
if (e instanceof ElasticsearchStatusException) {
|
||||
// Killing the process marks the task as completed so it
|
||||
|
@ -197,7 +514,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
}
|
||||
}
|
||||
|
||||
void markJobAsDeleting(String jobId, ActionListener<Boolean> listener, boolean force) {
|
||||
private void markJobAsDeleting(String jobId, ActionListener<Boolean> listener, boolean force) {
|
||||
clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
|
@ -220,7 +537,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
});
|
||||
}
|
||||
|
||||
void waitForDeletingJob(String jobId, TimeValue timeout, ActionListener<AcknowledgedResponse> listener) {
|
||||
private void waitForDeletingJob(String jobId, TimeValue timeout, ActionListener<AcknowledgedResponse> listener) {
|
||||
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
|
||||
|
||||
ClusterState clusterState = stateObserver.setAndGetObservedState();
|
||||
|
|
|
@ -20,7 +20,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.JobDataDeleter;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.ml.job.JobManager;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
|
|||
import org.elasticsearch.xpack.core.ml.action.util.PageParams;
|
||||
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
|
||||
import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils;
|
||||
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
|
|
@ -29,19 +29,19 @@ import org.elasticsearch.xpack.core.ml.action.GetOverallBucketsAction;
|
|||
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.OverallBucket;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.Result;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.core.ml.utils.Intervals;
|
||||
import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.job.JobManager;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsAggregator;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsCollector;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsProcessor;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsProvider;
|
||||
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
|
|||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.JobDataDeleter;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.ml.job.JobManager;
|
||||
|
|
|
@ -9,7 +9,6 @@ import org.elasticsearch.ResourceNotFoundException;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -34,7 +33,6 @@ import org.elasticsearch.xpack.core.XPackPlugin;
|
|||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.core.ml.MlTasks;
|
||||
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
|
||||
|
@ -47,7 +45,6 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
|||
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.JobStorageDeletionTask;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
|
@ -489,64 +486,6 @@ public class JobManager extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
public void deleteJob(DeleteJobAction.Request request, JobStorageDeletionTask task,
|
||||
ActionListener<AcknowledgedResponse> actionListener) {
|
||||
|
||||
String jobId = request.getJobId();
|
||||
logger.debug("Deleting job '" + jobId + "'");
|
||||
|
||||
// Step 4. When the job has been removed from the cluster state, return a response
|
||||
// -------
|
||||
CheckedConsumer<Boolean, Exception> apiResponseHandler = jobDeleted -> {
|
||||
if (jobDeleted) {
|
||||
logger.info("Job [" + jobId + "] deleted");
|
||||
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DELETED));
|
||||
actionListener.onResponse(new AcknowledgedResponse(true));
|
||||
} else {
|
||||
actionListener.onResponse(new AcknowledgedResponse(false));
|
||||
}
|
||||
};
|
||||
|
||||
// Step 3. When the physical storage has been deleted, remove from Cluster State
|
||||
// -------
|
||||
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> clusterService.submitStateUpdateTask("delete-job-" + jobId,
|
||||
new AckedClusterStateUpdateTask<Boolean>(request, ActionListener.wrap(apiResponseHandler, actionListener::onFailure)) {
|
||||
|
||||
@Override
|
||||
protected Boolean newResponse(boolean acknowledged) {
|
||||
return acknowledged && response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState);
|
||||
if (currentMlMetadata.getJobs().containsKey(jobId) == false) {
|
||||
// We wouldn't have got here if the job never existed so
|
||||
// the Job must have been deleted by another action.
|
||||
// Don't error in this case
|
||||
return currentState;
|
||||
}
|
||||
|
||||
MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
|
||||
builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
|
||||
return buildNewClusterState(currentState, builder);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
// Step 2. Remove the job from any calendars
|
||||
CheckedConsumer<Boolean, Exception> removeFromCalendarsHandler = response -> {
|
||||
jobResultsProvider.removeJobFromCalendars(jobId, ActionListener.<Boolean>wrap(deleteJobStateHandler::accept,
|
||||
actionListener::onFailure ));
|
||||
};
|
||||
|
||||
|
||||
// Step 1. Delete the physical storage
|
||||
|
||||
// This task manages the physical deletion of the job state and results
|
||||
task.delete(jobId, client, clusterService.state(), removeFromCalendarsHandler, actionListener::onFailure);
|
||||
}
|
||||
|
||||
public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionListener<RevertModelSnapshotAction.Response> actionListener,
|
||||
ModelSnapshot modelSnapshot) {
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ import org.elasticsearch.search.SearchHit;
|
|||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.sort.SortBuilders;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
|
||||
import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils;
|
||||
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Collections;
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ml.job.persistence;
|
||||
package org.elasticsearch.xpack.ml.job.persistence;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
|
@ -21,6 +21,8 @@ import org.elasticsearch.index.query.QueryBuilder;
|
|||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryAction;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.Result;
|
|
@ -25,7 +25,6 @@ import org.elasticsearch.common.xcontent.ToXContent;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.JobDataDeleter;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
|
||||
|
|
|
@ -64,8 +64,8 @@ import org.elasticsearch.search.aggregations.Aggregation;
|
|||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
|
||||
import org.elasticsearch.search.aggregations.metrics.Stats;
|
||||
import org.elasticsearch.search.aggregations.metrics.ExtendedStats;
|
||||
import org.elasticsearch.search.aggregations.metrics.Stats;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.sort.FieldSortBuilder;
|
||||
import org.elasticsearch.search.sort.SortBuilders;
|
||||
|
@ -99,11 +99,11 @@ import org.elasticsearch.xpack.core.ml.stats.CountAccumulator;
|
|||
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
|
||||
import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils;
|
||||
import org.elasticsearch.xpack.core.security.support.Exceptions;
|
||||
import org.elasticsearch.xpack.ml.job.categorization.GrokPatternCreator;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
|
||||
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ml.utils;
|
||||
package org.elasticsearch.xpack.ml.utils;
|
||||
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
|
|
@ -11,11 +11,10 @@ import org.elasticsearch.xpack.core.ml.job.results.ReservedFieldNames;
|
|||
|
||||
public class ReservedFieldNamesTests extends ESTestCase {
|
||||
|
||||
public void testIsValidFieldName() throws Exception {
|
||||
public void testIsValidFieldName() {
|
||||
assertTrue(ReservedFieldNames.isValidFieldName("host"));
|
||||
assertTrue(ReservedFieldNames.isValidFieldName("host.actual"));
|
||||
assertFalse(ReservedFieldNames.isValidFieldName("actual.host"));
|
||||
assertFalse(ReservedFieldNames.isValidFieldName(AnomalyRecord.BUCKET_SPAN.getPreferredName()));
|
||||
}
|
||||
|
||||
}
|
|
@ -6,7 +6,6 @@
|
|||
package org.elasticsearch.xpack.test.rest;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
|
||||
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.client.Request;
|
||||
|
|
Loading…
Reference in New Issue