Moved scheduler status to prelertmetadata to make it more independent of job.

Removed SchedulerStats as scheduler status is all we need and start and end times are only needed in start scheduler api.

Original commit: elastic/x-pack-elasticsearch@80c563cb69
This commit is contained in:
Martijn van Groningen 2016-12-09 14:43:41 +01:00
parent e067008a21
commit d5412627d2
28 changed files with 394 additions and 599 deletions

View File

@ -168,7 +168,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier);
DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory, clusterService.getClusterSettings());
ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, jobProvider,
ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider,
// norelease: we will no longer need to pass the client here after we switch to a client based data extractor
new HttpDataExtractorFactory(client),
System::currentTimeMillis);

View File

@ -39,9 +39,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
@ -280,19 +280,19 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
@Nullable
private ModelSizeStats modelSizeStats;
@Nullable
private SchedulerState schedulerState;
private SchedulerStatus schedulerStatus;
@Nullable
private JobStatus status;
JobInfo(String jobId, @Nullable Job job, @Nullable DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats,
@Nullable SchedulerState schedulerStatus, @Nullable JobStatus status) {
@Nullable SchedulerStatus schedulerStatus, @Nullable JobStatus status) {
this.jobId = jobId;
this.jobConfig = job;
this.dataCounts = dataCounts;
this.modelSizeStats = modelSizeStats;
this.schedulerState = schedulerStatus;
this.schedulerStatus = schedulerStatus;
this.status = status;
}
@ -301,7 +301,7 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
jobConfig = in.readOptionalWriteable(Job::new);
dataCounts = in.readOptionalWriteable(DataCounts::new);
modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new);
schedulerState = in.readOptionalWriteable(SchedulerState::new);
schedulerStatus = in.readOptionalWriteable(SchedulerStatus::fromStream);
status = in.readOptionalWriteable(JobStatus::fromStream);
}
@ -321,8 +321,8 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
return modelSizeStats;
}
public SchedulerState getSchedulerState() {
return schedulerState;
public SchedulerStatus getSchedulerStatus() {
return schedulerStatus;
}
public JobStatus getStatus() {
@ -342,8 +342,8 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
if (modelSizeStats != null) {
builder.field(MODEL_SIZE_STATS, modelSizeStats);
}
if (schedulerState != null) {
builder.field(SCHEDULER_STATE, schedulerState);
if (schedulerStatus != null) {
builder.field(SCHEDULER_STATE, schedulerStatus);
}
if (status != null) {
builder.field(STATUS, status);
@ -359,13 +359,13 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
out.writeOptionalWriteable(jobConfig);
out.writeOptionalWriteable(dataCounts);
out.writeOptionalWriteable(modelSizeStats);
out.writeOptionalWriteable(schedulerState);
out.writeOptionalWriteable(schedulerStatus);
out.writeOptionalWriteable(status);
}
@Override
public int hashCode() {
return Objects.hash(jobId, jobConfig, dataCounts, modelSizeStats, schedulerState, status);
return Objects.hash(jobId, jobConfig, dataCounts, modelSizeStats, schedulerStatus, status);
}
@Override
@ -381,7 +381,7 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
&& Objects.equals(jobConfig, other.jobConfig)
&& Objects.equals(this.dataCounts, other.dataCounts)
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.schedulerState, other.schedulerState)
&& Objects.equals(this.schedulerStatus, other.schedulerStatus)
&& Objects.equals(this.status, other.status);
}
}
@ -505,7 +505,7 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
Job jobConfig = request.config() ? jobs.results().get(0) : null;
DataCounts dataCounts = readDataCounts(request.dataCounts(), request.getJobId());
ModelSizeStats modelSizeStats = readModelSizeStats(request.modelSizeStats(), request.getJobId());
SchedulerState schedulerStatus = readSchedulerState(request.schedulerStatus(), request.getJobId());
SchedulerStatus schedulerStatus = readSchedulerState(request.schedulerStatus(), request.getJobId());
JobStatus jobStatus = readJobStatus(request.status(), request.getJobId());
Response.JobInfo jobInfo = new Response.JobInfo(
@ -522,7 +522,7 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
Job jobConfig = request.config() ? job : null;
DataCounts dataCounts = readDataCounts(request.dataCounts(), job.getId());
ModelSizeStats modelSizeStats = readModelSizeStats(request.modelSizeStats(), job.getId());
SchedulerState schedulerStatus = readSchedulerState(request.schedulerStatus(), job.getId());
SchedulerStatus schedulerStatus = readSchedulerState(request.schedulerStatus(), job.getId());
JobStatus jobStatus = readJobStatus(request.status(), job.getId());
Response.JobInfo jobInfo = new Response.JobInfo(job.getId(), jobConfig, dataCounts, modelSizeStats,
schedulerStatus, jobStatus);
@ -557,8 +557,8 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
return null;
}
private SchedulerState readSchedulerState(boolean schedulerState, String jobId) {
return schedulerState ? jobManager.getSchedulerState(jobId).orElse(null) : null;
private SchedulerStatus readSchedulerState(boolean schedulerState, String jobId) {
return schedulerState ? jobManager.getSchedulerStatus(jobId).orElse(null) : null;
}
private JobStatus readJobStatus(boolean status, String jobId) {

View File

@ -15,6 +15,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -30,10 +31,6 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunner;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
@ -43,6 +40,9 @@ import java.util.Objects;
public class StartJobSchedulerAction
extends Action<StartJobSchedulerAction.Request, StartJobSchedulerAction.Response, StartJobSchedulerAction.RequestBuilder> {
public static final ParseField START_TIME = new ParseField("start");
public static final ParseField END_TIME = new ParseField("end");
public static final StartJobSchedulerAction INSTANCE = new StartJobSchedulerAction();
public static final String NAME = "cluster:admin/prelert/job/scheduler/run";
@ -66,8 +66,8 @@ extends Action<StartJobSchedulerAction.Request, StartJobSchedulerAction.Response
static {
PARSER.declareString((request, jobId) -> request.jobId = jobId, Job.ID);
PARSER.declareObject((request, schedulerState) -> request.schedulerState = schedulerState, SchedulerState.PARSER,
SchedulerState.TYPE_FIELD);
PARSER.declareLong((request, startTime) -> request.startTime = startTime, START_TIME);
PARSER.declareLong(Request::setEndTime, END_TIME);
}
public static Request parseRequest(String jobId, XContentParser parser, ParseFieldMatcherSupplier parseFieldMatcherSupplier) {
@ -79,17 +79,12 @@ extends Action<StartJobSchedulerAction.Request, StartJobSchedulerAction.Response
}
private String jobId;
// TODO (norelease): instead of providing a scheduler state, the user should just provide: startTimeMillis and endTimeMillis
// the state is useless here as it should always be STARTING
private SchedulerState schedulerState;
private long startTime;
private Long endTime;
public Request(String jobId, SchedulerState schedulerState) {
public Request(String jobId, long startTime) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
this.schedulerState = ExceptionsHelper.requireNonNull(schedulerState, SchedulerState.TYPE_FIELD.getPreferredName());
if (schedulerState.getStatus() != JobSchedulerStatus.STARTED) {
throw new IllegalStateException(
"Start job scheduler action requires the scheduler status to be [" + JobSchedulerStatus.STARTED + "]");
}
this.startTime = startTime;
}
Request() {
@ -99,8 +94,16 @@ extends Action<StartJobSchedulerAction.Request, StartJobSchedulerAction.Response
return jobId;
}
public SchedulerState getSchedulerState() {
return schedulerState;
public long getStartTime() {
return startTime;
}
public Long getEndTime() {
return endTime;
}
public void setEndTime(Long endTime) {
this.endTime = endTime;
}
@Override
@ -117,28 +120,33 @@ extends Action<StartJobSchedulerAction.Request, StartJobSchedulerAction.Response
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
schedulerState = new SchedulerState(in);
startTime = in.readVLong();
endTime = in.readOptionalLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
schedulerState.writeTo(out);
out.writeVLong(startTime);
out.writeOptionalLong(endTime);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(SchedulerState.TYPE_FIELD.getPreferredName(), schedulerState);
builder.field(START_TIME.getPreferredName(), startTime);
if (endTime != null) {
builder.field(END_TIME.getPreferredName(), endTime);
}
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(jobId, schedulerState);
return Objects.hash(jobId, startTime, endTime);
}
@Override
@ -150,7 +158,9 @@ extends Action<StartJobSchedulerAction.Request, StartJobSchedulerAction.Response
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId) && Objects.equals(schedulerState, other.schedulerState);
return Objects.equals(jobId, other.jobId) &&
Objects.equals(startTime, other.startTime) &&
Objects.equals(endTime, other.endTime);
}
}
@ -195,25 +205,21 @@ extends Action<StartJobSchedulerAction.Request, StartJobSchedulerAction.Response
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final JobManager jobManager;
private final ScheduledJobRunner scheduledJobRunner;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager, ScheduledJobRunner scheduledJobRunner) {
ScheduledJobRunner scheduledJobRunner) {
super(settings, StartJobSchedulerAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
Request::new);
this.jobManager = jobManager;
this.scheduledJobRunner = scheduledJobRunner;
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
SchedulerTask schedulerTask = (SchedulerTask) task;
Job job = jobManager.getJobOrThrowIfUnknown(request.jobId);
Allocation allocation = jobManager.getJobAllocation(job.getId());
scheduledJobRunner.run(job, request.getSchedulerState(), allocation, schedulerTask, (error) -> {
scheduledJobRunner.run(request.getJobId(), request.getStartTime(), request.getEndTime(), schedulerTask, (error) -> {
if (error != null) {
listener.onFailure(error);
} else {

View File

@ -26,8 +26,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
@ -57,11 +56,11 @@ public class UpdateJobSchedulerStatusAction extends Action<UpdateJobSchedulerSta
public static class Request extends AcknowledgedRequest<Request> {
private String jobId;
private JobSchedulerStatus schedulerStatus;
private SchedulerStatus schedulerStatus;
public Request(String jobId, JobSchedulerStatus schedulerStatus) {
public Request(String jobId, SchedulerStatus schedulerStatus) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
this.schedulerStatus = ExceptionsHelper.requireNonNull(schedulerStatus, SchedulerState.STATUS.getPreferredName());
this.schedulerStatus = ExceptionsHelper.requireNonNull(schedulerStatus, "status");
}
Request() {}
@ -74,11 +73,11 @@ public class UpdateJobSchedulerStatusAction extends Action<UpdateJobSchedulerSta
this.jobId = jobId;
}
public JobSchedulerStatus getSchedulerStatus() {
public SchedulerStatus getSchedulerStatus() {
return schedulerStatus;
}
public void setSchedulerStatus(JobSchedulerStatus schedulerStatus) {
public void setSchedulerStatus(SchedulerStatus schedulerStatus) {
this.schedulerStatus = schedulerStatus;
}
@ -91,7 +90,7 @@ public class UpdateJobSchedulerStatusAction extends Action<UpdateJobSchedulerSta
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
schedulerStatus = JobSchedulerStatus.fromStream(in);
schedulerStatus = SchedulerStatus.fromStream(in);
}
@Override
@ -122,7 +121,7 @@ public class UpdateJobSchedulerStatusAction extends Action<UpdateJobSchedulerSta
public String toString() {
return "Request{" +
Job.ID.getPreferredName() + "='" + jobId + "', " +
SchedulerState.TYPE_FIELD.getPreferredName() + '=' + schedulerStatus +
"status=" + schedulerStatus +
'}';
}
}

View File

@ -39,7 +39,7 @@ public enum IgnoreDowntime implements Writeable {
public static IgnoreDowntime fromStream(StreamInput in) throws IOException {
int ordinal = in.readVInt();
if (ordinal < 0 || ordinal >= values().length) {
throw new IOException("Unknown public enum JobSchedulerStatus {\n ordinal [" + ordinal + "]");
throw new IOException("Unknown public enum SchedulerStatus {\n ordinal [" + ordinal + "]");
}
return values()[ordinal];
}

View File

@ -1,118 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
public class SchedulerState extends ToXContentToBytes implements Writeable {
public static final ParseField TYPE_FIELD = new ParseField("scheduler_state");
public static final ParseField STATUS = new ParseField("status");
public static final ParseField START_TIME_MILLIS = new ParseField("start");
public static final ParseField END_TIME_MILLIS = new ParseField("end");
public static final ConstructingObjectParser<SchedulerState, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>(
TYPE_FIELD.getPreferredName(), a -> new SchedulerState((JobSchedulerStatus) a[0], (Long) a[1], (Long) a[2]));
static {
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> JobSchedulerStatus.fromString(p.text()), STATUS,
ValueType.STRING);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), START_TIME_MILLIS);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), END_TIME_MILLIS);
}
private JobSchedulerStatus status;
@Nullable
private Long startTimeMillis;
@Nullable
private Long endTimeMillis;
public SchedulerState(JobSchedulerStatus status, Long startTimeMillis, Long endTimeMillis) {
this.status = status;
this.startTimeMillis = startTimeMillis;
this.endTimeMillis = endTimeMillis;
}
public SchedulerState(StreamInput in) throws IOException {
status = JobSchedulerStatus.fromStream(in);
startTimeMillis = in.readOptionalLong();
endTimeMillis = in.readOptionalLong();
}
public JobSchedulerStatus getStatus() {
return status;
}
public Long getStartTimeMillis() {
return startTimeMillis;
}
/**
* The end time as epoch milliseconds. An {@code null} end time indicates
* real-time mode.
*
* @return The optional end time as epoch milliseconds.
*/
@Nullable
public Long getEndTimeMillis() {
return endTimeMillis;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof SchedulerState == false) {
return false;
}
SchedulerState that = (SchedulerState) other;
return Objects.equals(this.status, that.status) && Objects.equals(this.startTimeMillis, that.startTimeMillis)
&& Objects.equals(this.endTimeMillis, that.endTimeMillis);
}
@Override
public int hashCode() {
return Objects.hash(status, startTimeMillis, endTimeMillis);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
status.writeTo(out);
out.writeOptionalLong(startTimeMillis);
out.writeOptionalLong(endTimeMillis);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(STATUS.getPreferredName(), status.name().toUpperCase(Locale.ROOT));
if (startTimeMillis != null) {
builder.field(START_TIME_MILLIS.getPreferredName(), startTimeMillis);
}
if (endTimeMillis != null) {
builder.field(END_TIME_MILLIS.getPreferredName(), endTimeMillis);
}
builder.endObject();
return builder;
}
}

View File

@ -12,18 +12,18 @@ import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.Locale;
public enum JobSchedulerStatus implements Writeable {
public enum SchedulerStatus implements Writeable {
STARTED, STOPPED;
public static JobSchedulerStatus fromString(String name) {
public static SchedulerStatus fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
}
public static JobSchedulerStatus fromStream(StreamInput in) throws IOException {
public static SchedulerStatus fromStream(StreamInput in) throws IOException {
int ordinal = in.readVInt();
if (ordinal < 0 || ordinal >= values().length) {
throw new IOException("Unknown public enum JobSchedulerStatus {\n ordinal [" + ordinal + "]");
throw new IOException("Unknown public enum SchedulerStatus {\n ordinal [" + ordinal + "]");
}
return values()[ordinal];
}

View File

@ -23,10 +23,9 @@ import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction;
import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.prelert.job.IgnoreDowntime;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
@ -152,7 +151,7 @@ public class JobManager extends AbstractComponent {
* @throws org.elasticsearch.ResourceNotFoundException
* if there is no job with matching the given {@code jobId}
*/
public Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) {
Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) {
PrelertMetadata prelertMetadata = clusterState.metaData().custom(PrelertMetadata.TYPE);
Job job = prelertMetadata.getJobs().get(jobId);
if (job == null) {
@ -166,26 +165,25 @@ public class JobManager extends AbstractComponent {
*/
public void putJob(PutJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
Job job = request.getJob();
ActionListener<Boolean> delegateListener = ActionListener.wrap(jobSaved -> {
jobProvider.createJobRelatedIndices(job, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean indicesCreated) {
audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED));
ActionListener<Boolean> delegateListener = ActionListener.wrap(jobSaved ->
jobProvider.createJobRelatedIndices(job, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean indicesCreated) {
audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED));
// Also I wonder if we need to audit log infra
// structure in prelert as when we merge into xpack
// we can use its audit trailing. See:
// https://github.com/elastic/prelert-legacy/issues/48
actionListener.onResponse(new PutJobAction.Response(jobSaved && indicesCreated, job));
}
// Also I wonder if we need to audit log infra
// structure in prelert as when we merge into xpack
// we can use its audit trailing. See:
// https://github.com/elastic/prelert-legacy/issues/48
actionListener.onResponse(new PutJobAction.Response(jobSaved && indicesCreated, job));
}
@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
});
}, actionListener::onFailure);
}
}), actionListener::onFailure);
clusterService.submitStateUpdateTask("put-job-" + job.getId(),
new AckedClusterStateUpdateTask<Boolean>(request, delegateListener) {
@ -203,12 +201,9 @@ public class JobManager extends AbstractComponent {
}
ClusterState innerPutJob(Job job, boolean overwrite, ClusterState currentState) {
PrelertMetadata currentPrelertMetadata = currentState.metaData().custom(PrelertMetadata.TYPE);
PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentPrelertMetadata);
PrelertMetadata.Builder builder = createPrelertMetadatBuilder(currentState);
builder.putJob(job, overwrite);
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build());
return newState.build();
return buildNewClusterState(currentState, builder);
}
/**
@ -254,52 +249,29 @@ public class JobManager extends AbstractComponent {
}
ClusterState removeJobFromClusterState(String jobId, ClusterState currentState) {
PrelertMetadata currentPrelertMetadata = currentState.metaData().custom(PrelertMetadata.TYPE);
PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentPrelertMetadata);
PrelertMetadata.Builder builder = createPrelertMetadatBuilder(currentState);
builder.removeJob(jobId);
Allocation allocation = currentPrelertMetadata.getAllocations().get(jobId);
if (allocation != null) {
SchedulerState schedulerState = allocation.getSchedulerState();
if (schedulerState != null && schedulerState.getStatus() != JobSchedulerStatus.STOPPED) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.JOB_CANNOT_DELETE_WHILE_SCHEDULER_RUNS, jobId));
}
if (!allocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(
Messages.JOB_CANNOT_DELETE_WHILE_RUNNING, jobId, allocation.getStatus()));
}
}
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build());
return newState.build();
return buildNewClusterState(currentState, builder);
}
private void checkJobIsScheduled(Job job) {
if (job.getSchedulerConfig() == null) {
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_SCHEDULER_NO_SUCH_SCHEDULED_JOB, job.getId()));
}
}
public Optional<SchedulerState> getSchedulerState(String jobId) {
Job job = getJobOrThrowIfUnknown(clusterService.state(), jobId);
if (job.getSchedulerConfig() == null) {
return Optional.empty();
}
Allocation allocation = getAllocation(clusterService.state(), jobId);
return Optional.ofNullable(allocation.getSchedulerState());
public Optional<SchedulerStatus> getSchedulerStatus(String jobId) {
PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE);
return Optional.ofNullable(prelertMetadata.getSchedulerStatuses().get(jobId));
}
public void updateSchedulerStatus(UpdateJobSchedulerStatusAction.Request request,
ActionListener<UpdateJobSchedulerStatusAction.Response> actionListener) {
String jobId = request.getJobId();
JobSchedulerStatus newStatus = request.getSchedulerStatus();
SchedulerStatus newStatus = request.getSchedulerStatus();
clusterService.submitStateUpdateTask("update-scheduler-status-job-" + jobId,
new AckedClusterStateUpdateTask<UpdateJobSchedulerStatusAction.Response>(request, actionListener) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return innerUpdateSchedulerState(currentState, jobId, newStatus, null, null);
PrelertMetadata.Builder builder = createPrelertMetadatBuilder(currentState);
builder.updateSchedulerStatus(jobId, newStatus);
return buildNewClusterState(currentState, builder);
}
@Override
@ -309,33 +281,6 @@ public class JobManager extends AbstractComponent {
});
}
private ClusterState innerUpdateSchedulerState(ClusterState currentState, String jobId, JobSchedulerStatus status,
Long startTime, Long endTime) {
Job job = getJobOrThrowIfUnknown(currentState, jobId);
checkJobIsScheduled(job);
Allocation allocation = getAllocation(currentState, jobId);
if (allocation.getSchedulerState() == null && status != JobSchedulerStatus.STARTED) {
throw new IllegalArgumentException("Can't change status to [" + status + "], because job's [" + jobId +
"] scheduler never started");
}
SchedulerState existingState = allocation.getSchedulerState();
if (existingState != null) {
if (startTime == null) {
startTime = existingState.getStartTimeMillis();
}
if (endTime == null) {
endTime = existingState.getEndTimeMillis();
}
}
existingState = new SchedulerState(status, startTime, endTime);
Allocation.Builder builder = new Allocation.Builder(allocation);
builder.setSchedulerState(existingState);
return innerUpdateAllocation(builder.build(), currentState);
}
private Allocation getAllocation(ClusterState state, String jobId) {
PrelertMetadata prelertMetadata = state.metaData().custom(PrelertMetadata.TYPE);
Allocation allocation = prelertMetadata.getAllocations().get(jobId);
@ -345,15 +290,6 @@ public class JobManager extends AbstractComponent {
return allocation;
}
private ClusterState innerUpdateAllocation(Allocation newAllocation, ClusterState currentState) {
PrelertMetadata currentPrelertMetadata = currentState.metaData().custom(PrelertMetadata.TYPE);
PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentPrelertMetadata);
builder.updateAllocation(newAllocation.getJobId(), newAllocation);
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build());
return newState.build();
}
public Auditor audit(String jobId) {
return jobProvider.audit(jobId);
}
@ -463,4 +399,15 @@ public class JobManager extends AbstractComponent {
jobResultsPersister.commitWrites(jobId);
}
private static PrelertMetadata.Builder createPrelertMetadatBuilder(ClusterState currentState) {
PrelertMetadata currentPrelertMetadata = currentState.metaData().custom(PrelertMetadata.TYPE);
return new PrelertMetadata.Builder(currentPrelertMetadata);
}
private static ClusterState buildNewClusterState(ClusterState currentState, PrelertMetadata.Builder builder) {
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(PrelertMetadata.TYPE, builder.build()).build());
return newState.build();
}
}

View File

@ -15,11 +15,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
@ -31,9 +27,8 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
private static final ParseField IGNORE_DOWNTIME_FIELD = new ParseField("ignore_downtime");
public static final ParseField STATUS = new ParseField("status");
public static final ParseField STATUS_REASON = new ParseField("status_reason");
public static final ParseField SCHEDULER_STATE = new ParseField("scheduler_state");
static final Allocation PROTO = new Allocation(null, null, false, null, null, null);
static final Allocation PROTO = new Allocation(null, null, false, null, null);
static final ObjectParser<Builder, ParseFieldMatcherSupplier> PARSER = new ObjectParser<>("allocation", Builder::new);
@ -43,7 +38,6 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
PARSER.declareBoolean(Builder::setIgnoreDowntime, IGNORE_DOWNTIME_FIELD);
PARSER.declareField(Builder::setStatus, (p, c) -> JobStatus.fromString(p.text()), STATUS, ObjectParser.ValueType.STRING);
PARSER.declareString(Builder::setStatusReason, STATUS_REASON);
PARSER.declareObject(Builder::setSchedulerState, SchedulerState.PARSER, SCHEDULER_STATE);
}
private final String nodeId;
@ -51,16 +45,13 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
private final boolean ignoreDowntime;
private final JobStatus status;
private final String statusReason;
private final SchedulerState schedulerState;
public Allocation(String nodeId, String jobId, boolean ignoreDowntime, JobStatus status, String statusReason,
SchedulerState schedulerState) {
public Allocation(String nodeId, String jobId, boolean ignoreDowntime, JobStatus status, String statusReason) {
this.nodeId = nodeId;
this.jobId = jobId;
this.ignoreDowntime = ignoreDowntime;
this.status = status;
this.statusReason = statusReason;
this.schedulerState = schedulerState;
}
public Allocation(StreamInput in) throws IOException {
@ -69,7 +60,6 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
this.ignoreDowntime = in.readBoolean();
this.status = JobStatus.fromStream(in);
this.statusReason = in.readOptionalString();
this.schedulerState = in.readOptionalWriteable(SchedulerState::new);
}
public String getNodeId() {
@ -97,10 +87,6 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
return statusReason;
}
public SchedulerState getSchedulerState() {
return schedulerState;
}
@Override
public Allocation readFrom(StreamInput in) throws IOException {
return new Allocation(in);
@ -113,7 +99,6 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
out.writeBoolean(ignoreDowntime);
status.writeTo(out);
out.writeOptionalString(statusReason);
out.writeOptionalWriteable(schedulerState);
}
@Override
@ -128,9 +113,6 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
if (statusReason != null) {
builder.field(STATUS_REASON.getPreferredName(), statusReason);
}
if (schedulerState != null) {
builder.field(SCHEDULER_STATE.getPreferredName(), schedulerState);
}
builder.endObject();
return builder;
}
@ -144,13 +126,12 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
Objects.equals(jobId, that.jobId) &&
Objects.equals(ignoreDowntime, that.ignoreDowntime) &&
Objects.equals(status, that.status) &&
Objects.equals(statusReason, that.statusReason) &&
Objects.equals(schedulerState, that.schedulerState);
Objects.equals(statusReason, that.statusReason);
}
@Override
public int hashCode() {
return Objects.hash(nodeId, jobId, ignoreDowntime, status, statusReason, schedulerState);
return Objects.hash(nodeId, jobId, ignoreDowntime, status, statusReason);
}
// Class alreadt extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
@ -175,16 +156,12 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
private boolean ignoreDowntime;
private JobStatus status;
private String statusReason;
private SchedulerState schedulerState;
public Builder() {
}
public Builder(Job job) {
this.jobId = job.getId();
if (job.getSchedulerConfig() != null) {
schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, null, null);
}
}
public Builder(Allocation allocation) {
@ -193,7 +170,6 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
this.ignoreDowntime = allocation.ignoreDowntime;
this.status = allocation.status;
this.statusReason = allocation.statusReason;
this.schedulerState = allocation.schedulerState;
}
public void setNodeId(String nodeId) {
@ -237,34 +213,8 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
this.statusReason = statusReason;
}
public void setSchedulerState(SchedulerState newSchedulerState) {
if (this.schedulerState != null){
JobSchedulerStatus currentSchedulerStatus = this.schedulerState.getStatus();
JobSchedulerStatus newSchedulerStatus = newSchedulerState.getStatus();
switch (newSchedulerStatus) {
case STARTED:
if (currentSchedulerStatus != JobSchedulerStatus.STOPPED) {
String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_START, jobId, newSchedulerStatus);
throw ExceptionsHelper.conflictStatusException(msg);
}
break;
case STOPPED:
if (currentSchedulerStatus != JobSchedulerStatus.STARTED) {
String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId,
newSchedulerStatus);
throw ExceptionsHelper.conflictStatusException(msg);
}
break;
default:
throw new IllegalArgumentException("Invalid requested job scheduler status: " + newSchedulerStatus);
}
}
this.schedulerState = newSchedulerState;
}
public Allocation build() {
return new Allocation(nodeId, jobId, ignoreDowntime, status, statusReason, schedulerState);
return new Allocation(nodeId, jobId, ignoreDowntime, status, statusReason);
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.prelert.job.metadata;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.MetaData;
@ -15,13 +16,14 @@ import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
@ -33,6 +35,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
public class PrelertMetadata implements MetaData.Custom {
@ -40,7 +43,8 @@ public class PrelertMetadata implements MetaData.Custom {
private static final ParseField ALLOCATIONS_FIELD = new ParseField("allocations");
public static final String TYPE = "prelert";
public static final PrelertMetadata PROTO = new PrelertMetadata(Collections.emptySortedMap(), Collections.emptySortedMap());
public static final PrelertMetadata PROTO = new PrelertMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(),
Collections.emptySortedMap());
private static final ObjectParser<Builder, ParseFieldMatcherSupplier> PRELERT_METADATA_PARSER = new ObjectParser<>("prelert_metadata",
Builder::new);
@ -54,10 +58,13 @@ public class PrelertMetadata implements MetaData.Custom {
// performance issue will occur if we don't change that
private final SortedMap<String, Job> jobs;
private final SortedMap<String, Allocation> allocations;
private final SortedMap<String, SchedulerStatus> schedulerStatuses;
private PrelertMetadata(SortedMap<String, Job> jobs, SortedMap<String, Allocation> allocations) {
private PrelertMetadata(SortedMap<String, Job> jobs, SortedMap<String, Allocation> allocations,
SortedMap<String, SchedulerStatus> schedulerStatuses) {
this.jobs = Collections.unmodifiableSortedMap(jobs);
this.allocations = Collections.unmodifiableSortedMap(allocations);
this.schedulerStatuses = Collections.unmodifiableSortedMap(schedulerStatuses);
}
public Map<String, Job> getJobs() {
@ -70,6 +77,10 @@ public class PrelertMetadata implements MetaData.Custom {
return allocations;
}
public SortedMap<String, SchedulerStatus> getSchedulerStatuses() {
return schedulerStatuses;
}
@Override
public String type() {
return TYPE;
@ -109,7 +120,12 @@ public class PrelertMetadata implements MetaData.Custom {
for (int i = 0; i < size; i++) {
allocations.put(in.readString(), Allocation.PROTO.readFrom(in));
}
return new PrelertMetadata(jobs, allocations);
size = in.readVInt();
TreeMap<String, SchedulerStatus> schedulerStatuses = new TreeMap<>();
for (int i = 0; i < size; i++) {
schedulerStatuses.put(in.readString(), SchedulerStatus.fromStream(in));
}
return new PrelertMetadata(jobs, allocations, schedulerStatuses);
}
@Override
@ -124,6 +140,11 @@ public class PrelertMetadata implements MetaData.Custom {
out.writeString(entry.getKey());
entry.getValue().writeTo(out);
}
out.writeVInt(schedulerStatuses.size());
for (Map.Entry<String, SchedulerStatus> entry : schedulerStatuses.entrySet()) {
out.writeString(entry.getKey());
entry.getValue().writeTo(out);
}
}
@Override
@ -145,28 +166,83 @@ public class PrelertMetadata implements MetaData.Custom {
final Diff<Map<String, Job>> jobs;
final Diff<Map<String, Allocation>> allocations;
final Diff<Map<String, SchedulerStatusDiff>> schedulerStatuses;
PrelertMetadataDiff(PrelertMetadata before, PrelertMetadata after) {
this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer());
this.allocations = DiffableUtils.diff(before.allocations, after.allocations, DiffableUtils.getStringKeySerializer());
this.schedulerStatuses = DiffableUtils.diff(
toSchedulerDiff(before.schedulerStatuses),
toSchedulerDiff(after.schedulerStatuses),
DiffableUtils.getStringKeySerializer());
}
PrelertMetadataDiff(StreamInput in) throws IOException {
jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job.PROTO);
allocations = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Allocation.PROTO);
schedulerStatuses = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), SchedulerStatusDiff.PROTO);
}
@Override
public MetaData.Custom apply(MetaData.Custom part) {
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((PrelertMetadata) part).jobs));
TreeMap<String, Allocation> newAllocations = new TreeMap<>(allocations.apply(((PrelertMetadata) part).allocations));
return new PrelertMetadata(newJobs, newAllocations);
Map<String, SchedulerStatusDiff> newSchedulerStatuses =
schedulerStatuses.apply(toSchedulerDiff((((PrelertMetadata) part)).schedulerStatuses));
return new PrelertMetadata(newJobs, newAllocations, new TreeMap<>(toSchedulerStatusMap(newSchedulerStatuses)));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
jobs.writeTo(out);
allocations.writeTo(out);
schedulerStatuses.writeTo(out);
}
private static Map<String, SchedulerStatusDiff> toSchedulerDiff(Map<String, SchedulerStatus> from) {
return from.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, (entry) -> new SchedulerStatusDiff(entry.getValue())));
}
private static Map<String, SchedulerStatus> toSchedulerStatusMap(Map<String, SchedulerStatusDiff> from) {
return from.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, (entry) -> entry.getValue().status));
}
// SchedulerStatus is enum and that can't extend from anything
static class SchedulerStatusDiff extends AbstractDiffable<SchedulerStatusDiff> implements Writeable {
static SchedulerStatusDiff PROTO = new SchedulerStatusDiff(null);
private final SchedulerStatus status;
SchedulerStatusDiff(SchedulerStatus status) {
this.status = status;
}
@Override
public SchedulerStatusDiff readFrom(StreamInput in) throws IOException {
return new SchedulerStatusDiff(SchedulerStatus.fromStream(in));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
status.writeTo(out);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SchedulerStatusDiff that = (SchedulerStatusDiff) o;
return status == that.status;
}
@Override
public int hashCode() {
return Objects.hash(status);
}
}
}
@ -177,27 +253,32 @@ public class PrelertMetadata implements MetaData.Custom {
if (o == null || getClass() != o.getClass())
return false;
PrelertMetadata that = (PrelertMetadata) o;
return Objects.equals(jobs, that.jobs) && Objects.equals(allocations, that.allocations);
return Objects.equals(jobs, that.jobs) &&
Objects.equals(allocations, that.allocations) &&
Objects.equals(schedulerStatuses, that.schedulerStatuses);
}
@Override
public int hashCode() {
return Objects.hash(jobs, allocations);
return Objects.hash(jobs, allocations, schedulerStatuses);
}
public static class Builder {
private TreeMap<String, Job> jobs;
private TreeMap<String, Allocation> allocations;
private TreeMap<String, SchedulerStatus> schedulerStatuses;
public Builder() {
this.jobs = new TreeMap<>();
this.allocations = new TreeMap<>();
this.schedulerStatuses = new TreeMap<>();
}
public Builder(PrelertMetadata previous) {
jobs = new TreeMap<>(previous.jobs);
allocations = new TreeMap<>(previous.allocations);
schedulerStatuses = new TreeMap<>(previous.schedulerStatuses);
}
public Builder putJob(Job job, boolean overwrite) {
@ -212,6 +293,9 @@ public class PrelertMetadata implements MetaData.Custom {
builder.setStatus(JobStatus.CLOSED);
allocations.put(job.getId(), builder.build());
}
if (job.getSchedulerConfig() != null) {
schedulerStatuses.put(job.getId(), SchedulerStatus.STOPPED);
}
return this;
}
@ -219,7 +303,20 @@ public class PrelertMetadata implements MetaData.Custom {
if (jobs.remove(jobId) == null) {
throw new ResourceNotFoundException("job [" + jobId + "] does not exist");
}
this.allocations.remove(jobId);
Allocation previousAllocation = this.allocations.remove(jobId);
if (previousAllocation != null) {
if (!previousAllocation.getStatus().isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(
Messages.JOB_CANNOT_DELETE_WHILE_RUNNING, jobId, previousAllocation.getStatus()));
}
}
SchedulerStatus previousStatus = this.schedulerStatuses.remove(jobId);
if (previousStatus != null) {
if (previousStatus != SchedulerStatus.STOPPED) {
String message = Messages.getMessage(Messages.JOB_CANNOT_DELETE_WHILE_SCHEDULER_RUNS, jobId);
throw ExceptionsHelper.conflictStatusException(message);
}
}
return this;
}
@ -253,7 +350,7 @@ public class PrelertMetadata implements MetaData.Custom {
}
public PrelertMetadata build() {
return new PrelertMetadata(jobs, allocations);
return new PrelertMetadata(jobs, allocations, schedulerStatuses);
}
public Builder assignToNode(String jobId, String nodeId) {
@ -296,6 +393,32 @@ public class PrelertMetadata implements MetaData.Custom {
allocations.put(jobId, builder.build());
return this;
}
public Builder updateSchedulerStatus(String jobId, SchedulerStatus newStatus) {
SchedulerStatus currentStatus = schedulerStatuses.get(jobId);
if (currentStatus == null) {
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_SCHEDULER_NO_SUCH_SCHEDULED_JOB, jobId));
}
switch (newStatus) {
case STARTED:
if (currentStatus != SchedulerStatus.STOPPED) {
String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_START, jobId, newStatus);
throw ExceptionsHelper.conflictStatusException(msg);
}
break;
case STOPPED:
if (currentStatus != SchedulerStatus.STARTED) {
String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, newStatus);
throw ExceptionsHelper.conflictStatusException(msg);
}
break;
default:
throw new IllegalArgumentException("[" + jobId + "] invalid requested job scheduler status [" + newStatus + "]");
}
schedulerStatuses.put(jobId, newStatus);
return this;
}
}
}

View File

@ -14,7 +14,6 @@ import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xpack.prelert.action.FlushJobAction;
import org.elasticsearch.xpack.prelert.action.JobDataAction;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
@ -60,11 +59,9 @@ class ScheduledJob {
}
}
Long runLookBack(SchedulerState schedulerState) throws Exception {
long startMs = schedulerState.getStartTimeMillis();
lookbackStartTimeMs = (lastEndTimeMs != null && lastEndTimeMs + 1 > startMs) ? lastEndTimeMs + 1 : startMs;
Optional<Long> endMs = Optional.ofNullable(schedulerState.getEndTimeMillis());
Long runLookBack(long startTime, Long endTime) throws Exception {
lookbackStartTimeMs = (lastEndTimeMs != null && lastEndTimeMs + 1 > startTime) ? lastEndTimeMs + 1 : startTime;
Optional<Long> endMs = Optional.ofNullable(endTime);
long lookbackEnd = endMs.orElse(currentTimeSupplier.get() - queryDelayMs);
boolean isLookbackOnly = endMs.isPresent();
if (lookbackEnd <= lookbackStartTimeMs) {
@ -115,7 +112,7 @@ class ScheduledJob {
return running;
}
private void run(long start, long end, FlushJobAction.Request flushRequest) throws IOException {
private void run(long start, Long end, FlushJobAction.Request flushRequest) throws IOException {
if (end <= start) {
return;
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -20,18 +21,19 @@ import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction;
import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.config.DefaultFrequency;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.time.Duration;
import java.util.Objects;
@ -42,32 +44,35 @@ import java.util.function.Supplier;
public class ScheduledJobRunner extends AbstractComponent {
private final Client client;
private final ClusterService clusterService;
private final JobProvider jobProvider;
private final DataExtractorFactory dataExtractorFactory;
private final ThreadPool threadPool;
private final Supplier<Long> currentTimeSupplier;
public ScheduledJobRunner(ThreadPool threadPool, Client client, JobProvider jobProvider, DataExtractorFactory dataExtractorFactory,
Supplier<Long> currentTimeSupplier) {
public ScheduledJobRunner(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider,
DataExtractorFactory dataExtractorFactory, Supplier<Long> currentTimeSupplier) {
super(Settings.EMPTY);
this.threadPool = threadPool;
this.clusterService = Objects.requireNonNull(clusterService);
this.client = Objects.requireNonNull(client);
this.jobProvider = Objects.requireNonNull(jobProvider);
this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory);
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
}
public void run(Job job, SchedulerState schedulerState, Allocation allocation, StartJobSchedulerAction.SchedulerTask task,
Consumer<Exception> handler) {
validate(job, allocation);
public void run(String jobId, long startTime, Long endTime, StartJobSchedulerAction.SchedulerTask task, Consumer<Exception> handler) {
PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE);
validate(jobId, prelertMetadata);
setJobSchedulerStatus(job.getId(), JobSchedulerStatus.STARTED, error -> {
logger.info("Starting scheduler [{}]", schedulerState);
setJobSchedulerStatus(jobId, SchedulerStatus.STARTED, error -> {
logger.info("[{}] Starting scheduler", jobId);
Job job = prelertMetadata.getJobs().get(jobId);
Holder holder = createJobScheduler(job, task, handler);
task.setHolder(holder);
holder.future = threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).submit(() -> {
try {
Long next = holder.scheduledJob.runLookBack(schedulerState);
Long next = holder.scheduledJob.runLookBack(startTime, endTime);
if (next != null) {
doScheduleRealtime(next, job.getId(), holder);
} else {
@ -124,19 +129,26 @@ public class ScheduledJobRunner extends AbstractComponent {
}
}
public static void validate(Job job, Allocation allocation) {
public static void validate(String jobId, PrelertMetadata prelertMetadata) {
Job job = prelertMetadata.getJobs().get(jobId);
if (job == null) {
throw ExceptionsHelper.missingJobException(jobId);
}
if (job.getSchedulerConfig() == null) {
throw new IllegalArgumentException("job [" + job.getId() + "] is not a scheduled job");
}
Allocation allocation = prelertMetadata.getAllocations().get(jobId);
if (allocation.getStatus() != JobStatus.OPENED) {
throw new ElasticsearchStatusException("cannot start scheduler, expected job status [{}], but got [{}]",
RestStatus.CONFLICT, JobStatus.OPENED, allocation.getStatus());
}
if (allocation.getSchedulerState().getStatus() != JobSchedulerStatus.STOPPED) {
SchedulerStatus status = prelertMetadata.getSchedulerStatuses().get(jobId);
if (status != SchedulerStatus.STOPPED) {
throw new ElasticsearchStatusException("scheduler already started, expected scheduler status [{}], but got [{}]",
RestStatus.CONFLICT, JobSchedulerStatus.STOPPED, allocation.getSchedulerState().getStatus());
RestStatus.CONFLICT, SchedulerStatus.STOPPED, status);
}
}
@ -191,7 +203,7 @@ public class ScheduledJobRunner extends AbstractComponent {
return new TimeValue(Math.max(1, next - currentTimeSupplier.get()));
}
private void setJobSchedulerStatus(String jobId, JobSchedulerStatus status, Consumer<Exception> supplier) {
private void setJobSchedulerStatus(String jobId, SchedulerStatus status, Consumer<Exception> supplier) {
UpdateJobSchedulerStatusAction.Request request = new UpdateJobSchedulerStatusAction.Request(jobId, status);
client.execute(UpdateJobSchedulerStatusAction.INSTANCE, request, new ActionListener<UpdateJobSchedulerStatusAction.Response>() {
@Override
@ -235,7 +247,7 @@ public class ScheduledJobRunner extends AbstractComponent {
logger.info("Stopping scheduler for job [{}]", jobId);
scheduledJob.stop();
FutureUtils.cancel(future);
setJobSchedulerStatus(jobId, JobSchedulerStatus.STOPPED, error -> handler.accept(null));
setJobSchedulerStatus(jobId, SchedulerStatus.STOPPED, error -> handler.accept(null));
}
}

View File

@ -26,11 +26,8 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunner;
import java.io.IOException;
@ -39,14 +36,11 @@ public class RestStartJobSchedulerAction extends BaseRestHandler {
private static final String DEFAULT_START = "0";
private final JobManager jobManager;
private final ClusterService clusterService;
@Inject
public RestStartJobSchedulerAction(Settings settings, RestController controller, JobManager jobManager,
ClusterService clusterService) {
public RestStartJobSchedulerAction(Settings settings, RestController controller, ClusterService clusterService) {
super(settings);
this.jobManager = jobManager;
this.clusterService = clusterService;
controller.registerHandler(RestRequest.Method.POST,
PrelertPlugin.BASE_PATH + "schedulers/{" + Job.ID.getPreferredName() + "}/_start", this);
@ -59,9 +53,8 @@ public class RestStartJobSchedulerAction extends BaseRestHandler {
// This validation happens also in ScheduledJobRunner, the reason we do it here too is that if it fails there
// we are unable to provide the user immediate feedback. We would create the task and the validation would fail
// in the background, whereas now the validation failure is part of the response being returned.
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
Allocation allocation = jobManager.getJobAllocation(jobId);
ScheduledJobRunner.validate(job, allocation);
PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE);
ScheduledJobRunner.validate(jobId, prelertMetadata);
StartJobSchedulerAction.Request jobSchedulerRequest;
if (RestActions.hasBodyContent(restRequest)) {
@ -69,15 +62,15 @@ public class RestStartJobSchedulerAction extends BaseRestHandler {
XContentParser parser = XContentFactory.xContent(bodyBytes).createParser(bodyBytes);
jobSchedulerRequest = StartJobSchedulerAction.Request.parseRequest(jobId, parser, () -> parseFieldMatcher);
} else {
long startTimeMillis = parseDateOrThrow(restRequest.param(SchedulerState.START_TIME_MILLIS.getPreferredName(), DEFAULT_START),
SchedulerState.START_TIME_MILLIS.getPreferredName());
long startTimeMillis = parseDateOrThrow(restRequest.param(StartJobSchedulerAction.START_TIME.getPreferredName(),
DEFAULT_START), StartJobSchedulerAction.START_TIME.getPreferredName());
Long endTimeMillis = null;
if (restRequest.hasParam(SchedulerState.END_TIME_MILLIS.getPreferredName())) {
endTimeMillis = parseDateOrThrow(restRequest.param(SchedulerState.END_TIME_MILLIS.getPreferredName()),
SchedulerState.END_TIME_MILLIS.getPreferredName());
if (restRequest.hasParam(StartJobSchedulerAction.END_TIME.getPreferredName())) {
endTimeMillis = parseDateOrThrow(restRequest.param(StartJobSchedulerAction.END_TIME.getPreferredName()),
StartJobSchedulerAction.END_TIME.getPreferredName());
}
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, startTimeMillis, endTimeMillis);
jobSchedulerRequest = new StartJobSchedulerAction.Request(jobId, schedulerState);
jobSchedulerRequest = new StartJobSchedulerAction.Request(jobId, startTimeMillis);
jobSchedulerRequest.setEndTime(endTimeMillis);
}
return sendTask(client.executeLocally(StartJobSchedulerAction.INSTANCE, jobSchedulerRequest, LoggingTaskListener.instance()));
}

View File

@ -13,13 +13,12 @@ import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.IgnoreDowntime;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelDebugConfig;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.transform.TransformConfig;
import org.elasticsearch.xpack.prelert.job.transform.TransformType;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
@ -86,10 +85,9 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase<GetJob
sizeStats = new ModelSizeStats.Builder("foo").build();
}
SchedulerState schedulerState = null;
SchedulerStatus schedulerStatus = null;
if (randomBoolean()) {
schedulerState = new SchedulerState(randomFrom(EnumSet.allOf(JobSchedulerStatus.class)), randomPositiveLong(),
randomPositiveLong());
schedulerStatus = randomFrom(SchedulerStatus.values());
}
JobStatus jobStatus = null;
@ -97,7 +95,7 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase<GetJob
jobStatus = randomFrom(EnumSet.allOf(JobStatus.class));
}
Response.JobInfo jobInfo = new Response.JobInfo(jobId, job, dataCounts, sizeStats, schedulerState, jobStatus);
Response.JobInfo jobInfo = new Response.JobInfo(jobId, job, dataCounts, sizeStats, schedulerStatus, jobStatus);
jobInfoList.add(jobInfo);
}

View File

@ -13,7 +13,6 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.plugins.Plugin;
@ -24,9 +23,8 @@ import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.junit.After;
@ -77,8 +75,8 @@ public class ScheduledJobsIT extends ESIntegTestCase {
OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get();
assertTrue(openJobResponse.isAcknowledged());
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, now);
StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", schedulerState);
StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", 0L);
startSchedulerRequest.setEndTime(now);
client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest).get();
assertBusy(() -> {
DataCounts dataCounts = getDataCounts("_job_id");
@ -86,8 +84,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
PrelertMetadata prelertMetadata = client().admin().cluster().prepareState().all().get()
.getState().metaData().custom(PrelertMetadata.TYPE);
assertThat(prelertMetadata.getAllocations().get("_job_id").getSchedulerState().getStatus(),
equalTo(JobSchedulerStatus.STOPPED));
assertThat(prelertMetadata.getSchedulerStatuses().get("_job_id"), equalTo(SchedulerStatus.STOPPED));
});
}
@ -110,9 +107,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
AtomicReference<Throwable> errorHolder = new AtomicReference<>();
Thread t = new Thread(() -> {
try {
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, null);
StartJobSchedulerAction.Request startSchedulerRequest =
new StartJobSchedulerAction.Request("_job_id", schedulerState);
StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", 0L);
client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest).get();
} catch (Exception | AssertionError e) {
errorHolder.set(e);
@ -138,8 +133,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
assertBusy(() -> {
PrelertMetadata prelertMetadata = client().admin().cluster().prepareState().all().get()
.getState().metaData().custom(PrelertMetadata.TYPE);
assertThat(prelertMetadata.getAllocations().get("_job_id").getSchedulerState().getStatus(),
equalTo(JobSchedulerStatus.STOPPED));
assertThat(prelertMetadata.getSchedulerStatuses().get("_job_id"), equalTo(SchedulerStatus.STOPPED));
});
assertThat(errorHolder.get(), nullValue());
}
@ -217,7 +211,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
} catch (Exception e) {
fail();
}
assertThat(r.getResponse().results().get(0).getSchedulerState().getStatus(), equalTo(JobSchedulerStatus.STOPPED));
assertThat(r.getResponse().results().get(0).getSchedulerStatus(), equalTo(SchedulerStatus.STOPPED));
});
} catch (Exception e) {
// ignore

View File

@ -8,16 +8,17 @@ package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction.Request;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableXContentTestCase;
public class StartJobSchedulerActionRequestTests extends AbstractStreamableXContentTestCase<StartJobSchedulerAction.Request> {
@Override
protected Request createTestInstance() {
SchedulerState state = new SchedulerState(JobSchedulerStatus.STARTED, randomLong(), randomLong());
return new Request(randomAsciiOfLengthBetween(1, 20), state);
Request request = new Request(randomAsciiOfLength(10), randomPositiveLong());
if (randomBoolean()) {
request.setEndTime(randomPositiveLong());
}
return request;
}
@Override

View File

@ -6,14 +6,14 @@
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction.Request;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerStatus;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
public class UpdateJobSchedulerStatusRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAsciiOfLengthBetween(1, 20), randomFrom(JobSchedulerStatus.values()));
return new Request(randomAsciiOfLengthBetween(1, 20), randomFrom(SchedulerStatus.values()));
}
@Override

View File

@ -125,7 +125,7 @@ public class ScheduledJobIT extends ESRestTestCase {
() -> client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/" + jobId));
response = e.getResponse();
assertThat(response.getStatusLine().getStatusCode(), equalTo(409));
assertThat(responseEntityToString(response), containsString("Cannot delete job '" + jobId + "' while the scheduler is running"));
assertThat(responseEntityToString(response), containsString("Cannot delete job '" + jobId + "' while it is OPENED"));
response = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_stop");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));

View File

@ -10,13 +10,13 @@ import org.elasticsearch.test.ESTestCase;
public class JobSchedulerStatusTests extends ESTestCase {
public void testForString() {
assertEquals(JobSchedulerStatus.fromString("started"), JobSchedulerStatus.STARTED);
assertEquals(JobSchedulerStatus.fromString("stopped"), JobSchedulerStatus.STOPPED);
assertEquals(SchedulerStatus.fromString("started"), SchedulerStatus.STARTED);
assertEquals(SchedulerStatus.fromString("stopped"), SchedulerStatus.STOPPED);
}
public void testValidOrdinals() {
assertEquals(0, JobSchedulerStatus.STARTED.ordinal());
assertEquals(1, JobSchedulerStatus.STOPPED.ordinal());
assertEquals(0, SchedulerStatus.STARTED.ordinal());
assertEquals(1, SchedulerStatus.STOPPED.ordinal());
}
}

View File

@ -1,75 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase;
public class SchedulerStateTests extends AbstractSerializingTestCase<SchedulerState> {
@Override
protected SchedulerState createTestInstance() {
return new SchedulerState(randomFrom(JobSchedulerStatus.values()), randomPositiveLong(),
randomBoolean() ? null : randomPositiveLong());
}
@Override
protected Writeable.Reader<SchedulerState> instanceReader() {
return SchedulerState::new;
}
@Override
protected SchedulerState parseInstance(XContentParser parser, ParseFieldMatcher matcher) {
return SchedulerState.PARSER.apply(parser, () -> matcher);
}
public void testEquals_GivenDifferentClass() {
assertFalse(new SchedulerState(JobSchedulerStatus.STARTED, 0L, null).equals("a string"));
}
public void testEquals_GivenSameReference() {
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 18L, 42L);
assertTrue(schedulerState.equals(schedulerState));
}
public void testEquals_GivenEqualObjects() {
SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, 18L, 42L);
SchedulerState schedulerState2 = new SchedulerState(schedulerState1.getStatus(), schedulerState1.getStartTimeMillis(),
schedulerState1.getEndTimeMillis());
assertTrue(schedulerState1.equals(schedulerState2));
assertTrue(schedulerState2.equals(schedulerState1));
assertEquals(schedulerState1.hashCode(), schedulerState2.hashCode());
}
public void testEquals_GivenDifferentStatus() {
SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, 18L, 42L);
SchedulerState schedulerState2 = new SchedulerState(JobSchedulerStatus.STOPPED, schedulerState1.getStartTimeMillis(),
schedulerState1.getEndTimeMillis());
assertFalse(schedulerState1.equals(schedulerState2));
assertFalse(schedulerState2.equals(schedulerState1));
}
public void testEquals_GivenDifferentStartTimeMillis() {
SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, null, 42L);
SchedulerState schedulerState2 = new SchedulerState(JobSchedulerStatus.STOPPED, 19L, schedulerState1.getEndTimeMillis());
assertFalse(schedulerState1.equals(schedulerState2));
assertFalse(schedulerState2.equals(schedulerState1));
}
public void testEquals_GivenDifferentEndTimeMillis() {
SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, 18L, 42L);
SchedulerState schedulerState2 = new SchedulerState(JobSchedulerStatus.STOPPED, schedulerState1.getStartTimeMillis(), 43L);
assertFalse(schedulerState1.equals(schedulerState2));
assertFalse(schedulerState2.equals(schedulerState1));
}
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.prelert.job.manager;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -34,7 +33,6 @@ import java.util.stream.Collectors;
import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -193,23 +191,6 @@ public class JobManagerTests extends ESTestCase {
assertThat(result.results().get(0).getId(), equalTo("9"));
}
public void testInnerPutJob() {
JobManager jobManager = createJobManager();
ClusterState cs = createClusterState();
Job job1 = buildJobBuilder("_id").build();
ClusterState result1 = jobManager.innerPutJob(job1, false, cs);
PrelertMetadata pm = result1.getMetaData().custom(PrelertMetadata.TYPE);
assertThat(pm.getJobs().get("_id"), sameInstance(job1));
Job job2 = buildJobBuilder("_id").build();
expectThrows(ResourceAlreadyExistsException.class, () -> jobManager.innerPutJob(job2, false, result1));
ClusterState result2 = jobManager.innerPutJob(job2, true, result1);
pm = result2.getMetaData().custom(PrelertMetadata.TYPE);
assertThat(pm.getJobs().get("_id"), sameInstance(job2));
}
private JobManager createJobManager() {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class);

View File

@ -8,9 +8,7 @@ package org.elasticsearch.xpack.prelert.job.metadata;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase;
public class AllocationTests extends AbstractSerializingTestCase<Allocation> {
@ -22,8 +20,7 @@ public class AllocationTests extends AbstractSerializingTestCase<Allocation> {
boolean ignoreDowntime = randomBoolean();
JobStatus jobStatus = randomFrom(JobStatus.values());
String statusReason = randomBoolean() ? randomAsciiOfLength(10) : null;
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, randomPositiveLong(), randomPositiveLong());
return new Allocation(nodeId, jobId, ignoreDowntime, jobStatus, statusReason, schedulerState);
return new Allocation(nodeId, jobId, ignoreDowntime, jobStatus, statusReason);
}
@Override
@ -37,7 +34,7 @@ public class AllocationTests extends AbstractSerializingTestCase<Allocation> {
}
public void testUnsetIgnoreDownTime() {
Allocation allocation = new Allocation("_node_id", "_job_id", true, JobStatus.OPENING, null, null);
Allocation allocation = new Allocation("_node_id", "_job_id", true, JobStatus.OPENING, null);
assertTrue(allocation.isIgnoreDowntime());
Allocation.Builder builder = new Allocation.Builder(allocation);
builder.setStatus(JobStatus.OPENED);

View File

@ -17,13 +17,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig;
import org.junit.Before;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import static org.mockito.Mockito.doAnswer;
@ -181,31 +176,4 @@ public class JobAllocatorTests extends ESTestCase {
verify(clusterService, times(1)).submitStateUpdateTask(any(), any());
}
public void testScheduledJobHasDefaultSchedulerState() {
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
SchedulerConfig.Builder schedulerConfigBuilder = new SchedulerConfig.Builder(Collections.singletonList("foo"),
Collections.singletonList("bar"));
Job.Builder jobBuilder = buildJobBuilder("_job_id");
jobBuilder.setSchedulerConfig(schedulerConfigBuilder);
DataDescription.Builder dataDescriptionBuilder = new DataDescription.Builder();
dataDescriptionBuilder.setFormat(DataDescription.DataFormat.ELASTICSEARCH);
jobBuilder.setDataDescription(dataDescriptionBuilder);
pmBuilder.putJob(jobBuilder.build(), false);
ClusterState cs = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_id", new LocalTransportAddress("_id"), Version.CURRENT))
.masterNodeId("_id")
.localNodeId("_id"))
.build();
ClusterState clusterStateWithAllocation = jobAllocator.assignJobsToNodes(cs);
PrelertMetadata metadata = clusterStateWithAllocation.metaData().custom(PrelertMetadata.TYPE);
assertEquals(JobSchedulerStatus.STOPPED, metadata.getAllocations().get("_job_id").getSchedulerState().getStatus());
}
}

View File

@ -27,6 +27,7 @@ import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
public class PrelertMetadataTests extends ESTestCase {
@ -90,20 +91,29 @@ public class PrelertMetadataTests extends ESTestCase {
}
public void testPutJob() {
Job job1 = buildJobBuilder("1").build();
Job job2 = buildJobBuilder("2").build();
PrelertMetadata.Builder builder = new PrelertMetadata.Builder();
builder.putJob(buildJobBuilder("1").build(), false);
builder.putJob(buildJobBuilder("2").build(), false);
ResourceAlreadyExistsException e = expectThrows(ResourceAlreadyExistsException.class,
() -> builder.putJob(buildJobBuilder("2").build(), false));
assertEquals("The job cannot be created with the Id '2'. The Id is already used.", e.getMessage());
builder.putJob(buildJobBuilder("2").build(), true);
builder.putJob(job1, false);
builder.putJob(job2, false);
PrelertMetadata result = builder.build();
assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getJobs().get("2"), sameInstance(job2));
builder = new PrelertMetadata.Builder(result);
PrelertMetadata.Builder builderReference = builder;
ResourceAlreadyExistsException e = expectThrows(ResourceAlreadyExistsException.class, () -> builderReference.putJob(job2, false));
assertEquals("The job cannot be created with the Id '2'. The Id is already used.", e.getMessage());
Job job2Attempt2 = buildJobBuilder("2").build();
builder.putJob(job2Attempt2, true);
result = builder.build();
assertThat(result.getJobs().size(), equalTo(2));
assertThat(result.getJobs().get("1"), notNullValue());
assertThat(result.getJobs().get("2"), notNullValue());
assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getJobs().get("2"), sameInstance(job2Attempt2));
}
public void testUpdateAllocation_setFinishedTime() {

View File

@ -9,6 +9,10 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@ -22,16 +26,14 @@ import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.data.DataProcessor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
@ -67,6 +69,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
private Client client;
private ActionFuture<JobDataAction.Response> jobDataFuture;
private ActionFuture<FlushJobAction.Response> flushJobFuture;
private ClusterService clusterService;
private ThreadPool threadPool;
private DataExtractorFactory dataExtractorFactory;
private ScheduledJobRunner scheduledJobRunner;
@ -78,6 +81,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
client = mock(Client.class);
jobDataFuture = mock(ActionFuture.class);
flushJobFuture = mock(ActionFuture.class);
clusterService = mock(ClusterService.class);
doAnswer(invocation -> {
@SuppressWarnings("unchecked")
ActionListener<Object> actionListener = (ActionListener) invocation.getArguments()[2];
@ -100,7 +104,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
scheduledJobRunner =
new ScheduledJobRunner(threadPool, client, jobProvider, dataExtractorFactory, () -> currentTime);
new ScheduledJobRunner(threadPool, client, clusterService,jobProvider, dataExtractorFactory, () -> currentTime);
when(jobProvider.audit(anyString())).thenReturn(auditor);
when(jobProvider.buckets(anyString(), any(BucketsQueryBuilder.BucketsQuery.class))).thenThrow(
@ -109,11 +113,14 @@ public class ScheduledJobRunnerTests extends ESTestCase {
public void testStart_GivenNewlyCreatedJobLoopBack() throws Exception {
Job.Builder builder = createScheduledJob();
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, 60000L);
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
Job job = builder.build();
Allocation allocation =
new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
PrelertMetadata prelertMetadata = new PrelertMetadata.Builder().putJob(job, false)
.updateStatus("foo", JobStatus.OPENED, null)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata))
.build());
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(job)).thenReturn(dataExtractor);
@ -123,24 +130,27 @@ public class ScheduledJobRunnerTests extends ESTestCase {
when(jobDataFuture.get()).thenReturn(new JobDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
StartJobSchedulerAction.SchedulerTask task = mock(StartJobSchedulerAction.SchedulerTask.class);
scheduledJobRunner.run(job, schedulerState, allocation, task, handler);
scheduledJobRunner.run("foo", 0L, 60000L, task, handler);
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo")));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STARTED)), any());
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any());
verify(client).execute(same(INSTANCE), eq(new Request("foo", SchedulerStatus.STARTED)), any());
verify(client).execute(same(INSTANCE), eq(new Request("foo", SchedulerStatus.STOPPED)), any());
}
public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception {
Job.Builder builder = createScheduledJob();
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, null);
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
Job job = builder.build();
Allocation allocation =
new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
PrelertMetadata prelertMetadata = new PrelertMetadata.Builder().putJob(job, false)
.updateStatus("foo", JobStatus.OPENED, null)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata))
.build());
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(job)).thenReturn(dataExtractor);
@ -151,13 +161,13 @@ public class ScheduledJobRunnerTests extends ESTestCase {
Consumer<Exception> handler = mockConsumer();
boolean cancelled = randomBoolean();
StartJobSchedulerAction.SchedulerTask task = new StartJobSchedulerAction.SchedulerTask(1, "type", "action", null, "foo");
scheduledJobRunner.run(job, schedulerState, allocation, task, handler);
scheduledJobRunner.run("foo", 0L, null, task, handler);
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME);
if (cancelled) {
task.stop();
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any());
verify(client).execute(same(INSTANCE), eq(new Request("foo", SchedulerStatus.STOPPED)), any());
} else {
verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo")));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
@ -183,19 +193,21 @@ public class ScheduledJobRunnerTests extends ESTestCase {
public void testValidate() {
Job job1 = buildJobBuilder("foo").build();
Exception e = expectThrows(IllegalArgumentException.class, () -> ScheduledJobRunner.validate(job1, null));
PrelertMetadata prelertMetadata1 = new PrelertMetadata.Builder().putJob(job1, false).build();
Exception e = expectThrows(IllegalArgumentException.class, () -> ScheduledJobRunner.validate("foo", prelertMetadata1));
assertThat(e.getMessage(), equalTo("job [foo] is not a scheduled job"));
Job job2 = createScheduledJob().build();
Allocation allocation1 =
new Allocation("_id", "_id", false, JobStatus.CLOSED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate(job2, allocation1));
PrelertMetadata prelertMetadata2 = new PrelertMetadata.Builder().putJob(job2, false).build();
e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate("foo", prelertMetadata2));
assertThat(e.getMessage(), equalTo("cannot start scheduler, expected job status [OPENED], but got [CLOSED]"));
Job job3 = createScheduledJob().build();
Allocation allocation2 =
new Allocation("_id", "_id", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STARTED, null, null));
e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate(job3, allocation2));
PrelertMetadata prelertMetadata3 = new PrelertMetadata.Builder().putJob(job3, false)
.updateStatus("foo", JobStatus.OPENED, null)
.updateSchedulerStatus("foo", SchedulerStatus.STARTED)
.build();
e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate("foo", prelertMetadata3));
assertThat(e.getMessage(), equalTo("scheduler already started, expected scheduler status [STOPPED], but got [STARTED]"));
}

View File

@ -11,8 +11,6 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.action.FlushJobAction;
import org.elasticsearch.xpack.prelert.action.JobDataAction;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.junit.Before;
@ -64,8 +62,7 @@ public class ScheduledJobTests extends ESTestCase {
public void testLookBackRunWithEndTime() throws Exception {
ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1);
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L);
assertNull(scheduledJob.runLookBack(schedulerState));
assertNull(scheduledJob.runLookBack(0L, 1000L));
verify(dataExtractor).newSearch(eq(0L), eq(1000L), any());
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
@ -78,8 +75,7 @@ public class ScheduledJobTests extends ESTestCase {
long frequencyMs = 1000;
long queryDelayMs = 500;
ScheduledJob scheduledJob = createScheduledJob(frequencyMs, queryDelayMs, -1, -1);
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, null);
long next = scheduledJob.runLookBack(schedulerState);
long next = scheduledJob.runLookBack(0L, null);
assertEquals(2000 + frequencyMs + 100, next);
verify(dataExtractor).newSearch(eq(0L), eq(1500L), any());
@ -101,8 +97,7 @@ public class ScheduledJobTests extends ESTestCase {
long frequencyMs = 1000;
long queryDelayMs = 500;
ScheduledJob scheduledJob = createScheduledJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs);
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, null);
long next = scheduledJob.runLookBack(schedulerState);
long next = scheduledJob.runLookBack(0L, null);
assertEquals(10000 + frequencyMs + 100, next);
verify(dataExtractor).newSearch(eq(5000 + 1L), eq(currentTime - queryDelayMs), any());
@ -131,8 +126,7 @@ public class ScheduledJobTests extends ESTestCase {
when(dataExtractor.hasNext()).thenReturn(false);
ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1);
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L);
expectThrows(ScheduledJob.EmptyDataCountException.class, () -> scheduledJob.runLookBack(schedulerState));
expectThrows(ScheduledJob.EmptyDataCountException.class, () -> scheduledJob.runLookBack(0L, 1000L));
}
public void testExtractionProblem() throws Exception {
@ -141,8 +135,7 @@ public class ScheduledJobTests extends ESTestCase {
when(dataExtractor.next()).thenThrow(new IOException());
ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1);
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L);
expectThrows(ScheduledJob.ExtractionProblemException.class, () -> scheduledJob.runLookBack(schedulerState));
expectThrows(ScheduledJob.ExtractionProblemException.class, () -> scheduledJob.runLookBack(0L, 1000L));
currentTime = 3001;
expectThrows(ScheduledJob.ExtractionProblemException.class, scheduledJob::runRealtime);
@ -162,8 +155,7 @@ public class ScheduledJobTests extends ESTestCase {
when(client.execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("_job_id")))).thenThrow(new RuntimeException());
ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1);
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L);
expectThrows(ScheduledJob.AnalysisProblemException.class, () -> scheduledJob.runLookBack(schedulerState));
expectThrows(ScheduledJob.AnalysisProblemException.class, () -> scheduledJob.runLookBack(0L, 1000L));
currentTime = 3001;
expectThrows(ScheduledJob.EmptyDataCountException.class, scheduledJob::runRealtime);

View File

@ -7,6 +7,9 @@ package org.elasticsearch.xpack.prelert.rest.schedulers;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestController;
@ -14,38 +17,43 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunnerTests;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class RestStartJobSchedulerActionTests extends ESTestCase {
public void testPrepareRequest() throws Exception {
JobManager jobManager = mock(JobManager.class);
ClusterService clusterService = mock(ClusterService.class);
Job.Builder job = ScheduledJobRunnerTests.createScheduledJob();
when(jobManager.getJobOrThrowIfUnknown(anyString())).thenReturn(job.build());
Allocation allocation =
new Allocation(null, "foo", false, JobStatus.OPENED, null, new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
when(jobManager.getJobAllocation(anyString())).thenReturn(allocation);
RestStartJobSchedulerAction action = new RestStartJobSchedulerAction(Settings.EMPTY, mock(RestController.class),
jobManager, mock(ClusterService.class));
PrelertMetadata prelertMetadata = new PrelertMetadata.Builder()
.putJob(job.build(), false)
.updateStatus("foo", JobStatus.OPENED, null)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata))
.build());
RestStartJobSchedulerAction action = new RestStartJobSchedulerAction(Settings.EMPTY, mock(RestController.class), clusterService);
RestRequest restRequest1 = new FakeRestRequest.Builder().withParams(Collections.singletonMap("start", "not-a-date")).build();
Map<String, String> params = new HashMap<>();
params.put("start", "not-a-date");
params.put("job_id", "foo");
RestRequest restRequest1 = new FakeRestRequest.Builder().withParams(params).build();
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class,
() -> action.prepareRequest(restRequest1, mock(NodeClient.class)));
assertEquals("Query param 'start' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).",
e.getMessage());
RestRequest restRequest2 = new FakeRestRequest.Builder().withParams(Collections.singletonMap("end", "not-a-date")).build();
params = new HashMap<>();
params.put("end", "not-a-date");
params.put("job_id", "foo");
RestRequest restRequest2 = new FakeRestRequest.Builder().withParams(params).build();
e = expectThrows(ElasticsearchParseException.class, () -> action.prepareRequest(restRequest2, mock(NodeClient.class)));
assertEquals("Query param 'end' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).",
e.getMessage());

View File

@ -146,7 +146,7 @@ setup:
- is_false: jobs.0.config
- is_false: jobs.0.data_counts
- is_false: jobs.0.model_size_stats
- match: { jobs.0.scheduler_state.status: STOPPED }
- match: { jobs.0.scheduler_state: STOPPED }
---
"Test bad metric":