From d5412627d2558767897c440881ad40a8aa9ee8fe Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 9 Dec 2016 14:43:41 +0100 Subject: [PATCH] Moved scheduler status to prelertmetadata to make it more independent of job. Removed SchedulerStats as scheduler status is all we need and start and end times are only needed in start scheduler api. Original commit: elastic/x-pack-elasticsearch@80c563cb696444aac3495f821af8dfacb1d7b004 --- .../xpack/prelert/PrelertPlugin.java | 2 +- .../xpack/prelert/action/GetJobsAction.java | 32 ++-- .../action/StartJobSchedulerAction.java | 62 ++++---- .../UpdateJobSchedulerStatusAction.java | 17 +-- .../xpack/prelert/job/IgnoreDowntime.java | 2 +- .../xpack/prelert/job/SchedulerState.java | 118 --------------- ...edulerStatus.java => SchedulerStatus.java} | 8 +- .../xpack/prelert/job/manager/JobManager.java | 133 +++++----------- .../prelert/job/metadata/Allocation.java | 60 +------- .../prelert/job/metadata/PrelertMetadata.java | 143 ++++++++++++++++-- .../prelert/job/scheduler/ScheduledJob.java | 11 +- .../job/scheduler/ScheduledJobRunner.java | 42 +++-- .../RestStartJobSchedulerAction.java | 29 ++-- .../action/GetJobActionResponseTests.java | 10 +- .../xpack/prelert/action/ScheduledJobsIT.java | 20 +-- .../StartJobSchedulerActionRequestTests.java | 9 +- .../UpdateJobSchedulerStatusRequestTests.java | 4 +- .../prelert/integration/ScheduledJobIT.java | 2 +- .../prelert/job/JobSchedulerStatusTests.java | 8 +- .../prelert/job/SchedulerStateTests.java | 75 --------- .../prelert/job/manager/JobManagerTests.java | 19 --- .../prelert/job/metadata/AllocationTests.java | 7 +- .../job/metadata/JobAllocatorTests.java | 32 ---- .../job/metadata/PrelertMetadataTests.java | 30 ++-- .../scheduler/ScheduledJobRunnerTests.java | 58 ++++--- .../job/scheduler/ScheduledJobTests.java | 20 +-- .../RestStartJobSchedulerActionTests.java | 38 +++-- .../rest-api-spec/test/jobs_get_stats.yaml | 2 +- 28 files changed, 394 insertions(+), 599 deletions(-) delete mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerState.java rename elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/{JobSchedulerStatus.java => SchedulerStatus.java} (73%) delete mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/SchedulerStateTests.java diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java index 95edef21b6e..a67f1dd8336 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java @@ -168,7 +168,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier); DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory, clusterService.getClusterSettings()); - ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, jobProvider, + ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider, // norelease: we will no longer need to pass the client here after we switch to a client based data extractor new HttpDataExtractorFactory(client), System::currentTimeMillis); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobsAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobsAction.java index 4e2707270ed..09336c29bca 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobsAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobsAction.java @@ -39,9 +39,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.Job; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.ModelSizeStats; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager; import org.elasticsearch.xpack.prelert.job.manager.JobManager; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; @@ -280,19 +280,19 @@ public class GetJobsAction extends Action { + public static final ParseField START_TIME = new ParseField("start"); + public static final ParseField END_TIME = new ParseField("end"); + public static final StartJobSchedulerAction INSTANCE = new StartJobSchedulerAction(); public static final String NAME = "cluster:admin/prelert/job/scheduler/run"; @@ -66,8 +66,8 @@ extends Action request.jobId = jobId, Job.ID); - PARSER.declareObject((request, schedulerState) -> request.schedulerState = schedulerState, SchedulerState.PARSER, - SchedulerState.TYPE_FIELD); + PARSER.declareLong((request, startTime) -> request.startTime = startTime, START_TIME); + PARSER.declareLong(Request::setEndTime, END_TIME); } public static Request parseRequest(String jobId, XContentParser parser, ParseFieldMatcherSupplier parseFieldMatcherSupplier) { @@ -79,17 +79,12 @@ extends Action { - private final JobManager jobManager; private final ScheduledJobRunner scheduledJobRunner; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - JobManager jobManager, ScheduledJobRunner scheduledJobRunner) { + ScheduledJobRunner scheduledJobRunner) { super(settings, StartJobSchedulerAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); - this.jobManager = jobManager; this.scheduledJobRunner = scheduledJobRunner; } @Override protected void doExecute(Task task, Request request, ActionListener listener) { SchedulerTask schedulerTask = (SchedulerTask) task; - Job job = jobManager.getJobOrThrowIfUnknown(request.jobId); - Allocation allocation = jobManager.getJobAllocation(job.getId()); - scheduledJobRunner.run(job, request.getSchedulerState(), allocation, schedulerTask, (error) -> { + scheduledJobRunner.run(request.getJobId(), request.getStartTime(), request.getEndTime(), schedulerTask, (error) -> { if (error != null) { listener.onFailure(error); } else { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusAction.java index d25e0968d29..33e731c3e0d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusAction.java @@ -26,8 +26,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.manager.JobManager; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; @@ -57,11 +56,11 @@ public class UpdateJobSchedulerStatusAction extends Action { private String jobId; - private JobSchedulerStatus schedulerStatus; + private SchedulerStatus schedulerStatus; - public Request(String jobId, JobSchedulerStatus schedulerStatus) { + public Request(String jobId, SchedulerStatus schedulerStatus) { this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); - this.schedulerStatus = ExceptionsHelper.requireNonNull(schedulerStatus, SchedulerState.STATUS.getPreferredName()); + this.schedulerStatus = ExceptionsHelper.requireNonNull(schedulerStatus, "status"); } Request() {} @@ -74,11 +73,11 @@ public class UpdateJobSchedulerStatusAction extends Action= values().length) { - throw new IOException("Unknown public enum JobSchedulerStatus {\n ordinal [" + ordinal + "]"); + throw new IOException("Unknown public enum SchedulerStatus {\n ordinal [" + ordinal + "]"); } return values()[ordinal]; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerState.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerState.java deleted file mode 100644 index 2f7f438d910..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerState.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.job; - -import org.elasticsearch.action.support.ToXContentToBytes; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.ParseFieldMatcherSupplier; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.XContentBuilder; - -import org.elasticsearch.common.xcontent.ObjectParser.ValueType; - -import java.io.IOException; -import java.util.Locale; -import java.util.Objects; - -public class SchedulerState extends ToXContentToBytes implements Writeable { - - public static final ParseField TYPE_FIELD = new ParseField("scheduler_state"); - public static final ParseField STATUS = new ParseField("status"); - public static final ParseField START_TIME_MILLIS = new ParseField("start"); - public static final ParseField END_TIME_MILLIS = new ParseField("end"); - - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - TYPE_FIELD.getPreferredName(), a -> new SchedulerState((JobSchedulerStatus) a[0], (Long) a[1], (Long) a[2])); - - static { - PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> JobSchedulerStatus.fromString(p.text()), STATUS, - ValueType.STRING); - PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), START_TIME_MILLIS); - PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), END_TIME_MILLIS); - } - - private JobSchedulerStatus status; - @Nullable - private Long startTimeMillis; - @Nullable - private Long endTimeMillis; - - public SchedulerState(JobSchedulerStatus status, Long startTimeMillis, Long endTimeMillis) { - this.status = status; - this.startTimeMillis = startTimeMillis; - this.endTimeMillis = endTimeMillis; - } - - public SchedulerState(StreamInput in) throws IOException { - status = JobSchedulerStatus.fromStream(in); - startTimeMillis = in.readOptionalLong(); - endTimeMillis = in.readOptionalLong(); - } - - public JobSchedulerStatus getStatus() { - return status; - } - - public Long getStartTimeMillis() { - return startTimeMillis; - } - - /** - * The end time as epoch milliseconds. An {@code null} end time indicates - * real-time mode. - * - * @return The optional end time as epoch milliseconds. - */ - @Nullable - public Long getEndTimeMillis() { - return endTimeMillis; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other instanceof SchedulerState == false) { - return false; - } - - SchedulerState that = (SchedulerState) other; - - return Objects.equals(this.status, that.status) && Objects.equals(this.startTimeMillis, that.startTimeMillis) - && Objects.equals(this.endTimeMillis, that.endTimeMillis); - } - - @Override - public int hashCode() { - return Objects.hash(status, startTimeMillis, endTimeMillis); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - status.writeTo(out); - out.writeOptionalLong(startTimeMillis); - out.writeOptionalLong(endTimeMillis); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(STATUS.getPreferredName(), status.name().toUpperCase(Locale.ROOT)); - if (startTimeMillis != null) { - builder.field(START_TIME_MILLIS.getPreferredName(), startTimeMillis); - } - if (endTimeMillis != null) { - builder.field(END_TIME_MILLIS.getPreferredName(), endTimeMillis); - } - builder.endObject(); - return builder; - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatus.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerStatus.java similarity index 73% rename from elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatus.java rename to elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerStatus.java index 5d476b95e61..0e6e0fc5eea 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatus.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerStatus.java @@ -12,18 +12,18 @@ import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; import java.util.Locale; -public enum JobSchedulerStatus implements Writeable { +public enum SchedulerStatus implements Writeable { STARTED, STOPPED; - public static JobSchedulerStatus fromString(String name) { + public static SchedulerStatus fromString(String name) { return valueOf(name.trim().toUpperCase(Locale.ROOT)); } - public static JobSchedulerStatus fromStream(StreamInput in) throws IOException { + public static SchedulerStatus fromStream(StreamInput in) throws IOException { int ordinal = in.readVInt(); if (ordinal < 0 || ordinal >= values().length) { - throw new IOException("Unknown public enum JobSchedulerStatus {\n ordinal [" + ordinal + "]"); + throw new IOException("Unknown public enum SchedulerStatus {\n ordinal [" + ordinal + "]"); } return values()[ordinal]; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java index 056dfa6c7d6..2ee1f1b744e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java @@ -23,10 +23,9 @@ import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction; import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction; import org.elasticsearch.xpack.prelert.job.IgnoreDowntime; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.ModelSnapshot; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.messages.Messages; import org.elasticsearch.xpack.prelert.job.metadata.Allocation; @@ -152,7 +151,7 @@ public class JobManager extends AbstractComponent { * @throws org.elasticsearch.ResourceNotFoundException * if there is no job with matching the given {@code jobId} */ - public Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) { + Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) { PrelertMetadata prelertMetadata = clusterState.metaData().custom(PrelertMetadata.TYPE); Job job = prelertMetadata.getJobs().get(jobId); if (job == null) { @@ -166,26 +165,25 @@ public class JobManager extends AbstractComponent { */ public void putJob(PutJobAction.Request request, ActionListener actionListener) { Job job = request.getJob(); - ActionListener delegateListener = ActionListener.wrap(jobSaved -> { - jobProvider.createJobRelatedIndices(job, new ActionListener() { - @Override - public void onResponse(Boolean indicesCreated) { - audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED)); + ActionListener delegateListener = ActionListener.wrap(jobSaved -> + jobProvider.createJobRelatedIndices(job, new ActionListener() { + @Override + public void onResponse(Boolean indicesCreated) { + audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED)); - // Also I wonder if we need to audit log infra - // structure in prelert as when we merge into xpack - // we can use its audit trailing. See: - // https://github.com/elastic/prelert-legacy/issues/48 - actionListener.onResponse(new PutJobAction.Response(jobSaved && indicesCreated, job)); - } + // Also I wonder if we need to audit log infra + // structure in prelert as when we merge into xpack + // we can use its audit trailing. See: + // https://github.com/elastic/prelert-legacy/issues/48 + actionListener.onResponse(new PutJobAction.Response(jobSaved && indicesCreated, job)); + } - @Override - public void onFailure(Exception e) { - actionListener.onFailure(e); + @Override + public void onFailure(Exception e) { + actionListener.onFailure(e); - } - }); - }, actionListener::onFailure); + } + }), actionListener::onFailure); clusterService.submitStateUpdateTask("put-job-" + job.getId(), new AckedClusterStateUpdateTask(request, delegateListener) { @@ -203,12 +201,9 @@ public class JobManager extends AbstractComponent { } ClusterState innerPutJob(Job job, boolean overwrite, ClusterState currentState) { - PrelertMetadata currentPrelertMetadata = currentState.metaData().custom(PrelertMetadata.TYPE); - PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentPrelertMetadata); + PrelertMetadata.Builder builder = createPrelertMetadatBuilder(currentState); builder.putJob(job, overwrite); - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build()); - return newState.build(); + return buildNewClusterState(currentState, builder); } /** @@ -254,52 +249,29 @@ public class JobManager extends AbstractComponent { } ClusterState removeJobFromClusterState(String jobId, ClusterState currentState) { - PrelertMetadata currentPrelertMetadata = currentState.metaData().custom(PrelertMetadata.TYPE); - PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentPrelertMetadata); + PrelertMetadata.Builder builder = createPrelertMetadatBuilder(currentState); builder.removeJob(jobId); - Allocation allocation = currentPrelertMetadata.getAllocations().get(jobId); - if (allocation != null) { - SchedulerState schedulerState = allocation.getSchedulerState(); - if (schedulerState != null && schedulerState.getStatus() != JobSchedulerStatus.STOPPED) { - throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.JOB_CANNOT_DELETE_WHILE_SCHEDULER_RUNS, jobId)); - } - if (!allocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) { - throw ExceptionsHelper.conflictStatusException(Messages.getMessage( - Messages.JOB_CANNOT_DELETE_WHILE_RUNNING, jobId, allocation.getStatus())); - } - } - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build()); - return newState.build(); + return buildNewClusterState(currentState, builder); } - private void checkJobIsScheduled(Job job) { - if (job.getSchedulerConfig() == null) { - throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_SCHEDULER_NO_SUCH_SCHEDULED_JOB, job.getId())); - } - } - - public Optional getSchedulerState(String jobId) { - Job job = getJobOrThrowIfUnknown(clusterService.state(), jobId); - if (job.getSchedulerConfig() == null) { - return Optional.empty(); - } - - Allocation allocation = getAllocation(clusterService.state(), jobId); - return Optional.ofNullable(allocation.getSchedulerState()); + public Optional getSchedulerStatus(String jobId) { + PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE); + return Optional.ofNullable(prelertMetadata.getSchedulerStatuses().get(jobId)); } public void updateSchedulerStatus(UpdateJobSchedulerStatusAction.Request request, ActionListener actionListener) { String jobId = request.getJobId(); - JobSchedulerStatus newStatus = request.getSchedulerStatus(); + SchedulerStatus newStatus = request.getSchedulerStatus(); clusterService.submitStateUpdateTask("update-scheduler-status-job-" + jobId, new AckedClusterStateUpdateTask(request, actionListener) { @Override public ClusterState execute(ClusterState currentState) throws Exception { - return innerUpdateSchedulerState(currentState, jobId, newStatus, null, null); + PrelertMetadata.Builder builder = createPrelertMetadatBuilder(currentState); + builder.updateSchedulerStatus(jobId, newStatus); + return buildNewClusterState(currentState, builder); } @Override @@ -309,33 +281,6 @@ public class JobManager extends AbstractComponent { }); } - private ClusterState innerUpdateSchedulerState(ClusterState currentState, String jobId, JobSchedulerStatus status, - Long startTime, Long endTime) { - Job job = getJobOrThrowIfUnknown(currentState, jobId); - checkJobIsScheduled(job); - - Allocation allocation = getAllocation(currentState, jobId); - if (allocation.getSchedulerState() == null && status != JobSchedulerStatus.STARTED) { - throw new IllegalArgumentException("Can't change status to [" + status + "], because job's [" + jobId + - "] scheduler never started"); - } - - SchedulerState existingState = allocation.getSchedulerState(); - if (existingState != null) { - if (startTime == null) { - startTime = existingState.getStartTimeMillis(); - } - if (endTime == null) { - endTime = existingState.getEndTimeMillis(); - } - } - - existingState = new SchedulerState(status, startTime, endTime); - Allocation.Builder builder = new Allocation.Builder(allocation); - builder.setSchedulerState(existingState); - return innerUpdateAllocation(builder.build(), currentState); - } - private Allocation getAllocation(ClusterState state, String jobId) { PrelertMetadata prelertMetadata = state.metaData().custom(PrelertMetadata.TYPE); Allocation allocation = prelertMetadata.getAllocations().get(jobId); @@ -345,15 +290,6 @@ public class JobManager extends AbstractComponent { return allocation; } - private ClusterState innerUpdateAllocation(Allocation newAllocation, ClusterState currentState) { - PrelertMetadata currentPrelertMetadata = currentState.metaData().custom(PrelertMetadata.TYPE); - PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentPrelertMetadata); - builder.updateAllocation(newAllocation.getJobId(), newAllocation); - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build()); - return newState.build(); - } - public Auditor audit(String jobId) { return jobProvider.audit(jobId); } @@ -463,4 +399,15 @@ public class JobManager extends AbstractComponent { jobResultsPersister.commitWrites(jobId); } + private static PrelertMetadata.Builder createPrelertMetadatBuilder(ClusterState currentState) { + PrelertMetadata currentPrelertMetadata = currentState.metaData().custom(PrelertMetadata.TYPE); + return new PrelertMetadata.Builder(currentPrelertMetadata); + } + + private static ClusterState buildNewClusterState(ClusterState currentState, PrelertMetadata.Builder builder) { + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build()); + return newState.build(); + } + } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java index 051eec475de..dfa851e16ea 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java @@ -15,11 +15,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; -import org.elasticsearch.xpack.prelert.job.messages.Messages; -import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import java.io.IOException; import java.util.Objects; @@ -31,9 +27,8 @@ public class Allocation extends AbstractDiffable implements ToXConte private static final ParseField IGNORE_DOWNTIME_FIELD = new ParseField("ignore_downtime"); public static final ParseField STATUS = new ParseField("status"); public static final ParseField STATUS_REASON = new ParseField("status_reason"); - public static final ParseField SCHEDULER_STATE = new ParseField("scheduler_state"); - static final Allocation PROTO = new Allocation(null, null, false, null, null, null); + static final Allocation PROTO = new Allocation(null, null, false, null, null); static final ObjectParser PARSER = new ObjectParser<>("allocation", Builder::new); @@ -43,7 +38,6 @@ public class Allocation extends AbstractDiffable implements ToXConte PARSER.declareBoolean(Builder::setIgnoreDowntime, IGNORE_DOWNTIME_FIELD); PARSER.declareField(Builder::setStatus, (p, c) -> JobStatus.fromString(p.text()), STATUS, ObjectParser.ValueType.STRING); PARSER.declareString(Builder::setStatusReason, STATUS_REASON); - PARSER.declareObject(Builder::setSchedulerState, SchedulerState.PARSER, SCHEDULER_STATE); } private final String nodeId; @@ -51,16 +45,13 @@ public class Allocation extends AbstractDiffable implements ToXConte private final boolean ignoreDowntime; private final JobStatus status; private final String statusReason; - private final SchedulerState schedulerState; - public Allocation(String nodeId, String jobId, boolean ignoreDowntime, JobStatus status, String statusReason, - SchedulerState schedulerState) { + public Allocation(String nodeId, String jobId, boolean ignoreDowntime, JobStatus status, String statusReason) { this.nodeId = nodeId; this.jobId = jobId; this.ignoreDowntime = ignoreDowntime; this.status = status; this.statusReason = statusReason; - this.schedulerState = schedulerState; } public Allocation(StreamInput in) throws IOException { @@ -69,7 +60,6 @@ public class Allocation extends AbstractDiffable implements ToXConte this.ignoreDowntime = in.readBoolean(); this.status = JobStatus.fromStream(in); this.statusReason = in.readOptionalString(); - this.schedulerState = in.readOptionalWriteable(SchedulerState::new); } public String getNodeId() { @@ -97,10 +87,6 @@ public class Allocation extends AbstractDiffable implements ToXConte return statusReason; } - public SchedulerState getSchedulerState() { - return schedulerState; - } - @Override public Allocation readFrom(StreamInput in) throws IOException { return new Allocation(in); @@ -113,7 +99,6 @@ public class Allocation extends AbstractDiffable implements ToXConte out.writeBoolean(ignoreDowntime); status.writeTo(out); out.writeOptionalString(statusReason); - out.writeOptionalWriteable(schedulerState); } @Override @@ -128,9 +113,6 @@ public class Allocation extends AbstractDiffable implements ToXConte if (statusReason != null) { builder.field(STATUS_REASON.getPreferredName(), statusReason); } - if (schedulerState != null) { - builder.field(SCHEDULER_STATE.getPreferredName(), schedulerState); - } builder.endObject(); return builder; } @@ -144,13 +126,12 @@ public class Allocation extends AbstractDiffable implements ToXConte Objects.equals(jobId, that.jobId) && Objects.equals(ignoreDowntime, that.ignoreDowntime) && Objects.equals(status, that.status) && - Objects.equals(statusReason, that.statusReason) && - Objects.equals(schedulerState, that.schedulerState); + Objects.equals(statusReason, that.statusReason); } @Override public int hashCode() { - return Objects.hash(nodeId, jobId, ignoreDowntime, status, statusReason, schedulerState); + return Objects.hash(nodeId, jobId, ignoreDowntime, status, statusReason); } // Class alreadt extends from AbstractDiffable, so copied from ToXContentToBytes#toString() @@ -175,16 +156,12 @@ public class Allocation extends AbstractDiffable implements ToXConte private boolean ignoreDowntime; private JobStatus status; private String statusReason; - private SchedulerState schedulerState; public Builder() { } public Builder(Job job) { this.jobId = job.getId(); - if (job.getSchedulerConfig() != null) { - schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, null, null); - } } public Builder(Allocation allocation) { @@ -193,7 +170,6 @@ public class Allocation extends AbstractDiffable implements ToXConte this.ignoreDowntime = allocation.ignoreDowntime; this.status = allocation.status; this.statusReason = allocation.statusReason; - this.schedulerState = allocation.schedulerState; } public void setNodeId(String nodeId) { @@ -237,34 +213,8 @@ public class Allocation extends AbstractDiffable implements ToXConte this.statusReason = statusReason; } - public void setSchedulerState(SchedulerState newSchedulerState) { - if (this.schedulerState != null){ - JobSchedulerStatus currentSchedulerStatus = this.schedulerState.getStatus(); - JobSchedulerStatus newSchedulerStatus = newSchedulerState.getStatus(); - switch (newSchedulerStatus) { - case STARTED: - if (currentSchedulerStatus != JobSchedulerStatus.STOPPED) { - String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_START, jobId, newSchedulerStatus); - throw ExceptionsHelper.conflictStatusException(msg); - } - break; - case STOPPED: - if (currentSchedulerStatus != JobSchedulerStatus.STARTED) { - String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, - newSchedulerStatus); - throw ExceptionsHelper.conflictStatusException(msg); - } - break; - default: - throw new IllegalArgumentException("Invalid requested job scheduler status: " + newSchedulerStatus); - } - } - - this.schedulerState = newSchedulerState; - } - public Allocation build() { - return new Allocation(nodeId, jobId, ignoreDowntime, status, statusReason, schedulerState); + return new Allocation(nodeId, jobId, ignoreDowntime, status, statusReason); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java index cc15c6c0c7f..c9964c8226b 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.prelert.job.metadata; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.metadata.MetaData; @@ -15,13 +16,14 @@ import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcherSupplier; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; +import org.elasticsearch.xpack.prelert.job.messages.Messages; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import java.io.IOException; @@ -33,6 +35,7 @@ import java.util.Map; import java.util.Objects; import java.util.SortedMap; import java.util.TreeMap; +import java.util.stream.Collectors; public class PrelertMetadata implements MetaData.Custom { @@ -40,7 +43,8 @@ public class PrelertMetadata implements MetaData.Custom { private static final ParseField ALLOCATIONS_FIELD = new ParseField("allocations"); public static final String TYPE = "prelert"; - public static final PrelertMetadata PROTO = new PrelertMetadata(Collections.emptySortedMap(), Collections.emptySortedMap()); + public static final PrelertMetadata PROTO = new PrelertMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), + Collections.emptySortedMap()); private static final ObjectParser PRELERT_METADATA_PARSER = new ObjectParser<>("prelert_metadata", Builder::new); @@ -54,10 +58,13 @@ public class PrelertMetadata implements MetaData.Custom { // performance issue will occur if we don't change that private final SortedMap jobs; private final SortedMap allocations; + private final SortedMap schedulerStatuses; - private PrelertMetadata(SortedMap jobs, SortedMap allocations) { + private PrelertMetadata(SortedMap jobs, SortedMap allocations, + SortedMap schedulerStatuses) { this.jobs = Collections.unmodifiableSortedMap(jobs); this.allocations = Collections.unmodifiableSortedMap(allocations); + this.schedulerStatuses = Collections.unmodifiableSortedMap(schedulerStatuses); } public Map getJobs() { @@ -70,6 +77,10 @@ public class PrelertMetadata implements MetaData.Custom { return allocations; } + public SortedMap getSchedulerStatuses() { + return schedulerStatuses; + } + @Override public String type() { return TYPE; @@ -109,7 +120,12 @@ public class PrelertMetadata implements MetaData.Custom { for (int i = 0; i < size; i++) { allocations.put(in.readString(), Allocation.PROTO.readFrom(in)); } - return new PrelertMetadata(jobs, allocations); + size = in.readVInt(); + TreeMap schedulerStatuses = new TreeMap<>(); + for (int i = 0; i < size; i++) { + schedulerStatuses.put(in.readString(), SchedulerStatus.fromStream(in)); + } + return new PrelertMetadata(jobs, allocations, schedulerStatuses); } @Override @@ -124,6 +140,11 @@ public class PrelertMetadata implements MetaData.Custom { out.writeString(entry.getKey()); entry.getValue().writeTo(out); } + out.writeVInt(schedulerStatuses.size()); + for (Map.Entry entry : schedulerStatuses.entrySet()) { + out.writeString(entry.getKey()); + entry.getValue().writeTo(out); + } } @Override @@ -145,28 +166,83 @@ public class PrelertMetadata implements MetaData.Custom { final Diff> jobs; final Diff> allocations; + final Diff> schedulerStatuses; PrelertMetadataDiff(PrelertMetadata before, PrelertMetadata after) { this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer()); this.allocations = DiffableUtils.diff(before.allocations, after.allocations, DiffableUtils.getStringKeySerializer()); + this.schedulerStatuses = DiffableUtils.diff( + toSchedulerDiff(before.schedulerStatuses), + toSchedulerDiff(after.schedulerStatuses), + DiffableUtils.getStringKeySerializer()); } PrelertMetadataDiff(StreamInput in) throws IOException { jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job.PROTO); allocations = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Allocation.PROTO); + schedulerStatuses = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), SchedulerStatusDiff.PROTO); } @Override public MetaData.Custom apply(MetaData.Custom part) { TreeMap newJobs = new TreeMap<>(jobs.apply(((PrelertMetadata) part).jobs)); TreeMap newAllocations = new TreeMap<>(allocations.apply(((PrelertMetadata) part).allocations)); - return new PrelertMetadata(newJobs, newAllocations); + + Map newSchedulerStatuses = + schedulerStatuses.apply(toSchedulerDiff((((PrelertMetadata) part)).schedulerStatuses)); + return new PrelertMetadata(newJobs, newAllocations, new TreeMap<>(toSchedulerStatusMap(newSchedulerStatuses))); } @Override public void writeTo(StreamOutput out) throws IOException { jobs.writeTo(out); allocations.writeTo(out); + schedulerStatuses.writeTo(out); + } + + private static Map toSchedulerDiff(Map from) { + return from.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, (entry) -> new SchedulerStatusDiff(entry.getValue()))); + } + + private static Map toSchedulerStatusMap(Map from) { + return from.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, (entry) -> entry.getValue().status)); + } + + // SchedulerStatus is enum and that can't extend from anything + static class SchedulerStatusDiff extends AbstractDiffable implements Writeable { + + static SchedulerStatusDiff PROTO = new SchedulerStatusDiff(null); + + private final SchedulerStatus status; + + SchedulerStatusDiff(SchedulerStatus status) { + this.status = status; + } + + @Override + public SchedulerStatusDiff readFrom(StreamInput in) throws IOException { + return new SchedulerStatusDiff(SchedulerStatus.fromStream(in)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + status.writeTo(out); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SchedulerStatusDiff that = (SchedulerStatusDiff) o; + return status == that.status; + } + + @Override + public int hashCode() { + return Objects.hash(status); + } } } @@ -177,27 +253,32 @@ public class PrelertMetadata implements MetaData.Custom { if (o == null || getClass() != o.getClass()) return false; PrelertMetadata that = (PrelertMetadata) o; - return Objects.equals(jobs, that.jobs) && Objects.equals(allocations, that.allocations); + return Objects.equals(jobs, that.jobs) && + Objects.equals(allocations, that.allocations) && + Objects.equals(schedulerStatuses, that.schedulerStatuses); } @Override public int hashCode() { - return Objects.hash(jobs, allocations); + return Objects.hash(jobs, allocations, schedulerStatuses); } public static class Builder { private TreeMap jobs; private TreeMap allocations; + private TreeMap schedulerStatuses; public Builder() { this.jobs = new TreeMap<>(); this.allocations = new TreeMap<>(); + this.schedulerStatuses = new TreeMap<>(); } public Builder(PrelertMetadata previous) { jobs = new TreeMap<>(previous.jobs); allocations = new TreeMap<>(previous.allocations); + schedulerStatuses = new TreeMap<>(previous.schedulerStatuses); } public Builder putJob(Job job, boolean overwrite) { @@ -212,6 +293,9 @@ public class PrelertMetadata implements MetaData.Custom { builder.setStatus(JobStatus.CLOSED); allocations.put(job.getId(), builder.build()); } + if (job.getSchedulerConfig() != null) { + schedulerStatuses.put(job.getId(), SchedulerStatus.STOPPED); + } return this; } @@ -219,7 +303,20 @@ public class PrelertMetadata implements MetaData.Custom { if (jobs.remove(jobId) == null) { throw new ResourceNotFoundException("job [" + jobId + "] does not exist"); } - this.allocations.remove(jobId); + Allocation previousAllocation = this.allocations.remove(jobId); + if (previousAllocation != null) { + if (!previousAllocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) { + throw ExceptionsHelper.conflictStatusException(Messages.getMessage( + Messages.JOB_CANNOT_DELETE_WHILE_RUNNING, jobId, previousAllocation.getStatus())); + } + } + SchedulerStatus previousStatus = this.schedulerStatuses.remove(jobId); + if (previousStatus != null) { + if (previousStatus != SchedulerStatus.STOPPED) { + String message = Messages.getMessage(Messages.JOB_CANNOT_DELETE_WHILE_SCHEDULER_RUNS, jobId); + throw ExceptionsHelper.conflictStatusException(message); + } + } return this; } @@ -253,7 +350,7 @@ public class PrelertMetadata implements MetaData.Custom { } public PrelertMetadata build() { - return new PrelertMetadata(jobs, allocations); + return new PrelertMetadata(jobs, allocations, schedulerStatuses); } public Builder assignToNode(String jobId, String nodeId) { @@ -296,6 +393,32 @@ public class PrelertMetadata implements MetaData.Custom { allocations.put(jobId, builder.build()); return this; } + + public Builder updateSchedulerStatus(String jobId, SchedulerStatus newStatus) { + SchedulerStatus currentStatus = schedulerStatuses.get(jobId); + if (currentStatus == null) { + throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_SCHEDULER_NO_SUCH_SCHEDULED_JOB, jobId)); + } + + switch (newStatus) { + case STARTED: + if (currentStatus != SchedulerStatus.STOPPED) { + String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_START, jobId, newStatus); + throw ExceptionsHelper.conflictStatusException(msg); + } + break; + case STOPPED: + if (currentStatus != SchedulerStatus.STARTED) { + String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, newStatus); + throw ExceptionsHelper.conflictStatusException(msg); + } + break; + default: + throw new IllegalArgumentException("[" + jobId + "] invalid requested job scheduler status [" + newStatus + "]"); + } + schedulerStatuses.put(jobId, newStatus); + return this; + } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java index d8896385d47..b53db5c2fe9 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java @@ -14,7 +14,6 @@ import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.xpack.prelert.action.FlushJobAction; import org.elasticsearch.xpack.prelert.action.JobDataAction; import org.elasticsearch.xpack.prelert.job.DataCounts; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.messages.Messages; @@ -60,11 +59,9 @@ class ScheduledJob { } } - Long runLookBack(SchedulerState schedulerState) throws Exception { - long startMs = schedulerState.getStartTimeMillis(); - lookbackStartTimeMs = (lastEndTimeMs != null && lastEndTimeMs + 1 > startMs) ? lastEndTimeMs + 1 : startMs; - - Optional endMs = Optional.ofNullable(schedulerState.getEndTimeMillis()); + Long runLookBack(long startTime, Long endTime) throws Exception { + lookbackStartTimeMs = (lastEndTimeMs != null && lastEndTimeMs + 1 > startTime) ? lastEndTimeMs + 1 : startTime; + Optional endMs = Optional.ofNullable(endTime); long lookbackEnd = endMs.orElse(currentTimeSupplier.get() - queryDelayMs); boolean isLookbackOnly = endMs.isPresent(); if (lookbackEnd <= lookbackStartTimeMs) { @@ -115,7 +112,7 @@ class ScheduledJob { return running; } - private void run(long start, long end, FlushJobAction.Request flushRequest) throws IOException { + private void run(long start, Long end, FlushJobAction.Request flushRequest) throws IOException { if (end <= start) { return; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java index a1d1b16266d..d261d534999 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -20,18 +21,19 @@ import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction; import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction; import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.config.DefaultFrequency; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory; import org.elasticsearch.xpack.prelert.job.metadata.Allocation; +import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; import org.elasticsearch.xpack.prelert.job.results.Bucket; +import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import java.time.Duration; import java.util.Objects; @@ -42,32 +44,35 @@ import java.util.function.Supplier; public class ScheduledJobRunner extends AbstractComponent { private final Client client; + private final ClusterService clusterService; private final JobProvider jobProvider; private final DataExtractorFactory dataExtractorFactory; private final ThreadPool threadPool; private final Supplier currentTimeSupplier; - public ScheduledJobRunner(ThreadPool threadPool, Client client, JobProvider jobProvider, DataExtractorFactory dataExtractorFactory, - Supplier currentTimeSupplier) { + public ScheduledJobRunner(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider, + DataExtractorFactory dataExtractorFactory, Supplier currentTimeSupplier) { super(Settings.EMPTY); this.threadPool = threadPool; + this.clusterService = Objects.requireNonNull(clusterService); this.client = Objects.requireNonNull(client); this.jobProvider = Objects.requireNonNull(jobProvider); this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); } - public void run(Job job, SchedulerState schedulerState, Allocation allocation, StartJobSchedulerAction.SchedulerTask task, - Consumer handler) { - validate(job, allocation); + public void run(String jobId, long startTime, Long endTime, StartJobSchedulerAction.SchedulerTask task, Consumer handler) { + PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE); + validate(jobId, prelertMetadata); - setJobSchedulerStatus(job.getId(), JobSchedulerStatus.STARTED, error -> { - logger.info("Starting scheduler [{}]", schedulerState); + setJobSchedulerStatus(jobId, SchedulerStatus.STARTED, error -> { + logger.info("[{}] Starting scheduler", jobId); + Job job = prelertMetadata.getJobs().get(jobId); Holder holder = createJobScheduler(job, task, handler); task.setHolder(holder); holder.future = threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).submit(() -> { try { - Long next = holder.scheduledJob.runLookBack(schedulerState); + Long next = holder.scheduledJob.runLookBack(startTime, endTime); if (next != null) { doScheduleRealtime(next, job.getId(), holder); } else { @@ -124,19 +129,26 @@ public class ScheduledJobRunner extends AbstractComponent { } } - public static void validate(Job job, Allocation allocation) { + public static void validate(String jobId, PrelertMetadata prelertMetadata) { + Job job = prelertMetadata.getJobs().get(jobId); + if (job == null) { + throw ExceptionsHelper.missingJobException(jobId); + } + if (job.getSchedulerConfig() == null) { throw new IllegalArgumentException("job [" + job.getId() + "] is not a scheduled job"); } + Allocation allocation = prelertMetadata.getAllocations().get(jobId); if (allocation.getStatus() != JobStatus.OPENED) { throw new ElasticsearchStatusException("cannot start scheduler, expected job status [{}], but got [{}]", RestStatus.CONFLICT, JobStatus.OPENED, allocation.getStatus()); } - if (allocation.getSchedulerState().getStatus() != JobSchedulerStatus.STOPPED) { + SchedulerStatus status = prelertMetadata.getSchedulerStatuses().get(jobId); + if (status != SchedulerStatus.STOPPED) { throw new ElasticsearchStatusException("scheduler already started, expected scheduler status [{}], but got [{}]", - RestStatus.CONFLICT, JobSchedulerStatus.STOPPED, allocation.getSchedulerState().getStatus()); + RestStatus.CONFLICT, SchedulerStatus.STOPPED, status); } } @@ -191,7 +203,7 @@ public class ScheduledJobRunner extends AbstractComponent { return new TimeValue(Math.max(1, next - currentTimeSupplier.get())); } - private void setJobSchedulerStatus(String jobId, JobSchedulerStatus status, Consumer supplier) { + private void setJobSchedulerStatus(String jobId, SchedulerStatus status, Consumer supplier) { UpdateJobSchedulerStatusAction.Request request = new UpdateJobSchedulerStatusAction.Request(jobId, status); client.execute(UpdateJobSchedulerStatusAction.INSTANCE, request, new ActionListener() { @Override @@ -235,7 +247,7 @@ public class ScheduledJobRunner extends AbstractComponent { logger.info("Stopping scheduler for job [{}]", jobId); scheduledJob.stop(); FutureUtils.cancel(future); - setJobSchedulerStatus(jobId, JobSchedulerStatus.STOPPED, error -> handler.accept(null)); + setJobSchedulerStatus(jobId, SchedulerStatus.STOPPED, error -> handler.accept(null)); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerAction.java index 4c1f8b35c73..a7575950c13 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerAction.java @@ -26,11 +26,8 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.prelert.PrelertPlugin; import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; -import org.elasticsearch.xpack.prelert.job.manager.JobManager; import org.elasticsearch.xpack.prelert.job.messages.Messages; -import org.elasticsearch.xpack.prelert.job.metadata.Allocation; +import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunner; import java.io.IOException; @@ -39,14 +36,11 @@ public class RestStartJobSchedulerAction extends BaseRestHandler { private static final String DEFAULT_START = "0"; - private final JobManager jobManager; private final ClusterService clusterService; @Inject - public RestStartJobSchedulerAction(Settings settings, RestController controller, JobManager jobManager, - ClusterService clusterService) { + public RestStartJobSchedulerAction(Settings settings, RestController controller, ClusterService clusterService) { super(settings); - this.jobManager = jobManager; this.clusterService = clusterService; controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH + "schedulers/{" + Job.ID.getPreferredName() + "}/_start", this); @@ -59,9 +53,8 @@ public class RestStartJobSchedulerAction extends BaseRestHandler { // This validation happens also in ScheduledJobRunner, the reason we do it here too is that if it fails there // we are unable to provide the user immediate feedback. We would create the task and the validation would fail // in the background, whereas now the validation failure is part of the response being returned. - Job job = jobManager.getJobOrThrowIfUnknown(jobId); - Allocation allocation = jobManager.getJobAllocation(jobId); - ScheduledJobRunner.validate(job, allocation); + PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE); + ScheduledJobRunner.validate(jobId, prelertMetadata); StartJobSchedulerAction.Request jobSchedulerRequest; if (RestActions.hasBodyContent(restRequest)) { @@ -69,15 +62,15 @@ public class RestStartJobSchedulerAction extends BaseRestHandler { XContentParser parser = XContentFactory.xContent(bodyBytes).createParser(bodyBytes); jobSchedulerRequest = StartJobSchedulerAction.Request.parseRequest(jobId, parser, () -> parseFieldMatcher); } else { - long startTimeMillis = parseDateOrThrow(restRequest.param(SchedulerState.START_TIME_MILLIS.getPreferredName(), DEFAULT_START), - SchedulerState.START_TIME_MILLIS.getPreferredName()); + long startTimeMillis = parseDateOrThrow(restRequest.param(StartJobSchedulerAction.START_TIME.getPreferredName(), + DEFAULT_START), StartJobSchedulerAction.START_TIME.getPreferredName()); Long endTimeMillis = null; - if (restRequest.hasParam(SchedulerState.END_TIME_MILLIS.getPreferredName())) { - endTimeMillis = parseDateOrThrow(restRequest.param(SchedulerState.END_TIME_MILLIS.getPreferredName()), - SchedulerState.END_TIME_MILLIS.getPreferredName()); + if (restRequest.hasParam(StartJobSchedulerAction.END_TIME.getPreferredName())) { + endTimeMillis = parseDateOrThrow(restRequest.param(StartJobSchedulerAction.END_TIME.getPreferredName()), + StartJobSchedulerAction.END_TIME.getPreferredName()); } - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, startTimeMillis, endTimeMillis); - jobSchedulerRequest = new StartJobSchedulerAction.Request(jobId, schedulerState); + jobSchedulerRequest = new StartJobSchedulerAction.Request(jobId, startTimeMillis); + jobSchedulerRequest.setEndTime(endTimeMillis); } return sendTask(client.executeLocally(StartJobSchedulerAction.INSTANCE, jobSchedulerRequest, LoggingTaskListener.instance())); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionResponseTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionResponseTests.java index 3d4742db114..5d8ffde7d9e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionResponseTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionResponseTests.java @@ -13,13 +13,12 @@ import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.Detector; import org.elasticsearch.xpack.prelert.job.IgnoreDowntime; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.ModelDebugConfig; import org.elasticsearch.xpack.prelert.job.ModelSizeStats; import org.elasticsearch.xpack.prelert.job.SchedulerConfig; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.transform.TransformConfig; import org.elasticsearch.xpack.prelert.job.transform.TransformType; import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase; @@ -86,10 +85,9 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase { DataCounts dataCounts = getDataCounts("_job_id"); @@ -86,8 +84,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { PrelertMetadata prelertMetadata = client().admin().cluster().prepareState().all().get() .getState().metaData().custom(PrelertMetadata.TYPE); - assertThat(prelertMetadata.getAllocations().get("_job_id").getSchedulerState().getStatus(), - equalTo(JobSchedulerStatus.STOPPED)); + assertThat(prelertMetadata.getSchedulerStatuses().get("_job_id"), equalTo(SchedulerStatus.STOPPED)); }); } @@ -110,9 +107,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { AtomicReference errorHolder = new AtomicReference<>(); Thread t = new Thread(() -> { try { - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, null); - StartJobSchedulerAction.Request startSchedulerRequest = - new StartJobSchedulerAction.Request("_job_id", schedulerState); + StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", 0L); client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest).get(); } catch (Exception | AssertionError e) { errorHolder.set(e); @@ -138,8 +133,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { assertBusy(() -> { PrelertMetadata prelertMetadata = client().admin().cluster().prepareState().all().get() .getState().metaData().custom(PrelertMetadata.TYPE); - assertThat(prelertMetadata.getAllocations().get("_job_id").getSchedulerState().getStatus(), - equalTo(JobSchedulerStatus.STOPPED)); + assertThat(prelertMetadata.getSchedulerStatuses().get("_job_id"), equalTo(SchedulerStatus.STOPPED)); }); assertThat(errorHolder.get(), nullValue()); } @@ -217,7 +211,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { } catch (Exception e) { fail(); } - assertThat(r.getResponse().results().get(0).getSchedulerState().getStatus(), equalTo(JobSchedulerStatus.STOPPED)); + assertThat(r.getResponse().results().get(0).getSchedulerStatus(), equalTo(SchedulerStatus.STOPPED)); }); } catch (Exception e) { // ignore diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerActionRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerActionRequestTests.java index 780a2631ffc..b3577697579 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerActionRequestTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerActionRequestTests.java @@ -8,16 +8,17 @@ package org.elasticsearch.xpack.prelert.action; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction.Request; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.support.AbstractStreamableXContentTestCase; public class StartJobSchedulerActionRequestTests extends AbstractStreamableXContentTestCase { @Override protected Request createTestInstance() { - SchedulerState state = new SchedulerState(JobSchedulerStatus.STARTED, randomLong(), randomLong()); - return new Request(randomAsciiOfLengthBetween(1, 20), state); + Request request = new Request(randomAsciiOfLength(10), randomPositiveLong()); + if (randomBoolean()) { + request.setEndTime(randomPositiveLong()); + } + return request; } @Override diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusRequestTests.java index 26344beedc1..f27eed7b651 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusRequestTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusRequestTests.java @@ -6,14 +6,14 @@ package org.elasticsearch.xpack.prelert.action; import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction.Request; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase; public class UpdateJobSchedulerStatusRequestTests extends AbstractStreamableTestCase { @Override protected Request createTestInstance() { - return new Request(randomAsciiOfLengthBetween(1, 20), randomFrom(JobSchedulerStatus.values())); + return new Request(randomAsciiOfLengthBetween(1, 20), randomFrom(SchedulerStatus.values())); } @Override diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java index 2902fc49f07..6c926aa8d99 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java @@ -125,7 +125,7 @@ public class ScheduledJobIT extends ESRestTestCase { () -> client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/" + jobId)); response = e.getResponse(); assertThat(response.getStatusLine().getStatusCode(), equalTo(409)); - assertThat(responseEntityToString(response), containsString("Cannot delete job '" + jobId + "' while the scheduler is running")); + assertThat(responseEntityToString(response), containsString("Cannot delete job '" + jobId + "' while it is OPENED")); response = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_stop"); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatusTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatusTests.java index c08a8e96dbd..3b149e6de13 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatusTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatusTests.java @@ -10,13 +10,13 @@ import org.elasticsearch.test.ESTestCase; public class JobSchedulerStatusTests extends ESTestCase { public void testForString() { - assertEquals(JobSchedulerStatus.fromString("started"), JobSchedulerStatus.STARTED); - assertEquals(JobSchedulerStatus.fromString("stopped"), JobSchedulerStatus.STOPPED); + assertEquals(SchedulerStatus.fromString("started"), SchedulerStatus.STARTED); + assertEquals(SchedulerStatus.fromString("stopped"), SchedulerStatus.STOPPED); } public void testValidOrdinals() { - assertEquals(0, JobSchedulerStatus.STARTED.ordinal()); - assertEquals(1, JobSchedulerStatus.STOPPED.ordinal()); + assertEquals(0, SchedulerStatus.STARTED.ordinal()); + assertEquals(1, SchedulerStatus.STOPPED.ordinal()); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/SchedulerStateTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/SchedulerStateTests.java deleted file mode 100644 index a2795c90096..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/SchedulerStateTests.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.job; - -import org.elasticsearch.common.ParseFieldMatcher; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase; - -public class SchedulerStateTests extends AbstractSerializingTestCase { - - @Override - protected SchedulerState createTestInstance() { - return new SchedulerState(randomFrom(JobSchedulerStatus.values()), randomPositiveLong(), - randomBoolean() ? null : randomPositiveLong()); - } - - @Override - protected Writeable.Reader instanceReader() { - return SchedulerState::new; - } - - @Override - protected SchedulerState parseInstance(XContentParser parser, ParseFieldMatcher matcher) { - return SchedulerState.PARSER.apply(parser, () -> matcher); - } - - public void testEquals_GivenDifferentClass() { - - assertFalse(new SchedulerState(JobSchedulerStatus.STARTED, 0L, null).equals("a string")); - } - - public void testEquals_GivenSameReference() { - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 18L, 42L); - assertTrue(schedulerState.equals(schedulerState)); - } - - public void testEquals_GivenEqualObjects() { - SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, 18L, 42L); - SchedulerState schedulerState2 = new SchedulerState(schedulerState1.getStatus(), schedulerState1.getStartTimeMillis(), - schedulerState1.getEndTimeMillis()); - - assertTrue(schedulerState1.equals(schedulerState2)); - assertTrue(schedulerState2.equals(schedulerState1)); - assertEquals(schedulerState1.hashCode(), schedulerState2.hashCode()); - } - - public void testEquals_GivenDifferentStatus() { - SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, 18L, 42L); - SchedulerState schedulerState2 = new SchedulerState(JobSchedulerStatus.STOPPED, schedulerState1.getStartTimeMillis(), - schedulerState1.getEndTimeMillis()); - - assertFalse(schedulerState1.equals(schedulerState2)); - assertFalse(schedulerState2.equals(schedulerState1)); - } - - public void testEquals_GivenDifferentStartTimeMillis() { - SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, null, 42L); - SchedulerState schedulerState2 = new SchedulerState(JobSchedulerStatus.STOPPED, 19L, schedulerState1.getEndTimeMillis()); - - assertFalse(schedulerState1.equals(schedulerState2)); - assertFalse(schedulerState2.equals(schedulerState1)); - } - - public void testEquals_GivenDifferentEndTimeMillis() { - SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, 18L, 42L); - SchedulerState schedulerState2 = new SchedulerState(JobSchedulerStatus.STOPPED, schedulerState1.getStartTimeMillis(), 43L); - - assertFalse(schedulerState1.equals(schedulerState2)); - assertFalse(schedulerState2.equals(schedulerState1)); - } -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java index d303f559444..93533ba7ad3 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.prelert.job.manager; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -34,7 +33,6 @@ import java.util.stream.Collectors; import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -193,23 +191,6 @@ public class JobManagerTests extends ESTestCase { assertThat(result.results().get(0).getId(), equalTo("9")); } - public void testInnerPutJob() { - JobManager jobManager = createJobManager(); - ClusterState cs = createClusterState(); - - Job job1 = buildJobBuilder("_id").build(); - ClusterState result1 = jobManager.innerPutJob(job1, false, cs); - PrelertMetadata pm = result1.getMetaData().custom(PrelertMetadata.TYPE); - assertThat(pm.getJobs().get("_id"), sameInstance(job1)); - - Job job2 = buildJobBuilder("_id").build(); - expectThrows(ResourceAlreadyExistsException.class, () -> jobManager.innerPutJob(job2, false, result1)); - - ClusterState result2 = jobManager.innerPutJob(job2, true, result1); - pm = result2.getMetaData().custom(PrelertMetadata.TYPE); - assertThat(pm.getJobs().get("_id"), sameInstance(job2)); - } - private JobManager createJobManager() { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java index b11bc83939c..06ef8b25a55 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java @@ -8,9 +8,7 @@ package org.elasticsearch.xpack.prelert.job.metadata; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase; public class AllocationTests extends AbstractSerializingTestCase { @@ -22,8 +20,7 @@ public class AllocationTests extends AbstractSerializingTestCase { boolean ignoreDowntime = randomBoolean(); JobStatus jobStatus = randomFrom(JobStatus.values()); String statusReason = randomBoolean() ? randomAsciiOfLength(10) : null; - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, randomPositiveLong(), randomPositiveLong()); - return new Allocation(nodeId, jobId, ignoreDowntime, jobStatus, statusReason, schedulerState); + return new Allocation(nodeId, jobId, ignoreDowntime, jobStatus, statusReason); } @Override @@ -37,7 +34,7 @@ public class AllocationTests extends AbstractSerializingTestCase { } public void testUnsetIgnoreDownTime() { - Allocation allocation = new Allocation("_node_id", "_job_id", true, JobStatus.OPENING, null, null); + Allocation allocation = new Allocation("_node_id", "_job_id", true, JobStatus.OPENING, null); assertTrue(allocation.isIgnoreDowntime()); Allocation.Builder builder = new Allocation.Builder(allocation); builder.setStatus(JobStatus.OPENED); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java index a91a33e8c50..a38a3f5389c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java @@ -17,13 +17,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.prelert.job.DataDescription; -import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerConfig; import org.junit.Before; -import java.util.Collections; import java.util.concurrent.ExecutorService; import static org.mockito.Mockito.doAnswer; @@ -181,31 +176,4 @@ public class JobAllocatorTests extends ESTestCase { verify(clusterService, times(1)).submitStateUpdateTask(any(), any()); } - public void testScheduledJobHasDefaultSchedulerState() { - PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); - - SchedulerConfig.Builder schedulerConfigBuilder = new SchedulerConfig.Builder(Collections.singletonList("foo"), - Collections.singletonList("bar")); - - Job.Builder jobBuilder = buildJobBuilder("_job_id"); - jobBuilder.setSchedulerConfig(schedulerConfigBuilder); - DataDescription.Builder dataDescriptionBuilder = new DataDescription.Builder(); - dataDescriptionBuilder.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - jobBuilder.setDataDescription(dataDescriptionBuilder); - - pmBuilder.putJob(jobBuilder.build(), false); - ClusterState cs = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() - .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("_id", new LocalTransportAddress("_id"), Version.CURRENT)) - .masterNodeId("_id") - .localNodeId("_id")) - .build(); - - - ClusterState clusterStateWithAllocation = jobAllocator.assignJobsToNodes(cs); - PrelertMetadata metadata = clusterStateWithAllocation.metaData().custom(PrelertMetadata.TYPE); - assertEquals(JobSchedulerStatus.STOPPED, metadata.getAllocations().get("_job_id").getSchedulerState().getStatus()); - } - } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java index 605ff54152b..2d6b28cef2c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java @@ -27,6 +27,7 @@ import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; public class PrelertMetadataTests extends ESTestCase { @@ -90,20 +91,29 @@ public class PrelertMetadataTests extends ESTestCase { } public void testPutJob() { + Job job1 = buildJobBuilder("1").build(); + Job job2 = buildJobBuilder("2").build(); + PrelertMetadata.Builder builder = new PrelertMetadata.Builder(); - builder.putJob(buildJobBuilder("1").build(), false); - builder.putJob(buildJobBuilder("2").build(), false); - - ResourceAlreadyExistsException e = expectThrows(ResourceAlreadyExistsException.class, - () -> builder.putJob(buildJobBuilder("2").build(), false)); - assertEquals("The job cannot be created with the Id '2'. The Id is already used.", e.getMessage()); - - builder.putJob(buildJobBuilder("2").build(), true); + builder.putJob(job1, false); + builder.putJob(job2, false); PrelertMetadata result = builder.build(); + assertThat(result.getJobs().get("1"), sameInstance(job1)); + assertThat(result.getJobs().get("2"), sameInstance(job2)); + + builder = new PrelertMetadata.Builder(result); + + PrelertMetadata.Builder builderReference = builder; + ResourceAlreadyExistsException e = expectThrows(ResourceAlreadyExistsException.class, () -> builderReference.putJob(job2, false)); + assertEquals("The job cannot be created with the Id '2'. The Id is already used.", e.getMessage()); + Job job2Attempt2 = buildJobBuilder("2").build(); + builder.putJob(job2Attempt2, true); + + result = builder.build(); assertThat(result.getJobs().size(), equalTo(2)); - assertThat(result.getJobs().get("1"), notNullValue()); - assertThat(result.getJobs().get("2"), notNullValue()); + assertThat(result.getJobs().get("1"), sameInstance(job1)); + assertThat(result.getJobs().get("2"), sameInstance(job2Attempt2)); } public void testUpdateAllocation_setFinishedTime() { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java index 804162c9bd7..869767643cb 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java @@ -9,6 +9,10 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -22,16 +26,14 @@ import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.Detector; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.SchedulerConfig; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.data.DataProcessor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory; -import org.elasticsearch.xpack.prelert.job.manager.JobManager; -import org.elasticsearch.xpack.prelert.job.metadata.Allocation; +import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; @@ -67,6 +69,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { private Client client; private ActionFuture jobDataFuture; private ActionFuture flushJobFuture; + private ClusterService clusterService; private ThreadPool threadPool; private DataExtractorFactory dataExtractorFactory; private ScheduledJobRunner scheduledJobRunner; @@ -78,6 +81,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { client = mock(Client.class); jobDataFuture = mock(ActionFuture.class); flushJobFuture = mock(ActionFuture.class); + clusterService = mock(ClusterService.class); doAnswer(invocation -> { @SuppressWarnings("unchecked") ActionListener actionListener = (ActionListener) invocation.getArguments()[2]; @@ -100,7 +104,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); scheduledJobRunner = - new ScheduledJobRunner(threadPool, client, jobProvider, dataExtractorFactory, () -> currentTime); + new ScheduledJobRunner(threadPool, client, clusterService,jobProvider, dataExtractorFactory, () -> currentTime); when(jobProvider.audit(anyString())).thenReturn(auditor); when(jobProvider.buckets(anyString(), any(BucketsQueryBuilder.BucketsQuery.class))).thenThrow( @@ -109,11 +113,14 @@ public class ScheduledJobRunnerTests extends ESTestCase { public void testStart_GivenNewlyCreatedJobLoopBack() throws Exception { Job.Builder builder = createScheduledJob(); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, 60000L); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); Job job = builder.build(); - Allocation allocation = - new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); + PrelertMetadata prelertMetadata = new PrelertMetadata.Builder().putJob(job, false) + .updateStatus("foo", JobStatus.OPENED, null) + .build(); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata)) + .build()); DataExtractor dataExtractor = mock(DataExtractor.class); when(dataExtractorFactory.newExtractor(job)).thenReturn(dataExtractor); @@ -123,24 +130,27 @@ public class ScheduledJobRunnerTests extends ESTestCase { when(jobDataFuture.get()).thenReturn(new JobDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); StartJobSchedulerAction.SchedulerTask task = mock(StartJobSchedulerAction.SchedulerTask.class); - scheduledJobRunner.run(job, schedulerState, allocation, task, handler); + scheduledJobRunner.run("foo", 0L, 60000L, task, handler); verify(dataExtractor).newSearch(eq(0L), eq(60000L), any()); verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo"))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); - verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STARTED)), any()); - verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); + verify(client).execute(same(INSTANCE), eq(new Request("foo", SchedulerStatus.STARTED)), any()); + verify(client).execute(same(INSTANCE), eq(new Request("foo", SchedulerStatus.STOPPED)), any()); } public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception { Job.Builder builder = createScheduledJob(); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, null); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); Job job = builder.build(); - Allocation allocation = - new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); + PrelertMetadata prelertMetadata = new PrelertMetadata.Builder().putJob(job, false) + .updateStatus("foo", JobStatus.OPENED, null) + .build(); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata)) + .build()); DataExtractor dataExtractor = mock(DataExtractor.class); when(dataExtractorFactory.newExtractor(job)).thenReturn(dataExtractor); @@ -151,13 +161,13 @@ public class ScheduledJobRunnerTests extends ESTestCase { Consumer handler = mockConsumer(); boolean cancelled = randomBoolean(); StartJobSchedulerAction.SchedulerTask task = new StartJobSchedulerAction.SchedulerTask(1, "type", "action", null, "foo"); - scheduledJobRunner.run(job, schedulerState, allocation, task, handler); + scheduledJobRunner.run("foo", 0L, null, task, handler); verify(dataExtractor).newSearch(eq(0L), eq(60000L), any()); verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME); if (cancelled) { task.stop(); - verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); + verify(client).execute(same(INSTANCE), eq(new Request("foo", SchedulerStatus.STOPPED)), any()); } else { verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo"))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); @@ -183,19 +193,21 @@ public class ScheduledJobRunnerTests extends ESTestCase { public void testValidate() { Job job1 = buildJobBuilder("foo").build(); - Exception e = expectThrows(IllegalArgumentException.class, () -> ScheduledJobRunner.validate(job1, null)); + PrelertMetadata prelertMetadata1 = new PrelertMetadata.Builder().putJob(job1, false).build(); + Exception e = expectThrows(IllegalArgumentException.class, () -> ScheduledJobRunner.validate("foo", prelertMetadata1)); assertThat(e.getMessage(), equalTo("job [foo] is not a scheduled job")); Job job2 = createScheduledJob().build(); - Allocation allocation1 = - new Allocation("_id", "_id", false, JobStatus.CLOSED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); - e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate(job2, allocation1)); + PrelertMetadata prelertMetadata2 = new PrelertMetadata.Builder().putJob(job2, false).build(); + e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate("foo", prelertMetadata2)); assertThat(e.getMessage(), equalTo("cannot start scheduler, expected job status [OPENED], but got [CLOSED]")); Job job3 = createScheduledJob().build(); - Allocation allocation2 = - new Allocation("_id", "_id", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STARTED, null, null)); - e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate(job3, allocation2)); + PrelertMetadata prelertMetadata3 = new PrelertMetadata.Builder().putJob(job3, false) + .updateStatus("foo", JobStatus.OPENED, null) + .updateSchedulerStatus("foo", SchedulerStatus.STARTED) + .build(); + e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate("foo", prelertMetadata3)); assertThat(e.getMessage(), equalTo("scheduler already started, expected scheduler status [STOPPED], but got [STARTED]")); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java index 2edbb318cf4..374fc278d54 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java @@ -11,8 +11,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.prelert.action.FlushJobAction; import org.elasticsearch.xpack.prelert.action.JobDataAction; import org.elasticsearch.xpack.prelert.job.DataCounts; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.junit.Before; @@ -64,8 +62,7 @@ public class ScheduledJobTests extends ESTestCase { public void testLookBackRunWithEndTime() throws Exception { ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L); - assertNull(scheduledJob.runLookBack(schedulerState)); + assertNull(scheduledJob.runLookBack(0L, 1000L)); verify(dataExtractor).newSearch(eq(0L), eq(1000L), any()); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); @@ -78,8 +75,7 @@ public class ScheduledJobTests extends ESTestCase { long frequencyMs = 1000; long queryDelayMs = 500; ScheduledJob scheduledJob = createScheduledJob(frequencyMs, queryDelayMs, -1, -1); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, null); - long next = scheduledJob.runLookBack(schedulerState); + long next = scheduledJob.runLookBack(0L, null); assertEquals(2000 + frequencyMs + 100, next); verify(dataExtractor).newSearch(eq(0L), eq(1500L), any()); @@ -101,8 +97,7 @@ public class ScheduledJobTests extends ESTestCase { long frequencyMs = 1000; long queryDelayMs = 500; ScheduledJob scheduledJob = createScheduledJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, null); - long next = scheduledJob.runLookBack(schedulerState); + long next = scheduledJob.runLookBack(0L, null); assertEquals(10000 + frequencyMs + 100, next); verify(dataExtractor).newSearch(eq(5000 + 1L), eq(currentTime - queryDelayMs), any()); @@ -131,8 +126,7 @@ public class ScheduledJobTests extends ESTestCase { when(dataExtractor.hasNext()).thenReturn(false); ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L); - expectThrows(ScheduledJob.EmptyDataCountException.class, () -> scheduledJob.runLookBack(schedulerState)); + expectThrows(ScheduledJob.EmptyDataCountException.class, () -> scheduledJob.runLookBack(0L, 1000L)); } public void testExtractionProblem() throws Exception { @@ -141,8 +135,7 @@ public class ScheduledJobTests extends ESTestCase { when(dataExtractor.next()).thenThrow(new IOException()); ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L); - expectThrows(ScheduledJob.ExtractionProblemException.class, () -> scheduledJob.runLookBack(schedulerState)); + expectThrows(ScheduledJob.ExtractionProblemException.class, () -> scheduledJob.runLookBack(0L, 1000L)); currentTime = 3001; expectThrows(ScheduledJob.ExtractionProblemException.class, scheduledJob::runRealtime); @@ -162,8 +155,7 @@ public class ScheduledJobTests extends ESTestCase { when(client.execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("_job_id")))).thenThrow(new RuntimeException()); ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L); - expectThrows(ScheduledJob.AnalysisProblemException.class, () -> scheduledJob.runLookBack(schedulerState)); + expectThrows(ScheduledJob.AnalysisProblemException.class, () -> scheduledJob.runLookBack(0L, 1000L)); currentTime = 3001; expectThrows(ScheduledJob.EmptyDataCountException.class, scheduledJob::runRealtime); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java index 2ac57c76ce2..87faac6829a 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java @@ -7,6 +7,9 @@ package org.elasticsearch.xpack.prelert.rest.schedulers; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestController; @@ -14,38 +17,43 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; -import org.elasticsearch.xpack.prelert.job.manager.JobManager; -import org.elasticsearch.xpack.prelert.job.metadata.Allocation; +import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunnerTests; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; -import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class RestStartJobSchedulerActionTests extends ESTestCase { public void testPrepareRequest() throws Exception { - JobManager jobManager = mock(JobManager.class); + ClusterService clusterService = mock(ClusterService.class); Job.Builder job = ScheduledJobRunnerTests.createScheduledJob(); - when(jobManager.getJobOrThrowIfUnknown(anyString())).thenReturn(job.build()); - Allocation allocation = - new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); - when(jobManager.getJobAllocation(anyString())).thenReturn(allocation); - RestStartJobSchedulerAction action = new RestStartJobSchedulerAction(Settings.EMPTY, mock(RestController.class), - jobManager, mock(ClusterService.class)); + PrelertMetadata prelertMetadata = new PrelertMetadata.Builder() + .putJob(job.build(), false) + .updateStatus("foo", JobStatus.OPENED, null) + .build(); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata)) + .build()); + RestStartJobSchedulerAction action = new RestStartJobSchedulerAction(Settings.EMPTY, mock(RestController.class), clusterService); - RestRequest restRequest1 = new FakeRestRequest.Builder().withParams(Collections.singletonMap("start", "not-a-date")).build(); + Map params = new HashMap<>(); + params.put("start", "not-a-date"); + params.put("job_id", "foo"); + RestRequest restRequest1 = new FakeRestRequest.Builder().withParams(params).build(); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> action.prepareRequest(restRequest1, mock(NodeClient.class))); assertEquals("Query param 'start' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).", e.getMessage()); - RestRequest restRequest2 = new FakeRestRequest.Builder().withParams(Collections.singletonMap("end", "not-a-date")).build(); + params = new HashMap<>(); + params.put("end", "not-a-date"); + params.put("job_id", "foo"); + RestRequest restRequest2 = new FakeRestRequest.Builder().withParams(params).build(); e = expectThrows(ElasticsearchParseException.class, () -> action.prepareRequest(restRequest2, mock(NodeClient.class))); assertEquals("Query param 'end' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).", e.getMessage()); diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml index c235ce1b2dc..14810261c2b 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml @@ -146,7 +146,7 @@ setup: - is_false: jobs.0.config - is_false: jobs.0.data_counts - is_false: jobs.0.model_size_stats - - match: { jobs.0.scheduler_state.status: STOPPED } + - match: { jobs.0.scheduler_state: STOPPED } --- "Test bad metric":