From f2654b58724ee5eaca0fdc96213691cd2299114d Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 28 Mar 2017 14:55:50 +0200 Subject: [PATCH] [ML] Changed job and datafeed lifecycle management * Removed OPENING and CLOSING job states. Instead when persistent task has been created and status hasn't been set then this means we haven't yet started, when the executor changes it to STARTED we have. The coordinating node will monitor cs for a period of time until that happens and then returns or times out. * Refactored job close api to go to node running job task and close job there. * Changed unexpected job and datafeed exception messages to not mention the state and instead mention that job/datafeed haven't yet started/stopped. Original commit: elastic/x-pack-elasticsearch@37e778b58593c5d5d52208f72891f65fb9668c46 --- .../xpack/ml/MachineLearning.java | 4 +- .../elasticsearch/xpack/ml/MlMetadata.java | 9 +- .../xpack/ml/action/CloseJobAction.java | 186 ++++++------------ .../xpack/ml/action/CloseJobService.java | 60 ------ .../ml/action/FinalizeJobExecutionAction.java | 179 +++++++++++++++++ .../xpack/ml/action/FlushJobAction.java | 3 +- .../xpack/ml/action/OpenJobAction.java | 37 ++-- .../xpack/ml/action/PostDataAction.java | 3 +- .../xpack/ml/action/StartDatafeedAction.java | 26 ++- .../xpack/ml/action/StopDatafeedAction.java | 19 +- .../ml/action/TransportJobTaskAction.java | 14 +- .../xpack/ml/action/UpdateProcessAction.java | 4 +- .../xpack/ml/datafeed/DatafeedJobRunner.java | 15 +- .../xpack/ml/job/config/Job.java | 8 +- .../xpack/ml/job/config/JobState.java | 2 +- .../xpack/ml/rest/job/RestCloseJobAction.java | 2 +- .../PersistentTasksCustomMetaData.java | 2 +- .../MachineLearningLicensingTests.java | 4 +- .../ml/action/CloseJobActionRequestTests.java | 66 +++++++ .../xpack/ml/action/CloseJobActionTests.java | 113 ----------- .../xpack/ml/action/OpenJobActionTests.java | 52 ++--- .../ml/action/StartDatafeedActionTests.java | 17 +- .../StopDatafeedActionRequestTests.java | 5 +- .../integration/BasicDistributedJobsIT.java | 2 +- .../xpack/ml/integration/DatafeedJobIT.java | 2 +- .../integration/MlRestTestStateCleaner.java | 9 +- .../xpack/ml/job/config/JobStateTests.java | 22 +-- .../xpack/ml/support/BaseMlIntegTestCase.java | 9 +- .../org/elasticsearch/transport/actions | 1 + .../rest-api-spec/test/ml/jobs_crud.yaml | 2 +- .../test/ml/start_stop_datafeed.yaml | 6 +- .../integration/MlRestTestStateCleaner.java | 9 +- 32 files changed, 420 insertions(+), 472 deletions(-) delete mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobService.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/action/FinalizeJobExecutionAction.java delete mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index daec1dac9a3..dab75ef8769 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -37,12 +37,12 @@ import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.ml.action.CloseJobAction; -import org.elasticsearch.xpack.ml.action.CloseJobService; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction; import org.elasticsearch.xpack.ml.action.DeleteFilterAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.ml.action.FlushJobAction; import org.elasticsearch.xpack.ml.action.GetBucketsAction; import org.elasticsearch.xpack.ml.action.GetCategoriesAction; @@ -324,7 +324,6 @@ public class MachineLearning implements ActionPlugin { persistentTasksExecutorRegistry, new PersistentTasksClusterService(Settings.EMPTY, persistentTasksExecutorRegistry, clusterService), auditor, - new CloseJobService(internalClient, threadPool, clusterService), invalidLicenseEnforcer ); } @@ -406,6 +405,7 @@ public class MachineLearning implements ActionPlugin { new ActionHandler<>(GetRecordsAction.INSTANCE, GetRecordsAction.TransportAction.class), new ActionHandler<>(PostDataAction.INSTANCE, PostDataAction.TransportAction.class), new ActionHandler<>(CloseJobAction.INSTANCE, CloseJobAction.TransportAction.class), + new ActionHandler<>(FinalizeJobExecutionAction.INSTANCE, FinalizeJobExecutionAction.TransportAction.class), new ActionHandler<>(FlushJobAction.INSTANCE, FlushJobAction.TransportAction.class), new ActionHandler<>(ValidateDetectorAction.INSTANCE, ValidateDetectorAction.TransportAction.class), new ActionHandler<>(ValidateJobConfigAction.INSTANCE, ValidateJobConfigAction.TransportAction.class), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java index 057ca8c5a16..f13c59cf889 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java @@ -357,13 +357,12 @@ public class MlMetadata implements MetaData.Custom { } Optional datafeed = getDatafeedByJobId(jobId); if (datafeed.isPresent()) { - throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] while datafeed [" + throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed [" + datafeed.get().getId() + "] refers to it"); } - JobState jobState = getJobState(jobId, tasks); - if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { - throw ExceptionsHelper.conflictStatusException("Unexpected job state [" + jobState + "], expected [" + - JobState.CLOSED + "]"); + PersistentTask jobTask = getJobTask(jobId, tasks); + if (jobTask != null) { + throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job hasn't been closed"); } Job.Builder jobBuilder = new Job.Builder(job); jobBuilder.setDeleted(true); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index b89a9676db6..62ccbd11366 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -9,33 +9,27 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlMetadata; @@ -43,15 +37,14 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; -import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; +import org.elasticsearch.xpack.ml.utils.JobStateObserver; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; +import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -74,7 +67,7 @@ public class CloseJobAction extends Action implements ToXContent { + public static class Request extends TransportJobTaskAction.JobTaskRequest implements ToXContent { public static final ParseField TIMEOUT = new ParseField("timeout"); public static final ParseField FORCE = new ParseField("force"); @@ -83,7 +76,7 @@ public class CloseJobAction extends Action - request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); + request.setCloseTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); PARSER.declareBoolean(Request::setForce, FORCE); } @@ -95,29 +88,24 @@ public class CloseJobAction extends Action { + public static class TransportAction extends TransportJobTaskAction { + private final InternalClient client; private final ClusterService clusterService; - private final CloseJobService closeJobService; - private final Client client; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService, CloseJobService closeJobService, Client client) { - super(settings, CloseJobAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, Request::new); + ClusterService clusterService, AutodetectProcessManager manager, Client client) { + super(settings, CloseJobAction.NAME, threadPool, clusterService, transportService, actionFilters, + indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.MANAGEMENT, manager); + this.client = new InternalClient(settings, threadPool, client); this.clusterService = clusterService; - this.closeJobService = closeJobService; - this.client = client; } @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected Response newResponse() { - return new Response(); - } - - @Override - protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { + protected void doExecute(Task task, Request request, ActionListener listener) { if (request.isForce()) { - forceCloseJob(client, request.getJobId(), state, listener); + forceCloseJob(request.getJobId(), listener); } else { - closeJob(request, listener); + ActionListener finalListener = + ActionListener.wrap(r -> waitForJobClosed(request, r, listener), listener::onFailure); + super.doExecute(task, request, finalListener); } } @Override - protected ClusterBlockException checkBlock(Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener, + ClusterState state) { + validate(request.getJobId(), state); + task.closeJob("close job (api)"); + listener.onResponse(new Response(true)); } - private void closeJob(Request request, ActionListener listener) { - clusterService.submitStateUpdateTask("closing job [" + request.getJobId() + "]", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return moveJobToClosingState(request.getJobId(), currentState); - } - - @Override - public void onFailure(String source, Exception e) { - listener.onFailure(e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - - @Override - protected void doRun() throws Exception { - closeJobService.closeJob(request, listener); - } - }); - } - }); + @Override + protected Response readTaskResponse(StreamInput in) throws IOException { + return new Response(in); } - private void forceCloseJob(Client client, String jobId, ClusterState currentState, - ActionListener listener) { + private void forceCloseJob(String jobId, ActionListener listener) { + ClusterState currentState = clusterService.state(); PersistentTask task = MlMetadata.getJobTask(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); if (task != null) { @@ -323,58 +279,36 @@ public class CloseJobAction extends Action validateAndFindTask(String jobId, ClusterState state) { - MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); - if (mlMetadata.getJobs().containsKey(jobId) == false) { - throw ExceptionsHelper.missingJobException(jobId); + // Wait for job to be marked as closed in cluster state, which means the job persistent task has been removed + // This api returns when job has been closed, but that doesn't mean the persistent task has been removed from cluster state, + // so wait for that to happen here. + void waitForJobClosed(Request request, Response response, ActionListener listener) { + JobStateObserver observer = new JobStateObserver(threadPool, clusterService); + observer.waitForState(request.getJobId(), request.getCloseTimeout(), JobState.CLOSED, e -> { + if (e != null) { + listener.onFailure(e); + } else { + FinalizeJobExecutionAction.Request finalizeRequest = + new FinalizeJobExecutionAction.Request(request.getJobId()); + client.execute(FinalizeJobExecutionAction.INSTANCE, finalizeRequest, + ActionListener.wrap(r-> listener.onResponse(response), listener::onFailure)); + } + }); } + } + + static void validate(String jobId, ClusterState state) { + MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); Optional datafeed = mlMetadata.getDatafeedByJobId(jobId); if (datafeed.isPresent()) { DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(), tasks); if (datafeedState != DatafeedState.STOPPED) { throw new ElasticsearchStatusException("cannot close job [{}], datafeed hasn't been stopped", - RestStatus.CONFLICT, jobId); + RestStatus.CONFLICT, jobId); } } - - PersistentTask jobTask = MlMetadata.getJobTask(jobId, tasks); - if (jobTask != null) { - JobState jobState = (JobState) jobTask.getStatus(); - if (jobState.isAnyOf(JobState.OPENED, JobState.FAILED) == false) { - throw new ElasticsearchStatusException("cannot close job [{}], expected job state [{}], but got [{}]", - RestStatus.CONFLICT, jobId, JobState.OPENED, jobState); - } - return jobTask; - } - throw new ElasticsearchStatusException("cannot close job [{}], expected job state [{}], but got [{}]", - RestStatus.CONFLICT, jobId, JobState.OPENED, JobState.CLOSED); - } - - static ClusterState moveJobToClosingState(String jobId, ClusterState currentState) { - PersistentTask task = validateAndFindTask(jobId, currentState); - PersistentTasksCustomMetaData currentTasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - Map> updatedTasks = new HashMap<>(currentTasks.taskMap()); - PersistentTask taskToUpdate = currentTasks.getTask(task.getId()); - taskToUpdate = new PersistentTask<>(taskToUpdate, JobState.CLOSING); - updatedTasks.put(taskToUpdate.getId(), taskToUpdate); - PersistentTasksCustomMetaData newTasks = new PersistentTasksCustomMetaData(currentTasks.getCurrentId(), updatedTasks); - - MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE); - Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); - jobBuilder.setFinishedTime(new Date()); - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); - mlMetadataBuilder.putJob(jobBuilder.build(), true); - - ClusterState.Builder builder = ClusterState.builder(currentState); - return builder - .metaData(new MetaData.Builder(currentState.metaData()) - .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, newTasks)) - .build(); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobService.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobService.java deleted file mode 100644 index 1e2134cbae1..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobService.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.action; - -import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.tasks.TaskInfo; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.ml.job.config.JobState; -import org.elasticsearch.xpack.ml.utils.JobStateObserver; - -/** - * Service that interacts with a client to close jobs remotely. - */ -// Ideally this would sit in CloseJobAction.TransportAction, but we can't inject a client there as -// it would lead to cyclic dependency issue, so we isolate it here. -public class CloseJobService { - - private final Client client; - private final JobStateObserver observer; - - public CloseJobService(Client client, ThreadPool threadPool, ClusterService clusterService) { - this.client = client; - this.observer = new JobStateObserver(threadPool, clusterService); - } - - void closeJob(CloseJobAction.Request request, ActionListener listener) { - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - listTasksRequest.setActions(OpenJobAction.NAME + "[c]"); - client.admin().cluster().listTasks(listTasksRequest, ActionListener.wrap(listTasksResponse -> { - String expectedDescription = "job-" + request.getJobId(); - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (expectedDescription.equals(taskInfo.getDescription())) { - CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); - cancelTasksRequest.setTaskId(taskInfo.getTaskId()); - client.admin().cluster().cancelTasks(cancelTasksRequest, ActionListener.wrap(cancelTaskResponse -> { - observer.waitForState(request.getJobId(), request.getTimeout(), JobState.CLOSED, e -> { - if (e == null) { - listener.onResponse(new CloseJobAction.Response(true)); - } else { - listener.onFailure(e); - } - }); - }, listener::onFailure)); - return; - } - } - listener.onFailure(new ResourceNotFoundException("task not found for job [" + request.getJobId() + "]")); - }, listener::onFailure)); - } - -} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FinalizeJobExecutionAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FinalizeJobExecutionAction.java new file mode 100644 index 00000000000..0218bc5e466 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FinalizeJobExecutionAction.java @@ -0,0 +1,179 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.job.config.Job; + +import java.io.IOException; +import java.util.Date; + +public class FinalizeJobExecutionAction extends Action { + + public static final FinalizeJobExecutionAction INSTANCE = new FinalizeJobExecutionAction(); + public static final String NAME = "cluster:internal/ml/job/finalize_job_execution"; + + private FinalizeJobExecutionAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, INSTANCE); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends MasterNodeRequest { + + private String jobId; + + public Request(String jobId) { + this.jobId = jobId; + } + + Request() { + } + + public String getJobId() { + return jobId; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + jobId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(jobId); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + } + + public static class RequestBuilder + extends MasterNodeOperationRequestBuilder { + + public RequestBuilder(ElasticsearchClient client, FinalizeJobExecutionAction action) { + super(client, action, new Request()); + } + } + + public static class Response extends AcknowledgedResponse { + + Response(boolean acknowledged) { + super(acknowledged); + } + + Response() { + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + readAcknowledged(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + writeAcknowledged(out); + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + @Inject + public TransportAction(Settings settings, TransportService transportService, + ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, Request::new); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected Response newResponse() { + return new Response(); + } + + @Override + protected void masterOperation(Request request, ClusterState state, + ActionListener listener) throws Exception { + String jobId = request.getJobId(); + String source = "finalize_job_execution [" + jobId + "]"; + clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE); + Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); + jobBuilder.setFinishedTime(new Date()); + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); + mlMetadataBuilder.putJob(jobBuilder.build(), true); + + ClusterState.Builder builder = ClusterState.builder(currentState); + return builder.metaData(new MetaData.Builder(currentState.metaData()) + .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) + .build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, + ClusterState newState) { + listener.onResponse(new Response(true)); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } + +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java index 19776a0d5a8..7410e486047 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; @@ -255,7 +256,7 @@ public class FlushJobAction extends Action listener) { + ActionListener listener, ClusterState state) { InterimResultsParams.Builder paramsBuilder = InterimResultsParams.builder(); paramsBuilder.calcInterim(request.getCalcInterim()); if (request.getAdvanceTime() != null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 1697e7a0861..f953fbf633d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -18,7 +18,6 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; @@ -268,6 +267,10 @@ public class OpenJobAction extends Action listener) { JobTask jobTask = (JobTask) task; jobTask.autodetectProcessManager = autodetectProcessManager; - autodetectProcessManager.setJobState(task.getPersistentTaskId(), JobState.OPENING, e1 -> { - if (e1 != null) { - listener.onFailure(e1); - return; + autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> { + if (e2 == null) { + listener.onResponse(new TransportResponse.Empty()); + } else { + listener.onFailure(e2); } - - autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> { - if (e2 == null) { - listener.onResponse(new TransportResponse.Empty()); - } else { - listener.onFailure(e2); - } - }); }); } @@ -431,19 +427,17 @@ public class OpenJobAction extends Action task = MlMetadata.getJobTask(jobId, tasks); + if (task != null) { + throw ExceptionsHelper.conflictStatusException("Cannot open job [" + jobId + "] because it has already been opened"); } } @@ -481,7 +475,6 @@ public class OpenJobAction extends Action listener) { + protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener, ClusterState state) { TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build(); DataLoadParams params = new DataLoadParams(timeRange, Optional.ofNullable(request.getDataDescription())); try { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index e8ac9e6da9b..2fd11d56841 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.action; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; @@ -38,7 +37,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -100,7 +98,7 @@ public class StartDatafeedAction static { PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID); PARSER.declareString((request, startTime) -> request.startTime = parseDateOrThrow( - startTime, START_TIME, () -> System.currentTimeMillis()), START_TIME); + startTime, START_TIME, System::currentTimeMillis), START_TIME); PARSER.declareString(Request::setEndTime, END_TIME); PARSER.declareString((request, val) -> request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); @@ -140,7 +138,7 @@ public class StartDatafeedAction } public Request(String datafeedId, String startTime) { - this(datafeedId, parseDateOrThrow(startTime, START_TIME, () -> System.currentTimeMillis())); + this(datafeedId, parseDateOrThrow(startTime, START_TIME, System::currentTimeMillis)); } public Request(StreamInput in) throws IOException { @@ -163,7 +161,7 @@ public class StartDatafeedAction } public void setEndTime(String endTime) { - setEndTime(parseDateOrThrow(endTime, END_TIME, () -> System.currentTimeMillis())); + setEndTime(parseDateOrThrow(endTime, END_TIME, System::currentTimeMillis)); } public void setEndTime(Long endTime) { @@ -413,8 +411,7 @@ public class StartDatafeedAction if (licenseState.isMachineLearningAllowed()) { MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - DiscoveryNodes nodes = clusterState.getNodes(); - StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks, nodes); + StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks); } else { throw LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING); } @@ -462,7 +459,7 @@ public class StartDatafeedAction } - static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks, DiscoveryNodes nodes) { + static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) { DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); if (datafeed == null) { throw ExceptionsHelper.missingDatafeedException(datafeedId); @@ -474,18 +471,17 @@ public class StartDatafeedAction DatafeedJobValidator.validate(datafeed, job); JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks); if (jobState != JobState.OPENED) { - throw new ElasticsearchStatusException("cannot start datafeed, expected job state [{}], but got [{}]", - RestStatus.CONFLICT, JobState.OPENED, jobState); + throw ExceptionsHelper.conflictStatusException("cannot start datafeed [" + datafeedId + "] because job [" + job.getId() + + "] hasn't been opened"); } - DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks); - if (datafeedState == DatafeedState.STARTED) { - throw new ElasticsearchStatusException("datafeed [{}] already started, expected datafeed state [{}], but got [{}]", - RestStatus.CONFLICT, datafeedId, DatafeedState.STOPPED, DatafeedState.STARTED); + PersistentTask datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks); + if (datafeedTask != null) { + throw ExceptionsHelper.conflictStatusException("cannot start datafeed [" + datafeedId + "] because it has already been started"); } } - public static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState) { + static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState) { MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index 01ae43cb892..231b77c7835 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.action; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -34,7 +33,6 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -48,6 +46,7 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; +import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; import java.util.List; @@ -220,7 +219,7 @@ public class StopDatafeedAction public static class TransportAction extends TransportTasksAction { - private final Client client; + private final InternalClient client; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, @@ -228,7 +227,7 @@ public class StopDatafeedAction ClusterService clusterService, Client client) { super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, Request::new, Response::new, MachineLearning.THREAD_POOL_NAME); - this.client = client; + this.client = new InternalClient(settings, threadPool, client); } @Override @@ -240,7 +239,7 @@ public class StopDatafeedAction if (request.force) { PersistentTask datafeedTask = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks); if (datafeedTask != null) { - forceStopTask(client, datafeedTask.getId(), listener); + forceStopTask(datafeedTask.getId(), listener); } else { String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + "datafeed's task could not be found."; @@ -271,7 +270,7 @@ public class StopDatafeedAction }); } - private void forceStopTask(Client client, long taskId, ActionListener listener) { + private void forceStopTask(long taskId, ActionListener listener) { RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(taskId); client.execute(RemovePersistentTaskAction.INSTANCE, request, @@ -293,7 +292,7 @@ public class StopDatafeedAction @Override protected void taskOperation(Request request, StartDatafeedAction.DatafeedTask task, ActionListener listener) { - task.stop("stop_datafeed_api", request.getTimeout()); + task.stop("stop_datafeed (api)", request.getTimeout()); listener.onResponse(new Response(true)); } @@ -309,9 +308,9 @@ public class StopDatafeedAction throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId)); } PersistentTask task = MlMetadata.getDatafeedTask(datafeedId, tasks); - if (task == null || task.getStatus() != DatafeedState.STARTED) { - throw new ElasticsearchStatusException("datafeed already stopped, expected datafeed state [{}], but got [{}]", - RestStatus.CONFLICT, DatafeedState.STARTED, DatafeedState.STOPPED); + if (task == null) { + throw ExceptionsHelper.conflictStatusException("Cannot stop datafeed [" + datafeedId + + "] because it has already been stopped"); } return task.getExecutorNode(); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java index e67bcaf37d4..23b71eefbcc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.action; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; @@ -20,7 +19,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -65,8 +63,8 @@ public abstract class TransportJobTaskAction jobTask = MlMetadata.getJobTask(jobId, tasks); if (jobTask == null || jobTask.isAssigned() == false) { - listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + JobState.CLOSED + - "], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT)); + String message = "Cannot perform requested action because job [" + jobId + "] hasn't been opened"; + listener.onFailure(ExceptionsHelper.conflictStatusException(message)); } else { request.setNodes(jobTask.getExecutorNode()); super.doExecute(task, request, listener); @@ -79,15 +77,15 @@ public abstract class TransportJobTaskAction listener); + protected abstract void innerTaskOperation(Request request, OperationTask task, ActionListener listener, ClusterState state); @Override protected Response newResponse(Request request, List tasks, List taskOperationFailures, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java index ffa85a6e501..c06be6680a1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -64,6 +65,7 @@ public class UpdateProcessAction extends private boolean isUpdated; private Response() { + super(null, null); this.isUpdated = true; } @@ -196,7 +198,7 @@ public class UpdateProcessAction extends } @Override - protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { + protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener, ClusterState state) { threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> { try { processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index 29f4916eca4..0ceb0ac6ec1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.datafeed; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; @@ -17,7 +16,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.mapper.DateFieldMapper; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; @@ -34,10 +32,9 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver; import org.elasticsearch.xpack.persistent.PersistentTasksService; import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; -import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; -import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver; import java.time.Duration; import java.util.Collections; @@ -80,16 +77,8 @@ public class DatafeedJobRunner extends AbstractComponent { public void run(StartDatafeedAction.DatafeedTask task, Consumer handler) { String datafeedId = task.getDatafeedId(); ClusterState state = clusterService.state(); - // CS on master node can be ahead on the node where job and datafeed tasks run, - // so check again and fail if in case of unexpected cs. Persist tasks will retry later then. - Assignment assignment = StartDatafeedAction.selectNode(logger, datafeedId, state); - if (assignment.getExecutorNode() == null) { - handler.accept(new ElasticsearchStatusException("cannot start datafeed [{}] yet, local cs [{}], allocation explanation [{}]", - RestStatus.CONFLICT, datafeedId, state.getVersion(), assignment.getExplanation())); - return; - } - logger.info("Attempt to start datafeed based on cluster state version [{}]", state.getVersion()); MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); + DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); Job job = mlMetadata.getJobs().get(datafeed.getJobId()); gatherInformation(job.getId(), (buckets, dataCounts) -> { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java index 29338d94784..c2572621a73 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java @@ -35,11 +35,9 @@ import java.util.concurrent.TimeUnit; /** * This class represents a configured and created Job. The creation time is set - * to the time the object was constructed, state is set to - * {@link JobState#OPENING} and the finished time and last data time fields are - * {@code null} until the job has seen some data or it is finished respectively. - * If the job was created to read data from a list of files FileUrls will be a - * non-empty list else the expects data to be streamed to it. + * to the time the object was constructed and the finished time and last + * data time fields are {@code null} until the job has seen some data or it is + * finished respectively. */ public class Job extends AbstractDiffable implements Writeable, ToXContent { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobState.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobState.java index b2c179fedc7..716fb89fd74 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobState.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobState.java @@ -26,7 +26,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru */ public enum JobState implements Task.Status { - CLOSING, CLOSED, OPENING, OPENED, FAILED; + CLOSED, OPENED, FAILED; public static final String NAME = "JobState"; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestCloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestCloseJobAction.java index 9d09be48a6e..ee901c0bd0f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestCloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestCloseJobAction.java @@ -31,7 +31,7 @@ public class RestCloseJobAction extends BaseRestHandler { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { Request request = new Request(restRequest.param(Job.ID.getPreferredName())); if (restRequest.hasParam(Request.TIMEOUT.getPreferredName())) { - request.setTimeout(TimeValue.parseTimeValue( + request.setCloseTimeout(TimeValue.parseTimeValue( restRequest.param(Request.TIMEOUT.getPreferredName()), Request.TIMEOUT.getPreferredName())); } if (restRequest.hasParam(Request.FORCE.getPreferredName())) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java index 1da7a6f19b7..a3092b2c72a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java @@ -353,7 +353,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable { @@ -18,6 +35,9 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa if (randomBoolean()) { request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); } + if (randomBoolean()) { + request.setForce(randomBoolean()); + } return request; } @@ -30,4 +50,50 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa protected Request parseInstance(XContentParser parser) { return Request.parseRequest(null, parser); } + + public void testValidate() { + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(), false); + mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", + Collections.singletonList("*"))); + Map> tasks = new HashMap<>(); + PersistentTask jobTask = createJobTask(1L, "job_id", null, JobState.OPENED); + tasks.put(1L, jobTask); + tasks.put(2L, createTask(2L, "datafeed_id", 0L, null, DatafeedState.STARTED)); + ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, + new PersistentTasksCustomMetaData(1L, tasks))).build(); + + ElasticsearchStatusException e = + expectThrows(ElasticsearchStatusException.class, + () -> CloseJobAction.validate("job_id", cs1)); + assertEquals(RestStatus.CONFLICT, e.status()); + assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage()); + + tasks = new HashMap<>(); + tasks.put(1L, jobTask); + if (randomBoolean()) { + tasks.put(2L, createTask(2L, "datafeed_id", 0L, null, DatafeedState.STOPPED)); + } + ClusterState cs2 = ClusterState.builder(new ClusterName("_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, + new PersistentTasksCustomMetaData(1L, tasks))).build(); + CloseJobAction.validate("job_id", cs2); + } + + public static PersistentTask createTask(long id, + String datafeedId, + long startTime, + String nodeId, + DatafeedState state) { + PersistentTask task = + new PersistentTask<>(id, StartDatafeedAction.NAME, + new StartDatafeedAction.Request(datafeedId, startTime), + new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); + task = new PersistentTask<>(task, state); + return task; + } + } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java deleted file mode 100644 index 1fd9a2030b3..00000000000 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.action; - -import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ml.MlMetadata; -import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.xpack.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.ml.job.config.JobState; -import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; -import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; - -public class CloseJobActionTests extends ESTestCase { - - public void testMoveJobToClosingState() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - mlBuilder.putJob(buildJobBuilder("job_id").build(), false); - PersistentTask task = - createJobTask(1L, "job_id", null, randomFrom(JobState.OPENED, JobState.FAILED)); - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, - new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)))); - ClusterState result = CloseJobAction.moveJobToClosingState("job_id", csBuilder.build()); - - PersistentTasksCustomMetaData actualTasks = result.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertEquals(JobState.CLOSING, actualTasks.getTask(1L).getStatus()); - - MlMetadata actualMetadata = result.metaData().custom(MlMetadata.TYPE); - assertNotNull(actualMetadata.getJobs().get("job_id").getFinishedTime()); - } - - public void testMoveJobToClosingState_jobMissing() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(1L, Collections.emptyMap()))); - expectThrows(ResourceNotFoundException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder.build())); - } - - public void testMoveJobToClosingState_unexpectedJobState() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - mlBuilder.putJob(buildJobBuilder("job_id").build(), false); - PersistentTask task = createJobTask(1L, "job_id", null, JobState.OPENING); - ClusterState.Builder csBuilder1 = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, - new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)))); - ElasticsearchStatusException result = - expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder1.build())); - assertEquals("cannot close job [job_id], expected job state [opened], but got [opening]", result.getMessage()); - - ClusterState.Builder csBuilder2 = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(1L, Collections.emptyMap()))); - result = expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder2.build())); - assertEquals("cannot close job [job_id], expected job state [opened], but got [closed]", result.getMessage()); - } - - public void testCloseJob_datafeedNotStopped() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(), false); - mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", Collections.singletonList("*"))); - Map> tasks = new HashMap<>(); - PersistentTask jobTask = createJobTask(1L, "job_id", null, JobState.OPENED); - tasks.put(1L, jobTask); - tasks.put(2L, createDatafeedTask(2L, "datafeed_id", 0L, null, DatafeedState.STARTED)); - ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(1L, tasks))).build(); - - ElasticsearchStatusException e = - expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.validateAndFindTask("job_id", cs1)); - assertEquals(RestStatus.CONFLICT, e.status()); - assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage()); - - tasks = new HashMap<>(); - tasks.put(1L, jobTask); - if (randomBoolean()) { - tasks.put(2L, createDatafeedTask(2L, "datafeed_id", 0L, null, DatafeedState.STOPPED)); - } - ClusterState cs2 = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(1L, tasks))).build(); - assertEquals(jobTask, CloseJobAction.validateAndFindTask("job_id", cs2)); - } - - public static PersistentTask createDatafeedTask(long id, String datafeedId, long startTime, - String nodeId, DatafeedState datafeedState) { - PersistentTask task = - new PersistentTask<>(id, StartDatafeedAction.NAME, new StartDatafeedAction.Request(datafeedId, startTime), - new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); - task = new PersistentTask<>(task, datafeedState); - return task; - } - -} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index b760ca693ae..de98cd72d1f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -51,24 +51,20 @@ public class OpenJobActionTests extends ESTestCase { public void testValidate() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(buildJobBuilder("job_id").build(), false); - DiscoveryNodes nodes = DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_name", "_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), - Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) - .build(); PersistentTask task = - createJobTask(1L, "job_id", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED)); + createJobTask(1L, "job_id2", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); - OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes); - OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap()), nodes); - OpenJobAction.validate("job_id", mlBuilder.build(), null, nodes); + OpenJobAction.validate("job_id", mlBuilder.build(), tasks); + OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap())); + OpenJobAction.validate("job_id", mlBuilder.build(), null); } public void testValidate_jobMissing() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(buildJobBuilder("job_id1").build(), false); - expectThrows(ResourceNotFoundException.class, () -> OpenJobAction.validate("job_id2", mlBuilder.build(), null, null)); + expectThrows(ResourceNotFoundException.class, () -> OpenJobAction.validate("job_id2", mlBuilder.build(), null)); } public void testValidate_jobMarkedAsDeleted() { @@ -77,33 +73,20 @@ public class OpenJobActionTests extends ESTestCase { jobBuilder.setDeleted(true); mlBuilder.putJob(jobBuilder.build(), false); Exception e = expectThrows(ElasticsearchStatusException.class, - () -> OpenJobAction.validate("job_id", mlBuilder.build(), null, null)); + () -> OpenJobAction.validate("job_id", mlBuilder.build(), null)); assertEquals("Cannot open job [job_id] because it has been marked as deleted", e.getMessage()); } public void testValidate_unexpectedState() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(buildJobBuilder("job_id").build(), false); - DiscoveryNodes nodes = DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_name", "_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), - Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) - .build(); - JobState jobState = randomFrom(JobState.OPENING, JobState.OPENED, JobState.CLOSING); - PersistentTask task = createJobTask(1L, "job_id", "_node_id", jobState); + PersistentTask task = createJobTask(1L, "job_id", "_node_id", JobState.OPENED); PersistentTasksCustomMetaData tasks1 = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); Exception e = expectThrows(ElasticsearchStatusException.class, - () -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1, nodes)); - assertEquals("[job_id] expected state [closed] or [failed], but got [" + jobState +"]", e.getMessage()); - - jobState = randomFrom(JobState.OPENING, JobState.CLOSING); - task = createJobTask(1L, "job_id", "_other_node_id", jobState); - PersistentTasksCustomMetaData tasks2 = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); - - e = expectThrows(ElasticsearchStatusException.class, - () -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks2, nodes)); - assertEquals("[job_id] expected state [closed] or [failed], but got [" + jobState +"]", e.getMessage()); + () -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1)); + assertEquals("Cannot open job [job_id] because it has already been opened", e.getMessage()); } public void testSelectLeastLoadedMlNode() { @@ -208,11 +191,11 @@ public class OpenJobActionTests extends ESTestCase { .build(); Map> taskMap = new HashMap<>(); - taskMap.put(0L, createJobTask(0L, "job_id1", "_node_id1", JobState.OPENING)); - taskMap.put(1L, createJobTask(1L, "job_id2", "_node_id1", JobState.OPENING)); - taskMap.put(2L, createJobTask(2L, "job_id3", "_node_id2", JobState.OPENING)); - taskMap.put(3L, createJobTask(3L, "job_id4", "_node_id2", JobState.OPENING)); - taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", JobState.OPENING)); + taskMap.put(0L, createJobTask(0L, "job_id1", "_node_id1", null)); + taskMap.put(1L, createJobTask(1L, "job_id2", "_node_id1", null)); + taskMap.put(2L, createJobTask(2L, "job_id3", "_node_id2", null)); + taskMap.put(3L, createJobTask(3L, "job_id4", "_node_id2", null)); + taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", null)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(5L, taskMap); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); @@ -228,7 +211,7 @@ public class OpenJobActionTests extends ESTestCase { Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger); assertEquals("_node_id3", result.getExecutorNode()); - PersistentTask lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING); + PersistentTask lastTask = createJobTask(5L, "job_id6", "_node_id3", null); taskMap.put(5L, lastTask); tasks = new PersistentTasksCustomMetaData(6L, taskMap); @@ -275,7 +258,6 @@ public class OpenJobActionTests extends ESTestCase { metaData = new MetaData.Builder(cs.metaData()); routingTable = new RoutingTable.Builder(cs.routingTable()); - MlMetadata mlMetadata = cs.metaData().custom(MlMetadata.TYPE); String indexToRemove = randomFrom(OpenJobAction.indicesOfInterest(cs, "job_id")); if (randomBoolean()) { routingTable.remove(indexToRemove); @@ -299,7 +281,9 @@ public class OpenJobActionTests extends ESTestCase { public static PersistentTask createJobTask(long id, String jobId, String nodeId, JobState jobState) { PersistentTask task = new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), new Assignment(nodeId, "test assignment")); - task = new PersistentTask<>(task, jobState); + if (jobState != null) { + task = new PersistentTask<>(task, jobState); + } return task; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index c3f4f4020f9..3334c9a2ef4 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -46,7 +46,7 @@ public class StartDatafeedActionTests extends ESTestCase { mlMetadata.putJob(job, false); mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*"))); - JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED, JobState.CLOSING, JobState.OPENING); + JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED); PersistentTask task = createJobTask(0L, job.getId(), "node_id", jobState); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); @@ -116,7 +116,7 @@ public class StartDatafeedActionTests extends ESTestCase { .putJob(job1, false) .build(); Exception e = expectThrows(ResourceNotFoundException.class, - () -> StartDatafeedAction.validate("some-datafeed", mlMetadata1, null, null)); + () -> StartDatafeedAction.validate("some-datafeed", mlMetadata1, null)); assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists")); } @@ -133,8 +133,8 @@ public class StartDatafeedActionTests extends ESTestCase { .putDatafeed(datafeedConfig1) .build(); Exception e = expectThrows(ElasticsearchStatusException.class, - () -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks, null)); - assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job state [opened], but got [closed]")); + () -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks)); + assertThat(e.getMessage(), equalTo("cannot start datafeed [foo-datafeed] because job [job_id] hasn't been opened")); } public void testValidate_dataFeedAlreadyStarted() { @@ -144,10 +144,6 @@ public class StartDatafeedActionTests extends ESTestCase { .putJob(job1, false) .putDatafeed(datafeedConfig) .build(); - DiscoveryNodes nodes = DiscoveryNodes.builder() - .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), - Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) - .build(); PersistentTask jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED); PersistentTask datafeedTask = @@ -160,9 +156,8 @@ public class StartDatafeedActionTests extends ESTestCase { PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(2L, taskMap); Exception e = expectThrows(ElasticsearchStatusException.class, - () -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes)); - assertThat(e.getMessage(), equalTo("datafeed [datafeed_id] already started, expected datafeed state [stopped], " + - "but got [started]")); + () -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks)); + assertThat(e.getMessage(), equalTo("cannot start datafeed [datafeed_id] because it has already been started")); } public static StartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action, diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index 2a1e3a79454..62de82a9285 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -69,8 +69,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe PersistentTasksCustomMetaData tasks; if (randomBoolean()) { PersistentTask task = new PersistentTask(1L, StartDatafeedAction.NAME, - new StartDatafeedAction.Request("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); - task = new PersistentTask<>(task, DatafeedState.STOPPED); + new StartDatafeedAction.Request("foo2", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); } else { tasks = randomBoolean() ? null : new PersistentTasksCustomMetaData(0L, Collections.emptyMap()); @@ -84,7 +83,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe .build(); Exception e = expectThrows(ElasticsearchStatusException.class, () -> StopDatafeedAction.validateAndReturnNodeId("foo", mlMetadata1, tasks)); - assertThat(e.getMessage(), equalTo("datafeed already stopped, expected datafeed state [started], but got [stopped]")); + assertThat(e.getMessage(), equalTo("Cannot stop datafeed [foo] because it has already been stopped")); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 98ea012190b..4c599462e23 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -226,7 +226,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { for (DiscoveryNode node : event.state().nodes()) { Collection> foundTasks = tasks.findTasks(OpenJobAction.NAME, task -> { return node.getId().equals(task.getExecutorNode()) && - (task.getStatus() == null || task.getStatus() == JobState.OPENING || task.isCurrentStatus() == false); + (task.getStatus() == null || task.isCurrentStatus() == false); }); int count = foundTasks.size(); if (count > maxConcurrentJobAllocations) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java index 4b4d8ed5496..a848e2b455c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java @@ -373,7 +373,7 @@ public class DatafeedJobIT extends ESRestTestCase { () -> client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId)); response = e.getResponse(); assertThat(response.getStatusLine().getStatusCode(), equalTo(409)); - assertThat(responseEntityToString(response), containsString("Cannot delete job [" + jobId + "] while datafeed [" + datafeedId + assertThat(responseEntityToString(response), containsString("Cannot delete job [" + jobId + "] because datafeed [" + datafeedId + "] refers to it")); response = client().performRequest("post", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop"); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index 32b03c203c8..52edf7daa79 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -52,7 +52,7 @@ public class MlRestTestStateCleaner { logger.error("Got status code " + statusCode + " when stopping datafeed " + datafeedId); } } catch (Exception e) { - if (e.getMessage().contains("datafeed already stopped, expected datafeed state [started], but got [stopped]")) { + if (e.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) { logger.debug("failed to stop datafeed [" + datafeedId + "]", e); } else { logger.warn("failed to stop datafeed [" + datafeedId + "]", e); @@ -84,15 +84,14 @@ public class MlRestTestStateCleaner { logger.error("Got status code " + statusCode + " when closing job " + jobId); } } catch (Exception e1) { - if (e1.getMessage().contains("expected job state [opened], but got [closed]") - || e1.getMessage().contains("expected job state [opened], but got [closing]")) { + if (e1.getMessage().contains("because job [" + jobId + "] hasn't been opened")) { logger.debug("job [" + jobId + "] has already been closed", e1); } else { - logger.warn("failed to close job [" + jobId + "]. Forcing closed.", e1); + logger.warn("failed to close job [" + jobId + "]. Forcing closed", e1); try { adminClient.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close?force=true"); } catch (Exception e2) { - throw new RuntimeException("Force-closing job [" + jobId + "] failed.", e2); + logger.warn("Force-closing job [" + jobId + "] failed", e2); } throw new RuntimeException("Had to resort to force-closing job, something went wrong?", e1); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobStateTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobStateTests.java index e6d9b4b4975..55adff4e298 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobStateTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobStateTests.java @@ -11,42 +11,32 @@ public class JobStateTests extends ESTestCase { public void testFromString() { assertEquals(JobState.fromString("closed"), JobState.CLOSED); - assertEquals(JobState.fromString("closing"), JobState.CLOSING); assertEquals(JobState.fromString("failed"), JobState.FAILED); - assertEquals(JobState.fromString("opening"), JobState.OPENING); assertEquals(JobState.fromString("opened"), JobState.OPENED); assertEquals(JobState.fromString("CLOSED"), JobState.CLOSED); - assertEquals(JobState.fromString("CLOSING"), JobState.CLOSING); assertEquals(JobState.fromString("FAILED"), JobState.FAILED); - assertEquals(JobState.fromString("OPENING"), JobState.OPENING); assertEquals(JobState.fromString("OPENED"), JobState.OPENED); } public void testToString() { assertEquals("closed", JobState.CLOSED.toString()); - assertEquals("closing", JobState.CLOSING.toString()); assertEquals("failed", JobState.FAILED.toString()); - assertEquals("opening", JobState.OPENING.toString()); assertEquals("opened", JobState.OPENED.toString()); } public void testValidOrdinals() { - assertEquals(0, JobState.CLOSING.ordinal()); - assertEquals(1, JobState.CLOSED.ordinal()); - assertEquals(2, JobState.OPENING.ordinal()); - assertEquals(3, JobState.OPENED.ordinal()); - assertEquals(4, JobState.FAILED.ordinal()); + assertEquals(0, JobState.CLOSED.ordinal()); + assertEquals(1, JobState.OPENED.ordinal()); + assertEquals(2, JobState.FAILED.ordinal()); } public void testIsAnyOf() { assertFalse(JobState.OPENED.isAnyOf()); - assertFalse(JobState.OPENED.isAnyOf(JobState.CLOSED, JobState.CLOSING, JobState.FAILED, - JobState.OPENING)); - assertFalse(JobState.CLOSED.isAnyOf(JobState.CLOSING, JobState.FAILED, JobState.OPENING, JobState.OPENED)); + assertFalse(JobState.OPENED.isAnyOf(JobState.CLOSED, JobState.FAILED)); + assertFalse(JobState.CLOSED.isAnyOf(JobState.FAILED, JobState.OPENED)); assertTrue(JobState.OPENED.isAnyOf(JobState.OPENED)); assertTrue(JobState.OPENED.isAnyOf(JobState.OPENED, JobState.CLOSED)); - assertTrue(JobState.CLOSING.isAnyOf(JobState.CLOSED, JobState.CLOSING)); - assertTrue(JobState.CLOSED.isAnyOf(JobState.CLOSED, JobState.CLOSING)); + assertTrue(JobState.CLOSED.isAnyOf(JobState.CLOSED)); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 687625f7e8c..27f5d2411fa 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -252,7 +252,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { client.execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).get(); assertTrue(stopResponse.isStopped()); } catch (ExecutionException e1) { - if (e1.getMessage().contains("datafeed already stopped, expected datafeed state [started], but got [stopped]")) { + if (e1.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) { logger.debug("failed to stop datafeed [" + datafeedId + "], already stopped", e1); } else { try { @@ -261,7 +261,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { StopDatafeedAction.Response stopResponse = client.execute(StopDatafeedAction.INSTANCE, request).get(); assertTrue(stopResponse.isStopped()); } catch (Exception e2) { - throw new RuntimeException("Force-stopping datafeed [" + datafeedId + "] failed.", e2); + logger.warn("Force-stopping datafeed [" + datafeedId + "] failed.", e2); } throw new RuntimeException("Had to resort to force-stopping datafeed, something went wrong?", e1); } @@ -293,8 +293,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { client.execute(CloseJobAction.INSTANCE, closeRequest).get(); assertTrue(response.isClosed()); } catch (Exception e1) { - if (e1.getMessage().contains("expected job state [opened], but got [closed]") - || e1.getMessage().contains("expected job state [opened], but got [closing]")) { + if (e1.getMessage().contains("because job [" + jobId + "] hasn't been opened")) { logger.debug("job [" + jobId + "] has already been closed", e1); } else { try { @@ -305,7 +304,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { client.execute(CloseJobAction.INSTANCE, closeRequest).get(); assertTrue(response.isClosed()); } catch (Exception e2) { - throw new RuntimeException("Force-closing datafeed [" + jobId + "] failed.", e2); + logger.warn("Force-closing datafeed [" + jobId + "] failed.", e2); } throw new RuntimeException("Had to resort to force-closing job, something went wrong?", e1); } diff --git a/plugin/src/test/resources/org/elasticsearch/transport/actions b/plugin/src/test/resources/org/elasticsearch/transport/actions index d7cd201c781..e0d3d83cd14 100644 --- a/plugin/src/test/resources/org/elasticsearch/transport/actions +++ b/plugin/src/test/resources/org/elasticsearch/transport/actions @@ -142,3 +142,4 @@ cluster:admin/persistent/start cluster:admin/persistent/completion cluster:admin/persistent/update_status cluster:admin/persistent/remove +cluster:internal/ml/job/finalize_job_execution diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml index 3a24832ec97..cf976e82bc8 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml @@ -292,7 +292,7 @@ - match: { datafeed_id: "test-datafeed-1" } - do: - catch: /Cannot delete job \[datafeed-job\] while datafeed \[test-datafeed-1\] refers to it/ + catch: /Cannot delete job \[datafeed-job\] because datafeed \[test-datafeed-1\] refers to it/ xpack.ml.delete_job: job_id: datafeed-job --- diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml index 4ea5750b0ff..fc20a86d9e8 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml @@ -122,7 +122,7 @@ setup: datafeed_id: "datafeed-1" start: 0 - do: - catch: /cannot start datafeed, expected job state \[opened\], but got \[closed\]/ + catch: /cannot start datafeed \[datafeed-1\] because job \[datafeed-job\] hasn't been opened/ xpack.ml.start_datafeed: datafeed_id: "datafeed-1" start: 0 @@ -143,7 +143,7 @@ setup: start: 0 - do: - catch: /datafeed \[datafeed\-1\] already started, expected datafeed state \[stopped\], but got \[started\]/ + catch: /cannot start datafeed \[datafeed-1\] because it has already been started/ xpack.ml.start_datafeed: datafeed_id: "datafeed-1" start: 0 @@ -162,7 +162,7 @@ setup: xpack.ml.stop_datafeed: datafeed_id: "datafeed-1" - do: - catch: /datafeed already stopped, expected datafeed state \[started\], but got \[stopped\]/ + catch: /Cannot stop datafeed \[datafeed-1\] because it has already been stopped/ xpack.ml.stop_datafeed: datafeed_id: "datafeed-1" diff --git a/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index 32b03c203c8..52edf7daa79 100644 --- a/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -52,7 +52,7 @@ public class MlRestTestStateCleaner { logger.error("Got status code " + statusCode + " when stopping datafeed " + datafeedId); } } catch (Exception e) { - if (e.getMessage().contains("datafeed already stopped, expected datafeed state [started], but got [stopped]")) { + if (e.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) { logger.debug("failed to stop datafeed [" + datafeedId + "]", e); } else { logger.warn("failed to stop datafeed [" + datafeedId + "]", e); @@ -84,15 +84,14 @@ public class MlRestTestStateCleaner { logger.error("Got status code " + statusCode + " when closing job " + jobId); } } catch (Exception e1) { - if (e1.getMessage().contains("expected job state [opened], but got [closed]") - || e1.getMessage().contains("expected job state [opened], but got [closing]")) { + if (e1.getMessage().contains("because job [" + jobId + "] hasn't been opened")) { logger.debug("job [" + jobId + "] has already been closed", e1); } else { - logger.warn("failed to close job [" + jobId + "]. Forcing closed.", e1); + logger.warn("failed to close job [" + jobId + "]. Forcing closed", e1); try { adminClient.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close?force=true"); } catch (Exception e2) { - throw new RuntimeException("Force-closing job [" + jobId + "] failed.", e2); + logger.warn("Force-closing job [" + jobId + "] failed", e2); } throw new RuntimeException("Had to resort to force-closing job, something went wrong?", e1); }