diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java index 95edef21b6e..a67f1dd8336 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java @@ -168,7 +168,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier); DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory, clusterService.getClusterSettings()); - ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, jobProvider, + ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider, // norelease: we will no longer need to pass the client here after we switch to a client based data extractor new HttpDataExtractorFactory(client), System::currentTimeMillis); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobsAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobsAction.java index 4e2707270ed..09336c29bca 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobsAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobsAction.java @@ -39,9 +39,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.Job; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.ModelSizeStats; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager; import org.elasticsearch.xpack.prelert.job.manager.JobManager; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; @@ -280,19 +280,19 @@ public class GetJobsAction extends Action { + public static final ParseField START_TIME = new ParseField("start"); + public static final ParseField END_TIME = new ParseField("end"); + public static final StartJobSchedulerAction INSTANCE = new StartJobSchedulerAction(); public static final String NAME = "cluster:admin/prelert/job/scheduler/run"; @@ -66,8 +66,8 @@ extends Action request.jobId = jobId, Job.ID); - PARSER.declareObject((request, schedulerState) -> request.schedulerState = schedulerState, SchedulerState.PARSER, - SchedulerState.TYPE_FIELD); + PARSER.declareLong((request, startTime) -> request.startTime = startTime, START_TIME); + PARSER.declareLong(Request::setEndTime, END_TIME); } public static Request parseRequest(String jobId, XContentParser parser, ParseFieldMatcherSupplier parseFieldMatcherSupplier) { @@ -79,17 +79,12 @@ extends Action { - private final JobManager jobManager; private final ScheduledJobRunner scheduledJobRunner; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - JobManager jobManager, ScheduledJobRunner scheduledJobRunner) { + ScheduledJobRunner scheduledJobRunner) { super(settings, StartJobSchedulerAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); - this.jobManager = jobManager; this.scheduledJobRunner = scheduledJobRunner; } @Override protected void doExecute(Task task, Request request, ActionListener listener) { SchedulerTask schedulerTask = (SchedulerTask) task; - Job job = jobManager.getJobOrThrowIfUnknown(request.jobId); - Allocation allocation = jobManager.getJobAllocation(job.getId()); - scheduledJobRunner.run(job, request.getSchedulerState(), allocation, schedulerTask, (error) -> { + scheduledJobRunner.run(request.getJobId(), request.getStartTime(), request.getEndTime(), schedulerTask, (error) -> { if (error != null) { listener.onFailure(error); } else { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusAction.java index d25e0968d29..33e731c3e0d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusAction.java @@ -26,8 +26,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.manager.JobManager; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; @@ -57,11 +56,11 @@ public class UpdateJobSchedulerStatusAction extends Action { private String jobId; - private JobSchedulerStatus schedulerStatus; + private SchedulerStatus schedulerStatus; - public Request(String jobId, JobSchedulerStatus schedulerStatus) { + public Request(String jobId, SchedulerStatus schedulerStatus) { this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); - this.schedulerStatus = ExceptionsHelper.requireNonNull(schedulerStatus, SchedulerState.STATUS.getPreferredName()); + this.schedulerStatus = ExceptionsHelper.requireNonNull(schedulerStatus, "status"); } Request() {} @@ -74,11 +73,11 @@ public class UpdateJobSchedulerStatusAction extends Action= values().length) { - throw new IOException("Unknown public enum JobSchedulerStatus {\n ordinal [" + ordinal + "]"); + throw new IOException("Unknown public enum SchedulerStatus {\n ordinal [" + ordinal + "]"); } return values()[ordinal]; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerState.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerState.java deleted file mode 100644 index 2f7f438d910..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerState.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.job; - -import org.elasticsearch.action.support.ToXContentToBytes; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.ParseFieldMatcherSupplier; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.XContentBuilder; - -import org.elasticsearch.common.xcontent.ObjectParser.ValueType; - -import java.io.IOException; -import java.util.Locale; -import java.util.Objects; - -public class SchedulerState extends ToXContentToBytes implements Writeable { - - public static final ParseField TYPE_FIELD = new ParseField("scheduler_state"); - public static final ParseField STATUS = new ParseField("status"); - public static final ParseField START_TIME_MILLIS = new ParseField("start"); - public static final ParseField END_TIME_MILLIS = new ParseField("end"); - - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - TYPE_FIELD.getPreferredName(), a -> new SchedulerState((JobSchedulerStatus) a[0], (Long) a[1], (Long) a[2])); - - static { - PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> JobSchedulerStatus.fromString(p.text()), STATUS, - ValueType.STRING); - PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), START_TIME_MILLIS); - PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), END_TIME_MILLIS); - } - - private JobSchedulerStatus status; - @Nullable - private Long startTimeMillis; - @Nullable - private Long endTimeMillis; - - public SchedulerState(JobSchedulerStatus status, Long startTimeMillis, Long endTimeMillis) { - this.status = status; - this.startTimeMillis = startTimeMillis; - this.endTimeMillis = endTimeMillis; - } - - public SchedulerState(StreamInput in) throws IOException { - status = JobSchedulerStatus.fromStream(in); - startTimeMillis = in.readOptionalLong(); - endTimeMillis = in.readOptionalLong(); - } - - public JobSchedulerStatus getStatus() { - return status; - } - - public Long getStartTimeMillis() { - return startTimeMillis; - } - - /** - * The end time as epoch milliseconds. An {@code null} end time indicates - * real-time mode. - * - * @return The optional end time as epoch milliseconds. - */ - @Nullable - public Long getEndTimeMillis() { - return endTimeMillis; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other instanceof SchedulerState == false) { - return false; - } - - SchedulerState that = (SchedulerState) other; - - return Objects.equals(this.status, that.status) && Objects.equals(this.startTimeMillis, that.startTimeMillis) - && Objects.equals(this.endTimeMillis, that.endTimeMillis); - } - - @Override - public int hashCode() { - return Objects.hash(status, startTimeMillis, endTimeMillis); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - status.writeTo(out); - out.writeOptionalLong(startTimeMillis); - out.writeOptionalLong(endTimeMillis); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(STATUS.getPreferredName(), status.name().toUpperCase(Locale.ROOT)); - if (startTimeMillis != null) { - builder.field(START_TIME_MILLIS.getPreferredName(), startTimeMillis); - } - if (endTimeMillis != null) { - builder.field(END_TIME_MILLIS.getPreferredName(), endTimeMillis); - } - builder.endObject(); - return builder; - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatus.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerStatus.java similarity index 73% rename from elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatus.java rename to elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerStatus.java index 5d476b95e61..0e6e0fc5eea 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatus.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/SchedulerStatus.java @@ -12,18 +12,18 @@ import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; import java.util.Locale; -public enum JobSchedulerStatus implements Writeable { +public enum SchedulerStatus implements Writeable { STARTED, STOPPED; - public static JobSchedulerStatus fromString(String name) { + public static SchedulerStatus fromString(String name) { return valueOf(name.trim().toUpperCase(Locale.ROOT)); } - public static JobSchedulerStatus fromStream(StreamInput in) throws IOException { + public static SchedulerStatus fromStream(StreamInput in) throws IOException { int ordinal = in.readVInt(); if (ordinal < 0 || ordinal >= values().length) { - throw new IOException("Unknown public enum JobSchedulerStatus {\n ordinal [" + ordinal + "]"); + throw new IOException("Unknown public enum SchedulerStatus {\n ordinal [" + ordinal + "]"); } return values()[ordinal]; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java index 056dfa6c7d6..2ee1f1b744e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java @@ -23,10 +23,9 @@ import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction; import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction; import org.elasticsearch.xpack.prelert.job.IgnoreDowntime; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.ModelSnapshot; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.messages.Messages; import org.elasticsearch.xpack.prelert.job.metadata.Allocation; @@ -152,7 +151,7 @@ public class JobManager extends AbstractComponent { * @throws org.elasticsearch.ResourceNotFoundException * if there is no job with matching the given {@code jobId} */ - public Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) { + Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) { PrelertMetadata prelertMetadata = clusterState.metaData().custom(PrelertMetadata.TYPE); Job job = prelertMetadata.getJobs().get(jobId); if (job == null) { @@ -166,26 +165,25 @@ public class JobManager extends AbstractComponent { */ public void putJob(PutJobAction.Request request, ActionListener actionListener) { Job job = request.getJob(); - ActionListener delegateListener = ActionListener.wrap(jobSaved -> { - jobProvider.createJobRelatedIndices(job, new ActionListener() { - @Override - public void onResponse(Boolean indicesCreated) { - audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED)); + ActionListener delegateListener = ActionListener.wrap(jobSaved -> + jobProvider.createJobRelatedIndices(job, new ActionListener() { + @Override + public void onResponse(Boolean indicesCreated) { + audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED)); - // Also I wonder if we need to audit log infra - // structure in prelert as when we merge into xpack - // we can use its audit trailing. See: - // https://github.com/elastic/prelert-legacy/issues/48 - actionListener.onResponse(new PutJobAction.Response(jobSaved && indicesCreated, job)); - } + // Also I wonder if we need to audit log infra + // structure in prelert as when we merge into xpack + // we can use its audit trailing. See: + // https://github.com/elastic/prelert-legacy/issues/48 + actionListener.onResponse(new PutJobAction.Response(jobSaved && indicesCreated, job)); + } - @Override - public void onFailure(Exception e) { - actionListener.onFailure(e); + @Override + public void onFailure(Exception e) { + actionListener.onFailure(e); - } - }); - }, actionListener::onFailure); + } + }), actionListener::onFailure); clusterService.submitStateUpdateTask("put-job-" + job.getId(), new AckedClusterStateUpdateTask(request, delegateListener) { @@ -203,12 +201,9 @@ public class JobManager extends AbstractComponent { } ClusterState innerPutJob(Job job, boolean overwrite, ClusterState currentState) { - PrelertMetadata currentPrelertMetadata = currentState.metaData().custom(PrelertMetadata.TYPE); - PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentPrelertMetadata); + PrelertMetadata.Builder builder = createPrelertMetadatBuilder(currentState); builder.putJob(job, overwrite); - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build()); - return newState.build(); + return buildNewClusterState(currentState, builder); } /** @@ -254,52 +249,29 @@ public class JobManager extends AbstractComponent { } ClusterState removeJobFromClusterState(String jobId, ClusterState currentState) { - PrelertMetadata currentPrelertMetadata = currentState.metaData().custom(PrelertMetadata.TYPE); - PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentPrelertMetadata); + PrelertMetadata.Builder builder = createPrelertMetadatBuilder(currentState); builder.removeJob(jobId); - Allocation allocation = currentPrelertMetadata.getAllocations().get(jobId); - if (allocation != null) { - SchedulerState schedulerState = allocation.getSchedulerState(); - if (schedulerState != null && schedulerState.getStatus() != JobSchedulerStatus.STOPPED) { - throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.JOB_CANNOT_DELETE_WHILE_SCHEDULER_RUNS, jobId)); - } - if (!allocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) { - throw ExceptionsHelper.conflictStatusException(Messages.getMessage( - Messages.JOB_CANNOT_DELETE_WHILE_RUNNING, jobId, allocation.getStatus())); - } - } - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build()); - return newState.build(); + return buildNewClusterState(currentState, builder); } - private void checkJobIsScheduled(Job job) { - if (job.getSchedulerConfig() == null) { - throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_SCHEDULER_NO_SUCH_SCHEDULED_JOB, job.getId())); - } - } - - public Optional getSchedulerState(String jobId) { - Job job = getJobOrThrowIfUnknown(clusterService.state(), jobId); - if (job.getSchedulerConfig() == null) { - return Optional.empty(); - } - - Allocation allocation = getAllocation(clusterService.state(), jobId); - return Optional.ofNullable(allocation.getSchedulerState()); + public Optional getSchedulerStatus(String jobId) { + PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE); + return Optional.ofNullable(prelertMetadata.getSchedulerStatuses().get(jobId)); } public void updateSchedulerStatus(UpdateJobSchedulerStatusAction.Request request, ActionListener actionListener) { String jobId = request.getJobId(); - JobSchedulerStatus newStatus = request.getSchedulerStatus(); + SchedulerStatus newStatus = request.getSchedulerStatus(); clusterService.submitStateUpdateTask("update-scheduler-status-job-" + jobId, new AckedClusterStateUpdateTask(request, actionListener) { @Override public ClusterState execute(ClusterState currentState) throws Exception { - return innerUpdateSchedulerState(currentState, jobId, newStatus, null, null); + PrelertMetadata.Builder builder = createPrelertMetadatBuilder(currentState); + builder.updateSchedulerStatus(jobId, newStatus); + return buildNewClusterState(currentState, builder); } @Override @@ -309,33 +281,6 @@ public class JobManager extends AbstractComponent { }); } - private ClusterState innerUpdateSchedulerState(ClusterState currentState, String jobId, JobSchedulerStatus status, - Long startTime, Long endTime) { - Job job = getJobOrThrowIfUnknown(currentState, jobId); - checkJobIsScheduled(job); - - Allocation allocation = getAllocation(currentState, jobId); - if (allocation.getSchedulerState() == null && status != JobSchedulerStatus.STARTED) { - throw new IllegalArgumentException("Can't change status to [" + status + "], because job's [" + jobId + - "] scheduler never started"); - } - - SchedulerState existingState = allocation.getSchedulerState(); - if (existingState != null) { - if (startTime == null) { - startTime = existingState.getStartTimeMillis(); - } - if (endTime == null) { - endTime = existingState.getEndTimeMillis(); - } - } - - existingState = new SchedulerState(status, startTime, endTime); - Allocation.Builder builder = new Allocation.Builder(allocation); - builder.setSchedulerState(existingState); - return innerUpdateAllocation(builder.build(), currentState); - } - private Allocation getAllocation(ClusterState state, String jobId) { PrelertMetadata prelertMetadata = state.metaData().custom(PrelertMetadata.TYPE); Allocation allocation = prelertMetadata.getAllocations().get(jobId); @@ -345,15 +290,6 @@ public class JobManager extends AbstractComponent { return allocation; } - private ClusterState innerUpdateAllocation(Allocation newAllocation, ClusterState currentState) { - PrelertMetadata currentPrelertMetadata = currentState.metaData().custom(PrelertMetadata.TYPE); - PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentPrelertMetadata); - builder.updateAllocation(newAllocation.getJobId(), newAllocation); - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build()); - return newState.build(); - } - public Auditor audit(String jobId) { return jobProvider.audit(jobId); } @@ -463,4 +399,15 @@ public class JobManager extends AbstractComponent { jobResultsPersister.commitWrites(jobId); } + private static PrelertMetadata.Builder createPrelertMetadatBuilder(ClusterState currentState) { + PrelertMetadata currentPrelertMetadata = currentState.metaData().custom(PrelertMetadata.TYPE); + return new PrelertMetadata.Builder(currentPrelertMetadata); + } + + private static ClusterState buildNewClusterState(ClusterState currentState, PrelertMetadata.Builder builder) { + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build()); + return newState.build(); + } + } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java index 051eec475de..dfa851e16ea 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java @@ -15,11 +15,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; -import org.elasticsearch.xpack.prelert.job.messages.Messages; -import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import java.io.IOException; import java.util.Objects; @@ -31,9 +27,8 @@ public class Allocation extends AbstractDiffable implements ToXConte private static final ParseField IGNORE_DOWNTIME_FIELD = new ParseField("ignore_downtime"); public static final ParseField STATUS = new ParseField("status"); public static final ParseField STATUS_REASON = new ParseField("status_reason"); - public static final ParseField SCHEDULER_STATE = new ParseField("scheduler_state"); - static final Allocation PROTO = new Allocation(null, null, false, null, null, null); + static final Allocation PROTO = new Allocation(null, null, false, null, null); static final ObjectParser PARSER = new ObjectParser<>("allocation", Builder::new); @@ -43,7 +38,6 @@ public class Allocation extends AbstractDiffable implements ToXConte PARSER.declareBoolean(Builder::setIgnoreDowntime, IGNORE_DOWNTIME_FIELD); PARSER.declareField(Builder::setStatus, (p, c) -> JobStatus.fromString(p.text()), STATUS, ObjectParser.ValueType.STRING); PARSER.declareString(Builder::setStatusReason, STATUS_REASON); - PARSER.declareObject(Builder::setSchedulerState, SchedulerState.PARSER, SCHEDULER_STATE); } private final String nodeId; @@ -51,16 +45,13 @@ public class Allocation extends AbstractDiffable implements ToXConte private final boolean ignoreDowntime; private final JobStatus status; private final String statusReason; - private final SchedulerState schedulerState; - public Allocation(String nodeId, String jobId, boolean ignoreDowntime, JobStatus status, String statusReason, - SchedulerState schedulerState) { + public Allocation(String nodeId, String jobId, boolean ignoreDowntime, JobStatus status, String statusReason) { this.nodeId = nodeId; this.jobId = jobId; this.ignoreDowntime = ignoreDowntime; this.status = status; this.statusReason = statusReason; - this.schedulerState = schedulerState; } public Allocation(StreamInput in) throws IOException { @@ -69,7 +60,6 @@ public class Allocation extends AbstractDiffable implements ToXConte this.ignoreDowntime = in.readBoolean(); this.status = JobStatus.fromStream(in); this.statusReason = in.readOptionalString(); - this.schedulerState = in.readOptionalWriteable(SchedulerState::new); } public String getNodeId() { @@ -97,10 +87,6 @@ public class Allocation extends AbstractDiffable implements ToXConte return statusReason; } - public SchedulerState getSchedulerState() { - return schedulerState; - } - @Override public Allocation readFrom(StreamInput in) throws IOException { return new Allocation(in); @@ -113,7 +99,6 @@ public class Allocation extends AbstractDiffable implements ToXConte out.writeBoolean(ignoreDowntime); status.writeTo(out); out.writeOptionalString(statusReason); - out.writeOptionalWriteable(schedulerState); } @Override @@ -128,9 +113,6 @@ public class Allocation extends AbstractDiffable implements ToXConte if (statusReason != null) { builder.field(STATUS_REASON.getPreferredName(), statusReason); } - if (schedulerState != null) { - builder.field(SCHEDULER_STATE.getPreferredName(), schedulerState); - } builder.endObject(); return builder; } @@ -144,13 +126,12 @@ public class Allocation extends AbstractDiffable implements ToXConte Objects.equals(jobId, that.jobId) && Objects.equals(ignoreDowntime, that.ignoreDowntime) && Objects.equals(status, that.status) && - Objects.equals(statusReason, that.statusReason) && - Objects.equals(schedulerState, that.schedulerState); + Objects.equals(statusReason, that.statusReason); } @Override public int hashCode() { - return Objects.hash(nodeId, jobId, ignoreDowntime, status, statusReason, schedulerState); + return Objects.hash(nodeId, jobId, ignoreDowntime, status, statusReason); } // Class alreadt extends from AbstractDiffable, so copied from ToXContentToBytes#toString() @@ -175,16 +156,12 @@ public class Allocation extends AbstractDiffable implements ToXConte private boolean ignoreDowntime; private JobStatus status; private String statusReason; - private SchedulerState schedulerState; public Builder() { } public Builder(Job job) { this.jobId = job.getId(); - if (job.getSchedulerConfig() != null) { - schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, null, null); - } } public Builder(Allocation allocation) { @@ -193,7 +170,6 @@ public class Allocation extends AbstractDiffable implements ToXConte this.ignoreDowntime = allocation.ignoreDowntime; this.status = allocation.status; this.statusReason = allocation.statusReason; - this.schedulerState = allocation.schedulerState; } public void setNodeId(String nodeId) { @@ -237,34 +213,8 @@ public class Allocation extends AbstractDiffable implements ToXConte this.statusReason = statusReason; } - public void setSchedulerState(SchedulerState newSchedulerState) { - if (this.schedulerState != null){ - JobSchedulerStatus currentSchedulerStatus = this.schedulerState.getStatus(); - JobSchedulerStatus newSchedulerStatus = newSchedulerState.getStatus(); - switch (newSchedulerStatus) { - case STARTED: - if (currentSchedulerStatus != JobSchedulerStatus.STOPPED) { - String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_START, jobId, newSchedulerStatus); - throw ExceptionsHelper.conflictStatusException(msg); - } - break; - case STOPPED: - if (currentSchedulerStatus != JobSchedulerStatus.STARTED) { - String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, - newSchedulerStatus); - throw ExceptionsHelper.conflictStatusException(msg); - } - break; - default: - throw new IllegalArgumentException("Invalid requested job scheduler status: " + newSchedulerStatus); - } - } - - this.schedulerState = newSchedulerState; - } - public Allocation build() { - return new Allocation(nodeId, jobId, ignoreDowntime, status, statusReason, schedulerState); + return new Allocation(nodeId, jobId, ignoreDowntime, status, statusReason); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java index cc15c6c0c7f..c9964c8226b 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.prelert.job.metadata; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.metadata.MetaData; @@ -15,13 +16,14 @@ import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcherSupplier; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; +import org.elasticsearch.xpack.prelert.job.messages.Messages; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import java.io.IOException; @@ -33,6 +35,7 @@ import java.util.Map; import java.util.Objects; import java.util.SortedMap; import java.util.TreeMap; +import java.util.stream.Collectors; public class PrelertMetadata implements MetaData.Custom { @@ -40,7 +43,8 @@ public class PrelertMetadata implements MetaData.Custom { private static final ParseField ALLOCATIONS_FIELD = new ParseField("allocations"); public static final String TYPE = "prelert"; - public static final PrelertMetadata PROTO = new PrelertMetadata(Collections.emptySortedMap(), Collections.emptySortedMap()); + public static final PrelertMetadata PROTO = new PrelertMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), + Collections.emptySortedMap()); private static final ObjectParser PRELERT_METADATA_PARSER = new ObjectParser<>("prelert_metadata", Builder::new); @@ -54,10 +58,13 @@ public class PrelertMetadata implements MetaData.Custom { // performance issue will occur if we don't change that private final SortedMap jobs; private final SortedMap allocations; + private final SortedMap schedulerStatuses; - private PrelertMetadata(SortedMap jobs, SortedMap allocations) { + private PrelertMetadata(SortedMap jobs, SortedMap allocations, + SortedMap schedulerStatuses) { this.jobs = Collections.unmodifiableSortedMap(jobs); this.allocations = Collections.unmodifiableSortedMap(allocations); + this.schedulerStatuses = Collections.unmodifiableSortedMap(schedulerStatuses); } public Map getJobs() { @@ -70,6 +77,10 @@ public class PrelertMetadata implements MetaData.Custom { return allocations; } + public SortedMap getSchedulerStatuses() { + return schedulerStatuses; + } + @Override public String type() { return TYPE; @@ -109,7 +120,12 @@ public class PrelertMetadata implements MetaData.Custom { for (int i = 0; i < size; i++) { allocations.put(in.readString(), Allocation.PROTO.readFrom(in)); } - return new PrelertMetadata(jobs, allocations); + size = in.readVInt(); + TreeMap schedulerStatuses = new TreeMap<>(); + for (int i = 0; i < size; i++) { + schedulerStatuses.put(in.readString(), SchedulerStatus.fromStream(in)); + } + return new PrelertMetadata(jobs, allocations, schedulerStatuses); } @Override @@ -124,6 +140,11 @@ public class PrelertMetadata implements MetaData.Custom { out.writeString(entry.getKey()); entry.getValue().writeTo(out); } + out.writeVInt(schedulerStatuses.size()); + for (Map.Entry entry : schedulerStatuses.entrySet()) { + out.writeString(entry.getKey()); + entry.getValue().writeTo(out); + } } @Override @@ -145,28 +166,83 @@ public class PrelertMetadata implements MetaData.Custom { final Diff> jobs; final Diff> allocations; + final Diff> schedulerStatuses; PrelertMetadataDiff(PrelertMetadata before, PrelertMetadata after) { this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer()); this.allocations = DiffableUtils.diff(before.allocations, after.allocations, DiffableUtils.getStringKeySerializer()); + this.schedulerStatuses = DiffableUtils.diff( + toSchedulerDiff(before.schedulerStatuses), + toSchedulerDiff(after.schedulerStatuses), + DiffableUtils.getStringKeySerializer()); } PrelertMetadataDiff(StreamInput in) throws IOException { jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job.PROTO); allocations = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Allocation.PROTO); + schedulerStatuses = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), SchedulerStatusDiff.PROTO); } @Override public MetaData.Custom apply(MetaData.Custom part) { TreeMap newJobs = new TreeMap<>(jobs.apply(((PrelertMetadata) part).jobs)); TreeMap newAllocations = new TreeMap<>(allocations.apply(((PrelertMetadata) part).allocations)); - return new PrelertMetadata(newJobs, newAllocations); + + Map newSchedulerStatuses = + schedulerStatuses.apply(toSchedulerDiff((((PrelertMetadata) part)).schedulerStatuses)); + return new PrelertMetadata(newJobs, newAllocations, new TreeMap<>(toSchedulerStatusMap(newSchedulerStatuses))); } @Override public void writeTo(StreamOutput out) throws IOException { jobs.writeTo(out); allocations.writeTo(out); + schedulerStatuses.writeTo(out); + } + + private static Map toSchedulerDiff(Map from) { + return from.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, (entry) -> new SchedulerStatusDiff(entry.getValue()))); + } + + private static Map toSchedulerStatusMap(Map from) { + return from.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, (entry) -> entry.getValue().status)); + } + + // SchedulerStatus is enum and that can't extend from anything + static class SchedulerStatusDiff extends AbstractDiffable implements Writeable { + + static SchedulerStatusDiff PROTO = new SchedulerStatusDiff(null); + + private final SchedulerStatus status; + + SchedulerStatusDiff(SchedulerStatus status) { + this.status = status; + } + + @Override + public SchedulerStatusDiff readFrom(StreamInput in) throws IOException { + return new SchedulerStatusDiff(SchedulerStatus.fromStream(in)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + status.writeTo(out); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SchedulerStatusDiff that = (SchedulerStatusDiff) o; + return status == that.status; + } + + @Override + public int hashCode() { + return Objects.hash(status); + } } } @@ -177,27 +253,32 @@ public class PrelertMetadata implements MetaData.Custom { if (o == null || getClass() != o.getClass()) return false; PrelertMetadata that = (PrelertMetadata) o; - return Objects.equals(jobs, that.jobs) && Objects.equals(allocations, that.allocations); + return Objects.equals(jobs, that.jobs) && + Objects.equals(allocations, that.allocations) && + Objects.equals(schedulerStatuses, that.schedulerStatuses); } @Override public int hashCode() { - return Objects.hash(jobs, allocations); + return Objects.hash(jobs, allocations, schedulerStatuses); } public static class Builder { private TreeMap jobs; private TreeMap allocations; + private TreeMap schedulerStatuses; public Builder() { this.jobs = new TreeMap<>(); this.allocations = new TreeMap<>(); + this.schedulerStatuses = new TreeMap<>(); } public Builder(PrelertMetadata previous) { jobs = new TreeMap<>(previous.jobs); allocations = new TreeMap<>(previous.allocations); + schedulerStatuses = new TreeMap<>(previous.schedulerStatuses); } public Builder putJob(Job job, boolean overwrite) { @@ -212,6 +293,9 @@ public class PrelertMetadata implements MetaData.Custom { builder.setStatus(JobStatus.CLOSED); allocations.put(job.getId(), builder.build()); } + if (job.getSchedulerConfig() != null) { + schedulerStatuses.put(job.getId(), SchedulerStatus.STOPPED); + } return this; } @@ -219,7 +303,20 @@ public class PrelertMetadata implements MetaData.Custom { if (jobs.remove(jobId) == null) { throw new ResourceNotFoundException("job [" + jobId + "] does not exist"); } - this.allocations.remove(jobId); + Allocation previousAllocation = this.allocations.remove(jobId); + if (previousAllocation != null) { + if (!previousAllocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) { + throw ExceptionsHelper.conflictStatusException(Messages.getMessage( + Messages.JOB_CANNOT_DELETE_WHILE_RUNNING, jobId, previousAllocation.getStatus())); + } + } + SchedulerStatus previousStatus = this.schedulerStatuses.remove(jobId); + if (previousStatus != null) { + if (previousStatus != SchedulerStatus.STOPPED) { + String message = Messages.getMessage(Messages.JOB_CANNOT_DELETE_WHILE_SCHEDULER_RUNS, jobId); + throw ExceptionsHelper.conflictStatusException(message); + } + } return this; } @@ -253,7 +350,7 @@ public class PrelertMetadata implements MetaData.Custom { } public PrelertMetadata build() { - return new PrelertMetadata(jobs, allocations); + return new PrelertMetadata(jobs, allocations, schedulerStatuses); } public Builder assignToNode(String jobId, String nodeId) { @@ -296,6 +393,32 @@ public class PrelertMetadata implements MetaData.Custom { allocations.put(jobId, builder.build()); return this; } + + public Builder updateSchedulerStatus(String jobId, SchedulerStatus newStatus) { + SchedulerStatus currentStatus = schedulerStatuses.get(jobId); + if (currentStatus == null) { + throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_SCHEDULER_NO_SUCH_SCHEDULED_JOB, jobId)); + } + + switch (newStatus) { + case STARTED: + if (currentStatus != SchedulerStatus.STOPPED) { + String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_START, jobId, newStatus); + throw ExceptionsHelper.conflictStatusException(msg); + } + break; + case STOPPED: + if (currentStatus != SchedulerStatus.STARTED) { + String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, newStatus); + throw ExceptionsHelper.conflictStatusException(msg); + } + break; + default: + throw new IllegalArgumentException("[" + jobId + "] invalid requested job scheduler status [" + newStatus + "]"); + } + schedulerStatuses.put(jobId, newStatus); + return this; + } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java index d8896385d47..b53db5c2fe9 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java @@ -14,7 +14,6 @@ import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.xpack.prelert.action.FlushJobAction; import org.elasticsearch.xpack.prelert.action.JobDataAction; import org.elasticsearch.xpack.prelert.job.DataCounts; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.messages.Messages; @@ -60,11 +59,9 @@ class ScheduledJob { } } - Long runLookBack(SchedulerState schedulerState) throws Exception { - long startMs = schedulerState.getStartTimeMillis(); - lookbackStartTimeMs = (lastEndTimeMs != null && lastEndTimeMs + 1 > startMs) ? lastEndTimeMs + 1 : startMs; - - Optional endMs = Optional.ofNullable(schedulerState.getEndTimeMillis()); + Long runLookBack(long startTime, Long endTime) throws Exception { + lookbackStartTimeMs = (lastEndTimeMs != null && lastEndTimeMs + 1 > startTime) ? lastEndTimeMs + 1 : startTime; + Optional endMs = Optional.ofNullable(endTime); long lookbackEnd = endMs.orElse(currentTimeSupplier.get() - queryDelayMs); boolean isLookbackOnly = endMs.isPresent(); if (lookbackEnd <= lookbackStartTimeMs) { @@ -115,7 +112,7 @@ class ScheduledJob { return running; } - private void run(long start, long end, FlushJobAction.Request flushRequest) throws IOException { + private void run(long start, Long end, FlushJobAction.Request flushRequest) throws IOException { if (end <= start) { return; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java index a1d1b16266d..d261d534999 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -20,18 +21,19 @@ import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction; import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction; import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.config.DefaultFrequency; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory; import org.elasticsearch.xpack.prelert.job.metadata.Allocation; +import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; import org.elasticsearch.xpack.prelert.job.results.Bucket; +import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import java.time.Duration; import java.util.Objects; @@ -42,32 +44,35 @@ import java.util.function.Supplier; public class ScheduledJobRunner extends AbstractComponent { private final Client client; + private final ClusterService clusterService; private final JobProvider jobProvider; private final DataExtractorFactory dataExtractorFactory; private final ThreadPool threadPool; private final Supplier currentTimeSupplier; - public ScheduledJobRunner(ThreadPool threadPool, Client client, JobProvider jobProvider, DataExtractorFactory dataExtractorFactory, - Supplier currentTimeSupplier) { + public ScheduledJobRunner(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider, + DataExtractorFactory dataExtractorFactory, Supplier currentTimeSupplier) { super(Settings.EMPTY); this.threadPool = threadPool; + this.clusterService = Objects.requireNonNull(clusterService); this.client = Objects.requireNonNull(client); this.jobProvider = Objects.requireNonNull(jobProvider); this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); } - public void run(Job job, SchedulerState schedulerState, Allocation allocation, StartJobSchedulerAction.SchedulerTask task, - Consumer handler) { - validate(job, allocation); + public void run(String jobId, long startTime, Long endTime, StartJobSchedulerAction.SchedulerTask task, Consumer handler) { + PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE); + validate(jobId, prelertMetadata); - setJobSchedulerStatus(job.getId(), JobSchedulerStatus.STARTED, error -> { - logger.info("Starting scheduler [{}]", schedulerState); + setJobSchedulerStatus(jobId, SchedulerStatus.STARTED, error -> { + logger.info("[{}] Starting scheduler", jobId); + Job job = prelertMetadata.getJobs().get(jobId); Holder holder = createJobScheduler(job, task, handler); task.setHolder(holder); holder.future = threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).submit(() -> { try { - Long next = holder.scheduledJob.runLookBack(schedulerState); + Long next = holder.scheduledJob.runLookBack(startTime, endTime); if (next != null) { doScheduleRealtime(next, job.getId(), holder); } else { @@ -124,19 +129,26 @@ public class ScheduledJobRunner extends AbstractComponent { } } - public static void validate(Job job, Allocation allocation) { + public static void validate(String jobId, PrelertMetadata prelertMetadata) { + Job job = prelertMetadata.getJobs().get(jobId); + if (job == null) { + throw ExceptionsHelper.missingJobException(jobId); + } + if (job.getSchedulerConfig() == null) { throw new IllegalArgumentException("job [" + job.getId() + "] is not a scheduled job"); } + Allocation allocation = prelertMetadata.getAllocations().get(jobId); if (allocation.getStatus() != JobStatus.OPENED) { throw new ElasticsearchStatusException("cannot start scheduler, expected job status [{}], but got [{}]", RestStatus.CONFLICT, JobStatus.OPENED, allocation.getStatus()); } - if (allocation.getSchedulerState().getStatus() != JobSchedulerStatus.STOPPED) { + SchedulerStatus status = prelertMetadata.getSchedulerStatuses().get(jobId); + if (status != SchedulerStatus.STOPPED) { throw new ElasticsearchStatusException("scheduler already started, expected scheduler status [{}], but got [{}]", - RestStatus.CONFLICT, JobSchedulerStatus.STOPPED, allocation.getSchedulerState().getStatus()); + RestStatus.CONFLICT, SchedulerStatus.STOPPED, status); } } @@ -191,7 +203,7 @@ public class ScheduledJobRunner extends AbstractComponent { return new TimeValue(Math.max(1, next - currentTimeSupplier.get())); } - private void setJobSchedulerStatus(String jobId, JobSchedulerStatus status, Consumer supplier) { + private void setJobSchedulerStatus(String jobId, SchedulerStatus status, Consumer supplier) { UpdateJobSchedulerStatusAction.Request request = new UpdateJobSchedulerStatusAction.Request(jobId, status); client.execute(UpdateJobSchedulerStatusAction.INSTANCE, request, new ActionListener() { @Override @@ -235,7 +247,7 @@ public class ScheduledJobRunner extends AbstractComponent { logger.info("Stopping scheduler for job [{}]", jobId); scheduledJob.stop(); FutureUtils.cancel(future); - setJobSchedulerStatus(jobId, JobSchedulerStatus.STOPPED, error -> handler.accept(null)); + setJobSchedulerStatus(jobId, SchedulerStatus.STOPPED, error -> handler.accept(null)); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerAction.java index 4c1f8b35c73..a7575950c13 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerAction.java @@ -26,11 +26,8 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.prelert.PrelertPlugin; import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; -import org.elasticsearch.xpack.prelert.job.manager.JobManager; import org.elasticsearch.xpack.prelert.job.messages.Messages; -import org.elasticsearch.xpack.prelert.job.metadata.Allocation; +import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunner; import java.io.IOException; @@ -39,14 +36,11 @@ public class RestStartJobSchedulerAction extends BaseRestHandler { private static final String DEFAULT_START = "0"; - private final JobManager jobManager; private final ClusterService clusterService; @Inject - public RestStartJobSchedulerAction(Settings settings, RestController controller, JobManager jobManager, - ClusterService clusterService) { + public RestStartJobSchedulerAction(Settings settings, RestController controller, ClusterService clusterService) { super(settings); - this.jobManager = jobManager; this.clusterService = clusterService; controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH + "schedulers/{" + Job.ID.getPreferredName() + "}/_start", this); @@ -59,9 +53,8 @@ public class RestStartJobSchedulerAction extends BaseRestHandler { // This validation happens also in ScheduledJobRunner, the reason we do it here too is that if it fails there // we are unable to provide the user immediate feedback. We would create the task and the validation would fail // in the background, whereas now the validation failure is part of the response being returned. - Job job = jobManager.getJobOrThrowIfUnknown(jobId); - Allocation allocation = jobManager.getJobAllocation(jobId); - ScheduledJobRunner.validate(job, allocation); + PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE); + ScheduledJobRunner.validate(jobId, prelertMetadata); StartJobSchedulerAction.Request jobSchedulerRequest; if (RestActions.hasBodyContent(restRequest)) { @@ -69,15 +62,15 @@ public class RestStartJobSchedulerAction extends BaseRestHandler { XContentParser parser = XContentFactory.xContent(bodyBytes).createParser(bodyBytes); jobSchedulerRequest = StartJobSchedulerAction.Request.parseRequest(jobId, parser, () -> parseFieldMatcher); } else { - long startTimeMillis = parseDateOrThrow(restRequest.param(SchedulerState.START_TIME_MILLIS.getPreferredName(), DEFAULT_START), - SchedulerState.START_TIME_MILLIS.getPreferredName()); + long startTimeMillis = parseDateOrThrow(restRequest.param(StartJobSchedulerAction.START_TIME.getPreferredName(), + DEFAULT_START), StartJobSchedulerAction.START_TIME.getPreferredName()); Long endTimeMillis = null; - if (restRequest.hasParam(SchedulerState.END_TIME_MILLIS.getPreferredName())) { - endTimeMillis = parseDateOrThrow(restRequest.param(SchedulerState.END_TIME_MILLIS.getPreferredName()), - SchedulerState.END_TIME_MILLIS.getPreferredName()); + if (restRequest.hasParam(StartJobSchedulerAction.END_TIME.getPreferredName())) { + endTimeMillis = parseDateOrThrow(restRequest.param(StartJobSchedulerAction.END_TIME.getPreferredName()), + StartJobSchedulerAction.END_TIME.getPreferredName()); } - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, startTimeMillis, endTimeMillis); - jobSchedulerRequest = new StartJobSchedulerAction.Request(jobId, schedulerState); + jobSchedulerRequest = new StartJobSchedulerAction.Request(jobId, startTimeMillis); + jobSchedulerRequest.setEndTime(endTimeMillis); } return sendTask(client.executeLocally(StartJobSchedulerAction.INSTANCE, jobSchedulerRequest, LoggingTaskListener.instance())); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionResponseTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionResponseTests.java index 3d4742db114..5d8ffde7d9e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionResponseTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionResponseTests.java @@ -13,13 +13,12 @@ import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.Detector; import org.elasticsearch.xpack.prelert.job.IgnoreDowntime; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.ModelDebugConfig; import org.elasticsearch.xpack.prelert.job.ModelSizeStats; import org.elasticsearch.xpack.prelert.job.SchedulerConfig; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.transform.TransformConfig; import org.elasticsearch.xpack.prelert.job.transform.TransformType; import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase; @@ -86,10 +85,9 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase { DataCounts dataCounts = getDataCounts("_job_id"); @@ -86,8 +84,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { PrelertMetadata prelertMetadata = client().admin().cluster().prepareState().all().get() .getState().metaData().custom(PrelertMetadata.TYPE); - assertThat(prelertMetadata.getAllocations().get("_job_id").getSchedulerState().getStatus(), - equalTo(JobSchedulerStatus.STOPPED)); + assertThat(prelertMetadata.getSchedulerStatuses().get("_job_id"), equalTo(SchedulerStatus.STOPPED)); }); } @@ -110,9 +107,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { AtomicReference errorHolder = new AtomicReference<>(); Thread t = new Thread(() -> { try { - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, null); - StartJobSchedulerAction.Request startSchedulerRequest = - new StartJobSchedulerAction.Request("_job_id", schedulerState); + StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", 0L); client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest).get(); } catch (Exception | AssertionError e) { errorHolder.set(e); @@ -138,8 +133,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { assertBusy(() -> { PrelertMetadata prelertMetadata = client().admin().cluster().prepareState().all().get() .getState().metaData().custom(PrelertMetadata.TYPE); - assertThat(prelertMetadata.getAllocations().get("_job_id").getSchedulerState().getStatus(), - equalTo(JobSchedulerStatus.STOPPED)); + assertThat(prelertMetadata.getSchedulerStatuses().get("_job_id"), equalTo(SchedulerStatus.STOPPED)); }); assertThat(errorHolder.get(), nullValue()); } @@ -217,7 +211,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { } catch (Exception e) { fail(); } - assertThat(r.getResponse().results().get(0).getSchedulerState().getStatus(), equalTo(JobSchedulerStatus.STOPPED)); + assertThat(r.getResponse().results().get(0).getSchedulerStatus(), equalTo(SchedulerStatus.STOPPED)); }); } catch (Exception e) { // ignore diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerActionRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerActionRequestTests.java index 780a2631ffc..b3577697579 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerActionRequestTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StartJobSchedulerActionRequestTests.java @@ -8,16 +8,17 @@ package org.elasticsearch.xpack.prelert.action; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction.Request; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.support.AbstractStreamableXContentTestCase; public class StartJobSchedulerActionRequestTests extends AbstractStreamableXContentTestCase { @Override protected Request createTestInstance() { - SchedulerState state = new SchedulerState(JobSchedulerStatus.STARTED, randomLong(), randomLong()); - return new Request(randomAsciiOfLengthBetween(1, 20), state); + Request request = new Request(randomAsciiOfLength(10), randomPositiveLong()); + if (randomBoolean()) { + request.setEndTime(randomPositiveLong()); + } + return request; } @Override diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusRequestTests.java index 26344beedc1..f27eed7b651 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusRequestTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/UpdateJobSchedulerStatusRequestTests.java @@ -6,14 +6,14 @@ package org.elasticsearch.xpack.prelert.action; import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction.Request; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase; public class UpdateJobSchedulerStatusRequestTests extends AbstractStreamableTestCase { @Override protected Request createTestInstance() { - return new Request(randomAsciiOfLengthBetween(1, 20), randomFrom(JobSchedulerStatus.values())); + return new Request(randomAsciiOfLengthBetween(1, 20), randomFrom(SchedulerStatus.values())); } @Override diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java index 2902fc49f07..6c926aa8d99 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java @@ -125,7 +125,7 @@ public class ScheduledJobIT extends ESRestTestCase { () -> client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/" + jobId)); response = e.getResponse(); assertThat(response.getStatusLine().getStatusCode(), equalTo(409)); - assertThat(responseEntityToString(response), containsString("Cannot delete job '" + jobId + "' while the scheduler is running")); + assertThat(responseEntityToString(response), containsString("Cannot delete job '" + jobId + "' while it is OPENED")); response = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_stop"); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatusTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatusTests.java index c08a8e96dbd..3b149e6de13 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatusTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobSchedulerStatusTests.java @@ -10,13 +10,13 @@ import org.elasticsearch.test.ESTestCase; public class JobSchedulerStatusTests extends ESTestCase { public void testForString() { - assertEquals(JobSchedulerStatus.fromString("started"), JobSchedulerStatus.STARTED); - assertEquals(JobSchedulerStatus.fromString("stopped"), JobSchedulerStatus.STOPPED); + assertEquals(SchedulerStatus.fromString("started"), SchedulerStatus.STARTED); + assertEquals(SchedulerStatus.fromString("stopped"), SchedulerStatus.STOPPED); } public void testValidOrdinals() { - assertEquals(0, JobSchedulerStatus.STARTED.ordinal()); - assertEquals(1, JobSchedulerStatus.STOPPED.ordinal()); + assertEquals(0, SchedulerStatus.STARTED.ordinal()); + assertEquals(1, SchedulerStatus.STOPPED.ordinal()); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/SchedulerStateTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/SchedulerStateTests.java deleted file mode 100644 index a2795c90096..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/SchedulerStateTests.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.job; - -import org.elasticsearch.common.ParseFieldMatcher; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase; - -public class SchedulerStateTests extends AbstractSerializingTestCase { - - @Override - protected SchedulerState createTestInstance() { - return new SchedulerState(randomFrom(JobSchedulerStatus.values()), randomPositiveLong(), - randomBoolean() ? null : randomPositiveLong()); - } - - @Override - protected Writeable.Reader instanceReader() { - return SchedulerState::new; - } - - @Override - protected SchedulerState parseInstance(XContentParser parser, ParseFieldMatcher matcher) { - return SchedulerState.PARSER.apply(parser, () -> matcher); - } - - public void testEquals_GivenDifferentClass() { - - assertFalse(new SchedulerState(JobSchedulerStatus.STARTED, 0L, null).equals("a string")); - } - - public void testEquals_GivenSameReference() { - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 18L, 42L); - assertTrue(schedulerState.equals(schedulerState)); - } - - public void testEquals_GivenEqualObjects() { - SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, 18L, 42L); - SchedulerState schedulerState2 = new SchedulerState(schedulerState1.getStatus(), schedulerState1.getStartTimeMillis(), - schedulerState1.getEndTimeMillis()); - - assertTrue(schedulerState1.equals(schedulerState2)); - assertTrue(schedulerState2.equals(schedulerState1)); - assertEquals(schedulerState1.hashCode(), schedulerState2.hashCode()); - } - - public void testEquals_GivenDifferentStatus() { - SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, 18L, 42L); - SchedulerState schedulerState2 = new SchedulerState(JobSchedulerStatus.STOPPED, schedulerState1.getStartTimeMillis(), - schedulerState1.getEndTimeMillis()); - - assertFalse(schedulerState1.equals(schedulerState2)); - assertFalse(schedulerState2.equals(schedulerState1)); - } - - public void testEquals_GivenDifferentStartTimeMillis() { - SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, null, 42L); - SchedulerState schedulerState2 = new SchedulerState(JobSchedulerStatus.STOPPED, 19L, schedulerState1.getEndTimeMillis()); - - assertFalse(schedulerState1.equals(schedulerState2)); - assertFalse(schedulerState2.equals(schedulerState1)); - } - - public void testEquals_GivenDifferentEndTimeMillis() { - SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, 18L, 42L); - SchedulerState schedulerState2 = new SchedulerState(JobSchedulerStatus.STOPPED, schedulerState1.getStartTimeMillis(), 43L); - - assertFalse(schedulerState1.equals(schedulerState2)); - assertFalse(schedulerState2.equals(schedulerState1)); - } -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java index d303f559444..93533ba7ad3 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.prelert.job.manager; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -34,7 +33,6 @@ import java.util.stream.Collectors; import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -193,23 +191,6 @@ public class JobManagerTests extends ESTestCase { assertThat(result.results().get(0).getId(), equalTo("9")); } - public void testInnerPutJob() { - JobManager jobManager = createJobManager(); - ClusterState cs = createClusterState(); - - Job job1 = buildJobBuilder("_id").build(); - ClusterState result1 = jobManager.innerPutJob(job1, false, cs); - PrelertMetadata pm = result1.getMetaData().custom(PrelertMetadata.TYPE); - assertThat(pm.getJobs().get("_id"), sameInstance(job1)); - - Job job2 = buildJobBuilder("_id").build(); - expectThrows(ResourceAlreadyExistsException.class, () -> jobManager.innerPutJob(job2, false, result1)); - - ClusterState result2 = jobManager.innerPutJob(job2, true, result1); - pm = result2.getMetaData().custom(PrelertMetadata.TYPE); - assertThat(pm.getJobs().get("_id"), sameInstance(job2)); - } - private JobManager createJobManager() { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java index b11bc83939c..06ef8b25a55 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java @@ -8,9 +8,7 @@ package org.elasticsearch.xpack.prelert.job.metadata; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase; public class AllocationTests extends AbstractSerializingTestCase { @@ -22,8 +20,7 @@ public class AllocationTests extends AbstractSerializingTestCase { boolean ignoreDowntime = randomBoolean(); JobStatus jobStatus = randomFrom(JobStatus.values()); String statusReason = randomBoolean() ? randomAsciiOfLength(10) : null; - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, randomPositiveLong(), randomPositiveLong()); - return new Allocation(nodeId, jobId, ignoreDowntime, jobStatus, statusReason, schedulerState); + return new Allocation(nodeId, jobId, ignoreDowntime, jobStatus, statusReason); } @Override @@ -37,7 +34,7 @@ public class AllocationTests extends AbstractSerializingTestCase { } public void testUnsetIgnoreDownTime() { - Allocation allocation = new Allocation("_node_id", "_job_id", true, JobStatus.OPENING, null, null); + Allocation allocation = new Allocation("_node_id", "_job_id", true, JobStatus.OPENING, null); assertTrue(allocation.isIgnoreDowntime()); Allocation.Builder builder = new Allocation.Builder(allocation); builder.setStatus(JobStatus.OPENED); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java index a91a33e8c50..a38a3f5389c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java @@ -17,13 +17,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.prelert.job.DataDescription; -import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerConfig; import org.junit.Before; -import java.util.Collections; import java.util.concurrent.ExecutorService; import static org.mockito.Mockito.doAnswer; @@ -181,31 +176,4 @@ public class JobAllocatorTests extends ESTestCase { verify(clusterService, times(1)).submitStateUpdateTask(any(), any()); } - public void testScheduledJobHasDefaultSchedulerState() { - PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); - - SchedulerConfig.Builder schedulerConfigBuilder = new SchedulerConfig.Builder(Collections.singletonList("foo"), - Collections.singletonList("bar")); - - Job.Builder jobBuilder = buildJobBuilder("_job_id"); - jobBuilder.setSchedulerConfig(schedulerConfigBuilder); - DataDescription.Builder dataDescriptionBuilder = new DataDescription.Builder(); - dataDescriptionBuilder.setFormat(DataDescription.DataFormat.ELASTICSEARCH); - jobBuilder.setDataDescription(dataDescriptionBuilder); - - pmBuilder.putJob(jobBuilder.build(), false); - ClusterState cs = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() - .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("_id", new LocalTransportAddress("_id"), Version.CURRENT)) - .masterNodeId("_id") - .localNodeId("_id")) - .build(); - - - ClusterState clusterStateWithAllocation = jobAllocator.assignJobsToNodes(cs); - PrelertMetadata metadata = clusterStateWithAllocation.metaData().custom(PrelertMetadata.TYPE); - assertEquals(JobSchedulerStatus.STOPPED, metadata.getAllocations().get("_job_id").getSchedulerState().getStatus()); - } - } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java index 605ff54152b..2d6b28cef2c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java @@ -27,6 +27,7 @@ import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; public class PrelertMetadataTests extends ESTestCase { @@ -90,20 +91,29 @@ public class PrelertMetadataTests extends ESTestCase { } public void testPutJob() { + Job job1 = buildJobBuilder("1").build(); + Job job2 = buildJobBuilder("2").build(); + PrelertMetadata.Builder builder = new PrelertMetadata.Builder(); - builder.putJob(buildJobBuilder("1").build(), false); - builder.putJob(buildJobBuilder("2").build(), false); - - ResourceAlreadyExistsException e = expectThrows(ResourceAlreadyExistsException.class, - () -> builder.putJob(buildJobBuilder("2").build(), false)); - assertEquals("The job cannot be created with the Id '2'. The Id is already used.", e.getMessage()); - - builder.putJob(buildJobBuilder("2").build(), true); + builder.putJob(job1, false); + builder.putJob(job2, false); PrelertMetadata result = builder.build(); + assertThat(result.getJobs().get("1"), sameInstance(job1)); + assertThat(result.getJobs().get("2"), sameInstance(job2)); + + builder = new PrelertMetadata.Builder(result); + + PrelertMetadata.Builder builderReference = builder; + ResourceAlreadyExistsException e = expectThrows(ResourceAlreadyExistsException.class, () -> builderReference.putJob(job2, false)); + assertEquals("The job cannot be created with the Id '2'. The Id is already used.", e.getMessage()); + Job job2Attempt2 = buildJobBuilder("2").build(); + builder.putJob(job2Attempt2, true); + + result = builder.build(); assertThat(result.getJobs().size(), equalTo(2)); - assertThat(result.getJobs().get("1"), notNullValue()); - assertThat(result.getJobs().get("2"), notNullValue()); + assertThat(result.getJobs().get("1"), sameInstance(job1)); + assertThat(result.getJobs().get("2"), sameInstance(job2Attempt2)); } public void testUpdateAllocation_setFinishedTime() { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java index 804162c9bd7..869767643cb 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java @@ -9,6 +9,10 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -22,16 +26,14 @@ import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.Detector; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.SchedulerConfig; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.data.DataProcessor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory; -import org.elasticsearch.xpack.prelert.job.manager.JobManager; -import org.elasticsearch.xpack.prelert.job.metadata.Allocation; +import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; @@ -67,6 +69,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { private Client client; private ActionFuture jobDataFuture; private ActionFuture flushJobFuture; + private ClusterService clusterService; private ThreadPool threadPool; private DataExtractorFactory dataExtractorFactory; private ScheduledJobRunner scheduledJobRunner; @@ -78,6 +81,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { client = mock(Client.class); jobDataFuture = mock(ActionFuture.class); flushJobFuture = mock(ActionFuture.class); + clusterService = mock(ClusterService.class); doAnswer(invocation -> { @SuppressWarnings("unchecked") ActionListener actionListener = (ActionListener) invocation.getArguments()[2]; @@ -100,7 +104,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); scheduledJobRunner = - new ScheduledJobRunner(threadPool, client, jobProvider, dataExtractorFactory, () -> currentTime); + new ScheduledJobRunner(threadPool, client, clusterService,jobProvider, dataExtractorFactory, () -> currentTime); when(jobProvider.audit(anyString())).thenReturn(auditor); when(jobProvider.buckets(anyString(), any(BucketsQueryBuilder.BucketsQuery.class))).thenThrow( @@ -109,11 +113,14 @@ public class ScheduledJobRunnerTests extends ESTestCase { public void testStart_GivenNewlyCreatedJobLoopBack() throws Exception { Job.Builder builder = createScheduledJob(); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, 60000L); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); Job job = builder.build(); - Allocation allocation = - new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); + PrelertMetadata prelertMetadata = new PrelertMetadata.Builder().putJob(job, false) + .updateStatus("foo", JobStatus.OPENED, null) + .build(); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata)) + .build()); DataExtractor dataExtractor = mock(DataExtractor.class); when(dataExtractorFactory.newExtractor(job)).thenReturn(dataExtractor); @@ -123,24 +130,27 @@ public class ScheduledJobRunnerTests extends ESTestCase { when(jobDataFuture.get()).thenReturn(new JobDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); StartJobSchedulerAction.SchedulerTask task = mock(StartJobSchedulerAction.SchedulerTask.class); - scheduledJobRunner.run(job, schedulerState, allocation, task, handler); + scheduledJobRunner.run("foo", 0L, 60000L, task, handler); verify(dataExtractor).newSearch(eq(0L), eq(60000L), any()); verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo"))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); - verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STARTED)), any()); - verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); + verify(client).execute(same(INSTANCE), eq(new Request("foo", SchedulerStatus.STARTED)), any()); + verify(client).execute(same(INSTANCE), eq(new Request("foo", SchedulerStatus.STOPPED)), any()); } public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception { Job.Builder builder = createScheduledJob(); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, null); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); Job job = builder.build(); - Allocation allocation = - new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); + PrelertMetadata prelertMetadata = new PrelertMetadata.Builder().putJob(job, false) + .updateStatus("foo", JobStatus.OPENED, null) + .build(); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata)) + .build()); DataExtractor dataExtractor = mock(DataExtractor.class); when(dataExtractorFactory.newExtractor(job)).thenReturn(dataExtractor); @@ -151,13 +161,13 @@ public class ScheduledJobRunnerTests extends ESTestCase { Consumer handler = mockConsumer(); boolean cancelled = randomBoolean(); StartJobSchedulerAction.SchedulerTask task = new StartJobSchedulerAction.SchedulerTask(1, "type", "action", null, "foo"); - scheduledJobRunner.run(job, schedulerState, allocation, task, handler); + scheduledJobRunner.run("foo", 0L, null, task, handler); verify(dataExtractor).newSearch(eq(0L), eq(60000L), any()); verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME); if (cancelled) { task.stop(); - verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); + verify(client).execute(same(INSTANCE), eq(new Request("foo", SchedulerStatus.STOPPED)), any()); } else { verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo"))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); @@ -183,19 +193,21 @@ public class ScheduledJobRunnerTests extends ESTestCase { public void testValidate() { Job job1 = buildJobBuilder("foo").build(); - Exception e = expectThrows(IllegalArgumentException.class, () -> ScheduledJobRunner.validate(job1, null)); + PrelertMetadata prelertMetadata1 = new PrelertMetadata.Builder().putJob(job1, false).build(); + Exception e = expectThrows(IllegalArgumentException.class, () -> ScheduledJobRunner.validate("foo", prelertMetadata1)); assertThat(e.getMessage(), equalTo("job [foo] is not a scheduled job")); Job job2 = createScheduledJob().build(); - Allocation allocation1 = - new Allocation("_id", "_id", false, JobStatus.CLOSED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); - e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate(job2, allocation1)); + PrelertMetadata prelertMetadata2 = new PrelertMetadata.Builder().putJob(job2, false).build(); + e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate("foo", prelertMetadata2)); assertThat(e.getMessage(), equalTo("cannot start scheduler, expected job status [OPENED], but got [CLOSED]")); Job job3 = createScheduledJob().build(); - Allocation allocation2 = - new Allocation("_id", "_id", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STARTED, null, null)); - e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate(job3, allocation2)); + PrelertMetadata prelertMetadata3 = new PrelertMetadata.Builder().putJob(job3, false) + .updateStatus("foo", JobStatus.OPENED, null) + .updateSchedulerStatus("foo", SchedulerStatus.STARTED) + .build(); + e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate("foo", prelertMetadata3)); assertThat(e.getMessage(), equalTo("scheduler already started, expected scheduler status [STOPPED], but got [STARTED]")); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java index 2edbb318cf4..374fc278d54 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java @@ -11,8 +11,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.prelert.action.FlushJobAction; import org.elasticsearch.xpack.prelert.action.JobDataAction; import org.elasticsearch.xpack.prelert.job.DataCounts; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.junit.Before; @@ -64,8 +62,7 @@ public class ScheduledJobTests extends ESTestCase { public void testLookBackRunWithEndTime() throws Exception { ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L); - assertNull(scheduledJob.runLookBack(schedulerState)); + assertNull(scheduledJob.runLookBack(0L, 1000L)); verify(dataExtractor).newSearch(eq(0L), eq(1000L), any()); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); @@ -78,8 +75,7 @@ public class ScheduledJobTests extends ESTestCase { long frequencyMs = 1000; long queryDelayMs = 500; ScheduledJob scheduledJob = createScheduledJob(frequencyMs, queryDelayMs, -1, -1); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, null); - long next = scheduledJob.runLookBack(schedulerState); + long next = scheduledJob.runLookBack(0L, null); assertEquals(2000 + frequencyMs + 100, next); verify(dataExtractor).newSearch(eq(0L), eq(1500L), any()); @@ -101,8 +97,7 @@ public class ScheduledJobTests extends ESTestCase { long frequencyMs = 1000; long queryDelayMs = 500; ScheduledJob scheduledJob = createScheduledJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, null); - long next = scheduledJob.runLookBack(schedulerState); + long next = scheduledJob.runLookBack(0L, null); assertEquals(10000 + frequencyMs + 100, next); verify(dataExtractor).newSearch(eq(5000 + 1L), eq(currentTime - queryDelayMs), any()); @@ -131,8 +126,7 @@ public class ScheduledJobTests extends ESTestCase { when(dataExtractor.hasNext()).thenReturn(false); ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L); - expectThrows(ScheduledJob.EmptyDataCountException.class, () -> scheduledJob.runLookBack(schedulerState)); + expectThrows(ScheduledJob.EmptyDataCountException.class, () -> scheduledJob.runLookBack(0L, 1000L)); } public void testExtractionProblem() throws Exception { @@ -141,8 +135,7 @@ public class ScheduledJobTests extends ESTestCase { when(dataExtractor.next()).thenThrow(new IOException()); ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L); - expectThrows(ScheduledJob.ExtractionProblemException.class, () -> scheduledJob.runLookBack(schedulerState)); + expectThrows(ScheduledJob.ExtractionProblemException.class, () -> scheduledJob.runLookBack(0L, 1000L)); currentTime = 3001; expectThrows(ScheduledJob.ExtractionProblemException.class, scheduledJob::runRealtime); @@ -162,8 +155,7 @@ public class ScheduledJobTests extends ESTestCase { when(client.execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("_job_id")))).thenThrow(new RuntimeException()); ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1); - SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L); - expectThrows(ScheduledJob.AnalysisProblemException.class, () -> scheduledJob.runLookBack(schedulerState)); + expectThrows(ScheduledJob.AnalysisProblemException.class, () -> scheduledJob.runLookBack(0L, 1000L)); currentTime = 3001; expectThrows(ScheduledJob.EmptyDataCountException.class, scheduledJob::runRealtime); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java index 2ac57c76ce2..87faac6829a 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java @@ -7,6 +7,9 @@ package org.elasticsearch.xpack.prelert.rest.schedulers; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestController; @@ -14,38 +17,43 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobStatus; -import org.elasticsearch.xpack.prelert.job.SchedulerState; -import org.elasticsearch.xpack.prelert.job.manager.JobManager; -import org.elasticsearch.xpack.prelert.job.metadata.Allocation; +import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunnerTests; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; -import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class RestStartJobSchedulerActionTests extends ESTestCase { public void testPrepareRequest() throws Exception { - JobManager jobManager = mock(JobManager.class); + ClusterService clusterService = mock(ClusterService.class); Job.Builder job = ScheduledJobRunnerTests.createScheduledJob(); - when(jobManager.getJobOrThrowIfUnknown(anyString())).thenReturn(job.build()); - Allocation allocation = - new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); - when(jobManager.getJobAllocation(anyString())).thenReturn(allocation); - RestStartJobSchedulerAction action = new RestStartJobSchedulerAction(Settings.EMPTY, mock(RestController.class), - jobManager, mock(ClusterService.class)); + PrelertMetadata prelertMetadata = new PrelertMetadata.Builder() + .putJob(job.build(), false) + .updateStatus("foo", JobStatus.OPENED, null) + .build(); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata)) + .build()); + RestStartJobSchedulerAction action = new RestStartJobSchedulerAction(Settings.EMPTY, mock(RestController.class), clusterService); - RestRequest restRequest1 = new FakeRestRequest.Builder().withParams(Collections.singletonMap("start", "not-a-date")).build(); + Map params = new HashMap<>(); + params.put("start", "not-a-date"); + params.put("job_id", "foo"); + RestRequest restRequest1 = new FakeRestRequest.Builder().withParams(params).build(); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> action.prepareRequest(restRequest1, mock(NodeClient.class))); assertEquals("Query param 'start' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).", e.getMessage()); - RestRequest restRequest2 = new FakeRestRequest.Builder().withParams(Collections.singletonMap("end", "not-a-date")).build(); + params = new HashMap<>(); + params.put("end", "not-a-date"); + params.put("job_id", "foo"); + RestRequest restRequest2 = new FakeRestRequest.Builder().withParams(params).build(); e = expectThrows(ElasticsearchParseException.class, () -> action.prepareRequest(restRequest2, mock(NodeClient.class))); assertEquals("Query param 'end' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).", e.getMessage()); diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml index c235ce1b2dc..14810261c2b 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_stats.yaml @@ -146,7 +146,7 @@ setup: - is_false: jobs.0.config - is_false: jobs.0.data_counts - is_false: jobs.0.model_size_stats - - match: { jobs.0.scheduler_state.status: STOPPED } + - match: { jobs.0.scheduler_state: STOPPED } --- "Test bad metric":