[ML] Add force delete job option (elastic/x-pack-elasticsearch#1612)

* Add force delete job option

* Can’t kill a process on a 5.4 node

* Address review comments

* Rename KillAutodetectAction -> KillProcessAction

* Review comments

* Cancelling task is superfluous after it has been killed

* Update docs

* Revert "Cancelling task is superfluous after it has been killed"

This reverts commit 576950e2e1ee095b38174d8b71de353c082ae953.

* Remove unnecessary TODOs and logic that doesn't alwasys force close

Original commit: elastic/x-pack-elasticsearch@f8c8b38217
This commit is contained in:
David Kyle 2017-06-06 09:41:33 +01:00 committed by GitHub
parent a12b384906
commit ce0315abc4
19 changed files with 539 additions and 92 deletions

View File

@ -20,7 +20,8 @@ IMPORTANT: Deleting a job must be done via this API only. Do not delete the
privileges are granted to anyone over the `.ml-*` indices.
Before you can delete a job, you must delete the {dfeeds} that are associated
with it. See <<ml-delete-datafeed,Delete {dfeeds-cap}>>.
with it. See <<ml-delete-datafeed,Delete {dfeeds-cap}>>. Unless the `force` parameter
is used the job must be closed before it can be deleted.
It is not currently possible to delete multiple jobs using wildcards or a comma
separated list.
@ -30,6 +31,12 @@ separated list.
`job_id` (required)::
(string) Identifier for the job
===== Query Parameters
`force`::
(boolean) Use to forcefully delete an opened job; this method is quicker than
closing and deleting the job.
===== Authorization

View File

@ -15,7 +15,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
@ -54,6 +53,7 @@ import org.elasticsearch.xpack.ml.action.GetJobsAction;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.ml.action.KillProcessAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.PreviewDatafeedAction;
@ -398,6 +398,7 @@ public class MachineLearning implements ActionPlugin {
new ActionHandler<>(GetFiltersAction.INSTANCE, GetFiltersAction.TransportAction.class),
new ActionHandler<>(PutFilterAction.INSTANCE, PutFilterAction.TransportAction.class),
new ActionHandler<>(DeleteFilterAction.INSTANCE, DeleteFilterAction.TransportAction.class),
new ActionHandler<>(KillProcessAction.INSTANCE, KillProcessAction.TransportAction.class),
new ActionHandler<>(GetBucketsAction.INSTANCE, GetBucketsAction.TransportAction.class),
new ActionHandler<>(GetInfluencersAction.INSTANCE, GetInfluencersAction.TransportAction.class),
new ActionHandler<>(GetRecordsAction.INSTANCE, GetRecordsAction.TransportAction.class),

View File

@ -348,7 +348,7 @@ public class MlMetadata implements MetaData.Custom {
return new MlMetadata(jobs, datafeeds);
}
public void markJobAsDeleted(String jobId, PersistentTasksCustomMetaData tasks) {
public void markJobAsDeleted(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) {
Job job = jobs.get(jobId);
if (job == null) {
throw ExceptionsHelper.missingJobException(jobId);
@ -357,19 +357,27 @@ public class MlMetadata implements MetaData.Custom {
// Job still exists
return;
}
checkJobHasNoDatafeed(jobId);
if (allowDeleteOpenJob == false) {
PersistentTask<?> jobTask = getJobTask(jobId, tasks);
if (jobTask != null) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is "
+ ((JobTaskStatus) jobTask.getStatus()).getState());
}
}
Job.Builder jobBuilder = new Job.Builder(job);
jobBuilder.setDeleted(true);
putJob(jobBuilder.build(), true);
}
public void checkJobHasNoDatafeed(String jobId) {
Optional<DatafeedConfig> datafeed = getDatafeedByJobId(jobId);
if (datafeed.isPresent()) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed ["
+ datafeed.get().getId() + "] refers to it");
}
PersistentTask<?> jobTask = getJobTask(jobId, tasks);
if (jobTask != null) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is "
+ ((JobTaskStatus) jobTask.getStatus()).getState());
}
Job.Builder jobBuilder = new Job.Builder(job);
jobBuilder.setDeleted(true);
putJob(jobBuilder.build(), true);
}
}

View File

@ -5,6 +5,9 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
@ -14,10 +17,13 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -27,10 +33,14 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException;
import java.util.Objects;
@ -57,6 +67,7 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
public static class Request extends AcknowledgedRequest<Request> {
private String jobId;
private boolean force;
public Request(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
@ -72,6 +83,14 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
this.jobId = jobId;
}
public boolean isForce() {
return force;
}
public void setForce(boolean force) {
this.force = force;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -86,17 +105,23 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
if (in.getVersion().onOrAfter(Version.V_5_5_0)) {
force = in.readBoolean();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
if (out.getVersion().onOrAfter(Version.V_5_5_0)) {
out.writeBoolean(force);
}
}
@Override
public int hashCode() {
return Objects.hash(jobId);
return Objects.hash(jobId, force);
}
@Override
@ -108,9 +133,8 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
return false;
}
DeleteJobAction.Request other = (DeleteJobAction.Request) obj;
return Objects.equals(jobId, other.jobId);
return Objects.equals(jobId, other.jobId) && Objects.equals(force, other.force);
}
}
static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
@ -143,15 +167,19 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final InternalClient internalClient;
private final JobManager jobManager;
private final PersistentTasksService persistentTasksService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager) {
JobManager jobManager, PersistentTasksService persistentTasksService, InternalClient internalClient) {
super(settings, DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.internalClient = internalClient;
this.jobManager = jobManager;
this.persistentTasksService = persistentTasksService;
}
@Override
@ -166,7 +194,18 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
jobManager.deleteJob(request, (JobStorageDeletionTask) task, listener);
ActionListener<Boolean> markAsDeletingListener = ActionListener.wrap(
response -> {
if (request.isForce()) {
forceDeleteJob(request, (JobStorageDeletionTask) task, listener);
} else {
normalDeleteJob(request, (JobStorageDeletionTask) task, listener);
}
},
listener::onFailure);
markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce());
}
@Override
@ -179,6 +218,109 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
private void normalDeleteJob(Request request, JobStorageDeletionTask task, ActionListener<Response> listener) {
jobManager.deleteJob(request, task, listener);
}
private void forceDeleteJob(Request request, JobStorageDeletionTask task, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
final String jobId = request.getJobId();
// 3. Delete the job
ActionListener<Boolean> removeTaskListener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean response) {
jobManager.deleteJob(request, task, listener);
}
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceNotFoundException) {
jobManager.deleteJob(request, task, listener);
} else {
listener.onFailure(e);
}
}
};
// 2. Cancel the persistent task. This closes the process gracefully so
// the process should be killed first.
ActionListener<KillProcessAction.Response> killJobListener = ActionListener.wrap(
response -> {
removePersistentTask(request.getJobId(), state, removeTaskListener);
},
e -> {
if (e instanceof ElasticsearchStatusException) {
// Killing the process marks the task as completed so it
// may have disappeared when we get here
removePersistentTask(request.getJobId(), state, removeTaskListener);
} else {
listener.onFailure(e);
}
}
);
// 1. Kill the job's process
killProcess(jobId, killJobListener);
}
private void killProcess(String jobId, ActionListener<KillProcessAction.Response> listener) {
KillProcessAction.Request killRequest = new KillProcessAction.Request(jobId);
internalClient.execute(KillProcessAction.INSTANCE, killRequest, listener);
}
private void removePersistentTask(String jobId, ClusterState currentState,
ActionListener<Boolean> listener) {
PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask == null) {
listener.onResponse(null);
} else {
persistentTasksService.cancelPersistentTask(jobTask.getId(),
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
listener.onResponse(Boolean.TRUE);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
void markJobAsDeleting(String jobId, ActionListener<Boolean> listener, boolean force) {
clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata currentMlMetadata = currentState.metaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
builder.markJobAsDeleted(jobId, tasks, force);
return buildNewClusterState(currentState, builder);
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
logger.debug("Job [" + jobId + "] is successfully marked as deleted");
listener.onResponse(true);
}
});
}
private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) {
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build());
return newState.build();
}
}
}

View File

@ -230,7 +230,7 @@ public class GetBucketsAction extends Action<GetBucketsAction.Request, GetBucket
end = in.readOptionalString();
anomalyScore = in.readOptionalDouble();
pageParams = in.readOptionalWriteable(PageParams::new);
if (in.getVersion().after(Version.V_5_4_0)) {
if (in.getVersion().onOrAfter(Version.V_5_5_0)) {
sort = in.readString();
descending = in.readBoolean();
}
@ -247,7 +247,7 @@ public class GetBucketsAction extends Action<GetBucketsAction.Request, GetBucket
out.writeOptionalString(end);
out.writeOptionalDouble(anomalyScore);
out.writeOptionalWriteable(pageParams);
if (out.getVersion().after(Version.V_5_4_0)) {
if (out.getVersion().onOrAfter(Version.V_5_5_0)) {
out.writeString(sort);
out.writeBoolean(descending);
}

View File

@ -0,0 +1,183 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import java.io.IOException;
import java.util.Objects;
public class KillProcessAction extends Action<KillProcessAction.Request, KillProcessAction.Response,
KillProcessAction.RequestBuilder> {
public static final KillProcessAction INSTANCE = new KillProcessAction();
public static final String NAME = "cluster:internal/xpack/ml/job/kill/process";
private KillProcessAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
RequestBuilder(ElasticsearchClient client, KillProcessAction action) {
super(client, action, new Request());
}
}
public static class Request extends TransportJobTaskAction.JobTaskRequest<Request> {
public Request(String jobId) {
super(jobId);
}
Request() {
super();
}
}
public static class Response extends BaseTasksResponse implements Writeable {
private boolean killed;
Response() {
}
Response(StreamInput in) throws IOException {
readFrom(in);
}
Response(boolean killed) {
super(null, null);
this.killed = killed;
}
public boolean isKilled() {
return killed;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
killed = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(killed);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return killed == response.killed;
}
@Override
public int hashCode() {
return Objects.hash(killed);
}
}
public static class TransportAction extends TransportJobTaskAction<Request, Response> {
private final Auditor auditor;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutodetectProcessManager processManager, Auditor auditor) {
super(settings, NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, Response::new, ThreadPool.Names.SAME, processManager);
// ThreadPool.Names.SAME
this.auditor = auditor;
}
@Override
protected void taskOperation(Request request, OpenJobAction.JobTask jobTask, ActionListener<Response> listener) {
logger.info("[{}] Killing job", jobTask.getJobId());
auditor.info(jobTask.getJobId(), Messages.JOB_AUDIT_KILLING);
try {
processManager.killProcess(jobTask, true);
listener.onResponse(new Response(true));
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
DiscoveryNodes nodes = clusterService.state().nodes();
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(request.getJobId(), tasks);
if (jobTask == null || jobTask.getExecutorNode() == null) {
logger.debug("[{}] Cannot kill the process because job is not open", request.getJobId());
listener.onResponse(new Response(false));
return;
}
DiscoveryNode executorNode = nodes.get(jobTask.getExecutorNode());
if (executorNode == null) {
listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot kill process for job {} as" +
"executor node {} cannot be found", request.getJobId(), jobTask.getExecutorNode()));
return;
}
Version nodeVersion = executorNode.getVersion();
if (nodeVersion.before(Version.V_5_5_0)) {
listener.onFailure(new ElasticsearchException("Cannot kill the process on node with version " + nodeVersion));
return;
}
super.doExecute(task, request, listener);
}
@Override
protected Response readTaskResponse(StreamInput in) throws IOException {
return new Response(in);
}
}
}

View File

@ -314,47 +314,11 @@ public class JobManager extends AbstractComponent {
}
});
// Step 1. When the job has been marked as deleted then begin deleting the physical storage
// -------
CheckedConsumer<Boolean, Exception> updateHandler = response -> {
// Successfully updated the status to DELETING, begin actually deleting
if (response) {
logger.info("Job [" + jobId + "] is successfully marked as deleted");
} else {
logger.warn("Job [" + jobId + "] marked as deleted wan't acknowledged");
}
// Step 1. Delete the physical storage
// This task manages the physical deletion of the job (removing the results, then the index)
task.delete(jobId, client, clusterService.state(),
deleteJobStateHandler::accept, actionListener::onFailure);
};
// This task manages the physical deletion of the job state and results
task.delete(jobId, client, clusterService.state(), deleteJobStateHandler::accept, actionListener::onFailure);
// Step 0. Kick off the chain of callbacks with the initial UpdateStatus call
// -------
clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata currentMlMetadata = currentState.metaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
builder.markJobAsDeleted(jobId, tasks);
return buildNewClusterState(currentState, builder);
}
@Override
public void onFailure(String source, Exception e) {
actionListener.onFailure(e);
}
@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
try {
updateHandler.accept(true);
} catch (Exception e) {
actionListener.onFailure(e);
}
}
});
}
public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionListener<RevertModelSnapshotAction.Response> actionListener,

View File

@ -49,6 +49,7 @@ public final class Messages {
public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time";
public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped";
public static final String JOB_AUDIT_DELETED = "Job deleted";
public static final String JOB_AUDIT_KILLING = "Killing job";
public static final String JOB_AUDIT_OLD_RESULTS_DELETED = "Deleted results prior to {1}";
public static final String JOB_AUDIT_REVERTED = "Job model snapshot reverted to ''{0}''";
public static final String JOB_AUDIT_SNAPSHOT_DELETED = "Model snapshot [{0}] with description ''{1}'' deleted";

View File

@ -40,6 +40,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -54,20 +55,20 @@ public class AutodetectCommunicator implements Closeable {
private final DataCountsReporter dataCountsReporter;
private final AutodetectProcess autodetectProcess;
private final AutoDetectResultProcessor autoDetectResultProcessor;
private final Consumer<Exception> handler;
private final Consumer<Exception> onFinishHandler;
private final ExecutorService autodetectWorkerExecutor;
private final NamedXContentRegistry xContentRegistry;
private volatile boolean processKilled;
AutodetectCommunicator(Job job, JobTask jobTask, AutodetectProcess process, DataCountsReporter dataCountsReporter,
AutoDetectResultProcessor autoDetectResultProcessor, Consumer<Exception> handler,
AutoDetectResultProcessor autoDetectResultProcessor, Consumer<Exception> onFinishHandler,
NamedXContentRegistry xContentRegistry, ExecutorService autodetectWorkerExecutor) {
this.job = job;
this.jobTask = jobTask;
this.autodetectProcess = process;
this.dataCountsReporter = dataCountsReporter;
this.autoDetectResultProcessor = autoDetectResultProcessor;
this.handler = handler;
this.onFinishHandler = onFinishHandler;
this.xContentRegistry = xContentRegistry;
this.autodetectWorkerExecutor = autodetectWorkerExecutor;
}
@ -124,14 +125,14 @@ public class AutodetectCommunicator implements Closeable {
* @param restart Whether the job should be restarted by persistent tasks
* @param reason The reason for closing the job
*/
public void close(boolean restart, String reason) throws IOException {
public void close(boolean restart, String reason) {
Future<?> future = autodetectWorkerExecutor.submit(() -> {
checkProcessIsAlive();
try {
autodetectProcess.close();
autoDetectResultProcessor.awaitCompletion();
} finally {
handler.accept(restart ? new ElasticsearchException(reason) : null);
onFinishHandler.accept(restart ? new ElasticsearchException(reason) : null);
}
LOGGER.info("[{}] job closed", job.getId());
return null;
@ -146,10 +147,25 @@ public class AutodetectCommunicator implements Closeable {
}
}
public void killProcess() throws IOException {
processKilled = true;
autoDetectResultProcessor.setProcessKilled();
autodetectProcess.kill();
public void killProcess(boolean awaitCompletion, boolean finish) throws IOException {
try {
processKilled = true;
autoDetectResultProcessor.setProcessKilled();
autodetectProcess.kill();
autodetectWorkerExecutor.shutdown();
if (awaitCompletion) {
try {
autoDetectResultProcessor.awaitCompletion();
} catch (TimeoutException e) {
LOGGER.warn(new ParameterizedMessage("[{}] Timed out waiting for killed job", job.getId()), e);
}
}
} finally {
if (finish) {
onFinishHandler.accept(null);
}
}
}
public void writeUpdateProcessMessage(ModelPlotConfig config, List<JobUpdate.DetectorUpdate> updates,

View File

@ -134,19 +134,31 @@ public class AutodetectProcessManager extends AbstractComponent {
}
}
public void killProcess(JobTask jobTask, boolean awaitCompletion) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.remove(jobTask.getAllocationId());
if (communicator != null) {
killProcess(communicator, jobTask.getJobId(), awaitCompletion, true);
}
}
public void killAllProcessesOnThisNode() {
Iterator<AutodetectCommunicator> iter = autoDetectCommunicatorByJob.values().iterator();
while (iter.hasNext()) {
AutodetectCommunicator communicator = iter.next();
try {
communicator.killProcess();
iter.remove();
} catch (IOException e) {
logger.error("[{}] Failed to kill autodetect process for job", communicator.getJobTask().getJobId());
}
iter.remove();
killProcess(communicator, communicator.getJobTask().getJobId(), false, false);
}
}
private void killProcess(AutodetectCommunicator communicator, String jobId, boolean awaitCompletion, boolean finish) {
try {
communicator.killProcess(awaitCompletion, finish);
} catch (IOException e) {
logger.error("[{}] Failed to kill autodetect process for job", jobId);
}
}
/**
* Passes data to the native process.
* This is a blocking call that won't return until all the data has been

View File

@ -115,11 +115,10 @@ public class AutoDetectResultProcessor {
}
try {
context.bulkResultsPersister.executeRequest();
} catch (Exception e) {
if (processKilled) {
throw e;
if (processKilled == false) {
context.bulkResultsPersister.executeRequest();
}
} catch (Exception e) {
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e);
}
@ -132,7 +131,7 @@ public class AutoDetectResultProcessor {
// that it would have been better to close jobs before shutting down,
// but we now fully expect jobs to move between nodes without doing
// all their graceful close activities.
LOGGER.warn("[{}] some results not processed due to node shutdown", jobId);
LOGGER.warn("[{}] some results not processed due to the process being killed", jobId);
} else {
// We should only get here if the iterator throws in which
// case parsing the autodetect output has failed.
@ -160,6 +159,10 @@ public class AutoDetectResultProcessor {
}
void processResult(Context context, AutodetectResult result) {
if (processKilled) {
return;
}
Bucket bucket = result.getBucket();
if (bucket != null) {
if (context.deleteInterimRequired) {

View File

@ -12,6 +12,7 @@ import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.job.config.Job;
@ -33,6 +34,7 @@ public class RestDeleteJobAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
DeleteJobAction.Request deleteJobRequest = new DeleteJobAction.Request(restRequest.param(Job.ID.getPreferredName()));
deleteJobRequest.setForce(restRequest.paramAsBoolean(CloseJobAction.Request.FORCE.getPreferredName(), deleteJobRequest.isForce()));
return channel -> client.execute(DeleteJobAction.INSTANCE, deleteJobRequest, new AcknowledgedRestListener<>(channel));
}
}

View File

@ -12,7 +12,9 @@ public class DeleteJobRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLengthBetween(1, 20));
Request request = new Request(randomAlphaOfLengthBetween(1, 20));
request.setForce(randomBoolean());
return request;
}
@Override

View File

@ -31,12 +31,16 @@ import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -110,13 +114,26 @@ public class AutodetectCommunicatorTests extends ESTestCase {
Mockito.verify(process).close();
}
public void testKill() throws IOException {
public void testKill() throws IOException, TimeoutException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
communicator.killProcess();
ExecutorService executorService = mock(ExecutorService.class);
AtomicBoolean finishCalled = new AtomicBoolean(false);
AutodetectCommunicator communicator = createAutodetectCommunicator(executorService, process, resultProcessor,
e -> finishCalled.set(true));
boolean awaitCompletion = randomBoolean();
boolean finish = randomBoolean();
communicator.killProcess(awaitCompletion, finish);
Mockito.verify(resultProcessor).setProcessKilled();
Mockito.verify(process).kill();
Mockito.verify(executorService).shutdown();
if (awaitCompletion) {
Mockito.verify(resultProcessor).awaitCompletion();
} else {
Mockito.verify(resultProcessor, never()).awaitCompletion();
}
assertEquals(finish, finishCalled.get());
}
private Job createJobDetails() {
@ -140,6 +157,21 @@ public class AutodetectCommunicatorTests extends ESTestCase {
return process;
}
private AutodetectCommunicator createAutodetectCommunicator(ExecutorService executorService, AutodetectProcess autodetectProcess,
AutoDetectResultProcessor autoDetectResultProcessor,
Consumer<Exception> finishHandler) throws IOException {
DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class);
doAnswer(invocation -> {
((ActionListener<Boolean>) invocation.getArguments()[0]).onResponse(true);
return null;
}).when(dataCountsReporter).finishReporting(any());
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
return new AutodetectCommunicator(createJobDetails(), jobTask, autodetectProcess,
dataCountsReporter, autoDetectResultProcessor, finishHandler,
new NamedXContentRegistry(Collections.emptyList()), executorService);
}
private AutodetectCommunicator createAutodetectCommunicator(AutodetectProcess autodetectProcess,
AutoDetectResultProcessor autoDetectResultProcessor) throws IOException {
ExecutorService executorService = mock(ExecutorService.class);
@ -153,16 +185,8 @@ public class AutodetectCommunicatorTests extends ESTestCase {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class);
doAnswer(invocation -> {
((ActionListener<Boolean>) invocation.getArguments()[0]).onResponse(true);
return null;
}).when(dataCountsReporter).finishReporting(any());
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
return new AutodetectCommunicator(createJobDetails(), jobTask, autodetectProcess,
dataCountsReporter, autoDetectResultProcessor, e -> {
}, new NamedXContentRegistry(Collections.emptyList()), executorService);
return createAutodetectCommunicator(executorService, autodetectProcess, autoDetectResultProcessor, e -> {});
}
}

View File

@ -383,6 +383,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
assertFalse(manager.jobHasActiveAutodetectProcess(jobTask));
when(communicator.getJobTask()).thenReturn(jobTask);
manager.openJob(jobTask, false, e -> {});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
@ -392,7 +393,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
manager.killAllProcessesOnThisNode();
verify(communicator).killProcess();
verify(communicator).killProcess(false, false);
}
public void testProcessData_GivenStateNotOpened() throws IOException {

View File

@ -363,6 +363,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verify(persister, never()).commitResultWrites(JOB_ID);
verify(persister, never()).commitStateWrites(JOB_ID);
verify(renormalizer, never()).renormalize(any());
verify(renormalizer).shutdown();
verify(renormalizer, never()).waitUntilIdle();
verify(flushListener, times(1)).clear();
}

View File

@ -130,6 +130,7 @@ cluster:admin/xpack/ml/job/model_snapshots/revert
cluster:admin/xpack/ml/datafeeds/delete
cluster:admin/xpack/ml/job/data/post
cluster:admin/xpack/ml/job/close
cluster:internal/xpack/ml/job/kill/process
cluster:admin/xpack/ml/filters/put
cluster:admin/xpack/ml/job/put
cluster:monitor/xpack/ml/job/get

View File

@ -10,6 +10,13 @@
"required": true,
"description": "The ID of the job to delete"
}
},
"params": {
"force": {
"type": "boolean",
"required": false,
"description": "True if the job should be forcefully deleted"
}
}
},
"body": null

View File

@ -0,0 +1,72 @@
setup:
- do:
xpack.ml.put_job:
job_id: force-delete-job
body: >
{
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"data_description" : {
}
}
---
"Test force delete a closed job":
- do:
xpack.ml.delete_job:
force: true
job_id: force-delete-job
- match: { acknowledged: true }
- do:
xpack.ml.get_jobs:
job_id: "_all"
- match: { count: 0 }
---
"Test force delete an open job":
- do:
xpack.ml.open_job:
job_id: force-delete-job
- do:
xpack.ml.delete_job:
force: true
job_id: force-delete-job
- match: { acknowledged: true }
- do:
xpack.ml.get_jobs:
job_id: "_all"
- match: { count: 0 }
---
"Test can't force delete an inexistent job":
- do:
catch: /resource_not_found_exception/
xpack.ml.delete_job:
force: true
job_id: inexistent-job
---
"Test force delete job that is referred by a datafeed":
- do:
xpack.ml.put_datafeed:
datafeed_id: force-delete-job-datafeed
body: >
{
"job_id":"force-delete-job",
"indexes":["index-foo"],
"types":["type-bar"]
}
- match: { datafeed_id: force-delete-job-datafeed }
- do:
catch: /Cannot delete job \[force-delete-job\] because datafeed \[force-delete-job-datafeed\] refers to it/
xpack.ml.delete_job:
job_id: force-delete-job