From ce0315abc4e4592e0b4096409cd2da237214889c Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 6 Jun 2017 09:41:33 +0100 Subject: [PATCH] [ML] Add force delete job option (elastic/x-pack-elasticsearch#1612) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add force delete job option * Can’t kill a process on a 5.4 node * Address review comments * Rename KillAutodetectAction -> KillProcessAction * Review comments * Cancelling task is superfluous after it has been killed * Update docs * Revert "Cancelling task is superfluous after it has been killed" This reverts commit 576950e2e1ee095b38174d8b71de353c082ae953. * Remove unnecessary TODOs and logic that doesn't alwasys force close Original commit: elastic/x-pack-elasticsearch@f8c8b38217c5608c4eb9206e30a77f6d7db1eb74 --- docs/en/rest-api/ml/delete-job.asciidoc | 9 +- .../xpack/ml/MachineLearning.java | 3 +- .../elasticsearch/xpack/ml/MlMetadata.java | 26 ++- .../xpack/ml/action/DeleteJobAction.java | 152 ++++++++++++++- .../xpack/ml/action/GetBucketsAction.java | 4 +- .../xpack/ml/action/KillProcessAction.java | 183 ++++++++++++++++++ .../xpack/ml/job/JobManager.java | 42 +--- .../xpack/ml/job/messages/Messages.java | 1 + .../autodetect/AutodetectCommunicator.java | 34 +++- .../autodetect/AutodetectProcessManager.java | 24 ++- .../output/AutoDetectResultProcessor.java | 13 +- .../ml/rest/job/RestDeleteJobAction.java | 2 + .../ml/action/DeleteJobRequestTests.java | 4 +- .../AutodetectCommunicatorTests.java | 50 +++-- .../AutodetectProcessManagerTests.java | 3 +- .../AutoDetectResultProcessorTests.java | 1 + .../org/elasticsearch/transport/actions | 1 + .../api/xpack.ml.delete_job.json | 7 + .../test/ml/delete_job_force.yml | 72 +++++++ 19 files changed, 539 insertions(+), 92 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/action/KillProcessAction.java create mode 100644 plugin/src/test/resources/rest-api-spec/test/ml/delete_job_force.yml diff --git a/docs/en/rest-api/ml/delete-job.asciidoc b/docs/en/rest-api/ml/delete-job.asciidoc index d97118dd7b9..e8f8280e6ae 100644 --- a/docs/en/rest-api/ml/delete-job.asciidoc +++ b/docs/en/rest-api/ml/delete-job.asciidoc @@ -20,7 +20,8 @@ IMPORTANT: Deleting a job must be done via this API only. Do not delete the privileges are granted to anyone over the `.ml-*` indices. Before you can delete a job, you must delete the {dfeeds} that are associated -with it. See <>. +with it. See <>. Unless the `force` parameter +is used the job must be closed before it can be deleted. It is not currently possible to delete multiple jobs using wildcards or a comma separated list. @@ -30,6 +31,12 @@ separated list. `job_id` (required):: (string) Identifier for the job +===== Query Parameters + +`force`:: + (boolean) Use to forcefully delete an opened job; this method is quicker than + closing and deleting the job. + ===== Authorization diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 1436dabf40d..d5e1a7b2974 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -15,7 +15,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.inject.util.Providers; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -54,6 +53,7 @@ import org.elasticsearch.xpack.ml.action.GetJobsAction; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.ml.action.GetRecordsAction; +import org.elasticsearch.xpack.ml.action.KillProcessAction; import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.PreviewDatafeedAction; @@ -398,6 +398,7 @@ public class MachineLearning implements ActionPlugin { new ActionHandler<>(GetFiltersAction.INSTANCE, GetFiltersAction.TransportAction.class), new ActionHandler<>(PutFilterAction.INSTANCE, PutFilterAction.TransportAction.class), new ActionHandler<>(DeleteFilterAction.INSTANCE, DeleteFilterAction.TransportAction.class), + new ActionHandler<>(KillProcessAction.INSTANCE, KillProcessAction.TransportAction.class), new ActionHandler<>(GetBucketsAction.INSTANCE, GetBucketsAction.TransportAction.class), new ActionHandler<>(GetInfluencersAction.INSTANCE, GetInfluencersAction.TransportAction.class), new ActionHandler<>(GetRecordsAction.INSTANCE, GetRecordsAction.TransportAction.class), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java index f6bdcd0d2be..bbcef5275ce 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java @@ -348,7 +348,7 @@ public class MlMetadata implements MetaData.Custom { return new MlMetadata(jobs, datafeeds); } - public void markJobAsDeleted(String jobId, PersistentTasksCustomMetaData tasks) { + public void markJobAsDeleted(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) { Job job = jobs.get(jobId); if (job == null) { throw ExceptionsHelper.missingJobException(jobId); @@ -357,19 +357,27 @@ public class MlMetadata implements MetaData.Custom { // Job still exists return; } + + checkJobHasNoDatafeed(jobId); + + if (allowDeleteOpenJob == false) { + PersistentTask jobTask = getJobTask(jobId, tasks); + if (jobTask != null) { + throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is " + + ((JobTaskStatus) jobTask.getStatus()).getState()); + } + } + Job.Builder jobBuilder = new Job.Builder(job); + jobBuilder.setDeleted(true); + putJob(jobBuilder.build(), true); + } + + public void checkJobHasNoDatafeed(String jobId) { Optional datafeed = getDatafeedByJobId(jobId); if (datafeed.isPresent()) { throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed [" + datafeed.get().getId() + "] refers to it"); } - PersistentTask jobTask = getJobTask(jobId, tasks); - if (jobTask != null) { - throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is " - + ((JobTaskStatus) jobTask.getStatus()).getState()); - } - Job.Builder jobBuilder = new Job.Builder(job); - jobBuilder.setDeleted(true); - putJob(jobBuilder.build(), true); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java index 24a7f8a9643..be9625166c0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java @@ -5,6 +5,9 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; @@ -14,10 +17,13 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -27,10 +33,14 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.persistent.PersistentTasksService; +import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; import java.util.Objects; @@ -57,6 +67,7 @@ public class DeleteJobAction extends Action { private String jobId; + private boolean force; public Request(String jobId) { this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); @@ -72,6 +83,14 @@ public class DeleteJobAction extends Action { @@ -143,15 +167,19 @@ public class DeleteJobAction extends Action { + private final InternalClient internalClient; private final JobManager jobManager; + private final PersistentTasksService persistentTasksService; @Inject public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - JobManager jobManager) { + JobManager jobManager, PersistentTasksService persistentTasksService, InternalClient internalClient) { super(settings, DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new); + this.internalClient = internalClient; this.jobManager = jobManager; + this.persistentTasksService = persistentTasksService; } @Override @@ -166,7 +194,18 @@ public class DeleteJobAction extends Action listener) throws Exception { - jobManager.deleteJob(request, (JobStorageDeletionTask) task, listener); + + ActionListener markAsDeletingListener = ActionListener.wrap( + response -> { + if (request.isForce()) { + forceDeleteJob(request, (JobStorageDeletionTask) task, listener); + } else { + normalDeleteJob(request, (JobStorageDeletionTask) task, listener); + } + }, + listener::onFailure); + + markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce()); } @Override @@ -179,6 +218,109 @@ public class DeleteJobAction extends Action listener) { + jobManager.deleteJob(request, task, listener); + } + private void forceDeleteJob(Request request, JobStorageDeletionTask task, ActionListener listener) { + + final ClusterState state = clusterService.state(); + final String jobId = request.getJobId(); + + // 3. Delete the job + ActionListener removeTaskListener = new ActionListener() { + @Override + public void onResponse(Boolean response) { + jobManager.deleteJob(request, task, listener); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ResourceNotFoundException) { + jobManager.deleteJob(request, task, listener); + } else { + listener.onFailure(e); + } + } + }; + + // 2. Cancel the persistent task. This closes the process gracefully so + // the process should be killed first. + ActionListener killJobListener = ActionListener.wrap( + response -> { + removePersistentTask(request.getJobId(), state, removeTaskListener); + }, + e -> { + if (e instanceof ElasticsearchStatusException) { + // Killing the process marks the task as completed so it + // may have disappeared when we get here + removePersistentTask(request.getJobId(), state, removeTaskListener); + } else { + listener.onFailure(e); + } + } + ); + + // 1. Kill the job's process + killProcess(jobId, killJobListener); + } + + private void killProcess(String jobId, ActionListener listener) { + KillProcessAction.Request killRequest = new KillProcessAction.Request(jobId); + internalClient.execute(KillProcessAction.INSTANCE, killRequest, listener); + } + + private void removePersistentTask(String jobId, ClusterState currentState, + ActionListener listener) { + PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + + PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata.getJobTask(jobId, tasks); + if (jobTask == null) { + listener.onResponse(null); + } else { + persistentTasksService.cancelPersistentTask(jobTask.getId(), + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + listener.onResponse(Boolean.TRUE); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + } + + void markJobAsDeleting(String jobId, ActionListener listener, boolean force) { + clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + MlMetadata currentMlMetadata = currentState.metaData().custom(MlMetadata.TYPE); + PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); + builder.markJobAsDeleted(jobId, tasks, force); + return buildNewClusterState(currentState, builder); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + logger.debug("Job [" + jobId + "] is successfully marked as deleted"); + listener.onResponse(true); + } + }); + } + + private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) { + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()); + return newState.build(); + } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetBucketsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetBucketsAction.java index 3a354b8eabd..30b8227ac8c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetBucketsAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetBucketsAction.java @@ -230,7 +230,7 @@ public class GetBucketsAction extends Action { + + public static final KillProcessAction INSTANCE = new KillProcessAction(); + public static final String NAME = "cluster:internal/xpack/ml/job/kill/process"; + + private KillProcessAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + static class RequestBuilder extends ActionRequestBuilder { + + RequestBuilder(ElasticsearchClient client, KillProcessAction action) { + super(client, action, new Request()); + } + } + + public static class Request extends TransportJobTaskAction.JobTaskRequest { + + public Request(String jobId) { + super(jobId); + } + + Request() { + super(); + } + } + + public static class Response extends BaseTasksResponse implements Writeable { + + private boolean killed; + + Response() { + } + + Response(StreamInput in) throws IOException { + readFrom(in); + } + + Response(boolean killed) { + super(null, null); + this.killed = killed; + } + + public boolean isKilled() { + return killed; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + killed = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(killed); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return killed == response.killed; + } + + @Override + public int hashCode() { + return Objects.hash(killed); + } + } + + public static class TransportAction extends TransportJobTaskAction { + + private final Auditor auditor; + + @Inject + public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + AutodetectProcessManager processManager, Auditor auditor) { + super(settings, NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, + Request::new, Response::new, ThreadPool.Names.SAME, processManager); + // ThreadPool.Names.SAME + + this.auditor = auditor; + } + + @Override + protected void taskOperation(Request request, OpenJobAction.JobTask jobTask, ActionListener listener) { + logger.info("[{}] Killing job", jobTask.getJobId()); + auditor.info(jobTask.getJobId(), Messages.JOB_AUDIT_KILLING); + + try { + processManager.killProcess(jobTask, true); + listener.onResponse(new Response(true)); + } catch (Exception e) { + listener.onFailure(e); + } + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + DiscoveryNodes nodes = clusterService.state().nodes(); + PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata.getJobTask(request.getJobId(), tasks); + if (jobTask == null || jobTask.getExecutorNode() == null) { + logger.debug("[{}] Cannot kill the process because job is not open", request.getJobId()); + listener.onResponse(new Response(false)); + return; + } + + DiscoveryNode executorNode = nodes.get(jobTask.getExecutorNode()); + if (executorNode == null) { + listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot kill process for job {} as" + + "executor node {} cannot be found", request.getJobId(), jobTask.getExecutorNode())); + return; + } + + Version nodeVersion = executorNode.getVersion(); + if (nodeVersion.before(Version.V_5_5_0)) { + listener.onFailure(new ElasticsearchException("Cannot kill the process on node with version " + nodeVersion)); + return; + } + + super.doExecute(task, request, listener); + } + + + @Override + protected Response readTaskResponse(StreamInput in) throws IOException { + return new Response(in); + } + } +} \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 25c8598dc7d..c79f07da170 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -314,47 +314,11 @@ public class JobManager extends AbstractComponent { } }); - // Step 1. When the job has been marked as deleted then begin deleting the physical storage - // ------- - CheckedConsumer updateHandler = response -> { - // Successfully updated the status to DELETING, begin actually deleting - if (response) { - logger.info("Job [" + jobId + "] is successfully marked as deleted"); - } else { - logger.warn("Job [" + jobId + "] marked as deleted wan't acknowledged"); - } + // Step 1. Delete the physical storage - // This task manages the physical deletion of the job (removing the results, then the index) - task.delete(jobId, client, clusterService.state(), - deleteJobStateHandler::accept, actionListener::onFailure); - }; + // This task manages the physical deletion of the job state and results + task.delete(jobId, client, clusterService.state(), deleteJobStateHandler::accept, actionListener::onFailure); - // Step 0. Kick off the chain of callbacks with the initial UpdateStatus call - // ------- - clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - MlMetadata currentMlMetadata = currentState.metaData().custom(MlMetadata.TYPE); - PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); - builder.markJobAsDeleted(jobId, tasks); - return buildNewClusterState(currentState, builder); - } - - @Override - public void onFailure(String source, Exception e) { - actionListener.onFailure(e); - } - - @Override - public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { - try { - updateHandler.accept(true); - } catch (Exception e) { - actionListener.onFailure(e); - } - } - }); } public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionListener actionListener, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java index a75117489d2..ff96f3b78fe 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java @@ -49,6 +49,7 @@ public final class Messages { public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time"; public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped"; public static final String JOB_AUDIT_DELETED = "Job deleted"; + public static final String JOB_AUDIT_KILLING = "Killing job"; public static final String JOB_AUDIT_OLD_RESULTS_DELETED = "Deleted results prior to {1}"; public static final String JOB_AUDIT_REVERTED = "Job model snapshot reverted to ''{0}''"; public static final String JOB_AUDIT_SNAPSHOT_DELETED = "Model snapshot [{0}] with description ''{1}'' deleted"; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index e559a550a9c..55e2f2cedeb 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -40,6 +40,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -54,20 +55,20 @@ public class AutodetectCommunicator implements Closeable { private final DataCountsReporter dataCountsReporter; private final AutodetectProcess autodetectProcess; private final AutoDetectResultProcessor autoDetectResultProcessor; - private final Consumer handler; + private final Consumer onFinishHandler; private final ExecutorService autodetectWorkerExecutor; private final NamedXContentRegistry xContentRegistry; private volatile boolean processKilled; AutodetectCommunicator(Job job, JobTask jobTask, AutodetectProcess process, DataCountsReporter dataCountsReporter, - AutoDetectResultProcessor autoDetectResultProcessor, Consumer handler, + AutoDetectResultProcessor autoDetectResultProcessor, Consumer onFinishHandler, NamedXContentRegistry xContentRegistry, ExecutorService autodetectWorkerExecutor) { this.job = job; this.jobTask = jobTask; this.autodetectProcess = process; this.dataCountsReporter = dataCountsReporter; this.autoDetectResultProcessor = autoDetectResultProcessor; - this.handler = handler; + this.onFinishHandler = onFinishHandler; this.xContentRegistry = xContentRegistry; this.autodetectWorkerExecutor = autodetectWorkerExecutor; } @@ -124,14 +125,14 @@ public class AutodetectCommunicator implements Closeable { * @param restart Whether the job should be restarted by persistent tasks * @param reason The reason for closing the job */ - public void close(boolean restart, String reason) throws IOException { + public void close(boolean restart, String reason) { Future future = autodetectWorkerExecutor.submit(() -> { checkProcessIsAlive(); try { autodetectProcess.close(); autoDetectResultProcessor.awaitCompletion(); } finally { - handler.accept(restart ? new ElasticsearchException(reason) : null); + onFinishHandler.accept(restart ? new ElasticsearchException(reason) : null); } LOGGER.info("[{}] job closed", job.getId()); return null; @@ -146,10 +147,25 @@ public class AutodetectCommunicator implements Closeable { } } - public void killProcess() throws IOException { - processKilled = true; - autoDetectResultProcessor.setProcessKilled(); - autodetectProcess.kill(); + public void killProcess(boolean awaitCompletion, boolean finish) throws IOException { + try { + processKilled = true; + autoDetectResultProcessor.setProcessKilled(); + autodetectProcess.kill(); + autodetectWorkerExecutor.shutdown(); + + if (awaitCompletion) { + try { + autoDetectResultProcessor.awaitCompletion(); + } catch (TimeoutException e) { + LOGGER.warn(new ParameterizedMessage("[{}] Timed out waiting for killed job", job.getId()), e); + } + } + } finally { + if (finish) { + onFinishHandler.accept(null); + } + } } public void writeUpdateProcessMessage(ModelPlotConfig config, List updates, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 728ed76e29d..222cc4a476f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -134,19 +134,31 @@ public class AutodetectProcessManager extends AbstractComponent { } } + public void killProcess(JobTask jobTask, boolean awaitCompletion) { + AutodetectCommunicator communicator = autoDetectCommunicatorByJob.remove(jobTask.getAllocationId()); + if (communicator != null) { + killProcess(communicator, jobTask.getJobId(), awaitCompletion, true); + } + } + public void killAllProcessesOnThisNode() { Iterator iter = autoDetectCommunicatorByJob.values().iterator(); while (iter.hasNext()) { AutodetectCommunicator communicator = iter.next(); - try { - communicator.killProcess(); - iter.remove(); - } catch (IOException e) { - logger.error("[{}] Failed to kill autodetect process for job", communicator.getJobTask().getJobId()); - } + iter.remove(); + killProcess(communicator, communicator.getJobTask().getJobId(), false, false); } } + private void killProcess(AutodetectCommunicator communicator, String jobId, boolean awaitCompletion, boolean finish) { + try { + communicator.killProcess(awaitCompletion, finish); + } catch (IOException e) { + logger.error("[{}] Failed to kill autodetect process for job", jobId); + } + } + + /** * Passes data to the native process. * This is a blocking call that won't return until all the data has been diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index be2d47fdf25..99d389977af 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -115,11 +115,10 @@ public class AutoDetectResultProcessor { } try { - context.bulkResultsPersister.executeRequest(); - } catch (Exception e) { - if (processKilled) { - throw e; + if (processKilled == false) { + context.bulkResultsPersister.executeRequest(); } + } catch (Exception e) { LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e); } @@ -132,7 +131,7 @@ public class AutoDetectResultProcessor { // that it would have been better to close jobs before shutting down, // but we now fully expect jobs to move between nodes without doing // all their graceful close activities. - LOGGER.warn("[{}] some results not processed due to node shutdown", jobId); + LOGGER.warn("[{}] some results not processed due to the process being killed", jobId); } else { // We should only get here if the iterator throws in which // case parsing the autodetect output has failed. @@ -160,6 +159,10 @@ public class AutoDetectResultProcessor { } void processResult(Context context, AutodetectResult result) { + if (processKilled) { + return; + } + Bucket bucket = result.getBucket(); if (bucket != null) { if (context.deleteInterimRequired) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestDeleteJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestDeleteJobAction.java index 0fc0cfeb1a9..a3b0582c9cc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestDeleteJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestDeleteJobAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.AcknowledgedRestListener; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.job.config.Job; @@ -33,6 +34,7 @@ public class RestDeleteJobAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { DeleteJobAction.Request deleteJobRequest = new DeleteJobAction.Request(restRequest.param(Job.ID.getPreferredName())); + deleteJobRequest.setForce(restRequest.paramAsBoolean(CloseJobAction.Request.FORCE.getPreferredName(), deleteJobRequest.isForce())); return channel -> client.execute(DeleteJobAction.INSTANCE, deleteJobRequest, new AcknowledgedRestListener<>(channel)); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DeleteJobRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DeleteJobRequestTests.java index 091abfb699b..cbaba6446f6 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DeleteJobRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DeleteJobRequestTests.java @@ -12,7 +12,9 @@ public class DeleteJobRequestTests extends AbstractStreamableTestCase { @Override protected Request createTestInstance() { - return new Request(randomAlphaOfLengthBetween(1, 20)); + Request request = new Request(randomAlphaOfLengthBetween(1, 20)); + request.setForce(randomBoolean()); + return request; } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index f75525fd482..404c420ea44 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -31,12 +31,16 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -110,13 +114,26 @@ public class AutodetectCommunicatorTests extends ESTestCase { Mockito.verify(process).close(); } - public void testKill() throws IOException { + public void testKill() throws IOException, TimeoutException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); - AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); - communicator.killProcess(); + ExecutorService executorService = mock(ExecutorService.class); + + AtomicBoolean finishCalled = new AtomicBoolean(false); + AutodetectCommunicator communicator = createAutodetectCommunicator(executorService, process, resultProcessor, + e -> finishCalled.set(true)); + boolean awaitCompletion = randomBoolean(); + boolean finish = randomBoolean(); + communicator.killProcess(awaitCompletion, finish); Mockito.verify(resultProcessor).setProcessKilled(); Mockito.verify(process).kill(); + Mockito.verify(executorService).shutdown(); + if (awaitCompletion) { + Mockito.verify(resultProcessor).awaitCompletion(); + } else { + Mockito.verify(resultProcessor, never()).awaitCompletion(); + } + assertEquals(finish, finishCalled.get()); } private Job createJobDetails() { @@ -140,6 +157,21 @@ public class AutodetectCommunicatorTests extends ESTestCase { return process; } + private AutodetectCommunicator createAutodetectCommunicator(ExecutorService executorService, AutodetectProcess autodetectProcess, + AutoDetectResultProcessor autoDetectResultProcessor, + Consumer finishHandler) throws IOException { + DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class); + doAnswer(invocation -> { + ((ActionListener) invocation.getArguments()[0]).onResponse(true); + return null; + }).when(dataCountsReporter).finishReporting(any()); + JobTask jobTask = mock(JobTask.class); + when(jobTask.getJobId()).thenReturn("foo"); + return new AutodetectCommunicator(createJobDetails(), jobTask, autodetectProcess, + dataCountsReporter, autoDetectResultProcessor, finishHandler, + new NamedXContentRegistry(Collections.emptyList()), executorService); + } + private AutodetectCommunicator createAutodetectCommunicator(AutodetectProcess autodetectProcess, AutoDetectResultProcessor autoDetectResultProcessor) throws IOException { ExecutorService executorService = mock(ExecutorService.class); @@ -153,16 +185,8 @@ public class AutodetectCommunicatorTests extends ESTestCase { ((Runnable) invocation.getArguments()[0]).run(); return null; }).when(executorService).execute(any(Runnable.class)); - DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class); - doAnswer(invocation -> { - ((ActionListener) invocation.getArguments()[0]).onResponse(true); - return null; - }).when(dataCountsReporter).finishReporting(any()); - JobTask jobTask = mock(JobTask.class); - when(jobTask.getJobId()).thenReturn("foo"); - return new AutodetectCommunicator(createJobDetails(), jobTask, autodetectProcess, - dataCountsReporter, autoDetectResultProcessor, e -> { - }, new NamedXContentRegistry(Collections.emptyList()), executorService); + + return createAutodetectCommunicator(executorService, autodetectProcess, autoDetectResultProcessor, e -> {}); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 83c5489d9c4..be89b1b1dcf 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -383,6 +383,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); assertFalse(manager.jobHasActiveAutodetectProcess(jobTask)); + when(communicator.getJobTask()).thenReturn(jobTask); manager.openJob(jobTask, false, e -> {}); manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()), @@ -392,7 +393,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { manager.killAllProcessesOnThisNode(); - verify(communicator).killProcess(); + verify(communicator).killProcess(false, false); } public void testProcessData_GivenStateNotOpened() throws IOException { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 14d1c09e374..cc54eb71908 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -363,6 +363,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { verify(persister, never()).commitResultWrites(JOB_ID); verify(persister, never()).commitStateWrites(JOB_ID); verify(renormalizer, never()).renormalize(any()); + verify(renormalizer).shutdown(); verify(renormalizer, never()).waitUntilIdle(); verify(flushListener, times(1)).clear(); } diff --git a/plugin/src/test/resources/org/elasticsearch/transport/actions b/plugin/src/test/resources/org/elasticsearch/transport/actions index 8b06ffc4f3f..79b5361a060 100644 --- a/plugin/src/test/resources/org/elasticsearch/transport/actions +++ b/plugin/src/test/resources/org/elasticsearch/transport/actions @@ -130,6 +130,7 @@ cluster:admin/xpack/ml/job/model_snapshots/revert cluster:admin/xpack/ml/datafeeds/delete cluster:admin/xpack/ml/job/data/post cluster:admin/xpack/ml/job/close +cluster:internal/xpack/ml/job/kill/process cluster:admin/xpack/ml/filters/put cluster:admin/xpack/ml/job/put cluster:monitor/xpack/ml/job/get diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_job.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_job.json index 2ad1706338f..b94cda77216 100644 --- a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_job.json +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_job.json @@ -10,6 +10,13 @@ "required": true, "description": "The ID of the job to delete" } + }, + "params": { + "force": { + "type": "boolean", + "required": false, + "description": "True if the job should be forcefully deleted" + } } }, "body": null diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/delete_job_force.yml b/plugin/src/test/resources/rest-api-spec/test/ml/delete_job_force.yml new file mode 100644 index 00000000000..dbfb1b50a08 --- /dev/null +++ b/plugin/src/test/resources/rest-api-spec/test/ml/delete_job_force.yml @@ -0,0 +1,72 @@ +setup: + - do: + xpack.ml.put_job: + job_id: force-delete-job + body: > + { + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"count"}] + }, + "data_description" : { + } + } + +--- +"Test force delete a closed job": + - do: + xpack.ml.delete_job: + force: true + job_id: force-delete-job + - match: { acknowledged: true } + + - do: + xpack.ml.get_jobs: + job_id: "_all" + - match: { count: 0 } + +--- +"Test force delete an open job": + + - do: + xpack.ml.open_job: + job_id: force-delete-job + + - do: + xpack.ml.delete_job: + force: true + job_id: force-delete-job + - match: { acknowledged: true } + + - do: + xpack.ml.get_jobs: + job_id: "_all" + - match: { count: 0 } + +--- +"Test can't force delete an inexistent job": + + - do: + catch: /resource_not_found_exception/ + xpack.ml.delete_job: + force: true + job_id: inexistent-job + +--- +"Test force delete job that is referred by a datafeed": + + - do: + xpack.ml.put_datafeed: + datafeed_id: force-delete-job-datafeed + body: > + { + "job_id":"force-delete-job", + "indexes":["index-foo"], + "types":["type-bar"] + } + - match: { datafeed_id: force-delete-job-datafeed } + + - do: + catch: /Cannot delete job \[force-delete-job\] because datafeed \[force-delete-job-datafeed\] refers to it/ + xpack.ml.delete_job: + job_id: force-delete-job