diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteJobAction.java index 56b7ec2b52f..9fbde4721cd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteJobAction.java @@ -16,7 +16,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.persistence.JobStorageDeletionTask; +import org.elasticsearch.xpack.core.ml.job.persistence.JobDeletionTask; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -71,7 +71,7 @@ public class DeleteJobAction extends Action { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new JobStorageDeletionTask(id, type, action, "delete-job-" + jobId, parentTaskId, headers); + return new JobDeletionTask(id, type, action, "delete-job-" + jobId, parentTaskId, headers); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java new file mode 100644 index 00000000000..2f218cfb2dc --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.job.persistence; + +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; + +import java.util.Map; + +public class JobDeletionTask extends Task { + + public JobDeletionTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { + super(id, type, action, description, parentTask, headers); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobStorageDeletionTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobStorageDeletionTask.java deleted file mode 100644 index 43cc372b6c7..00000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobStorageDeletionTask.java +++ /dev/null @@ -1,301 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.ml.job.persistence; - -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; -import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; -import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.AliasMetaData; -import org.elasticsearch.common.CheckedConsumer; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.query.ConstantScoreQueryBuilder; -import org.elasticsearch.index.query.IdsQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.query.TermQueryBuilder; -import org.elasticsearch.index.reindex.BulkByScrollResponse; -import org.elasticsearch.index.reindex.DeleteByQueryAction; -import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; -import org.elasticsearch.xpack.core.ml.action.util.PageParams; -import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; -import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Consumer; - -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; - -/* - Moving this class to plugin-core caused a *lot* of server side logic to be pulled in to plugin-core. This should be considered as needing - refactoring to move it back to core. See DeleteJobAction for its use. -*/ -public class JobStorageDeletionTask extends Task { - - private static final int MAX_SNAPSHOTS_TO_DELETE = 10000; - - private final Logger logger; - - public JobStorageDeletionTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { - super(id, type, action, description, parentTask, headers); - this.logger = Loggers.getLogger(getClass()); - } - - public void delete(String jobId, Client client, ClusterState state, - CheckedConsumer finishedHandler, - Consumer failureHandler) { - - final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(state, jobId); - final String indexPattern = indexName + "-*"; - - final ActionListener completionHandler = ActionListener.wrap( - response -> finishedHandler.accept(response.isAcknowledged()), - failureHandler); - - // Step 7. If we did not drop the index and after DBQ state done, we delete the aliases - ActionListener dbqHandler = ActionListener.wrap( - bulkByScrollResponse -> { - if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume Index was deleted - completionHandler.onResponse(new AcknowledgedResponse(true)); - } else { - if (bulkByScrollResponse.isTimedOut()) { - logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName, indexPattern); - } - if (!bulkByScrollResponse.getBulkFailures().isEmpty()) { - logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}, {}].", - jobId, bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts(), - indexName, indexPattern); - for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) { - logger.warn("DBQ failure: " + failure); - } - } - deleteAliases(jobId, client, completionHandler); - } - }, - failureHandler); - - // Step 6. If we did not delete the index, we run a delete by query - ActionListener deleteByQueryExecutor = ActionListener.wrap( - response -> { - if (response) { - logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]"); - DeleteByQueryRequest request = new DeleteByQueryRequest(indexName, indexPattern); - ConstantScoreQueryBuilder query = - new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); - request.setQuery(query); - request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); - request.setSlices(5); - request.setAbortOnVersionConflict(false); - request.setRefresh(true); - - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler); - } else { // We did not execute DBQ, no need to delete aliases or check the response - dbqHandler.onResponse(null); - } - }, - failureHandler); - - // Step 5. If we have any hits, that means we are NOT the only job on this index, and should not delete it - // if we do not have any hits, we can drop the index and then skip the DBQ and alias deletion - ActionListener customIndexSearchHandler = ActionListener.wrap( - searchResponse -> { - if (searchResponse == null || searchResponse.getHits().totalHits > 0) { - deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion - } else { - logger.info("Running DELETE Index on [" + indexName + "] for job [" + jobId + "]"); - DeleteIndexRequest request = new DeleteIndexRequest(indexName); - request.indicesOptions(IndicesOptions.lenientExpandOpen()); - // If we have deleted the index, then we don't need to delete the aliases or run the DBQ - executeAsyncWithOrigin( - client.threadPool().getThreadContext(), - ML_ORIGIN, - request, - ActionListener.wrap( - response -> deleteByQueryExecutor.onResponse(false), // skip DBQ && Alias - failureHandler), - client.admin().indices()::delete); - } - }, - failure -> { - if (failure.getClass() == IndexNotFoundException.class) { // assume the index is already deleted - deleteByQueryExecutor.onResponse(false); // skip DBQ && Alias - } else { - failureHandler.accept(failure); - } - } - ); - - // Step 4. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases - ActionListener deleteCategorizerStateHandler = ActionListener.wrap( - response -> { - if (indexName.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + - AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) { - customIndexSearchHandler.onResponse(null); //don't bother searching the index any further, we are on the default shared - } else { - SearchSourceBuilder source = new SearchSourceBuilder() - .size(1) - .query(QueryBuilders.boolQuery().filter( - QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)))); - - SearchRequest searchRequest = new SearchRequest(indexName); - searchRequest.source(source); - executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler); - } - }, - failureHandler - ); - - // Step 3. Delete quantiles done, delete the categorizer state - ActionListener deleteQuantilesHandler = ActionListener.wrap( - response -> deleteCategorizerState(jobId, client, 1, deleteCategorizerStateHandler), - failureHandler); - - // Step 2. Delete state done, delete the quantiles - ActionListener deleteStateHandler = ActionListener.wrap( - bulkResponse -> deleteQuantiles(jobId, client, deleteQuantilesHandler), - failureHandler); - - // Step 1. Delete the model state - deleteModelState(jobId, client, deleteStateHandler); - } - - private void deleteQuantiles(String jobId, Client client, ActionListener finishedHandler) { - // The quantiles type and doc ID changed in v5.5 so delete both the old and new format - DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); - // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace - IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId), - // TODO: remove in 7.0 - Quantiles.v54DocumentId(jobId)); - request.setQuery(query); - request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); - request.setAbortOnVersionConflict(false); - request.setRefresh(true); - - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap( - response -> finishedHandler.onResponse(true), - e -> { - // It's not a problem for us if the index wasn't found - it's equivalent to document not found - if (e instanceof IndexNotFoundException) { - finishedHandler.onResponse(true); - } else { - finishedHandler.onFailure(e); - } - })); - } - - private void deleteModelState(String jobId, Client client, ActionListener listener) { - GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null); - request.setPageParams(new PageParams(0, MAX_SNAPSHOTS_TO_DELETE)); - executeAsyncWithOrigin(client, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap( - response -> { - List deleteCandidates = response.getPage().results(); - JobDataDeleter deleter = new JobDataDeleter(client, jobId); - deleter.deleteModelSnapshots(deleteCandidates, listener); - }, - listener::onFailure)); - } - - private void deleteCategorizerState(String jobId, Client client, int docNum, ActionListener finishedHandler) { - // The categorizer state type and doc ID changed in v5.5 so delete both the old and new format - DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); - // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace - IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum), - // TODO: remove in 7.0 - CategorizerState.v54DocumentId(jobId, docNum)); - request.setQuery(query); - request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); - request.setAbortOnVersionConflict(false); - request.setRefresh(true); - - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap( - response -> { - // If we successfully deleted a document try the next one; if not we're done - if (response.getDeleted() > 0) { - // There's an assumption here that there won't be very many categorizer - // state documents, so the recursion won't go more than, say, 5 levels deep - deleteCategorizerState(jobId, client, docNum + 1, finishedHandler); - return; - } - finishedHandler.onResponse(true); - }, - e -> { - // It's not a problem for us if the index wasn't found - it's equivalent to document not found - if (e instanceof IndexNotFoundException) { - finishedHandler.onResponse(true); - } else { - finishedHandler.onFailure(e); - } - })); - } - - private void deleteAliases(String jobId, Client client, ActionListener finishedHandler) { - final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId); - - // first find the concrete indices associated with the aliases - GetAliasesRequest aliasesRequest = new GetAliasesRequest().aliases(readAliasName, writeAliasName) - .indicesOptions(IndicesOptions.lenientExpandOpen()); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, aliasesRequest, - ActionListener.wrap( - getAliasesResponse -> { - // remove the aliases from the concrete indices found in the first step - IndicesAliasesRequest removeRequest = buildRemoveAliasesRequest(getAliasesResponse); - if (removeRequest == null) { - // don't error if the job's aliases have already been deleted - carry on and delete the - // rest of the job's data - finishedHandler.onResponse(new AcknowledgedResponse(true)); - return; - } - executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, removeRequest, - ActionListener.wrap( - finishedHandler::onResponse, - finishedHandler::onFailure), - client.admin().indices()::aliases); - }, - finishedHandler::onFailure), client.admin().indices()::getAliases); - } - - private IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesResponse getAliasesResponse) { - Set aliases = new HashSet<>(); - List indices = new ArrayList<>(); - for (ObjectObjectCursor> entry : getAliasesResponse.getAliases()) { - // The response includes _all_ indices, but only those associated with - // the aliases we asked about will have associated AliasMetaData - if (entry.value.isEmpty() == false) { - indices.add(entry.key); - entry.value.forEach(metadata -> aliases.add(metadata.getAlias())); - } - } - return aliases.isEmpty() ? null : new IndicesAliasesRequest().addAliasAction( - IndicesAliasesRequest.AliasActions.remove() - .aliases(aliases.toArray(new String[aliases.size()])) - .indices(indices.toArray(new String[indices.size()]))); - } -} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java index 7e53478533e..5e793e54da4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.core.ml.job.config; import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; - import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; import org.elasticsearch.common.bytes.BytesReference; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index a74d1498f10..1d285b91f2f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -5,27 +5,50 @@ */ package org.elasticsearch.xpack.ml.action; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.query.ConstantScoreQueryBuilder; +import org.elasticsearch.index.query.IdsQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -33,31 +56,51 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; +import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction; -import org.elasticsearch.xpack.core.ml.job.persistence.JobStorageDeletionTask; -import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.core.ml.action.util.PageParams; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; public class TransportDeleteJobAction extends TransportMasterNodeAction { + private static final int MAX_SNAPSHOTS_TO_DELETE = 10000; + private final Client client; - private final JobManager jobManager; private final PersistentTasksService persistentTasksService; + private final Auditor auditor; + private final JobResultsProvider jobResultsProvider; @Inject public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager, - PersistentTasksService persistentTasksService, Client client) { + IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService, + Client client, Auditor auditor, JobResultsProvider jobResultsProvider) { super(settings, DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, DeleteJobAction.Request::new); this.client = client; - this.jobManager = jobManager; this.persistentTasksService = persistentTasksService; + this.auditor = auditor; + this.jobResultsProvider = jobResultsProvider; } @Override @@ -72,14 +115,14 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction listener) throws Exception { + ActionListener listener) { ActionListener markAsDeletingListener = ActionListener.wrap( response -> { if (request.isForce()) { - forceDeleteJob(request, (JobStorageDeletionTask) task, listener); + forceDeleteJob(request, listener); } else { - normalDeleteJob(request, (JobStorageDeletionTask) task, listener); + normalDeleteJob(request, listener); } }, e -> { @@ -95,7 +138,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction { if (request.isForce() && e2 instanceof TimeoutException) { - forceDeleteJob(request, (JobStorageDeletionTask) task, listener); + forceDeleteJob(request, listener); } else { listener.onFailure(e2); } @@ -110,8 +153,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction listener) throws Exception { + protected void masterOperation(DeleteJobAction.Request request, ClusterState state, ActionListener listener) { throw new UnsupportedOperationException("the Task parameter is required"); } @@ -120,13 +162,290 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction listener) { - jobManager.deleteJob(request, task, listener); + private void normalDeleteJob(DeleteJobAction.Request request, ActionListener listener) { + String jobId = request.getJobId(); + logger.debug("Deleting job '" + jobId + "'"); + + // Step 4. When the job has been removed from the cluster state, return a response + // ------- + CheckedConsumer apiResponseHandler = jobDeleted -> { + if (jobDeleted) { + logger.info("Job [" + jobId + "] deleted"); + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DELETED)); + listener.onResponse(new AcknowledgedResponse(true)); + } else { + listener.onResponse(new AcknowledgedResponse(false)); + } + }; + + // Step 3. When the physical storage has been deleted, remove from Cluster State + // ------- + CheckedConsumer deleteJobStateHandler = response -> clusterService.submitStateUpdateTask( + "delete-job-" + jobId, + new AckedClusterStateUpdateTask(request, ActionListener.wrap(apiResponseHandler, listener::onFailure)) { + + @Override + protected Boolean newResponse(boolean acknowledged) { + return acknowledged && response; + } + + @Override + public ClusterState execute(ClusterState currentState) { + MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState); + if (currentMlMetadata.getJobs().containsKey(jobId) == false) { + // We wouldn't have got here if the job never existed so + // the Job must have been deleted by another action. + // Don't error in this case + return currentState; + } + + MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); + builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); + return buildNewClusterState(currentState, builder); + } + }); + + + // Step 2. Remove the job from any calendars + CheckedConsumer removeFromCalendarsHandler = response -> jobResultsProvider.removeJobFromCalendars(jobId, + ActionListener.wrap(deleteJobStateHandler::accept, listener::onFailure )); + + + // Step 1. Delete the physical storage + deleteJobDocuments(jobId, removeFromCalendarsHandler, listener::onFailure); } - private void forceDeleteJob(DeleteJobAction.Request request, JobStorageDeletionTask task, - ActionListener listener) { + private void deleteJobDocuments(String jobId, CheckedConsumer finishedHandler, Consumer failureHandler) { + + final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(clusterService.state(), jobId); + final String indexPattern = indexName + "-*"; + + final ActionListener completionHandler = ActionListener.wrap( + response -> finishedHandler.accept(response.isAcknowledged()), + failureHandler); + + // Step 7. If we did not drop the index and after DBQ state done, we delete the aliases + ActionListener dbqHandler = ActionListener.wrap( + bulkByScrollResponse -> { + if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume Index was deleted + completionHandler.onResponse(new AcknowledgedResponse(true)); + } else { + if (bulkByScrollResponse.isTimedOut()) { + logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName, indexPattern); + } + if (!bulkByScrollResponse.getBulkFailures().isEmpty()) { + logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}, {}].", + jobId, bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts(), + indexName, indexPattern); + for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) { + logger.warn("DBQ failure: " + failure); + } + } + deleteAliases(jobId, client, completionHandler); + } + }, + failureHandler); + + // Step 6. If we did not delete the index, we run a delete by query + ActionListener deleteByQueryExecutor = ActionListener.wrap( + response -> { + if (response) { + logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]"); + DeleteByQueryRequest request = new DeleteByQueryRequest(indexName, indexPattern); + ConstantScoreQueryBuilder query = + new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); + request.setQuery(query); + request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); + request.setSlices(5); + request.setAbortOnVersionConflict(false); + request.setRefresh(true); + + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler); + } else { // We did not execute DBQ, no need to delete aliases or check the response + dbqHandler.onResponse(null); + } + }, + failureHandler); + + // Step 5. If we have any hits, that means we are NOT the only job on this index, and should not delete it + // if we do not have any hits, we can drop the index and then skip the DBQ and alias deletion + ActionListener customIndexSearchHandler = ActionListener.wrap( + searchResponse -> { + if (searchResponse == null || searchResponse.getHits().totalHits > 0) { + deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion + } else { + logger.info("Running DELETE Index on [" + indexName + "] for job [" + jobId + "]"); + DeleteIndexRequest request = new DeleteIndexRequest(indexName); + request.indicesOptions(IndicesOptions.lenientExpandOpen()); + // If we have deleted the index, then we don't need to delete the aliases or run the DBQ + executeAsyncWithOrigin( + client.threadPool().getThreadContext(), + ML_ORIGIN, + request, + ActionListener.wrap( + response -> deleteByQueryExecutor.onResponse(false), // skip DBQ && Alias + failureHandler), + client.admin().indices()::delete); + } + }, + failure -> { + if (failure.getClass() == IndexNotFoundException.class) { // assume the index is already deleted + deleteByQueryExecutor.onResponse(false); // skip DBQ && Alias + } else { + failureHandler.accept(failure); + } + } + ); + + // Step 4. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases + ActionListener deleteCategorizerStateHandler = ActionListener.wrap( + response -> { + if (indexName.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) { + //don't bother searching the index any further, we are on the default shared + customIndexSearchHandler.onResponse(null); + } else { + SearchSourceBuilder source = new SearchSourceBuilder() + .size(1) + .query(QueryBuilders.boolQuery().filter( + QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)))); + + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.source(source); + executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler); + } + }, + failureHandler + ); + + // Step 3. Delete quantiles done, delete the categorizer state + ActionListener deleteQuantilesHandler = ActionListener.wrap( + response -> deleteCategorizerState(jobId, client, 1, deleteCategorizerStateHandler), + failureHandler); + + // Step 2. Delete state done, delete the quantiles + ActionListener deleteStateHandler = ActionListener.wrap( + bulkResponse -> deleteQuantiles(jobId, client, deleteQuantilesHandler), + failureHandler); + + // Step 1. Delete the model state + deleteModelState(jobId, client, deleteStateHandler); + } + + private void deleteQuantiles(String jobId, Client client, ActionListener finishedHandler) { + // The quantiles type and doc ID changed in v5.5 so delete both the old and new format + DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); + // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace + IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId), + // TODO: remove in 7.0 + Quantiles.v54DocumentId(jobId)); + request.setQuery(query); + request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); + request.setAbortOnVersionConflict(false); + request.setRefresh(true); + + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap( + response -> finishedHandler.onResponse(true), + e -> { + // It's not a problem for us if the index wasn't found - it's equivalent to document not found + if (e instanceof IndexNotFoundException) { + finishedHandler.onResponse(true); + } else { + finishedHandler.onFailure(e); + } + })); + } + + private void deleteModelState(String jobId, Client client, ActionListener listener) { + GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null); + request.setPageParams(new PageParams(0, MAX_SNAPSHOTS_TO_DELETE)); + executeAsyncWithOrigin(client, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap( + response -> { + List deleteCandidates = response.getPage().results(); + JobDataDeleter deleter = new JobDataDeleter(client, jobId); + deleter.deleteModelSnapshots(deleteCandidates, listener); + }, + listener::onFailure)); + } + + private void deleteCategorizerState(String jobId, Client client, int docNum, ActionListener finishedHandler) { + // The categorizer state type and doc ID changed in v5.5 so delete both the old and new format + DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); + // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace + IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum), + // TODO: remove in 7.0 + CategorizerState.v54DocumentId(jobId, docNum)); + request.setQuery(query); + request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); + request.setAbortOnVersionConflict(false); + request.setRefresh(true); + + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap( + response -> { + // If we successfully deleted a document try the next one; if not we're done + if (response.getDeleted() > 0) { + // There's an assumption here that there won't be very many categorizer + // state documents, so the recursion won't go more than, say, 5 levels deep + deleteCategorizerState(jobId, client, docNum + 1, finishedHandler); + return; + } + finishedHandler.onResponse(true); + }, + e -> { + // It's not a problem for us if the index wasn't found - it's equivalent to document not found + if (e instanceof IndexNotFoundException) { + finishedHandler.onResponse(true); + } else { + finishedHandler.onFailure(e); + } + })); + } + + private void deleteAliases(String jobId, Client client, ActionListener finishedHandler) { + final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId); + + // first find the concrete indices associated with the aliases + GetAliasesRequest aliasesRequest = new GetAliasesRequest().aliases(readAliasName, writeAliasName) + .indicesOptions(IndicesOptions.lenientExpandOpen()); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, aliasesRequest, + ActionListener.wrap( + getAliasesResponse -> { + // remove the aliases from the concrete indices found in the first step + IndicesAliasesRequest removeRequest = buildRemoveAliasesRequest(getAliasesResponse); + if (removeRequest == null) { + // don't error if the job's aliases have already been deleted - carry on and delete the + // rest of the job's data + finishedHandler.onResponse(new AcknowledgedResponse(true)); + return; + } + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, removeRequest, + ActionListener.wrap( + finishedHandler::onResponse, + finishedHandler::onFailure), + client.admin().indices()::aliases); + }, + finishedHandler::onFailure), client.admin().indices()::getAliases); + } + + private IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesResponse getAliasesResponse) { + Set aliases = new HashSet<>(); + List indices = new ArrayList<>(); + for (ObjectObjectCursor> entry : getAliasesResponse.getAliases()) { + // The response includes _all_ indices, but only those associated with + // the aliases we asked about will have associated AliasMetaData + if (entry.value.isEmpty() == false) { + indices.add(entry.key); + entry.value.forEach(metadata -> aliases.add(metadata.getAlias())); + } + } + return aliases.isEmpty() ? null : new IndicesAliasesRequest().addAliasAction( + IndicesAliasesRequest.AliasActions.remove() + .aliases(aliases.toArray(new String[aliases.size()])) + .indices(indices.toArray(new String[indices.size()]))); + } + + private void forceDeleteJob(DeleteJobAction.Request request, ActionListener listener) { final ClusterState state = clusterService.state(); final String jobId = request.getJobId(); @@ -135,13 +454,13 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction removeTaskListener = new ActionListener() { @Override public void onResponse(Boolean response) { - jobManager.deleteJob(request, task, listener); + normalDeleteJob(request, listener); } @Override public void onFailure(Exception e) { if (e instanceof ResourceNotFoundException) { - jobManager.deleteJob(request, task, listener); + normalDeleteJob(request, listener); } else { listener.onFailure(e); } @@ -151,9 +470,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction killJobListener = ActionListener.wrap( - response -> { - removePersistentTask(request.getJobId(), state, removeTaskListener); - }, + response -> removePersistentTask(request.getJobId(), state, removeTaskListener), e -> { if (e instanceof ElasticsearchStatusException) { // Killing the process marks the task as completed so it @@ -197,7 +514,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction listener, boolean force) { + private void markJobAsDeleting(String jobId, ActionListener listener, boolean force) { clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -220,7 +537,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction listener) { + private void waitForDeletingJob(String jobId, TimeValue timeout, ActionListener listener) { ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext()); ClusterState clusterState = stateObserver.setAndGetObservedState(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java index b880bf6fa0c..6d0721b03d9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java @@ -20,7 +20,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.xpack.core.ml.job.persistence.JobDataDeleter; +import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetFiltersAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetFiltersAction.java index 83a4c12b819..55fbdfa0f55 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetFiltersAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetFiltersAction.java @@ -33,7 +33,7 @@ import org.elasticsearch.xpack.core.ml.action.GetFiltersAction; import org.elasticsearch.xpack.core.ml.action.util.PageParams; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; -import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils; +import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import java.io.IOException; import java.io.InputStream; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetOverallBucketsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetOverallBucketsAction.java index 4a17a2654c6..1ff5bdecf69 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetOverallBucketsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetOverallBucketsAction.java @@ -29,19 +29,19 @@ import org.elasticsearch.xpack.core.ml.action.GetOverallBucketsAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.OverallBucket; import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.Intervals; -import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsAggregator; import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsCollector; import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsProcessor; import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsProvider; +import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import java.util.HashSet; import java.util.List; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index 07b9dade4d8..6d5b8bdb0db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -25,7 +25,7 @@ import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.xpack.core.ml.job.persistence.JobDataDeleter; +import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.job.JobManager; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index d361bb21112..f84c23db5ef 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -9,7 +9,6 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; @@ -34,7 +33,6 @@ import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; -import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; @@ -47,7 +45,6 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.xpack.core.ml.job.persistence.JobStorageDeletionTask; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -489,64 +486,6 @@ public class JobManager extends AbstractComponent { } } - public void deleteJob(DeleteJobAction.Request request, JobStorageDeletionTask task, - ActionListener actionListener) { - - String jobId = request.getJobId(); - logger.debug("Deleting job '" + jobId + "'"); - - // Step 4. When the job has been removed from the cluster state, return a response - // ------- - CheckedConsumer apiResponseHandler = jobDeleted -> { - if (jobDeleted) { - logger.info("Job [" + jobId + "] deleted"); - auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DELETED)); - actionListener.onResponse(new AcknowledgedResponse(true)); - } else { - actionListener.onResponse(new AcknowledgedResponse(false)); - } - }; - - // Step 3. When the physical storage has been deleted, remove from Cluster State - // ------- - CheckedConsumer deleteJobStateHandler = response -> clusterService.submitStateUpdateTask("delete-job-" + jobId, - new AckedClusterStateUpdateTask(request, ActionListener.wrap(apiResponseHandler, actionListener::onFailure)) { - - @Override - protected Boolean newResponse(boolean acknowledged) { - return acknowledged && response; - } - - @Override - public ClusterState execute(ClusterState currentState) { - MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState); - if (currentMlMetadata.getJobs().containsKey(jobId) == false) { - // We wouldn't have got here if the job never existed so - // the Job must have been deleted by another action. - // Don't error in this case - return currentState; - } - - MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); - builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); - return buildNewClusterState(currentState, builder); - } - }); - - - // Step 2. Remove the job from any calendars - CheckedConsumer removeFromCalendarsHandler = response -> { - jobResultsProvider.removeJobFromCalendars(jobId, ActionListener.wrap(deleteJobStateHandler::accept, - actionListener::onFailure )); - }; - - - // Step 1. Delete the physical storage - - // This task manages the physical deletion of the job state and results - task.delete(jobId, client, clusterService.state(), removeFromCalendarsHandler, actionListener::onFailure); - } - public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionListener actionListener, ModelSnapshot modelSnapshot) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java index d50a7c3f8c2..e8655548592 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java @@ -16,7 +16,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; -import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils; +import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import java.util.ArrayDeque; import java.util.Collections; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDataDeleter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java similarity index 97% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDataDeleter.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index cc86ce17bb9..1e0825d14f9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDataDeleter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.core.ml.job.persistence; +package org.elasticsearch.xpack.ml.job.persistence; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; @@ -21,6 +21,8 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState; import org.elasticsearch.xpack.core.ml.job.results.Result; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 5f0043b8645..233a2b4078a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; -import org.elasticsearch.xpack.core.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java index 09a0a25cc4d..9338d24dd68 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java @@ -64,8 +64,8 @@ import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; -import org.elasticsearch.search.aggregations.metrics.Stats; import org.elasticsearch.search.aggregations.metrics.ExtendedStats; +import org.elasticsearch.search.aggregations.metrics.Stats; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilders; @@ -99,11 +99,11 @@ import org.elasticsearch.xpack.core.ml.stats.CountAccumulator; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils; import org.elasticsearch.xpack.core.security.support.Exceptions; import org.elasticsearch.xpack.ml.job.categorization.GrokPatternCreator; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; +import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import java.io.IOException; import java.io.InputStream; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndicesUtils.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlIndicesUtils.java similarity index 93% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndicesUtils.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlIndicesUtils.java index c916b6664d2..63557c3bf23 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndicesUtils.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlIndicesUtils.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.core.ml.utils; +package org.elasticsearch.xpack.ml.utils; import org.elasticsearch.action.support.IndicesOptions; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNamesTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNamesTests.java index 2fa4834d1ec..a08b53fba3c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNamesTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNamesTests.java @@ -11,11 +11,10 @@ import org.elasticsearch.xpack.core.ml.job.results.ReservedFieldNames; public class ReservedFieldNamesTests extends ESTestCase { - public void testIsValidFieldName() throws Exception { + public void testIsValidFieldName() { assertTrue(ReservedFieldNames.isValidFieldName("host")); assertTrue(ReservedFieldNames.isValidFieldName("host.actual")); assertFalse(ReservedFieldNames.isValidFieldName("actual.host")); assertFalse(ReservedFieldNames.isValidFieldName(AnomalyRecord.BUCKET_SPAN.getPreferredName())); } - } \ No newline at end of file diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java index 31d6312f662..b020109ca29 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.test.rest; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; - import org.apache.http.HttpStatus; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.client.Request;