diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java index b7657f1b676..1c7bc6f2f1a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java @@ -68,7 +68,7 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcessF import org.elasticsearch.xpack.prelert.job.process.autodetect.BlackHoleAutodetectProcess; import org.elasticsearch.xpack.prelert.job.process.autodetect.NativeAutodetectProcessFactory; import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutodetectResultsParser; -import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobService; +import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunner; import org.elasticsearch.xpack.prelert.job.scheduler.http.HttpDataExtractorFactory; import org.elasticsearch.xpack.prelert.job.status.StatusReporter; import org.elasticsearch.xpack.prelert.job.usage.UsageReporter; @@ -168,7 +168,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier); DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory, clusterService.getClusterSettings()); - ScheduledJobService scheduledJobService = new ScheduledJobService(threadPool, client, jobProvider, dataProcessor, + ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, jobProvider, dataProcessor, // norelease: we will no longer need to pass the client here after we switch to a client based data extractor new HttpDataExtractorFactory(client), System::currentTimeMillis); @@ -176,11 +176,12 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { jobProvider, jobManager, new JobAllocator(settings, clusterService, threadPool), - new JobLifeCycleService(settings, client, clusterService, scheduledJobService, dataProcessor, threadPool.generic()), + new JobLifeCycleService(settings, client, clusterService, dataProcessor, threadPool.generic()), new JobDataDeleterFactory(client), //NORELEASE: this should use Delete-by-query dataProcessor, new PrelertInitializationService(settings, threadPool, clusterService, jobProvider), - jobDataCountsPersister + jobDataCountsPersister, + scheduledJobRunner ); } @@ -258,7 +259,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { FixedExecutorBuilder prelert = new FixedExecutorBuilder(settings, THREAD_POOL_NAME, maxNumberOfJobs, 1000, "xpack.prelert.thread_pool"); - // fail quick to start autodetect process / scheduler, so no queues + // fail quick to run autodetect process / scheduler, so no queues // 4 threads: for c++ logging, result processing, state processing and restore state FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_PROCESS_THREAD_POOL_NAME, maxNumberOfJobs * 4, 4, "xpack.prelert.autodetect_process_thread_pool"); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerAction.java index 1f7ee21c138..d9cb780bcfd 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerAction.java @@ -7,18 +7,14 @@ package org.elasticsearch.xpack.prelert.action; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseFieldMatcherSupplier; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -28,12 +24,17 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.manager.JobManager; +import org.elasticsearch.xpack.prelert.job.metadata.Allocation; +import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunner; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import java.io.IOException; @@ -43,7 +44,7 @@ public class StartJobSchedulerAction extends Action { public static final StartJobSchedulerAction INSTANCE = new StartJobSchedulerAction(); - public static final String NAME = "cluster:admin/prelert/job/scheduler/start"; + public static final String NAME = "cluster:admin/prelert/job/scheduler/run"; private StartJobSchedulerAction() { super(NAME); @@ -59,7 +60,7 @@ extends Action implements ToXContent { + public static class Request extends ActionRequest implements ToXContent { public static ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); @@ -85,9 +86,9 @@ extends Action { + static class RequestBuilder extends ActionRequestBuilder { public RequestBuilder(ElasticsearchClient client, StartJobSchedulerAction action) { super(client, action, new Request()); } } - public static class Response extends AcknowledgedResponse { + public static class Response extends ActionResponse { - public Response(boolean acknowledged) { - super(acknowledged); + Response() { } - private Response() { + } + + public static class SchedulerTask extends CancellableTask { + + private volatile ScheduledJobRunner.Holder holder; + + public SchedulerTask(long id, String type, String action, TaskId parentTaskId, String jobId) { + super(id, type, action, "job-scheduler-" + jobId, parentTaskId); + } + + public void setHolder(ScheduledJobRunner.Holder holder) { + this.holder = holder; } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - readAcknowledged(in); + protected void onCancelled() { + stop(); } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - writeAcknowledged(out); + /* public for testing */ + public void stop() { + if (holder != null) { + holder.stop(); + } } } - public static class TransportAction extends TransportMasterNodeAction { + public static class TransportAction extends HandledTransportAction { private final JobManager jobManager; + private final ScheduledJobRunner scheduledJobRunner; @Inject - public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager) { - super(settings, StartJobSchedulerAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, Request::new); + public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + JobManager jobManager, ScheduledJobRunner scheduledJobRunner) { + super(settings, StartJobSchedulerAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, + Request::new); this.jobManager = jobManager; + this.scheduledJobRunner = scheduledJobRunner; } @Override - protected String executor() { - return ThreadPool.Names.SAME; + protected void doExecute(Task task, Request request, ActionListener listener) { + SchedulerTask schedulerTask = (SchedulerTask) task; + Job job = jobManager.getJobOrThrowIfUnknown(request.jobId); + Allocation allocation = jobManager.getJobAllocation(job.getId()); + scheduledJobRunner.run(job, request.getSchedulerState(), allocation, schedulerTask, (error) -> { + if (error != null) { + listener.onFailure(error); + } else { + listener.onResponse(new Response()); + } + }); } @Override - protected Response newResponse() { - return new Response(); - } - - @Override - protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { - jobManager.startJobScheduler(request, listener); - } - - @Override - protected ClusterBlockException checkBlock(Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + protected void doExecute(Request request, ActionListener listener) { + throw new UnsupportedOperationException("the task parameter is required"); } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/StopJobSchedulerAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/StopJobSchedulerAction.java index 0e2a2da360b..9e7dae3ea1e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/StopJobSchedulerAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/StopJobSchedulerAction.java @@ -5,24 +5,28 @@ */ package org.elasticsearch.xpack.prelert.action; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.prelert.job.Job; @@ -52,7 +56,7 @@ extends Action { + public static class Request extends ActionRequest { private String jobId; @@ -102,7 +106,7 @@ extends Action { + static class RequestBuilder extends ActionRequestBuilder { public RequestBuilder(ElasticsearchClient client, StopJobSchedulerAction action) { super(client, action, new Request()); @@ -111,56 +115,72 @@ extends Action { + public static class TransportAction extends HandledTransportAction { - private final JobManager jobManager; + private final TransportCancelTasksAction cancelTasksAction; + private final TransportListTasksAction listTasksAction; @Inject - public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager) { - super(settings, StopJobSchedulerAction.NAME, transportService, clusterService, threadPool, actionFilters, + public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + TransportCancelTasksAction cancelTasksAction, TransportListTasksAction listTasksAction) { + super(settings, StopJobSchedulerAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); - this.jobManager = jobManager; + this.cancelTasksAction = cancelTasksAction; + this.listTasksAction = listTasksAction; } @Override - protected String executor() { - return ThreadPool.Names.SAME; - } + protected void doExecute(Request request, ActionListener listener) { + String jobId = request.getJobId(); + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setActions(StartJobSchedulerAction.NAME); + listTasksRequest.setDetailed(true); + listTasksAction.execute(listTasksRequest, new ActionListener() { + @Override + public void onResponse(ListTasksResponse listTasksResponse) { + String expectedJobDescription = "job-scheduler-" + jobId; + for (TaskInfo taskInfo : listTasksResponse.getTasks()) { + if (expectedJobDescription.equals(taskInfo.getDescription())) { + CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); + cancelTasksRequest.setTaskId(taskInfo.getTaskId()); + cancelTasksAction.execute(cancelTasksRequest, new ActionListener() { + @Override + public void onResponse(CancelTasksResponse cancelTasksResponse) { + listener.onResponse(new Response()); + } - @Override - protected Response newResponse() { - return new Response(); - } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + return; + } + } + listener.onFailure(new ResourceNotFoundException("No scheduler running for job [" + jobId + "]")); + } - @Override - protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { - jobManager.stopJobScheduler(request, listener); - } - - @Override - protected ClusterBlockException checkBlock(Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusAction.java index 886222a6df3..d25e0968d29 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusAction.java @@ -140,7 +140,7 @@ public class UpdateJobSchedulerStatusAction extends Action actionListener) { - clusterService.submitStateUpdateTask("start-scheduler-job-" + request.getJobId(), - new AckedClusterStateUpdateTask(request, actionListener) { - - @Override - protected StartJobSchedulerAction.Response newResponse(boolean acknowledged) { - return new StartJobSchedulerAction.Response(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - long startTime = request.getSchedulerState().getStartTimeMillis(); - Long endTime = request.getSchedulerState().getEndTimeMillis(); - return innerUpdateSchedulerState(currentState, request.getJobId(), JobSchedulerStatus.STARTING, startTime, endTime); - } - }); - } - - public void stopJobScheduler(StopJobSchedulerAction.Request request, ActionListener actionListener) { - clusterService.submitStateUpdateTask("stop-scheduler-job-" + request.getJobId(), - new AckedClusterStateUpdateTask(request, actionListener) { - - @Override - protected StopJobSchedulerAction.Response newResponse(boolean acknowledged) { - return new StopJobSchedulerAction.Response(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return innerUpdateSchedulerState(currentState, request.getJobId(), JobSchedulerStatus.STOPPING, null, null); - } - }); - } - private void checkJobIsScheduled(Job job) { if (job.getSchedulerConfig() == null) { throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_SCHEDULER_NO_SUCH_SCHEDULED_JOB, job.getId())); @@ -352,7 +315,7 @@ public class JobManager extends AbstractComponent { checkJobIsScheduled(job); Allocation allocation = getAllocation(currentState, jobId); - if (allocation.getSchedulerState() == null && status != JobSchedulerStatus.STARTING) { + if (allocation.getSchedulerState() == null && status != JobSchedulerStatus.STARTED) { throw new IllegalArgumentException("Can't change status to [" + status + "], because job's [" + jobId + "] scheduler never started"); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java index 37f6b27cb3e..051eec475de 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.SchedulerState; @@ -179,6 +180,13 @@ public class Allocation extends AbstractDiffable implements ToXConte public Builder() { } + public Builder(Job job) { + this.jobId = job.getId(); + if (job.getSchedulerConfig() != null) { + schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, null, null); + } + } + public Builder(Allocation allocation) { this.nodeId = allocation.nodeId; this.jobId = allocation.jobId; @@ -229,41 +237,30 @@ public class Allocation extends AbstractDiffable implements ToXConte this.statusReason = statusReason; } - public void setSchedulerState(SchedulerState schedulerState) { - JobSchedulerStatus currentSchedulerStatus = this.schedulerState == null ? - JobSchedulerStatus.STOPPED : this.schedulerState.getStatus(); - JobSchedulerStatus newSchedulerStatus = schedulerState.getStatus(); - switch (newSchedulerStatus) { - case STARTING: - if (currentSchedulerStatus != JobSchedulerStatus.STOPPED) { - String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_START, jobId, newSchedulerStatus); - throw ExceptionsHelper.conflictStatusException(msg); + public void setSchedulerState(SchedulerState newSchedulerState) { + if (this.schedulerState != null){ + JobSchedulerStatus currentSchedulerStatus = this.schedulerState.getStatus(); + JobSchedulerStatus newSchedulerStatus = newSchedulerState.getStatus(); + switch (newSchedulerStatus) { + case STARTED: + if (currentSchedulerStatus != JobSchedulerStatus.STOPPED) { + String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_START, jobId, newSchedulerStatus); + throw ExceptionsHelper.conflictStatusException(msg); + } + break; + case STOPPED: + if (currentSchedulerStatus != JobSchedulerStatus.STARTED) { + String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, + newSchedulerStatus); + throw ExceptionsHelper.conflictStatusException(msg); + } + break; + default: + throw new IllegalArgumentException("Invalid requested job scheduler status: " + newSchedulerStatus); } - break; - case STARTED: - if (currentSchedulerStatus != JobSchedulerStatus.STARTING) { - String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_START, jobId, newSchedulerStatus); - throw ExceptionsHelper.conflictStatusException(msg); - } - break; - case STOPPING: - if (currentSchedulerStatus != JobSchedulerStatus.STARTED) { - String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, newSchedulerStatus); - throw ExceptionsHelper.conflictStatusException(msg); - } - break; - case STOPPED: - if ((currentSchedulerStatus != JobSchedulerStatus.STOPPED || - currentSchedulerStatus != JobSchedulerStatus.STOPPING) == false) { - String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, newSchedulerStatus); - throw ExceptionsHelper.conflictStatusException(msg); - } - break; - default: - throw new IllegalArgumentException("Invalid requested job scheduler status: " + newSchedulerStatus); } - this.schedulerState = schedulerState; + this.schedulerState = newSchedulerState; } public Allocation build() { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java index e8d4035940a..a9e80e88491 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java @@ -16,9 +16,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.JobStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.data.DataProcessor; -import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobService; import java.util.HashSet; import java.util.Objects; @@ -29,16 +27,14 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta volatile Set localAssignedJobs = new HashSet<>(); private final Client client; - private final ScheduledJobService scheduledJobService; private final DataProcessor dataProcessor; private final Executor executor; - public JobLifeCycleService(Settings settings, Client client, ClusterService clusterService, ScheduledJobService scheduledJobService, - DataProcessor dataProcessor, Executor executor) { + public JobLifeCycleService(Settings settings, Client client, ClusterService clusterService, DataProcessor dataProcessor, + Executor executor) { super(settings); clusterService.add(this); this.client = Objects.requireNonNull(client); - this.scheduledJobService = Objects.requireNonNull(scheduledJobService); this.dataProcessor = Objects.requireNonNull(dataProcessor); this.executor = Objects.requireNonNull(executor); } @@ -80,28 +76,6 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta startJob(allocation); } } - - handleSchedulerStatusChange(job, allocation); - } - - private void handleSchedulerStatusChange(Job job, Allocation allocation) { - SchedulerState schedulerState = allocation.getSchedulerState(); - if (schedulerState != null) { - switch (schedulerState.getStatus()) { - case STARTING: - executor.execute(() -> scheduledJobService.start(job, allocation)); - break; - case STARTED: - break; - case STOPPING: - executor.execute(() -> scheduledJobService.stop(allocation)); - break; - case STOPPED: - break; - default: - throw new IllegalStateException("Unhandled scheduler state [" + schedulerState.getStatus() + "]"); - } - } } void startJob(Allocation allocation) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java index d8c9ee8039f..cc15c6c0c7f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java @@ -208,12 +208,7 @@ public class PrelertMetadata implements MetaData.Custom { Allocation allocation = allocations.get(job.getId()); if (allocation == null) { - Allocation.Builder builder = new Allocation.Builder(); - builder.setJobId(job.getId()); - boolean addSchedulderState = job.getSchedulerConfig() != null; - if (addSchedulderState) { - builder.setSchedulerState(new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); - } + Allocation.Builder builder = new Allocation.Builder(job); builder.setStatus(JobStatus.CLOSED); allocations.put(job.getId(), builder.build()); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java similarity index 65% rename from elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobService.java rename to elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java index a2717888c75..6f3b8c467a3 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java @@ -5,20 +5,23 @@ */ package org.elasticsearch.xpack.prelert.job.scheduler; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.prelert.PrelertPlugin; +import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction; import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction; import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.config.DefaultFrequency; @@ -33,11 +36,11 @@ import org.elasticsearch.xpack.prelert.job.results.Bucket; import java.time.Duration; import java.util.Objects; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; +import java.util.function.Consumer; import java.util.function.Supplier; -public class ScheduledJobService extends AbstractComponent { +public class ScheduledJobRunner extends AbstractComponent { private final Client client; private final JobProvider jobProvider; @@ -45,10 +48,9 @@ public class ScheduledJobService extends AbstractComponent { private final DataExtractorFactory dataExtractorFactory; private final ThreadPool threadPool; private final Supplier currentTimeSupplier; - final ConcurrentMap registry; - public ScheduledJobService(ThreadPool threadPool, Client client, JobProvider jobProvider, DataProcessor dataProcessor, - DataExtractorFactory dataExtractorFactory, Supplier currentTimeSupplier) { + public ScheduledJobRunner(ThreadPool threadPool, Client client, JobProvider jobProvider, DataProcessor dataProcessor, + DataExtractorFactory dataExtractorFactory, Supplier currentTimeSupplier) { super(Settings.EMPTY); this.threadPool = threadPool; this.client = Objects.requireNonNull(client); @@ -56,77 +58,43 @@ public class ScheduledJobService extends AbstractComponent { this.dataProcessor = Objects.requireNonNull(dataProcessor); this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); - this.registry = ConcurrentCollections.newConcurrentMap(); } - public void start(Job job, Allocation allocation) { - SchedulerState schedulerState = allocation.getSchedulerState(); - if (schedulerState == null) { - throw new IllegalStateException("Job [" + job.getId() + "] is not a scheduled job"); - } + public void run(Job job, SchedulerState schedulerState, Allocation allocation, StartJobSchedulerAction.SchedulerTask task, + Consumer handler) { + validate(job, allocation); - if (schedulerState.getStatus() != JobSchedulerStatus.STARTING) { - throw new IllegalStateException("expected job scheduler status [" + JobSchedulerStatus.STARTING + "], but got [" + - schedulerState.getStatus() + "] instead"); - } - - if (registry.containsKey(allocation.getJobId())) { - throw new IllegalStateException("job [" + allocation.getJobId() + "] has already been started"); - } - - logger.info("Starting scheduler [{}]", allocation); - Holder holder = createJobScheduler(job); - registry.put(job.getId(), holder); - - holder.future = threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).submit(() -> { - try { - Long next = holder.scheduledJob.runLookBack(allocation.getSchedulerState()); - if (next != null) { - doScheduleRealtime(next, job.getId(), holder); - } else { - holder.scheduledJob.stop(); - requestStopping(job.getId()); + setJobSchedulerStatus(job.getId(), JobSchedulerStatus.STARTED, error -> { + logger.info("Starting scheduler [{}]", schedulerState); + Holder holder = createJobScheduler(job, task, handler); + task.setHolder(holder); + holder.future = threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).submit(() -> { + try { + Long next = holder.scheduledJob.runLookBack(schedulerState); + if (next != null) { + doScheduleRealtime(next, job.getId(), holder); + } else { + holder.stop(); + } + } catch (ScheduledJob.ExtractionProblemException e) { + holder.problemTracker.reportExtractionProblem(e.getCause().getMessage()); + } catch (ScheduledJob.AnalysisProblemException e) { + holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage()); + } catch (ScheduledJob.EmptyDataCountException e) { + if (holder.problemTracker.updateEmptyDataCount(true)) { + holder.stop(); + } + } catch (Exception e) { + logger.error("Failed lookback import for job[" + job.getId() + "]", e); + holder.stop(); } - } catch (ScheduledJob.ExtractionProblemException e) { - holder.problemTracker.reportExtractionProblem(e.getCause().getMessage()); - } catch (ScheduledJob.AnalysisProblemException e) { - holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage()); - } catch (ScheduledJob.EmptyDataCountException e) { - if (holder.problemTracker.updateEmptyDataCount(true)) { - requestStopping(job.getId()); - } - } catch (Exception e) { - logger.error("Failed lookback import for job[" + job.getId() + "]", e); - requestStopping(job.getId()); - } - holder.problemTracker.finishReport(); + holder.problemTracker.finishReport(); + }); }); - setJobSchedulerStatus(job.getId(), JobSchedulerStatus.STARTED); - } - - public void stop(Allocation allocation) { - SchedulerState schedulerState = allocation.getSchedulerState(); - if (schedulerState == null) { - throw new IllegalStateException("Job [" + allocation.getJobId() + "] is not a scheduled job"); - } - if (schedulerState.getStatus() != JobSchedulerStatus.STOPPING) { - throw new IllegalStateException("expected job scheduler status [" + JobSchedulerStatus.STOPPING + "], but got [" + - schedulerState.getStatus() + "] instead"); - } - - Holder holder = registry.remove(allocation.getJobId()); - if (holder == null) { - throw new IllegalStateException("job [" + allocation.getJobId() + "] has not been started"); - } - logger.info("Stopping scheduler for job [{}]", allocation.getJobId()); - holder.stop(); - // Don't close the job directly without going via the close data api to change the job status: -// dataProcessor.closeJob(allocation.getJobId()); - setJobSchedulerStatus(allocation.getJobId(), JobSchedulerStatus.STOPPED); } private void doScheduleRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) { - if (holder.scheduledJob.isRunning()) { + if (holder.isRunning()) { TimeValue delay = computeNextDelay(delayInMsSinceEpoch); logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId); holder.future = threadPool.schedule(delay, PrelertPlugin.SCHEDULER_THREAD_POOL_NAME, () -> { @@ -143,25 +111,40 @@ public class ScheduledJobService extends AbstractComponent { nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch; if (holder.problemTracker.updateEmptyDataCount(true)) { holder.problemTracker.finishReport(); - requestStopping(jobId); + holder.stop(); return; } } catch (Exception e) { logger.error("Unexpected scheduler failure for job [" + jobId + "] stopping...", e); - requestStopping(jobId); + holder.stop(); return; } holder.problemTracker.finishReport(); doScheduleRealtime(nextDelayInMsSinceEpoch, jobId, holder); }); + } else { + holder.stop(); } } - private void requestStopping(String jobId) { - setJobSchedulerStatus(jobId, JobSchedulerStatus.STOPPING); + public static void validate(Job job, Allocation allocation) { + if (job.getSchedulerConfig() == null) { + throw new IllegalArgumentException("job [" + job.getId() + "] is not a scheduled job"); + } + + if (allocation.getStatus() != JobStatus.OPENED) { + throw new ElasticsearchStatusException("cannot start scheduler, expected job status [{}], but got [{}]", + RestStatus.CONFLICT, JobStatus.OPENED, allocation.getStatus()); + } + + if (allocation.getSchedulerState().getStatus() != JobSchedulerStatus.STOPPED) { + throw new ElasticsearchStatusException("scheduler already started, expected scheduler status [{}], but got [{}]", + RestStatus.CONFLICT, JobSchedulerStatus.STOPPED, allocation.getSchedulerState().getStatus()); + } + } - Holder createJobScheduler(Job job) { + private Holder createJobScheduler(Job job, StartJobSchedulerAction.SchedulerTask task, Consumer handler) { Auditor auditor = jobProvider.audit(job.getId()); Duration frequency = getFrequencyOrDefault(job); Duration queryDelay = Duration.ofSeconds(job.getSchedulerConfig().getQueryDelay()); @@ -169,7 +152,7 @@ public class ScheduledJobService extends AbstractComponent { ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(), dataExtractor, dataProcessor, auditor, currentTimeSupplier, getLatestFinalBucketEndTimeMs(job), getLatestRecordTimestamp(job.getId())); - return new Holder(scheduledJob, new ProblemTracker(() -> auditor)); + return new Holder(job, scheduledJob, new ProblemTracker(() -> auditor), handler); } private long getLatestFinalBucketEndTimeMs(Job job) { @@ -211,7 +194,7 @@ public class ScheduledJobService extends AbstractComponent { return new TimeValue(Math.max(1, next - currentTimeSupplier.get())); } - private void setJobSchedulerStatus(String jobId, JobSchedulerStatus status) { + private void setJobSchedulerStatus(String jobId, JobSchedulerStatus status, Consumer supplier) { UpdateJobSchedulerStatusAction.Request request = new UpdateJobSchedulerStatusAction.Request(jobId, status); client.execute(UpdateJobSchedulerStatusAction.INSTANCE, request, new ActionListener() { @Override @@ -221,29 +204,41 @@ public class ScheduledJobService extends AbstractComponent { } else { logger.info("set job scheduler status to [{}] for job [{}], but was not acknowledged", status, jobId); } + supplier.accept(null); } @Override public void onFailure(Exception e) { logger.error("could not set job scheduler status to [" + status + "] for job [" + jobId +"]", e); + supplier.accept(e); } }); } - private static class Holder { + public class Holder { + private final String jobId; private final ScheduledJob scheduledJob; private final ProblemTracker problemTracker; + private final Consumer handler; volatile Future future; - private Holder(ScheduledJob scheduledJob, ProblemTracker problemTracker) { + private Holder(Job job, ScheduledJob scheduledJob, ProblemTracker problemTracker, Consumer handler) { + this.jobId = job.getId(); this.scheduledJob = scheduledJob; this.problemTracker = problemTracker; + this.handler = handler; } - void stop() { + boolean isRunning() { + return scheduledJob.isRunning(); + } + + public void stop() { + logger.info("Stopping scheduler for job [{}]", jobId); scheduledJob.stop(); FutureUtils.cancel(future); + setJobSchedulerStatus(jobId, JobSchedulerStatus.STOPPED, error -> handler.accept(null)); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerAction.java index a667250b2b7..4c1f8b35c73 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerAction.java @@ -7,23 +7,31 @@ package org.elasticsearch.xpack.prelert.rest.schedulers; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.AcknowledgedRestListener; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestActions; +import org.elasticsearch.tasks.LoggingTaskListener; +import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.prelert.PrelertPlugin; import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; import org.elasticsearch.xpack.prelert.job.SchedulerState; +import org.elasticsearch.xpack.prelert.job.manager.JobManager; import org.elasticsearch.xpack.prelert.job.messages.Messages; +import org.elasticsearch.xpack.prelert.job.metadata.Allocation; +import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunner; import java.io.IOException; @@ -31,13 +39,15 @@ public class RestStartJobSchedulerAction extends BaseRestHandler { private static final String DEFAULT_START = "0"; - private final StartJobSchedulerAction.TransportAction transportJobSchedulerAction; + private final JobManager jobManager; + private final ClusterService clusterService; @Inject - public RestStartJobSchedulerAction(Settings settings, RestController controller, - StartJobSchedulerAction.TransportAction transportJobSchedulerAction) { + public RestStartJobSchedulerAction(Settings settings, RestController controller, JobManager jobManager, + ClusterService clusterService) { super(settings); - this.transportJobSchedulerAction = transportJobSchedulerAction; + this.jobManager = jobManager; + this.clusterService = clusterService; controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH + "schedulers/{" + Job.ID.getPreferredName() + "}/_start", this); } @@ -45,6 +55,14 @@ public class RestStartJobSchedulerAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String jobId = restRequest.param(Job.ID.getPreferredName()); + + // This validation happens also in ScheduledJobRunner, the reason we do it here too is that if it fails there + // we are unable to provide the user immediate feedback. We would create the task and the validation would fail + // in the background, whereas now the validation failure is part of the response being returned. + Job job = jobManager.getJobOrThrowIfUnknown(jobId); + Allocation allocation = jobManager.getJobAllocation(jobId); + ScheduledJobRunner.validate(job, allocation); + StartJobSchedulerAction.Request jobSchedulerRequest; if (RestActions.hasBodyContent(restRequest)) { BytesReference bodyBytes = RestActions.getRestContent(restRequest); @@ -58,10 +76,21 @@ public class RestStartJobSchedulerAction extends BaseRestHandler { endTimeMillis = parseDateOrThrow(restRequest.param(SchedulerState.END_TIME_MILLIS.getPreferredName()), SchedulerState.END_TIME_MILLIS.getPreferredName()); } - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, startTimeMillis, endTimeMillis); + SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, startTimeMillis, endTimeMillis); jobSchedulerRequest = new StartJobSchedulerAction.Request(jobId, schedulerState); } - return channel -> transportJobSchedulerAction.execute(jobSchedulerRequest, new AcknowledgedRestListener<>(channel)); + return sendTask(client.executeLocally(StartJobSchedulerAction.INSTANCE, jobSchedulerRequest, LoggingTaskListener.instance())); + } + + private RestChannelConsumer sendTask(Task task) throws IOException { + return channel -> { + try (XContentBuilder builder = channel.newBuilder()) { + builder.startObject(); + builder.field("task", clusterService.localNode().getId() + ":" + task.getId()); + builder.endObject(); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + } + }; } static long parseDateOrThrow(String date, String paramName) { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java index 9ea51b4cc1b..d7ad5ed953b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java @@ -37,9 +37,11 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(numDataNodes = 1) public class ScheduledJobsIT extends ESIntegTestCase { @@ -75,11 +77,9 @@ public class ScheduledJobsIT extends ESIntegTestCase { OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get(); assertTrue(openJobResponse.isAcknowledged()); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, 0L, now); + SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, now); StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", schedulerState); - StartJobSchedulerAction.Response startJobResponse = client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest) - .get(); - assertTrue(startJobResponse.isAcknowledged()); + client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest).get(); assertBusy(() -> { DataCounts dataCounts = getDataCounts("_job_id"); assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs)); @@ -107,11 +107,18 @@ public class ScheduledJobsIT extends ESIntegTestCase { OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get(); assertTrue(openJobResponse.isAcknowledged()); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, 0L, null); - StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", schedulerState); - StartJobSchedulerAction.Response startJobResponse = client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest) - .get(); - assertTrue(startJobResponse.isAcknowledged()); + AtomicReference errorHolder = new AtomicReference<>(); + Thread t = new Thread(() -> { + try { + SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, null); + StartJobSchedulerAction.Request startSchedulerRequest = + new StartJobSchedulerAction.Request("_job_id", schedulerState); + client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest).get(); + } catch (Exception | AssertionError e) { + errorHolder.set(e); + } + }); + t.start(); assertBusy(() -> { DataCounts dataCounts = getDataCounts("_job_id"); assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs1)); @@ -126,14 +133,15 @@ public class ScheduledJobsIT extends ESIntegTestCase { }, 30, TimeUnit.SECONDS); StopJobSchedulerAction.Request stopSchedulerRequest = new StopJobSchedulerAction.Request("_job_id"); - client().execute(StopJobSchedulerAction.INSTANCE, stopSchedulerRequest).get(); - assertTrue(startJobResponse.isAcknowledged()); + StopJobSchedulerAction.Response stopJobResponse = client().execute(StopJobSchedulerAction.INSTANCE, stopSchedulerRequest).get(); + assertTrue(stopJobResponse.isAcknowledged()); assertBusy(() -> { PrelertMetadata prelertMetadata = client().admin().cluster().prepareState().all().get() .getState().metaData().custom(PrelertMetadata.TYPE); assertThat(prelertMetadata.getAllocations().get("_job_id").getSchedulerState().getStatus(), equalTo(JobSchedulerStatus.STOPPED)); }); + assertThat(errorHolder.get(), nullValue()); } private void indexDocs(long numDocs, long start, long end) { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerActionRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerActionRequestTests.java index 4019f00945e..780a2631ffc 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerActionRequestTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerActionRequestTests.java @@ -16,7 +16,7 @@ public class StartJobSchedulerActionRequestTests extends AbstractStreamableXCont @Override protected Request createTestInstance() { - SchedulerState state = new SchedulerState(JobSchedulerStatus.STARTING, randomLong(), randomLong()); + SchedulerState state = new SchedulerState(JobSchedulerStatus.STARTED, randomLong(), randomLong()); return new Request(randomAsciiOfLengthBetween(1, 20), state); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java index 4e8a8100c8f..2902fc49f07 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java @@ -45,7 +45,36 @@ public class ScheduledJobIT extends ESRestTestCase { () -> client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start")); assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400)); String responseAsString = responseEntityToString(e.getResponse()); - assertThat(responseAsString, containsString("\"reason\":\"There is no job '" + jobId + "' with a scheduler configured\"")); + assertThat(responseAsString, containsString("\"reason\":\"job [" + jobId + "] is not a scheduled job\"")); + } + + public void testStartJobScheduler_jobNotOpened() throws Exception { + String jobId = "_id1"; + createScheduledJob(jobId); + + ResponseException e = expectThrows(ResponseException.class, + () -> client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start")); + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(409)); + String responseAsString = responseEntityToString(e.getResponse()); + assertThat(responseAsString, containsString("\"reason\":\"cannot start scheduler, expected job status [OPENED], " + + "but got [CLOSED]\"")); + } + + public void testStartJobScheduler_schedulerAlreadyStarted() throws Exception { + String jobId = "_id1"; + createScheduledJob(jobId); + openJob(client(), jobId); + Response startSchedulerRequest = client().performRequest("post", + PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z"); + assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(responseEntityToString(startSchedulerRequest), containsString("{\"task\":\"")); + + ResponseException e = expectThrows(ResponseException.class, + () -> client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start")); + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(409)); + String responseAsString = responseEntityToString(e.getResponse()); + assertThat(responseAsString, containsString("\"reason\":\"scheduler already started, expected scheduler status " + + "[STOPPED], but got [STARTED]\"")); } public void testStartJobScheduler_GivenLookbackOnly() throws Exception { @@ -57,9 +86,7 @@ public class ScheduledJobIT extends ESRestTestCase { Response startSchedulerRequest = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z"); assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200)); - assertThat(responseEntityToString(startSchedulerRequest), equalTo("{\"acknowledged\":true}")); - waitForSchedulerStartedState(jobId); - + assertThat(responseEntityToString(startSchedulerRequest), containsString("{\"task\":\"")); assertBusy(() -> { try { Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId + "/_stats", @@ -69,7 +96,6 @@ public class ScheduledJobIT extends ESRestTestCase { throw new RuntimeException(e); } }); - waitForSchedulerStoppedState(client(), jobId); } @@ -82,8 +108,7 @@ public class ScheduledJobIT extends ESRestTestCase { Response response = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z"); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); - assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}")); - waitForSchedulerStartedState(jobId); + assertThat(responseEntityToString(response), containsString("{\"task\":\"")); assertBusy(() -> { try { Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId + "/_stats", @@ -176,23 +201,6 @@ public class ScheduledJobIT extends ESRestTestCase { } } - private void waitForSchedulerStartedState(String jobId) throws Exception { - try { - assertBusy(() -> { - try { - Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId + "/_stats", - Collections.singletonMap("metric", "scheduler_state")); - assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"STARTED\"")); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } catch (AssertionError e) { - Response response = client().performRequest("get", "/_nodes/hotthreads"); - logger.info("hot_threads: {}", responseEntityToString(response)); - } - } - @After public void clearPrelertState() throws IOException { clearPrelertMetadata(adminClient()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java index 4ef4b4287ff..e086a15bcb8 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java @@ -79,7 +79,7 @@ public class TooManyJobsIT extends ESIntegTestCase { "[failed to open, max running job capacity [" + maxRunningJobsPerNode + "] reached]", cause.getMessage()); logger.info("good news everybody --> reached maximum number of allowed opened jobs, after trying to open the {}th job", i); - // now manually clean things up and see if we can succeed to start one new job + // now manually clean things up and see if we can succeed to run one new job clearPrelertMetadata(); putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); assertTrue(putJobResponse.isAcknowledged()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatusTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatusTests.java index 18508f621da..c08a8e96dbd 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatusTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatusTests.java @@ -10,17 +10,13 @@ import org.elasticsearch.test.ESTestCase; public class JobSchedulerStatusTests extends ESTestCase { public void testForString() { - assertEquals(JobSchedulerStatus.fromString("starting"), JobSchedulerStatus.STARTING); assertEquals(JobSchedulerStatus.fromString("started"), JobSchedulerStatus.STARTED); - assertEquals(JobSchedulerStatus.fromString("stopping"), JobSchedulerStatus.STOPPING); assertEquals(JobSchedulerStatus.fromString("stopped"), JobSchedulerStatus.STOPPED); } public void testValidOrdinals() { - assertEquals(0, JobSchedulerStatus.STARTING.ordinal()); - assertEquals(1, JobSchedulerStatus.STARTED.ordinal()); - assertEquals(2, JobSchedulerStatus.STOPPING.ordinal()); - assertEquals(3, JobSchedulerStatus.STOPPED.ordinal()); + assertEquals(0, JobSchedulerStatus.STARTED.ordinal()); + assertEquals(1, JobSchedulerStatus.STOPPED.ordinal()); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java index 9309d5a9dad..b11bc83939c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java @@ -22,7 +22,7 @@ public class AllocationTests extends AbstractSerializingTestCase { boolean ignoreDowntime = randomBoolean(); JobStatus jobStatus = randomFrom(JobStatus.values()); String statusReason = randomBoolean() ? randomAsciiOfLength(10) : null; - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, randomPositiveLong(), randomPositiveLong()); + SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, randomPositiveLong(), randomPositiveLong()); return new Allocation(nodeId, jobId, ignoreDowntime, jobStatus, statusReason, schedulerState); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java index b7f89ac785e..f0bbfd9fd93 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java @@ -20,7 +20,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction; import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.data.DataProcessor; -import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobService; import org.junit.Before; import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder; @@ -40,11 +39,9 @@ public class JobLifeCycleServiceTests extends ESTestCase { @Before public void instantiateJobAllocator() { ClusterService clusterService = mock(ClusterService.class); - ScheduledJobService scheduledJobService = mock(ScheduledJobService.class); dataProcessor = mock(DataProcessor.class); client = mock(Client.class); - jobLifeCycleService = new JobLifeCycleService(Settings.EMPTY, client, clusterService, scheduledJobService, dataProcessor, - Runnable::run); + jobLifeCycleService = new JobLifeCycleService(Settings.EMPTY, client, clusterService, dataProcessor, Runnable::run); } public void testStartStop() { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java similarity index 61% rename from elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java rename to elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java index a00b76e6860..84b2ff82a86 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java @@ -5,11 +5,15 @@ */ package org.elasticsearch.xpack.prelert.job.scheduler; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.prelert.PrelertPlugin; +import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction; +import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction; import org.elasticsearch.xpack.prelert.job.AnalysisConfig; import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.DataDescription; @@ -38,9 +42,11 @@ import java.util.Arrays; import java.util.Date; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; import static org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction.INSTANCE; import static org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction.Request; +import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -53,27 +59,30 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class ScheduledJobServiceTests extends ESTestCase { +public class ScheduledJobRunnerTests extends ESTestCase { private Client client; private ThreadPool threadPool; - private JobProvider jobProvider; - private JobManager jobManager; private DataProcessor dataProcessor; private DataExtractorFactory dataExtractorFactory; - private Auditor auditor; - private ScheduledJobService scheduledJobService; + private ScheduledJobRunner scheduledJobRunner; private long currentTime = 120000; @Before public void setUpTests() { client = mock(Client.class); - jobProvider = mock(JobProvider.class); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener actionListener = (ActionListener) invocation.getArguments()[2]; + actionListener.onResponse(new UpdateJobSchedulerStatusAction.Response()); + return null; + }).when(client).execute(eq(UpdateJobSchedulerStatusAction.INSTANCE), any(), any()); + + JobProvider jobProvider = mock(JobProvider.class); when(jobProvider.dataCounts(anyString())).thenReturn(new DataCounts("foo")); - jobManager = mock(JobManager.class); dataProcessor = mock(DataProcessor.class); dataExtractorFactory = mock(DataExtractorFactory.class); - auditor = mock(Auditor.class); + Auditor auditor = mock(Auditor.class); threadPool = mock(ThreadPool.class); ExecutorService executorService = mock(ExecutorService.class); doAnswer(invocation -> { @@ -82,8 +91,8 @@ public class ScheduledJobServiceTests extends ESTestCase { }).when(executorService).submit(any(Runnable.class)); when(threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME)).thenReturn(executorService); - scheduledJobService = - new ScheduledJobService(threadPool, client, jobProvider, dataProcessor, dataExtractorFactory, () -> currentTime); + scheduledJobRunner = + new ScheduledJobRunner(threadPool, client, jobProvider, dataProcessor, dataExtractorFactory, () -> currentTime); when(jobProvider.audit(anyString())).thenReturn(auditor); when(jobProvider.buckets(anyString(), any(BucketsQueryBuilder.BucketsQuery.class))).thenThrow( @@ -92,84 +101,59 @@ public class ScheduledJobServiceTests extends ESTestCase { public void testStart_GivenNewlyCreatedJobLoopBack() throws IOException { Job.Builder builder = createScheduledJob(); - Allocation allocation = new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null, - new SchedulerState(JobSchedulerStatus.STARTING, 0L, 60000L)); + SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, 60000L); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); - - when(jobManager.getJobAllocation("foo")).thenReturn(allocation); + Job job = builder.build(); + Allocation allocation = + new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); DataExtractor dataExtractor = mock(DataExtractor.class); - when(dataExtractorFactory.newExtractor(builder.build())).thenReturn(dataExtractor); + when(dataExtractorFactory.newExtractor(job)).thenReturn(dataExtractor); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); when(dataExtractor.next()).thenReturn(Optional.of(in)); when(dataProcessor.processData(anyString(), eq(in), any(), any())).thenReturn(dataCounts); - scheduledJobService.start(builder.build(), allocation); + Consumer handler = mockConsumer(); + StartJobSchedulerAction.SchedulerTask task = mock(StartJobSchedulerAction.SchedulerTask.class); + scheduledJobRunner.run(job, schedulerState, allocation, task, handler); verify(dataExtractor).newSearch(eq(0L), eq(60000L), any()); verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STARTED)), any()); - verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPING)), any()); + verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); } public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws IOException { Job.Builder builder = createScheduledJob(); - Allocation allocation = new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null, - new SchedulerState(JobSchedulerStatus.STARTING, 0L, null)); + SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, null); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); - when(jobManager.getJobAllocation("foo")).thenReturn(allocation); + Job job = builder.build(); + Allocation allocation = + new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); DataExtractor dataExtractor = mock(DataExtractor.class); - when(dataExtractorFactory.newExtractor(builder.build())).thenReturn(dataExtractor); + when(dataExtractorFactory.newExtractor(job)).thenReturn(dataExtractor); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); when(dataExtractor.next()).thenReturn(Optional.of(in)); when(dataProcessor.processData(anyString(), eq(in), any(), any())).thenReturn(dataCounts); - scheduledJobService.start(builder.build(), allocation); + Consumer handler = mockConsumer(); + boolean cancelled = randomBoolean(); + StartJobSchedulerAction.SchedulerTask task = new StartJobSchedulerAction.SchedulerTask(1, "type", "action", null, "foo"); + scheduledJobRunner.run(job, schedulerState, allocation, task, handler); verify(dataExtractor).newSearch(eq(0L), eq(60000L), any()); verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME); - verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME), any()); - - allocation = new Allocation(allocation.getNodeId(), allocation.getJobId(), false, allocation.getStatus(), - null, new SchedulerState(JobSchedulerStatus.STOPPING, 0L, 60000L)); - scheduledJobService.stop(allocation); - verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); + if (cancelled) { + task.stop(); + verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); + } else { + verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME), any()); + } } - public void testStop_GivenNonScheduledJob() { - Allocation allocation = new Allocation.Builder().build(); - expectThrows(IllegalStateException.class, () -> scheduledJobService.stop(allocation)); - } - - public void testStop_GivenStartedScheduledJob() throws IOException { - Job.Builder builder = createScheduledJob(); - Allocation allocation1 = - new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STARTED, 0L, null)); - when(jobManager.getJobAllocation("foo")).thenReturn(allocation1); - - DataExtractor dataExtractor = mock(DataExtractor.class); - when(dataExtractorFactory.newExtractor(builder.build())).thenReturn(dataExtractor); - - Exception e = expectThrows(IllegalStateException.class, () -> scheduledJobService.start(builder.build(), allocation1)); - assertThat(e.getMessage(), equalTo("expected job scheduler status [STARTING], but got [STARTED] instead")); - - e = expectThrows(IllegalStateException.class, () -> scheduledJobService.stop(allocation1)); - assertThat(e.getMessage(), equalTo("expected job scheduler status [STOPPING], but got [STARTED] instead")); - - // Properly stop it to avoid leaking threads in the test - Allocation allocation2 = - new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPING, 0L, null)); - scheduledJobService.registry.put("foo", scheduledJobService.createJobScheduler(builder.build())); - scheduledJobService.stop(allocation2); - - // We stopped twice but the first time should have been ignored. We can assert that indirectly - // by verifying that the scheduler status was set to STOPPED. - verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); - } - - private static Job.Builder createScheduledJob() { + public static Job.Builder createScheduledJob() { AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); acBuilder.setBucketSpan(3600L); acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); @@ -184,4 +168,27 @@ public class ScheduledJobServiceTests extends ESTestCase { builder.setDataDescription(dataDescription); return builder; } + + public void testValidate() { + Job job1 = buildJobBuilder("foo").build(); + Exception e = expectThrows(IllegalArgumentException.class, () -> ScheduledJobRunner.validate(job1, null)); + assertThat(e.getMessage(), equalTo("job [foo] is not a scheduled job")); + + Job job2 = createScheduledJob().build(); + Allocation allocation1 = + new Allocation("_id", "_id", false, JobStatus.CLOSED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); + e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate(job2, allocation1)); + assertThat(e.getMessage(), equalTo("cannot start scheduler, expected job status [OPENED], but got [CLOSED]")); + + Job job3 = createScheduledJob().build(); + Allocation allocation2 = + new Allocation("_id", "_id", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STARTED, null, null)); + e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate(job3, allocation2)); + assertThat(e.getMessage(), equalTo("scheduler already started, expected scheduler status [STOPPED], but got [STARTED]")); + } + + @SuppressWarnings("unchecked") + private Consumer mockConsumer() { + return mock(Consumer.class); + } } \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java index d4bdf1975f9..2ac57c76ce2 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java @@ -7,22 +7,37 @@ package org.elasticsearch.xpack.prelert.rest.schedulers; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.FakeRestRequest; -import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction; +import org.elasticsearch.xpack.prelert.job.Job; +import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.JobStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerState; +import org.elasticsearch.xpack.prelert.job.manager.JobManager; +import org.elasticsearch.xpack.prelert.job.metadata.Allocation; +import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunnerTests; import java.util.Collections; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RestStartJobSchedulerActionTests extends ESTestCase { public void testPrepareRequest() throws Exception { + JobManager jobManager = mock(JobManager.class); + Job.Builder job = ScheduledJobRunnerTests.createScheduledJob(); + when(jobManager.getJobOrThrowIfUnknown(anyString())).thenReturn(job.build()); + Allocation allocation = + new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); + when(jobManager.getJobAllocation(anyString())).thenReturn(allocation); RestStartJobSchedulerAction action = new RestStartJobSchedulerAction(Settings.EMPTY, mock(RestController.class), - mock(StartJobSchedulerAction.TransportAction.class)); + jobManager, mock(ClusterService.class)); RestRequest restRequest1 = new FakeRestRequest.Builder().withParams(Collections.singletonMap("start", "not-a-date")).build(); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class,