diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java index 41e19b69b8c..8753303f2a0 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java @@ -51,6 +51,7 @@ import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.ml.action.GetRecordsAction; import org.elasticsearch.xpack.ml.action.InternalOpenJobAction; +import org.elasticsearch.xpack.ml.action.MlDeleteByQueryAction; import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.PutDatafeedAction; @@ -326,7 +327,8 @@ public class MlPlugin extends Plugin implements ActionPlugin { new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class), new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), - new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class) + new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class), + new ActionHandler<>(MlDeleteByQueryAction.INSTANCE, MlDeleteByQueryAction.TransportAction.class) ); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java index 1e6e747ae38..223e8291946 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -79,7 +80,7 @@ public class DeleteJobAction extends Action { @@ -167,7 +169,7 @@ public class DeleteJobAction extends Action listener) throws Exception { - jobManager.deleteJob(client, request, listener); + jobManager.deleteJob(request, client, (JobStorageDeletionTask) task, taskManager, listener); } @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/MlDeleteByQueryAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/MlDeleteByQueryAction.java new file mode 100644 index 00000000000..5a90978ad45 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/MlDeleteByQueryAction.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder; +import org.elasticsearch.action.bulk.byscroll.AsyncDeleteByQueryAction; +import org.elasticsearch.action.bulk.byscroll.BulkByScrollParallelizationHelper; +import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; +import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest; +import org.elasticsearch.action.bulk.byscroll.ParentBulkByScrollTask; +import org.elasticsearch.action.bulk.byscroll.WorkingBulkByScrollTask; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.client.ParentTaskAssigningClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +public class MlDeleteByQueryAction extends Action { + + public static final MlDeleteByQueryAction INSTANCE = new MlDeleteByQueryAction(); + public static final String NAME = "indices:data/write/delete/mlbyquery"; + + private MlDeleteByQueryAction() { + super(NAME); + } + + @Override + public MlDeleteByQueryRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new MlDeleteByQueryRequestBuilder(client, this); + } + + @Override + public BulkByScrollResponse newResponse() { + return new BulkByScrollResponse(); + } + + public static class MlDeleteByQueryRequestBuilder extends + AbstractBulkByScrollRequestBuilder { + + public MlDeleteByQueryRequestBuilder(ElasticsearchClient client, + Action action) { + this(client, action, new SearchRequestBuilder(client, SearchAction.INSTANCE)); + } + + private MlDeleteByQueryRequestBuilder(ElasticsearchClient client, + Action action, + SearchRequestBuilder search) { + super(client, action, search, new DeleteByQueryRequest(search.request())); + } + + @Override + protected MlDeleteByQueryRequestBuilder self() { + return this; + } + + @Override + public MlDeleteByQueryRequestBuilder abortOnVersionConflict(boolean abortOnVersionConflict) { + request.setAbortOnVersionConflict(abortOnVersionConflict); + return this; + } + } + + public static class TransportAction extends HandledTransportAction { + private final Client client; + private final ScriptService scriptService; + private final ClusterService clusterService; + + @Inject + public TransportAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver resolver, Client client, TransportService transportService, + ScriptService scriptService, ClusterService clusterService) { + super(settings, MlDeleteByQueryAction.NAME, threadPool, transportService, actionFilters, resolver, DeleteByQueryRequest::new); + this.client = client; + this.scriptService = scriptService; + this.clusterService = clusterService; + } + + @Override + public void doExecute(Task task, DeleteByQueryRequest request, ActionListener listener) { + if (request.getSlices() > 1) { + BulkByScrollParallelizationHelper.startSlices(client, taskManager, MlDeleteByQueryAction.INSTANCE, + clusterService.localNode().getId(), (ParentBulkByScrollTask) task, request, listener); + } else { + ClusterState state = clusterService.state(); + ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task); + new AsyncDeleteByQueryAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state, + listener).start(); + } + } + + @Override + protected void doExecute(DeleteByQueryRequest request, ActionListener listener) { + throw new UnsupportedOperationException("task required"); + } + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 066bb502590..83f31b621ac 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.job; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; @@ -18,7 +17,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction; @@ -34,6 +32,7 @@ import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -229,14 +228,17 @@ public class JobManager extends AbstractComponent { return buildNewClusterState(currentState, builder); } - public void deleteJob(Client client, DeleteJobAction.Request request, ActionListener actionListener) { + + public void deleteJob(DeleteJobAction.Request request, Client client, JobStorageDeletionTask task, + ActionListener actionListener) { + String jobId = request.getJobId(); String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); logger.debug("Deleting job '" + jobId + "'"); - // Step 3. Listen for the Cluster State job state change - // Chain acknowledged state onto original actionListener - CheckedConsumer deleteStateConsumer = jobDeleted -> { + // Step 3. When the job has been removed from the cluster state, return a response + // ------- + CheckedConsumer apiResponseHandler = jobDeleted -> { if (jobDeleted) { logger.info("Job [" + jobId + "] deleted."); actionListener.onResponse(new DeleteJobAction.Response(true)); @@ -246,64 +248,41 @@ public class JobManager extends AbstractComponent { } }; + // Step 2. When the physical storage has been deleted, remove from Cluster State + // ------- + CheckedConsumer deleteJobStateHandler = response -> clusterService.submitStateUpdateTask("delete-job-" + jobId, + new AckedClusterStateUpdateTask(request, ActionListener.wrap(apiResponseHandler, actionListener::onFailure)) { - // Step 2. Listen for the Deleted Index response - // If successful, delete from cluster state and chain onto deleteStateListener - CheckedConsumer deleteIndexConsumer = response -> { - clusterService.submitStateUpdateTask("delete-job-" + jobId, - new AckedClusterStateUpdateTask(request, ActionListener.wrap(deleteStateConsumer, actionListener::onFailure)) { + @Override + protected Boolean newResponse(boolean acknowledged) { + return acknowledged && response; + } - @Override - protected Boolean newResponse(boolean acknowledged) { - return acknowledged && response; - } + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return removeJobFromState(jobId, currentState); + } + }); - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return removeJobFromState(jobId, currentState); - } - }); - - }; - - // Step 1. Update the CS to DELETING - // If successful, attempt to delete the physical index and chain - // onto deleteIndexConsumer - CheckedConsumer updateConsumer = response -> { - // Sucessfully updated the state to DELETING, begin actually deleting + // Step 1. When the job's status updates to DELETING, begin deleting the physical storage + // ------- + CheckedConsumer updateHandler = response -> { + // Successfully updated the status to DELETING, begin actually deleting if (response.isAcknowledged()) { logger.info("Job [" + jobId + "] set to [" + JobState.DELETING + "]"); } else { logger.warn("Job [" + jobId + "] change to [" + JobState.DELETING + "] was not acknowledged."); } - DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); - client.admin().indices().delete(deleteIndexRequest, ActionListener.wrap(deleteIndexResponse -> { - logger.info("Deleting index [" + indexName + "] successful"); + // This task manages the physical deletion of the job (removing the results, then the index) + task.delete(jobId, indexName, client, deleteJobStateHandler::accept, actionListener::onFailure); - if (deleteIndexResponse.isAcknowledged()) { - logger.info("Index deletion acknowledged"); - } else { - logger.warn("Index deletion not acknowledged"); - } - deleteIndexConsumer.accept(deleteIndexResponse.isAcknowledged()); - }, e -> { - if (e instanceof IndexNotFoundException) { - logger.warn("Physical index [" + indexName + "] not found. Continuing to delete job."); - try { - deleteIndexConsumer.accept(false); - } catch (Exception e1) { - actionListener.onFailure(e1); - } - } else { - // all other exceptions should die - actionListener.onFailure(e); - } - })); }; + // Step 0. Kick off the chain of callbacks with the initial UpdateStatus call + // ------- UpdateJobStateAction.Request updateStateListener = new UpdateJobStateAction.Request(jobId, JobState.DELETING); - setJobState(updateStateListener, ActionListener.wrap(updateConsumer, actionListener::onFailure)); + setJobState(updateStateListener, ActionListener.wrap(updateHandler, actionListener::onFailure)); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java new file mode 100644 index 00000000000..f0307348286 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest; +import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.ml.action.MlDeleteByQueryAction; + + +import java.util.function.Consumer; + +public class JobStorageDeletionTask extends Task { + private final Logger logger; + + public JobStorageDeletionTask(long id, String type, String action, String description, TaskId parentTask) { + super(id, type, action, description, parentTask); + this.logger = Loggers.getLogger(getClass()); + } + + public void delete(String jobId, String indexName, Client client, + CheckedConsumer finishedHandler, + Consumer failureHandler) { + + // Step 2. Regardless of if the DBQ succeeds, we delete the physical index + // ------- + CheckedConsumer dbqHandler = bulkByScrollResponse -> { + if (bulkByScrollResponse.isTimedOut()) { + logger.warn("DeleteByQuery for index [" + indexName + "] timed out. Continuing to delete index."); + } + if (!bulkByScrollResponse.getBulkFailures().isEmpty()) { + logger.warn("[" + bulkByScrollResponse.getBulkFailures().size() + + "] failures encountered while running DeleteByQuery on index [" + indexName + "]. " + + "Continuing to delete index"); + } + + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); + client.admin().indices().delete(deleteIndexRequest, ActionListener.wrap(deleteIndexResponse -> { + logger.info("Deleting index [" + indexName + "] successful"); + + if (deleteIndexResponse.isAcknowledged()) { + logger.info("Index deletion acknowledged"); + } else { + logger.warn("Index deletion not acknowledged"); + } + finishedHandler.accept(deleteIndexResponse.isAcknowledged()); + }, missingIndexHandler(indexName, finishedHandler, failureHandler))); + }; + + // Step 1. DeleteByQuery on the index, matching all docs with the right job_id + // ------- + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.source(new SearchSourceBuilder().query(new TermQueryBuilder("job_id", jobId))); + DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + request.setSlices(5); + + client.execute(MlDeleteByQueryAction.INSTANCE, request, + ActionListener.wrap(dbqHandler, missingIndexHandler(indexName, finishedHandler, failureHandler))); + } + + // If the index doesn't exist, we need to catch the exception and carry onwards so that the cluster + // state is properly updated + private Consumer missingIndexHandler(String indexName, CheckedConsumer finishedHandler, + Consumer failureHandler) { + return e -> { + if (e instanceof IndexNotFoundException) { + logger.warn("Physical index [" + indexName + "] not found. Continuing to delete job."); + try { + finishedHandler.accept(false); + } catch (Exception e1) { + failureHandler.accept(e1); + } + } else { + // all other exceptions should die + failureHandler.accept(e); + } + }; + } + +}