Hooked scheduler into task api infrastructure.

The start scheduler api call will run until the scheduler has completed. Either when lookback only scheduler completes or the scheduler has been stopped.
 The start scheduler api will first update the scheduler status from STOPPED to STARTED on master node and then start running the scheduler.
 Once a scheduled job completes it updates the scheduler status from STARTED to STOPPED and then the start schedule api returns.
 The STARTING and STOPPING statuses are no longer used, so have been removed.
 The stop scheduler api is a sugar api that uses the task list and cancel apis stop the scheduler.
 Renamed ScheduledJobService to ScheduledJobRunner

Original commit: elastic/x-pack-elasticsearch@ab504fe3d9
This commit is contained in:
Martijn van Groningen 2016-12-02 14:09:09 +01:00
parent 037392ddd4
commit 9c2c831996
20 changed files with 409 additions and 386 deletions

View File

@ -68,7 +68,7 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcessF
import org.elasticsearch.xpack.prelert.job.process.autodetect.BlackHoleAutodetectProcess;
import org.elasticsearch.xpack.prelert.job.process.autodetect.NativeAutodetectProcessFactory;
import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobService;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunner;
import org.elasticsearch.xpack.prelert.job.scheduler.http.HttpDataExtractorFactory;
import org.elasticsearch.xpack.prelert.job.status.StatusReporter;
import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
@ -168,7 +168,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier);
DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory, clusterService.getClusterSettings());
ScheduledJobService scheduledJobService = new ScheduledJobService(threadPool, client, jobProvider, dataProcessor,
ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, jobProvider, dataProcessor,
// norelease: we will no longer need to pass the client here after we switch to a client based data extractor
new HttpDataExtractorFactory(client),
System::currentTimeMillis);
@ -176,11 +176,12 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
jobProvider,
jobManager,
new JobAllocator(settings, clusterService, threadPool),
new JobLifeCycleService(settings, client, clusterService, scheduledJobService, dataProcessor, threadPool.generic()),
new JobLifeCycleService(settings, client, clusterService, dataProcessor, threadPool.generic()),
new JobDataDeleterFactory(client), //NORELEASE: this should use Delete-by-query
dataProcessor,
new PrelertInitializationService(settings, threadPool, clusterService, jobProvider),
jobDataCountsPersister
jobDataCountsPersister,
scheduledJobRunner
);
}
@ -258,7 +259,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
FixedExecutorBuilder prelert = new FixedExecutorBuilder(settings, THREAD_POOL_NAME,
maxNumberOfJobs, 1000, "xpack.prelert.thread_pool");
// fail quick to start autodetect process / scheduler, so no queues
// fail quick to run autodetect process / scheduler, so no queues
// 4 threads: for c++ logging, result processing, state processing and restore state
FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_PROCESS_THREAD_POOL_NAME,
maxNumberOfJobs * 4, 4, "xpack.prelert.autodetect_process_thread_pool");

View File

@ -7,18 +7,14 @@ package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -28,12 +24,17 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunner;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
@ -43,7 +44,7 @@ public class StartJobSchedulerAction
extends Action<StartJobSchedulerAction.Request, StartJobSchedulerAction.Response, StartJobSchedulerAction.RequestBuilder> {
public static final StartJobSchedulerAction INSTANCE = new StartJobSchedulerAction();
public static final String NAME = "cluster:admin/prelert/job/scheduler/start";
public static final String NAME = "cluster:admin/prelert/job/scheduler/run";
private StartJobSchedulerAction() {
super(NAME);
@ -59,7 +60,7 @@ extends Action<StartJobSchedulerAction.Request, StartJobSchedulerAction.Response
return new Response();
}
public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
public static class Request extends ActionRequest implements ToXContent {
public static ObjectParser<Request, ParseFieldMatcherSupplier> PARSER = new ObjectParser<>(NAME, Request::new);
@ -85,9 +86,9 @@ extends Action<StartJobSchedulerAction.Request, StartJobSchedulerAction.Response
public Request(String jobId, SchedulerState schedulerState) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
this.schedulerState = ExceptionsHelper.requireNonNull(schedulerState, SchedulerState.TYPE_FIELD.getPreferredName());
if (schedulerState.getStatus() != JobSchedulerStatus.STARTING) {
if (schedulerState.getStatus() != JobSchedulerStatus.STARTED) {
throw new IllegalStateException(
"Start job scheduler action requires the scheduler status to be [" + JobSchedulerStatus.STARTING + "]");
"Start job scheduler action requires the scheduler status to be [" + JobSchedulerStatus.STARTED + "]");
}
}
@ -107,6 +108,11 @@ extends Action<StartJobSchedulerAction.Request, StartJobSchedulerAction.Response
return null;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SchedulerTask(id, type, action, parentTaskId, jobId);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -148,65 +154,77 @@ extends Action<StartJobSchedulerAction.Request, StartJobSchedulerAction.Response
}
}
static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, StartJobSchedulerAction action) {
super(client, action, new Request());
}
}
public static class Response extends AcknowledgedResponse {
public static class Response extends ActionResponse {
public Response(boolean acknowledged) {
super(acknowledged);
Response() {
}
private Response() {
}
public static class SchedulerTask extends CancellableTask {
private volatile ScheduledJobRunner.Holder holder;
public SchedulerTask(long id, String type, String action, TaskId parentTaskId, String jobId) {
super(id, type, action, "job-scheduler-" + jobId, parentTaskId);
}
public void setHolder(ScheduledJobRunner.Holder holder) {
this.holder = holder;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
protected void onCancelled() {
stop();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
/* public for testing */
public void stop() {
if (holder != null) {
holder.stop();
}
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final JobManager jobManager;
private final ScheduledJobRunner scheduledJobRunner;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager) {
super(settings, StartJobSchedulerAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager, ScheduledJobRunner scheduledJobRunner) {
super(settings, StartJobSchedulerAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
Request::new);
this.jobManager = jobManager;
this.scheduledJobRunner = scheduledJobRunner;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
SchedulerTask schedulerTask = (SchedulerTask) task;
Job job = jobManager.getJobOrThrowIfUnknown(request.jobId);
Allocation allocation = jobManager.getJobAllocation(job.getId());
scheduledJobRunner.run(job, request.getSchedulerState(), allocation, schedulerTask, (error) -> {
if (error != null) {
listener.onFailure(error);
} else {
listener.onResponse(new Response());
}
});
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
jobManager.startJobScheduler(request, listener);
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
protected void doExecute(Request request, ActionListener<Response> listener) {
throw new UnsupportedOperationException("the task parameter is required");
}
}
}

View File

@ -5,24 +5,28 @@
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.Job;
@ -52,7 +56,7 @@ extends Action<StopJobSchedulerAction.Request, StopJobSchedulerAction.Response,
return new Response();
}
public static class Request extends AcknowledgedRequest<Request> {
public static class Request extends ActionRequest {
private String jobId;
@ -102,7 +106,7 @@ extends Action<StopJobSchedulerAction.Request, StopJobSchedulerAction.Response,
}
}
static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, StopJobSchedulerAction action) {
super(client, action, new Request());
@ -111,56 +115,72 @@ extends Action<StopJobSchedulerAction.Request, StopJobSchedulerAction.Response,
public static class Response extends AcknowledgedResponse {
public Response(boolean acknowledged) {
super(acknowledged);
}
private Response() {
super(true);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final JobManager jobManager;
private final TransportCancelTasksAction cancelTasksAction;
private final TransportListTasksAction listTasksAction;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager) {
super(settings, StopJobSchedulerAction.NAME, transportService, clusterService, threadPool, actionFilters,
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
TransportCancelTasksAction cancelTasksAction, TransportListTasksAction listTasksAction) {
super(settings, StopJobSchedulerAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;
this.cancelTasksAction = cancelTasksAction;
this.listTasksAction = listTasksAction;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
protected void doExecute(Request request, ActionListener<Response> listener) {
String jobId = request.getJobId();
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions(StartJobSchedulerAction.NAME);
listTasksRequest.setDetailed(true);
listTasksAction.execute(listTasksRequest, new ActionListener<ListTasksResponse>() {
@Override
public void onResponse(ListTasksResponse listTasksResponse) {
String expectedJobDescription = "job-scheduler-" + jobId;
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (expectedJobDescription.equals(taskInfo.getDescription())) {
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setTaskId(taskInfo.getTaskId());
cancelTasksAction.execute(cancelTasksRequest, new ActionListener<CancelTasksResponse>() {
@Override
public void onResponse(CancelTasksResponse cancelTasksResponse) {
listener.onResponse(new Response());
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
return;
}
}
listener.onFailure(new ResourceNotFoundException("No scheduler running for job [" + jobId + "]"));
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
jobManager.stopJobScheduler(request, listener);
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
}

View File

@ -140,7 +140,7 @@ public class UpdateJobSchedulerStatusAction extends Action<UpdateJobSchedulerSta
super(acknowledged);
}
private Response() {}
public Response() {}
@Override
public void readFrom(StreamInput in) throws IOException {

View File

@ -14,7 +14,7 @@ import java.util.Locale;
public enum JobSchedulerStatus implements Writeable {
STARTING, STARTED, STOPPING, STOPPED;
STARTED, STOPPED;
public static JobSchedulerStatus fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));

View File

@ -19,8 +19,6 @@ import org.elasticsearch.xpack.prelert.action.DeleteJobAction;
import org.elasticsearch.xpack.prelert.action.PutJobAction;
import org.elasticsearch.xpack.prelert.action.OpenJobAction;
import org.elasticsearch.xpack.prelert.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction;
import org.elasticsearch.xpack.prelert.action.StopJobSchedulerAction;
import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction;
import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.prelert.job.IgnoreDowntime;
@ -276,41 +274,6 @@ public class JobManager extends AbstractComponent {
return newState.build();
}
public void startJobScheduler(StartJobSchedulerAction.Request request,
ActionListener<StartJobSchedulerAction.Response> actionListener) {
clusterService.submitStateUpdateTask("start-scheduler-job-" + request.getJobId(),
new AckedClusterStateUpdateTask<StartJobSchedulerAction.Response>(request, actionListener) {
@Override
protected StartJobSchedulerAction.Response newResponse(boolean acknowledged) {
return new StartJobSchedulerAction.Response(acknowledged);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
long startTime = request.getSchedulerState().getStartTimeMillis();
Long endTime = request.getSchedulerState().getEndTimeMillis();
return innerUpdateSchedulerState(currentState, request.getJobId(), JobSchedulerStatus.STARTING, startTime, endTime);
}
});
}
public void stopJobScheduler(StopJobSchedulerAction.Request request, ActionListener<StopJobSchedulerAction.Response> actionListener) {
clusterService.submitStateUpdateTask("stop-scheduler-job-" + request.getJobId(),
new AckedClusterStateUpdateTask<StopJobSchedulerAction.Response>(request, actionListener) {
@Override
protected StopJobSchedulerAction.Response newResponse(boolean acknowledged) {
return new StopJobSchedulerAction.Response(acknowledged);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return innerUpdateSchedulerState(currentState, request.getJobId(), JobSchedulerStatus.STOPPING, null, null);
}
});
}
private void checkJobIsScheduled(Job job) {
if (job.getSchedulerConfig() == null) {
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_SCHEDULER_NO_SUCH_SCHEDULED_JOB, job.getId()));
@ -352,7 +315,7 @@ public class JobManager extends AbstractComponent {
checkJobIsScheduled(job);
Allocation allocation = getAllocation(currentState, jobId);
if (allocation.getSchedulerState() == null && status != JobSchedulerStatus.STARTING) {
if (allocation.getSchedulerState() == null && status != JobSchedulerStatus.STARTED) {
throw new IllegalArgumentException("Can't change status to [" + status + "], because job's [" + jobId +
"] scheduler never started");
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
@ -179,6 +180,13 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
public Builder() {
}
public Builder(Job job) {
this.jobId = job.getId();
if (job.getSchedulerConfig() != null) {
schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, null, null);
}
}
public Builder(Allocation allocation) {
this.nodeId = allocation.nodeId;
this.jobId = allocation.jobId;
@ -229,41 +237,30 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
this.statusReason = statusReason;
}
public void setSchedulerState(SchedulerState schedulerState) {
JobSchedulerStatus currentSchedulerStatus = this.schedulerState == null ?
JobSchedulerStatus.STOPPED : this.schedulerState.getStatus();
JobSchedulerStatus newSchedulerStatus = schedulerState.getStatus();
switch (newSchedulerStatus) {
case STARTING:
if (currentSchedulerStatus != JobSchedulerStatus.STOPPED) {
String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_START, jobId, newSchedulerStatus);
throw ExceptionsHelper.conflictStatusException(msg);
public void setSchedulerState(SchedulerState newSchedulerState) {
if (this.schedulerState != null){
JobSchedulerStatus currentSchedulerStatus = this.schedulerState.getStatus();
JobSchedulerStatus newSchedulerStatus = newSchedulerState.getStatus();
switch (newSchedulerStatus) {
case STARTED:
if (currentSchedulerStatus != JobSchedulerStatus.STOPPED) {
String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_START, jobId, newSchedulerStatus);
throw ExceptionsHelper.conflictStatusException(msg);
}
break;
case STOPPED:
if (currentSchedulerStatus != JobSchedulerStatus.STARTED) {
String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId,
newSchedulerStatus);
throw ExceptionsHelper.conflictStatusException(msg);
}
break;
default:
throw new IllegalArgumentException("Invalid requested job scheduler status: " + newSchedulerStatus);
}
break;
case STARTED:
if (currentSchedulerStatus != JobSchedulerStatus.STARTING) {
String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_START, jobId, newSchedulerStatus);
throw ExceptionsHelper.conflictStatusException(msg);
}
break;
case STOPPING:
if (currentSchedulerStatus != JobSchedulerStatus.STARTED) {
String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, newSchedulerStatus);
throw ExceptionsHelper.conflictStatusException(msg);
}
break;
case STOPPED:
if ((currentSchedulerStatus != JobSchedulerStatus.STOPPED ||
currentSchedulerStatus != JobSchedulerStatus.STOPPING) == false) {
String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, newSchedulerStatus);
throw ExceptionsHelper.conflictStatusException(msg);
}
break;
default:
throw new IllegalArgumentException("Invalid requested job scheduler status: " + newSchedulerStatus);
}
this.schedulerState = schedulerState;
this.schedulerState = newSchedulerState;
}
public Allocation build() {

View File

@ -16,9 +16,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.data.DataProcessor;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobService;
import java.util.HashSet;
import java.util.Objects;
@ -29,16 +27,14 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
volatile Set<String> localAssignedJobs = new HashSet<>();
private final Client client;
private final ScheduledJobService scheduledJobService;
private final DataProcessor dataProcessor;
private final Executor executor;
public JobLifeCycleService(Settings settings, Client client, ClusterService clusterService, ScheduledJobService scheduledJobService,
DataProcessor dataProcessor, Executor executor) {
public JobLifeCycleService(Settings settings, Client client, ClusterService clusterService, DataProcessor dataProcessor,
Executor executor) {
super(settings);
clusterService.add(this);
this.client = Objects.requireNonNull(client);
this.scheduledJobService = Objects.requireNonNull(scheduledJobService);
this.dataProcessor = Objects.requireNonNull(dataProcessor);
this.executor = Objects.requireNonNull(executor);
}
@ -80,28 +76,6 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
startJob(allocation);
}
}
handleSchedulerStatusChange(job, allocation);
}
private void handleSchedulerStatusChange(Job job, Allocation allocation) {
SchedulerState schedulerState = allocation.getSchedulerState();
if (schedulerState != null) {
switch (schedulerState.getStatus()) {
case STARTING:
executor.execute(() -> scheduledJobService.start(job, allocation));
break;
case STARTED:
break;
case STOPPING:
executor.execute(() -> scheduledJobService.stop(allocation));
break;
case STOPPED:
break;
default:
throw new IllegalStateException("Unhandled scheduler state [" + schedulerState.getStatus() + "]");
}
}
}
void startJob(Allocation allocation) {

View File

@ -208,12 +208,7 @@ public class PrelertMetadata implements MetaData.Custom {
Allocation allocation = allocations.get(job.getId());
if (allocation == null) {
Allocation.Builder builder = new Allocation.Builder();
builder.setJobId(job.getId());
boolean addSchedulderState = job.getSchedulerConfig() != null;
if (addSchedulderState) {
builder.setSchedulerState(new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
}
Allocation.Builder builder = new Allocation.Builder(job);
builder.setStatus(JobStatus.CLOSED);
allocations.put(job.getId(), builder.build());
}

View File

@ -5,20 +5,23 @@
*/
package org.elasticsearch.xpack.prelert.job.scheduler;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction;
import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.config.DefaultFrequency;
@ -33,11 +36,11 @@ import org.elasticsearch.xpack.prelert.job.results.Bucket;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class ScheduledJobService extends AbstractComponent {
public class ScheduledJobRunner extends AbstractComponent {
private final Client client;
private final JobProvider jobProvider;
@ -45,10 +48,9 @@ public class ScheduledJobService extends AbstractComponent {
private final DataExtractorFactory dataExtractorFactory;
private final ThreadPool threadPool;
private final Supplier<Long> currentTimeSupplier;
final ConcurrentMap<String, Holder> registry;
public ScheduledJobService(ThreadPool threadPool, Client client, JobProvider jobProvider, DataProcessor dataProcessor,
DataExtractorFactory dataExtractorFactory, Supplier<Long> currentTimeSupplier) {
public ScheduledJobRunner(ThreadPool threadPool, Client client, JobProvider jobProvider, DataProcessor dataProcessor,
DataExtractorFactory dataExtractorFactory, Supplier<Long> currentTimeSupplier) {
super(Settings.EMPTY);
this.threadPool = threadPool;
this.client = Objects.requireNonNull(client);
@ -56,77 +58,43 @@ public class ScheduledJobService extends AbstractComponent {
this.dataProcessor = Objects.requireNonNull(dataProcessor);
this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory);
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
this.registry = ConcurrentCollections.newConcurrentMap();
}
public void start(Job job, Allocation allocation) {
SchedulerState schedulerState = allocation.getSchedulerState();
if (schedulerState == null) {
throw new IllegalStateException("Job [" + job.getId() + "] is not a scheduled job");
}
public void run(Job job, SchedulerState schedulerState, Allocation allocation, StartJobSchedulerAction.SchedulerTask task,
Consumer<Exception> handler) {
validate(job, allocation);
if (schedulerState.getStatus() != JobSchedulerStatus.STARTING) {
throw new IllegalStateException("expected job scheduler status [" + JobSchedulerStatus.STARTING + "], but got [" +
schedulerState.getStatus() + "] instead");
}
if (registry.containsKey(allocation.getJobId())) {
throw new IllegalStateException("job [" + allocation.getJobId() + "] has already been started");
}
logger.info("Starting scheduler [{}]", allocation);
Holder holder = createJobScheduler(job);
registry.put(job.getId(), holder);
holder.future = threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).submit(() -> {
try {
Long next = holder.scheduledJob.runLookBack(allocation.getSchedulerState());
if (next != null) {
doScheduleRealtime(next, job.getId(), holder);
} else {
holder.scheduledJob.stop();
requestStopping(job.getId());
setJobSchedulerStatus(job.getId(), JobSchedulerStatus.STARTED, error -> {
logger.info("Starting scheduler [{}]", schedulerState);
Holder holder = createJobScheduler(job, task, handler);
task.setHolder(holder);
holder.future = threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).submit(() -> {
try {
Long next = holder.scheduledJob.runLookBack(schedulerState);
if (next != null) {
doScheduleRealtime(next, job.getId(), holder);
} else {
holder.stop();
}
} catch (ScheduledJob.ExtractionProblemException e) {
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
} catch (ScheduledJob.AnalysisProblemException e) {
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (ScheduledJob.EmptyDataCountException e) {
if (holder.problemTracker.updateEmptyDataCount(true)) {
holder.stop();
}
} catch (Exception e) {
logger.error("Failed lookback import for job[" + job.getId() + "]", e);
holder.stop();
}
} catch (ScheduledJob.ExtractionProblemException e) {
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
} catch (ScheduledJob.AnalysisProblemException e) {
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (ScheduledJob.EmptyDataCountException e) {
if (holder.problemTracker.updateEmptyDataCount(true)) {
requestStopping(job.getId());
}
} catch (Exception e) {
logger.error("Failed lookback import for job[" + job.getId() + "]", e);
requestStopping(job.getId());
}
holder.problemTracker.finishReport();
holder.problemTracker.finishReport();
});
});
setJobSchedulerStatus(job.getId(), JobSchedulerStatus.STARTED);
}
public void stop(Allocation allocation) {
SchedulerState schedulerState = allocation.getSchedulerState();
if (schedulerState == null) {
throw new IllegalStateException("Job [" + allocation.getJobId() + "] is not a scheduled job");
}
if (schedulerState.getStatus() != JobSchedulerStatus.STOPPING) {
throw new IllegalStateException("expected job scheduler status [" + JobSchedulerStatus.STOPPING + "], but got [" +
schedulerState.getStatus() + "] instead");
}
Holder holder = registry.remove(allocation.getJobId());
if (holder == null) {
throw new IllegalStateException("job [" + allocation.getJobId() + "] has not been started");
}
logger.info("Stopping scheduler for job [{}]", allocation.getJobId());
holder.stop();
// Don't close the job directly without going via the close data api to change the job status:
// dataProcessor.closeJob(allocation.getJobId());
setJobSchedulerStatus(allocation.getJobId(), JobSchedulerStatus.STOPPED);
}
private void doScheduleRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) {
if (holder.scheduledJob.isRunning()) {
if (holder.isRunning()) {
TimeValue delay = computeNextDelay(delayInMsSinceEpoch);
logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId);
holder.future = threadPool.schedule(delay, PrelertPlugin.SCHEDULER_THREAD_POOL_NAME, () -> {
@ -143,25 +111,40 @@ public class ScheduledJobService extends AbstractComponent {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
if (holder.problemTracker.updateEmptyDataCount(true)) {
holder.problemTracker.finishReport();
requestStopping(jobId);
holder.stop();
return;
}
} catch (Exception e) {
logger.error("Unexpected scheduler failure for job [" + jobId + "] stopping...", e);
requestStopping(jobId);
holder.stop();
return;
}
holder.problemTracker.finishReport();
doScheduleRealtime(nextDelayInMsSinceEpoch, jobId, holder);
});
} else {
holder.stop();
}
}
private void requestStopping(String jobId) {
setJobSchedulerStatus(jobId, JobSchedulerStatus.STOPPING);
public static void validate(Job job, Allocation allocation) {
if (job.getSchedulerConfig() == null) {
throw new IllegalArgumentException("job [" + job.getId() + "] is not a scheduled job");
}
if (allocation.getStatus() != JobStatus.OPENED) {
throw new ElasticsearchStatusException("cannot start scheduler, expected job status [{}], but got [{}]",
RestStatus.CONFLICT, JobStatus.OPENED, allocation.getStatus());
}
if (allocation.getSchedulerState().getStatus() != JobSchedulerStatus.STOPPED) {
throw new ElasticsearchStatusException("scheduler already started, expected scheduler status [{}], but got [{}]",
RestStatus.CONFLICT, JobSchedulerStatus.STOPPED, allocation.getSchedulerState().getStatus());
}
}
Holder createJobScheduler(Job job) {
private Holder createJobScheduler(Job job, StartJobSchedulerAction.SchedulerTask task, Consumer<Exception> handler) {
Auditor auditor = jobProvider.audit(job.getId());
Duration frequency = getFrequencyOrDefault(job);
Duration queryDelay = Duration.ofSeconds(job.getSchedulerConfig().getQueryDelay());
@ -169,7 +152,7 @@ public class ScheduledJobService extends AbstractComponent {
ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(),
dataExtractor, dataProcessor, auditor, currentTimeSupplier, getLatestFinalBucketEndTimeMs(job),
getLatestRecordTimestamp(job.getId()));
return new Holder(scheduledJob, new ProblemTracker(() -> auditor));
return new Holder(job, scheduledJob, new ProblemTracker(() -> auditor), handler);
}
private long getLatestFinalBucketEndTimeMs(Job job) {
@ -211,7 +194,7 @@ public class ScheduledJobService extends AbstractComponent {
return new TimeValue(Math.max(1, next - currentTimeSupplier.get()));
}
private void setJobSchedulerStatus(String jobId, JobSchedulerStatus status) {
private void setJobSchedulerStatus(String jobId, JobSchedulerStatus status, Consumer<Exception> supplier) {
UpdateJobSchedulerStatusAction.Request request = new UpdateJobSchedulerStatusAction.Request(jobId, status);
client.execute(UpdateJobSchedulerStatusAction.INSTANCE, request, new ActionListener<UpdateJobSchedulerStatusAction.Response>() {
@Override
@ -221,29 +204,41 @@ public class ScheduledJobService extends AbstractComponent {
} else {
logger.info("set job scheduler status to [{}] for job [{}], but was not acknowledged", status, jobId);
}
supplier.accept(null);
}
@Override
public void onFailure(Exception e) {
logger.error("could not set job scheduler status to [" + status + "] for job [" + jobId +"]", e);
supplier.accept(e);
}
});
}
private static class Holder {
public class Holder {
private final String jobId;
private final ScheduledJob scheduledJob;
private final ProblemTracker problemTracker;
private final Consumer<Exception> handler;
volatile Future<?> future;
private Holder(ScheduledJob scheduledJob, ProblemTracker problemTracker) {
private Holder(Job job, ScheduledJob scheduledJob, ProblemTracker problemTracker, Consumer<Exception> handler) {
this.jobId = job.getId();
this.scheduledJob = scheduledJob;
this.problemTracker = problemTracker;
this.handler = handler;
}
void stop() {
boolean isRunning() {
return scheduledJob.isRunning();
}
public void stop() {
logger.info("Stopping scheduler for job [{}]", jobId);
scheduledJob.stop();
FutureUtils.cancel(future);
setJobSchedulerStatus(jobId, JobSchedulerStatus.STOPPED, error -> handler.accept(null));
}
}

View File

@ -7,23 +7,31 @@ package org.elasticsearch.xpack.prelert.rest.schedulers;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunner;
import java.io.IOException;
@ -31,13 +39,15 @@ public class RestStartJobSchedulerAction extends BaseRestHandler {
private static final String DEFAULT_START = "0";
private final StartJobSchedulerAction.TransportAction transportJobSchedulerAction;
private final JobManager jobManager;
private final ClusterService clusterService;
@Inject
public RestStartJobSchedulerAction(Settings settings, RestController controller,
StartJobSchedulerAction.TransportAction transportJobSchedulerAction) {
public RestStartJobSchedulerAction(Settings settings, RestController controller, JobManager jobManager,
ClusterService clusterService) {
super(settings);
this.transportJobSchedulerAction = transportJobSchedulerAction;
this.jobManager = jobManager;
this.clusterService = clusterService;
controller.registerHandler(RestRequest.Method.POST,
PrelertPlugin.BASE_PATH + "schedulers/{" + Job.ID.getPreferredName() + "}/_start", this);
}
@ -45,6 +55,14 @@ public class RestStartJobSchedulerAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String jobId = restRequest.param(Job.ID.getPreferredName());
// This validation happens also in ScheduledJobRunner, the reason we do it here too is that if it fails there
// we are unable to provide the user immediate feedback. We would create the task and the validation would fail
// in the background, whereas now the validation failure is part of the response being returned.
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
Allocation allocation = jobManager.getJobAllocation(jobId);
ScheduledJobRunner.validate(job, allocation);
StartJobSchedulerAction.Request jobSchedulerRequest;
if (RestActions.hasBodyContent(restRequest)) {
BytesReference bodyBytes = RestActions.getRestContent(restRequest);
@ -58,10 +76,21 @@ public class RestStartJobSchedulerAction extends BaseRestHandler {
endTimeMillis = parseDateOrThrow(restRequest.param(SchedulerState.END_TIME_MILLIS.getPreferredName()),
SchedulerState.END_TIME_MILLIS.getPreferredName());
}
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, startTimeMillis, endTimeMillis);
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, startTimeMillis, endTimeMillis);
jobSchedulerRequest = new StartJobSchedulerAction.Request(jobId, schedulerState);
}
return channel -> transportJobSchedulerAction.execute(jobSchedulerRequest, new AcknowledgedRestListener<>(channel));
return sendTask(client.executeLocally(StartJobSchedulerAction.INSTANCE, jobSchedulerRequest, LoggingTaskListener.instance()));
}
private RestChannelConsumer sendTask(Task task) throws IOException {
return channel -> {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
builder.field("task", clusterService.localNode().getId() + ":" + task.getId());
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}
};
}
static long parseDateOrThrow(String date, String paramName) {

View File

@ -37,9 +37,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@ESIntegTestCase.ClusterScope(numDataNodes = 1)
public class ScheduledJobsIT extends ESIntegTestCase {
@ -75,11 +77,9 @@ public class ScheduledJobsIT extends ESIntegTestCase {
OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get();
assertTrue(openJobResponse.isAcknowledged());
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, 0L, now);
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, now);
StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", schedulerState);
StartJobSchedulerAction.Response startJobResponse = client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest)
.get();
assertTrue(startJobResponse.isAcknowledged());
client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest).get();
assertBusy(() -> {
DataCounts dataCounts = getDataCounts("_job_id");
assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs));
@ -107,11 +107,18 @@ public class ScheduledJobsIT extends ESIntegTestCase {
OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get();
assertTrue(openJobResponse.isAcknowledged());
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, 0L, null);
StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", schedulerState);
StartJobSchedulerAction.Response startJobResponse = client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest)
.get();
assertTrue(startJobResponse.isAcknowledged());
AtomicReference<Throwable> errorHolder = new AtomicReference<>();
Thread t = new Thread(() -> {
try {
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, null);
StartJobSchedulerAction.Request startSchedulerRequest =
new StartJobSchedulerAction.Request("_job_id", schedulerState);
client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest).get();
} catch (Exception | AssertionError e) {
errorHolder.set(e);
}
});
t.start();
assertBusy(() -> {
DataCounts dataCounts = getDataCounts("_job_id");
assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs1));
@ -126,14 +133,15 @@ public class ScheduledJobsIT extends ESIntegTestCase {
}, 30, TimeUnit.SECONDS);
StopJobSchedulerAction.Request stopSchedulerRequest = new StopJobSchedulerAction.Request("_job_id");
client().execute(StopJobSchedulerAction.INSTANCE, stopSchedulerRequest).get();
assertTrue(startJobResponse.isAcknowledged());
StopJobSchedulerAction.Response stopJobResponse = client().execute(StopJobSchedulerAction.INSTANCE, stopSchedulerRequest).get();
assertTrue(stopJobResponse.isAcknowledged());
assertBusy(() -> {
PrelertMetadata prelertMetadata = client().admin().cluster().prepareState().all().get()
.getState().metaData().custom(PrelertMetadata.TYPE);
assertThat(prelertMetadata.getAllocations().get("_job_id").getSchedulerState().getStatus(),
equalTo(JobSchedulerStatus.STOPPED));
});
assertThat(errorHolder.get(), nullValue());
}
private void indexDocs(long numDocs, long start, long end) {

View File

@ -16,7 +16,7 @@ public class StartJobSchedulerActionRequestTests extends AbstractStreamableXCont
@Override
protected Request createTestInstance() {
SchedulerState state = new SchedulerState(JobSchedulerStatus.STARTING, randomLong(), randomLong());
SchedulerState state = new SchedulerState(JobSchedulerStatus.STARTED, randomLong(), randomLong());
return new Request(randomAsciiOfLengthBetween(1, 20), state);
}

View File

@ -45,7 +45,36 @@ public class ScheduledJobIT extends ESRestTestCase {
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
String responseAsString = responseEntityToString(e.getResponse());
assertThat(responseAsString, containsString("\"reason\":\"There is no job '" + jobId + "' with a scheduler configured\""));
assertThat(responseAsString, containsString("\"reason\":\"job [" + jobId + "] is not a scheduled job\""));
}
public void testStartJobScheduler_jobNotOpened() throws Exception {
String jobId = "_id1";
createScheduledJob(jobId);
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(409));
String responseAsString = responseEntityToString(e.getResponse());
assertThat(responseAsString, containsString("\"reason\":\"cannot start scheduler, expected job status [OPENED], " +
"but got [CLOSED]\""));
}
public void testStartJobScheduler_schedulerAlreadyStarted() throws Exception {
String jobId = "_id1";
createScheduledJob(jobId);
openJob(client(), jobId);
Response startSchedulerRequest = client().performRequest("post",
PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z");
assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(startSchedulerRequest), containsString("{\"task\":\""));
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(409));
String responseAsString = responseEntityToString(e.getResponse());
assertThat(responseAsString, containsString("\"reason\":\"scheduler already started, expected scheduler status " +
"[STOPPED], but got [STARTED]\""));
}
public void testStartJobScheduler_GivenLookbackOnly() throws Exception {
@ -57,9 +86,7 @@ public class ScheduledJobIT extends ESRestTestCase {
Response startSchedulerRequest = client().performRequest("post",
PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z");
assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(startSchedulerRequest), equalTo("{\"acknowledged\":true}"));
waitForSchedulerStartedState(jobId);
assertThat(responseEntityToString(startSchedulerRequest), containsString("{\"task\":\""));
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId + "/_stats",
@ -69,7 +96,6 @@ public class ScheduledJobIT extends ESRestTestCase {
throw new RuntimeException(e);
}
});
waitForSchedulerStoppedState(client(), jobId);
}
@ -82,8 +108,7 @@ public class ScheduledJobIT extends ESRestTestCase {
Response response = client().performRequest("post",
PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
waitForSchedulerStartedState(jobId);
assertThat(responseEntityToString(response), containsString("{\"task\":\""));
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId + "/_stats",
@ -176,23 +201,6 @@ public class ScheduledJobIT extends ESRestTestCase {
}
}
private void waitForSchedulerStartedState(String jobId) throws Exception {
try {
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId + "/_stats",
Collections.singletonMap("metric", "scheduler_state"));
assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"STARTED\""));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (AssertionError e) {
Response response = client().performRequest("get", "/_nodes/hotthreads");
logger.info("hot_threads: {}", responseEntityToString(response));
}
}
@After
public void clearPrelertState() throws IOException {
clearPrelertMetadata(adminClient());

View File

@ -79,7 +79,7 @@ public class TooManyJobsIT extends ESIntegTestCase {
"[failed to open, max running job capacity [" + maxRunningJobsPerNode + "] reached]", cause.getMessage());
logger.info("good news everybody --> reached maximum number of allowed opened jobs, after trying to open the {}th job", i);
// now manually clean things up and see if we can succeed to start one new job
// now manually clean things up and see if we can succeed to run one new job
clearPrelertMetadata();
putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());

View File

@ -10,17 +10,13 @@ import org.elasticsearch.test.ESTestCase;
public class JobSchedulerStatusTests extends ESTestCase {
public void testForString() {
assertEquals(JobSchedulerStatus.fromString("starting"), JobSchedulerStatus.STARTING);
assertEquals(JobSchedulerStatus.fromString("started"), JobSchedulerStatus.STARTED);
assertEquals(JobSchedulerStatus.fromString("stopping"), JobSchedulerStatus.STOPPING);
assertEquals(JobSchedulerStatus.fromString("stopped"), JobSchedulerStatus.STOPPED);
}
public void testValidOrdinals() {
assertEquals(0, JobSchedulerStatus.STARTING.ordinal());
assertEquals(1, JobSchedulerStatus.STARTED.ordinal());
assertEquals(2, JobSchedulerStatus.STOPPING.ordinal());
assertEquals(3, JobSchedulerStatus.STOPPED.ordinal());
assertEquals(0, JobSchedulerStatus.STARTED.ordinal());
assertEquals(1, JobSchedulerStatus.STOPPED.ordinal());
}
}

View File

@ -22,7 +22,7 @@ public class AllocationTests extends AbstractSerializingTestCase<Allocation> {
boolean ignoreDowntime = randomBoolean();
JobStatus jobStatus = randomFrom(JobStatus.values());
String statusReason = randomBoolean() ? randomAsciiOfLength(10) : null;
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, randomPositiveLong(), randomPositiveLong());
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, randomPositiveLong(), randomPositiveLong());
return new Allocation(nodeId, jobId, ignoreDowntime, jobStatus, statusReason, schedulerState);
}

View File

@ -20,7 +20,6 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.data.DataProcessor;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobService;
import org.junit.Before;
import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder;
@ -40,11 +39,9 @@ public class JobLifeCycleServiceTests extends ESTestCase {
@Before
public void instantiateJobAllocator() {
ClusterService clusterService = mock(ClusterService.class);
ScheduledJobService scheduledJobService = mock(ScheduledJobService.class);
dataProcessor = mock(DataProcessor.class);
client = mock(Client.class);
jobLifeCycleService = new JobLifeCycleService(Settings.EMPTY, client, clusterService, scheduledJobService, dataProcessor,
Runnable::run);
jobLifeCycleService = new JobLifeCycleService(Settings.EMPTY, client, clusterService, dataProcessor, Runnable::run);
}
public void testStartStop() {

View File

@ -5,11 +5,15 @@
*/
package org.elasticsearch.xpack.prelert.job.scheduler;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction;
import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.DataDescription;
@ -38,9 +42,11 @@ import java.util.Arrays;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction.INSTANCE;
import static org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction.Request;
import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
@ -53,27 +59,30 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ScheduledJobServiceTests extends ESTestCase {
public class ScheduledJobRunnerTests extends ESTestCase {
private Client client;
private ThreadPool threadPool;
private JobProvider jobProvider;
private JobManager jobManager;
private DataProcessor dataProcessor;
private DataExtractorFactory dataExtractorFactory;
private Auditor auditor;
private ScheduledJobService scheduledJobService;
private ScheduledJobRunner scheduledJobRunner;
private long currentTime = 120000;
@Before
public void setUpTests() {
client = mock(Client.class);
jobProvider = mock(JobProvider.class);
doAnswer(invocation -> {
@SuppressWarnings("unchecked")
ActionListener<Object> actionListener = (ActionListener) invocation.getArguments()[2];
actionListener.onResponse(new UpdateJobSchedulerStatusAction.Response());
return null;
}).when(client).execute(eq(UpdateJobSchedulerStatusAction.INSTANCE), any(), any());
JobProvider jobProvider = mock(JobProvider.class);
when(jobProvider.dataCounts(anyString())).thenReturn(new DataCounts("foo"));
jobManager = mock(JobManager.class);
dataProcessor = mock(DataProcessor.class);
dataExtractorFactory = mock(DataExtractorFactory.class);
auditor = mock(Auditor.class);
Auditor auditor = mock(Auditor.class);
threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
doAnswer(invocation -> {
@ -82,8 +91,8 @@ public class ScheduledJobServiceTests extends ESTestCase {
}).when(executorService).submit(any(Runnable.class));
when(threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME)).thenReturn(executorService);
scheduledJobService =
new ScheduledJobService(threadPool, client, jobProvider, dataProcessor, dataExtractorFactory, () -> currentTime);
scheduledJobRunner =
new ScheduledJobRunner(threadPool, client, jobProvider, dataProcessor, dataExtractorFactory, () -> currentTime);
when(jobProvider.audit(anyString())).thenReturn(auditor);
when(jobProvider.buckets(anyString(), any(BucketsQueryBuilder.BucketsQuery.class))).thenThrow(
@ -92,84 +101,59 @@ public class ScheduledJobServiceTests extends ESTestCase {
public void testStart_GivenNewlyCreatedJobLoopBack() throws IOException {
Job.Builder builder = createScheduledJob();
Allocation allocation = new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null,
new SchedulerState(JobSchedulerStatus.STARTING, 0L, 60000L));
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, 60000L);
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
when(jobManager.getJobAllocation("foo")).thenReturn(allocation);
Job job = builder.build();
Allocation allocation =
new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(builder.build())).thenReturn(dataExtractor);
when(dataExtractorFactory.newExtractor(job)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in));
when(dataProcessor.processData(anyString(), eq(in), any(), any())).thenReturn(dataCounts);
scheduledJobService.start(builder.build(), allocation);
Consumer<Exception> handler = mockConsumer();
StartJobSchedulerAction.SchedulerTask task = mock(StartJobSchedulerAction.SchedulerTask.class);
scheduledJobRunner.run(job, schedulerState, allocation, task, handler);
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STARTED)), any());
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPING)), any());
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any());
}
public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws IOException {
Job.Builder builder = createScheduledJob();
Allocation allocation = new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null,
new SchedulerState(JobSchedulerStatus.STARTING, 0L, null));
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, null);
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
when(jobManager.getJobAllocation("foo")).thenReturn(allocation);
Job job = builder.build();
Allocation allocation =
new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(builder.build())).thenReturn(dataExtractor);
when(dataExtractorFactory.newExtractor(job)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in));
when(dataProcessor.processData(anyString(), eq(in), any(), any())).thenReturn(dataCounts);
scheduledJobService.start(builder.build(), allocation);
Consumer<Exception> handler = mockConsumer();
boolean cancelled = randomBoolean();
StartJobSchedulerAction.SchedulerTask task = new StartJobSchedulerAction.SchedulerTask(1, "type", "action", null, "foo");
scheduledJobRunner.run(job, schedulerState, allocation, task, handler);
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME);
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME), any());
allocation = new Allocation(allocation.getNodeId(), allocation.getJobId(), false, allocation.getStatus(),
null, new SchedulerState(JobSchedulerStatus.STOPPING, 0L, 60000L));
scheduledJobService.stop(allocation);
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any());
if (cancelled) {
task.stop();
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any());
} else {
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME), any());
}
}
public void testStop_GivenNonScheduledJob() {
Allocation allocation = new Allocation.Builder().build();
expectThrows(IllegalStateException.class, () -> scheduledJobService.stop(allocation));
}
public void testStop_GivenStartedScheduledJob() throws IOException {
Job.Builder builder = createScheduledJob();
Allocation allocation1 =
new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STARTED, 0L, null));
when(jobManager.getJobAllocation("foo")).thenReturn(allocation1);
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(builder.build())).thenReturn(dataExtractor);
Exception e = expectThrows(IllegalStateException.class, () -> scheduledJobService.start(builder.build(), allocation1));
assertThat(e.getMessage(), equalTo("expected job scheduler status [STARTING], but got [STARTED] instead"));
e = expectThrows(IllegalStateException.class, () -> scheduledJobService.stop(allocation1));
assertThat(e.getMessage(), equalTo("expected job scheduler status [STOPPING], but got [STARTED] instead"));
// Properly stop it to avoid leaking threads in the test
Allocation allocation2 =
new Allocation("_nodeId", "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPING, 0L, null));
scheduledJobService.registry.put("foo", scheduledJobService.createJobScheduler(builder.build()));
scheduledJobService.stop(allocation2);
// We stopped twice but the first time should have been ignored. We can assert that indirectly
// by verifying that the scheduler status was set to STOPPED.
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any());
}
private static Job.Builder createScheduledJob() {
public static Job.Builder createScheduledJob() {
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build()));
acBuilder.setBucketSpan(3600L);
acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build()));
@ -184,4 +168,27 @@ public class ScheduledJobServiceTests extends ESTestCase {
builder.setDataDescription(dataDescription);
return builder;
}
public void testValidate() {
Job job1 = buildJobBuilder("foo").build();
Exception e = expectThrows(IllegalArgumentException.class, () -> ScheduledJobRunner.validate(job1, null));
assertThat(e.getMessage(), equalTo("job [foo] is not a scheduled job"));
Job job2 = createScheduledJob().build();
Allocation allocation1 =
new Allocation("_id", "_id", false, JobStatus.CLOSED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate(job2, allocation1));
assertThat(e.getMessage(), equalTo("cannot start scheduler, expected job status [OPENED], but got [CLOSED]"));
Job job3 = createScheduledJob().build();
Allocation allocation2 =
new Allocation("_id", "_id", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STARTED, null, null));
e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate(job3, allocation2));
assertThat(e.getMessage(), equalTo("scheduler already started, expected scheduler status [STOPPED], but got [STARTED]"));
}
@SuppressWarnings("unchecked")
private Consumer<Exception> mockConsumer() {
return mock(Consumer.class);
}
}

View File

@ -7,22 +7,37 @@ package org.elasticsearch.xpack.prelert.rest.schedulers;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunnerTests;
import java.util.Collections;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class RestStartJobSchedulerActionTests extends ESTestCase {
public void testPrepareRequest() throws Exception {
JobManager jobManager = mock(JobManager.class);
Job.Builder job = ScheduledJobRunnerTests.createScheduledJob();
when(jobManager.getJobOrThrowIfUnknown(anyString())).thenReturn(job.build());
Allocation allocation =
new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
when(jobManager.getJobAllocation(anyString())).thenReturn(allocation);
RestStartJobSchedulerAction action = new RestStartJobSchedulerAction(Settings.EMPTY, mock(RestController.class),
mock(StartJobSchedulerAction.TransportAction.class));
jobManager, mock(ClusterService.class));
RestRequest restRequest1 = new FakeRestRequest.Builder().withParams(Collections.singletonMap("start", "not-a-date")).build();
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class,