Add scheduler status filter to jobs endpoint (elastic/elasticsearch#350)

* Add scheduler status filter to jobs endpoint

* For scheduled jobs set the initial scheduler state

* Add status filter to job endpoint


Original commit: elastic/x-pack-elasticsearch@c7ed1627e2
This commit is contained in:
David Kyle 2016-11-23 10:00:21 +00:00 committed by GitHub
parent 423a9cf7b2
commit d5bb1f603b
17 changed files with 252 additions and 47 deletions

View File

@ -38,7 +38,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats; import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager; import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager;
import org.elasticsearch.xpack.prelert.job.manager.JobManager; import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider; import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
@ -92,10 +94,11 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
private boolean config; private boolean config;
private boolean dataCounts; private boolean dataCounts;
private boolean modelSizeStats; private boolean modelSizeStats;
private boolean schedulerStatus;
private boolean status;
private PageParams pageParams = null; private PageParams pageParams = null;
public Request() { public Request() {
} }
public void setJobId(String jobId) { public void setJobId(String jobId) {
@ -115,9 +118,11 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
} }
public Request all() { public Request all() {
this.config = true; config = true;
this.dataCounts = true; dataCounts = true;
this.modelSizeStats = true; modelSizeStats = true;
schedulerStatus = true;
status = true;
return this; return this;
} }
@ -148,17 +153,36 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
return this; return this;
} }
public boolean schedulerStatus() {
return schedulerStatus;
}
public Request schedulerStatus(boolean schedulerStatus) {
this.schedulerStatus = schedulerStatus;
return this;
}
public void setStats(Set<String> stats) { public void setStats(Set<String> stats) {
if (stats.contains("_all")) { if (stats.contains("_all")) {
all(); all();
} } else {
else {
config(stats.contains("config")); config(stats.contains("config"));
dataCounts(stats.contains("data_counts")); dataCounts(stats.contains("data_counts"));
modelSizeStats(stats.contains("model_size_stats")); modelSizeStats(stats.contains("model_size_stats"));
schedulerStatus(stats.contains("scheduler_state"));
status(stats.contains("status"));
} }
} }
public boolean status() {
return status;
}
public Request status(boolean status) {
this.status = status;
return this;
}
@Override @Override
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
return null; return null;
@ -171,6 +195,8 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
config = in.readBoolean(); config = in.readBoolean();
dataCounts = in.readBoolean(); dataCounts = in.readBoolean();
modelSizeStats = in.readBoolean(); modelSizeStats = in.readBoolean();
schedulerStatus = in.readBoolean();
status = in.readBoolean();
pageParams = in.readOptionalWriteable(PageParams::new); pageParams = in.readOptionalWriteable(PageParams::new);
} }
@ -181,12 +207,14 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
out.writeBoolean(config); out.writeBoolean(config);
out.writeBoolean(dataCounts); out.writeBoolean(dataCounts);
out.writeBoolean(modelSizeStats); out.writeBoolean(modelSizeStats);
out.writeBoolean(schedulerStatus);
out.writeBoolean(status);
out.writeOptionalWriteable(pageParams); out.writeOptionalWriteable(pageParams);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(jobId, config, dataCounts, modelSizeStats, pageParams); return Objects.hash(jobId, config, dataCounts, modelSizeStats, schedulerStatus, status, pageParams);
} }
@Override @Override
@ -202,6 +230,8 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
&& this.config == other.config && this.config == other.config
&& this.dataCounts == other.dataCounts && this.dataCounts == other.dataCounts
&& this.modelSizeStats == other.modelSizeStats && this.modelSizeStats == other.modelSizeStats
&& this.schedulerStatus == other.schedulerStatus
&& this.status == other.status
&& Objects.equals(this.pageParams, other.pageParams); && Objects.equals(this.pageParams, other.pageParams);
} }
} }
@ -222,17 +252,28 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
private DataCounts dataCounts; private DataCounts dataCounts;
@Nullable @Nullable
private ModelSizeStats modelSizeStats; private ModelSizeStats modelSizeStats;
@Nullable
private SchedulerState schedulerState;
@Nullable
private JobStatus status;
JobInfo(@Nullable Job job, @Nullable DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats) {
JobInfo(@Nullable Job job, @Nullable DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats,
@Nullable SchedulerState schedulerStatus, @Nullable JobStatus status) {
this.jobConfig = job; this.jobConfig = job;
this.dataCounts = dataCounts; this.dataCounts = dataCounts;
this.modelSizeStats = modelSizeStats; this.modelSizeStats = modelSizeStats;
this.schedulerState = schedulerStatus;
this.status = status;
} }
JobInfo(StreamInput in) throws IOException { JobInfo(StreamInput in) throws IOException {
jobConfig = in.readOptionalWriteable(Job::new); jobConfig = in.readOptionalWriteable(Job::new);
dataCounts = in.readOptionalWriteable(DataCounts::new); dataCounts = in.readOptionalWriteable(DataCounts::new);
modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new); modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new);
schedulerState = in.readOptionalWriteable(SchedulerState::new);
status = in.readOptionalWriteable(JobStatus::fromStream);
} }
@Override @Override
@ -247,6 +288,12 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
if (modelSizeStats != null) { if (modelSizeStats != null) {
builder.field("model_size_stats", modelSizeStats); builder.field("model_size_stats", modelSizeStats);
} }
if (schedulerState != null) {
builder.field("scheduler_state", schedulerState);
}
if (status != null) {
builder.field("status", status);
}
builder.endObject(); builder.endObject();
return builder; return builder;
@ -257,11 +304,13 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
out.writeOptionalWriteable(jobConfig); out.writeOptionalWriteable(jobConfig);
out.writeOptionalWriteable(dataCounts); out.writeOptionalWriteable(dataCounts);
out.writeOptionalWriteable(modelSizeStats); out.writeOptionalWriteable(modelSizeStats);
out.writeOptionalWriteable(schedulerState);
out.writeOptionalWriteable(status);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(jobConfig, dataCounts, modelSizeStats); return Objects.hash(jobConfig, dataCounts, modelSizeStats, schedulerState, status);
} }
@Override @Override
@ -275,7 +324,9 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
JobInfo other = (JobInfo) obj; JobInfo other = (JobInfo) obj;
return Objects.equals(jobConfig, other.jobConfig) return Objects.equals(jobConfig, other.jobConfig)
&& Objects.equals(this.dataCounts, other.dataCounts) && Objects.equals(this.dataCounts, other.dataCounts)
&& Objects.equals(this.modelSizeStats, other.modelSizeStats); && Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.schedulerState, other.schedulerState)
&& Objects.equals(this.status, other.status);
} }
} }
@ -400,8 +451,10 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
Job jobConfig = request.config() ? jobs.hits().get(0) : null; Job jobConfig = request.config() ? jobs.hits().get(0) : null;
DataCounts dataCounts = readDataCounts(request.dataCounts(), request.getJobId()); DataCounts dataCounts = readDataCounts(request.dataCounts(), request.getJobId());
ModelSizeStats modelSizeStats = readModelSizeStats(request.modelSizeStats(), request.getJobId()); ModelSizeStats modelSizeStats = readModelSizeStats(request.modelSizeStats(), request.getJobId());
SchedulerState schedulerStatus = readSchedulerState(request.schedulerStatus(), request.getJobId());
JobStatus jobStatus = readJobStatus(request.status(), request.getJobId());
Response.JobInfo jobInfo = new Response.JobInfo(jobConfig, dataCounts, modelSizeStats); Response.JobInfo jobInfo = new Response.JobInfo(jobConfig, dataCounts, modelSizeStats, schedulerStatus, jobStatus);
response = new QueryPage<>(Collections.singletonList(jobInfo), 1); response = new QueryPage<>(Collections.singletonList(jobInfo), 1);
} else { } else {
@ -412,7 +465,9 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
Job jobConfig = request.config() ? job : null; Job jobConfig = request.config() ? job : null;
DataCounts dataCounts = readDataCounts(request.dataCounts(), job.getJobId()); DataCounts dataCounts = readDataCounts(request.dataCounts(), job.getJobId());
ModelSizeStats modelSizeStats = readModelSizeStats(request.modelSizeStats(), job.getJobId()); ModelSizeStats modelSizeStats = readModelSizeStats(request.modelSizeStats(), job.getJobId());
Response.JobInfo jobInfo = new Response.JobInfo(jobConfig, dataCounts, modelSizeStats); SchedulerState schedulerStatus = readSchedulerState(request.schedulerStatus(), job.getJobId());
JobStatus jobStatus = readJobStatus(request.status(), job.getJobId());
Response.JobInfo jobInfo = new Response.JobInfo(jobConfig, dataCounts, modelSizeStats, schedulerStatus, jobStatus);
jobInfoList.add(jobInfo); jobInfoList.add(jobInfo);
} }
response = new QueryPage<>(jobInfoList, jobsPage.hitCount()); response = new QueryPage<>(jobInfoList, jobsPage.hitCount());
@ -441,6 +496,14 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
} }
return null; return null;
} }
private SchedulerState readSchedulerState(boolean schedulerState, String jobId) {
return schedulerState ? jobManager.getSchedulerState(jobId).orElse(null) : null;
}
private JobStatus readJobStatus(boolean status, String jobId) {
return status ? jobManager.getJobStatus(jobId) : null;
}
} }
} }

View File

@ -35,16 +35,17 @@ public class SchedulerState extends ToXContentToBytes implements Writeable {
static { static {
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> JobSchedulerStatus.fromString(p.text()), STATUS, PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> JobSchedulerStatus.fromString(p.text()), STATUS,
ValueType.STRING); ValueType.STRING);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), START_TIME_MILLIS); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), START_TIME_MILLIS);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), END_TIME_MILLIS); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), END_TIME_MILLIS);
} }
private JobSchedulerStatus status; private JobSchedulerStatus status;
private long startTimeMillis; @Nullable
private Long startTimeMillis;
@Nullable @Nullable
private Long endTimeMillis; private Long endTimeMillis;
public SchedulerState(JobSchedulerStatus status, long startTimeMillis, Long endTimeMillis) { public SchedulerState(JobSchedulerStatus status, Long startTimeMillis, Long endTimeMillis) {
this.status = status; this.status = status;
this.startTimeMillis = startTimeMillis; this.startTimeMillis = startTimeMillis;
this.endTimeMillis = endTimeMillis; this.endTimeMillis = endTimeMillis;
@ -52,7 +53,7 @@ public class SchedulerState extends ToXContentToBytes implements Writeable {
public SchedulerState(StreamInput in) throws IOException { public SchedulerState(StreamInput in) throws IOException {
status = JobSchedulerStatus.fromStream(in); status = JobSchedulerStatus.fromStream(in);
startTimeMillis = in.readLong(); startTimeMillis = in.readOptionalLong();
endTimeMillis = in.readOptionalLong(); endTimeMillis = in.readOptionalLong();
} }
@ -60,7 +61,7 @@ public class SchedulerState extends ToXContentToBytes implements Writeable {
return status; return status;
} }
public long getStartTimeMillis() { public Long getStartTimeMillis() {
return startTimeMillis; return startTimeMillis;
} }
@ -98,7 +99,7 @@ public class SchedulerState extends ToXContentToBytes implements Writeable {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
status.writeTo(out); status.writeTo(out);
out.writeLong(startTimeMillis); out.writeOptionalLong(startTimeMillis);
out.writeOptionalLong(endTimeMillis); out.writeOptionalLong(endTimeMillis);
} }
@ -106,7 +107,9 @@ public class SchedulerState extends ToXContentToBytes implements Writeable {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
builder.field(STATUS.getPreferredName(), status.name().toUpperCase(Locale.ROOT)); builder.field(STATUS.getPreferredName(), status.name().toUpperCase(Locale.ROOT));
builder.field(START_TIME_MILLIS.getPreferredName(), startTimeMillis); if (startTimeMillis != null) {
builder.field(START_TIME_MILLIS.getPreferredName(), startTimeMillis);
}
if (endTimeMillis != null) { if (endTimeMillis != null) {
builder.field(END_TIME_MILLIS.getPreferredName(), endTimeMillis); builder.field(END_TIME_MILLIS.getPreferredName(), endTimeMillis);
} }

View File

@ -340,6 +340,16 @@ public class JobManager {
} }
} }
public Optional<SchedulerState> getSchedulerState(String jobId) {
Job job = getJobOrThrowIfUnknown(clusterService.state(), jobId);
if (job.getSchedulerConfig() == null) {
return Optional.empty();
}
Allocation allocation = getAllocation(clusterService.state(), jobId);
return Optional.ofNullable(allocation.getSchedulerState());
}
public void updateSchedulerStatus(String jobId, JobSchedulerStatus newStatus) { public void updateSchedulerStatus(String jobId, JobSchedulerStatus newStatus) {
clusterService.submitStateUpdateTask("update-scheduler-status-job-" + jobId, new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("update-scheduler-status-job-" + jobId, new ClusterStateUpdateTask() {
@ -506,6 +516,10 @@ public class JobManager {
}); });
} }
public JobStatus getJobStatus(String jobId) {
return getJobAllocation(jobId).getStatus();
}
public void setJobStatus(String jobId, JobStatus newStatus) { public void setJobStatus(String jobId, JobStatus newStatus) {
clusterService.submitStateUpdateTask("set-paused-status-job-" + jobId, new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("set-paused-status-job-" + jobId, new ClusterStateUpdateTask() {

View File

@ -186,7 +186,8 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
} }
break; break;
case STOPPED: case STOPPED:
if (currentSchedulerStatus != JobSchedulerStatus.STOPPING) { if ((currentSchedulerStatus != JobSchedulerStatus.STOPPED ||
currentSchedulerStatus != JobSchedulerStatus.STOPPING) == false) {
String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, newSchedulerStatus); String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, newSchedulerStatus);
throw ExceptionsHelper.conflictStatusException(msg); throw ExceptionsHelper.conflictStatusException(msg);
} }

View File

@ -50,7 +50,13 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe
for (String jobId : prelertMetadata.getJobs().keySet()) { for (String jobId : prelertMetadata.getJobs().keySet()) {
if (prelertMetadata.getAllocations().containsKey(jobId) == false) { if (prelertMetadata.getAllocations().containsKey(jobId) == false) {
builder.putAllocation(prelertNode.getId(), jobId); boolean addSchedulderState = prelertMetadata.getJobs().get(jobId).getSchedulerConfig() != null;
if (addSchedulderState) {
builder.putAllocationWithScheduler(prelertNode.getId(), jobId);
}
else {
builder.putAllocation(prelertNode.getId(), jobId);
}
} }
} }

View File

@ -18,6 +18,8 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException; import java.io.IOException;
@ -219,6 +221,15 @@ public class PrelertMetadata implements MetaData.Custom {
return this; return this;
} }
public Builder putAllocationWithScheduler(String nodeId, String jobId) {
Allocation.Builder builder = new Allocation.Builder();
builder.setJobId(jobId);
builder.setNodeId(nodeId);
builder.setSchedulerState(new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
this.allocations.put(jobId, builder.build());
return this;
}
public Builder updateAllocation(String jobId, Allocation updated) { public Builder updateAllocation(String jobId, Allocation updated) {
Allocation previous = this.allocations.put(jobId, updated); Allocation previous = this.allocations.put(jobId, updated);
if (previous == null) { if (previous == null) {

View File

@ -51,7 +51,6 @@ public class RestGetJobAction extends BaseRestHandler {
@Override @Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
final GetJobAction.Request request; final GetJobAction.Request request;
if (RestActions.hasBodyContent(restRequest)) { if (RestActions.hasBodyContent(restRequest)) {
BytesReference bodyBytes = RestActions.getRestContent(restRequest); BytesReference bodyBytes = RestActions.getRestContent(restRequest);
@ -66,8 +65,7 @@ public class RestGetJobAction extends BaseRestHandler {
request.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), DEFAULT_FROM), request.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), DEFAULT_FROM),
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), DEFAULT_SIZE))); restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), DEFAULT_SIZE)));
} }
return channel -> transportGetJobAction.execute(request, new RestStatusToXContentListener<>(channel)); return channel -> transportGetJobAction.execute(request, new RestStatusToXContentListener<>(channel));
} }
} }

View File

@ -17,6 +17,8 @@ public class GetJobActionRequestTests extends AbstractStreamableTestCase<GetJobA
instance.config(randomBoolean()); instance.config(randomBoolean());
instance.dataCounts(randomBoolean()); instance.dataCounts(randomBoolean());
instance.modelSizeStats(randomBoolean()); instance.modelSizeStats(randomBoolean());
instance.schedulerStatus(randomBoolean());
instance.status(randomBoolean());
if (randomBoolean()) { if (randomBoolean()) {
int from = randomInt(PageParams.MAX_FROM_SIZE_SUM); int from = randomInt(PageParams.MAX_FROM_SIZE_SUM);
int maxSize = PageParams.MAX_FROM_SIZE_SUM - from; int maxSize = PageParams.MAX_FROM_SIZE_SUM - from;

View File

@ -13,10 +13,13 @@ import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Detector; import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.IgnoreDowntime; import org.elasticsearch.xpack.prelert.job.IgnoreDowntime;
import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelDebugConfig; import org.elasticsearch.xpack.prelert.job.ModelDebugConfig;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats; import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig; import org.elasticsearch.xpack.prelert.job.SchedulerConfig;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.transform.TransformConfig; import org.elasticsearch.xpack.prelert.job.transform.TransformConfig;
import org.elasticsearch.xpack.prelert.job.transform.TransformType; import org.elasticsearch.xpack.prelert.job.transform.TransformType;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase; import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
@ -25,6 +28,7 @@ import org.joda.time.DateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -70,18 +74,30 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase<GetJob
DataCounts dataCounts = null; DataCounts dataCounts = null;
ModelSizeStats sizeStats = null;
if (randomBoolean()) { if (randomBoolean()) {
dataCounts = new DataCounts(randomAsciiOfLength(10), randomIntBetween(1, 1_000_000), dataCounts = new DataCounts(randomAsciiOfLength(10), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate()); new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate());
} }
ModelSizeStats sizeStats = null;
if (randomBoolean()) { if (randomBoolean()) {
sizeStats = new ModelSizeStats.Builder("foo").build(); sizeStats = new ModelSizeStats.Builder("foo").build();
} }
Response.JobInfo jobInfo = new Response.JobInfo(job, dataCounts, sizeStats);
SchedulerState schedulerState = null;
if (randomBoolean()) {
schedulerState = new SchedulerState(randomFrom(EnumSet.allOf(JobSchedulerStatus.class)), randomPositiveLong(),
randomPositiveLong());
}
JobStatus jobStatus = null;
if (randomBoolean()) {
jobStatus = randomFrom(EnumSet.allOf(JobStatus.class));
}
Response.JobInfo jobInfo = new Response.JobInfo(job, dataCounts, sizeStats, schedulerState, jobStatus);
jobInfoList.add(jobInfo); jobInfoList.add(jobInfo);
} }

View File

@ -93,7 +93,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged()); assertTrue(putJobResponse.isAcknowledged());
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, 0, now); SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, 0L, now);
StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", schedulerState); StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", schedulerState);
StartJobSchedulerAction.Response startJobResponse = client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest) StartJobSchedulerAction.Response startJobResponse = client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest)
.get(); .get();
@ -123,7 +123,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged()); assertTrue(putJobResponse.isAcknowledged());
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, 0, null); SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTING, 0L, null);
StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", schedulerState); StartJobSchedulerAction.Request startSchedulerRequest = new StartJobSchedulerAction.Request("_job_id", schedulerState);
StartJobSchedulerAction.Response startJobResponse = client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest) StartJobSchedulerAction.Response startJobResponse = client().execute(StartJobSchedulerAction.INSTANCE, startSchedulerRequest)
.get(); .get();

View File

@ -30,7 +30,7 @@ public class SchedulerStateTests extends AbstractSerializingTestCase<SchedulerSt
public void testEquals_GivenDifferentClass() { public void testEquals_GivenDifferentClass() {
assertFalse(new SchedulerState(JobSchedulerStatus.STARTED, 0, null).equals("a string")); assertFalse(new SchedulerState(JobSchedulerStatus.STARTED, 0L, null).equals("a string"));
} }
public void testEquals_GivenSameReference() { public void testEquals_GivenSameReference() {
@ -58,7 +58,7 @@ public class SchedulerStateTests extends AbstractSerializingTestCase<SchedulerSt
} }
public void testEquals_GivenDifferentStartTimeMillis() { public void testEquals_GivenDifferentStartTimeMillis() {
SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, 18L, 42L); SchedulerState schedulerState1 = new SchedulerState(JobSchedulerStatus.STARTED, null, 42L);
SchedulerState schedulerState2 = new SchedulerState(JobSchedulerStatus.STOPPED, 19L, schedulerState1.getEndTimeMillis()); SchedulerState schedulerState2 = new SchedulerState(JobSchedulerStatus.STOPPED, 19L, schedulerState1.getEndTimeMillis());
assertFalse(schedulerState1.equals(schedulerState2)); assertFalse(schedulerState1.equals(schedulerState2));

View File

@ -17,8 +17,13 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig;
import org.junit.Before; import org.junit.Before;
import java.util.Collections;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
@ -176,4 +181,34 @@ public class JobAllocatorTests extends ESTestCase {
verify(clusterService, times(1)).submitStateUpdateTask(any(), any()); verify(clusterService, times(1)).submitStateUpdateTask(any(), any());
} }
public void testScheduledJobHasDefaultSchedulerState() {
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
SchedulerConfig.Builder schedulerConfigBuilder = new SchedulerConfig.Builder(SchedulerConfig.DataSource.ELASTICSEARCH);
schedulerConfigBuilder.setBaseUrl("http://server");
schedulerConfigBuilder.setIndexes(Collections.singletonList("foo"));
schedulerConfigBuilder.setTypes(Collections.singletonList("bar"));
Job.Builder jobBuilder = buildJobBuilder("_job_id");
jobBuilder.setSchedulerConfig(schedulerConfigBuilder);
DataDescription.Builder dataDescriptionBuilder = new DataDescription.Builder();
dataDescriptionBuilder.setFormat(DataDescription.DataFormat.ELASTICSEARCH);
jobBuilder.setDataDescription(dataDescriptionBuilder);
pmBuilder.putJob(jobBuilder.build(), false);
ClusterState cs = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_id", new LocalTransportAddress("_id"), Version.CURRENT))
.masterNodeId("_id")
.localNodeId("_id"))
.build();
ClusterState clusterStateWithAllocation = jobAllocator.allocateJobs(cs);
PrelertMetadata metadata = clusterStateWithAllocation.metaData().custom(PrelertMetadata.TYPE);
assertEquals(JobSchedulerStatus.STOPPED, metadata.getAllocations().get("_job_id").getSchedulerState().getStatus());
}
} }

View File

@ -147,7 +147,7 @@ public class ScheduledJobServiceTests extends ESTestCase {
public void testStop_GivenStartedScheduledJob() throws IOException { public void testStop_GivenStartedScheduledJob() throws IOException {
Job.Builder builder = createScheduledJob(); Job.Builder builder = createScheduledJob();
Allocation allocation1 = Allocation allocation1 =
new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTED, 0, null)); new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTED, 0L, null));
when(jobManager.getJobAllocation("foo")).thenReturn(allocation1); when(jobManager.getJobAllocation("foo")).thenReturn(allocation1);
DataExtractor dataExtractor = mock(DataExtractor.class); DataExtractor dataExtractor = mock(DataExtractor.class);
@ -161,7 +161,7 @@ public class ScheduledJobServiceTests extends ESTestCase {
// Properly stop it to avoid leaking threads in the test // Properly stop it to avoid leaking threads in the test
Allocation allocation2 = Allocation allocation2 =
new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STOPPING, 0, null)); new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STOPPING, 0L, null));
scheduledJobService.registry.put("foo", scheduledJobService.createJobScheduler(builder.build())); scheduledJobService.registry.put("foo", scheduledJobService.createJobScheduler(builder.build()));
scheduledJobService.stop(allocation2); scheduledJobService.stop(allocation2);

View File

@ -2,20 +2,20 @@
"xpack.prelert.get_job": { "xpack.prelert.get_job": {
"methods": [ "GET", "POST" ], "methods": [ "GET", "POST" ],
"url": { "url": {
"path": "/_xpack/prelert/jobs/{jobId}", "path": "/_xpack/prelert/jobs/{job_id}",
"paths": [ "paths": [
"/_xpack/prelert/jobs", "/_xpack/prelert/jobs",
"/_xpack/prelert/jobs/{jobId}", "/_xpack/prelert/jobs/{job_id}",
"/_xpack/prelert/jobs/{jobId}/{metric}" "/_xpack/prelert/jobs/{job_id}/{metric}"
], ],
"parts": { "parts": {
"jobId": { "job_id": {
"type": "string", "type": "string",
"description": "The ID of the job to fetch" "description": "The ID of the job to fetch"
}, },
"metric" : { "metric" : {
"type" : "list", "type" : "list",
"options" : ["_all", "config", "data_counts", "model_size_stats"], "options" : ["_all", "config", "data_counts", "model_size_stats", "scheduler_state", "status"],
"description" : "Limit the information returned to the specified statistics" "description" : "Limit the information returned to the specified statistics"
} }
}, },

View File

@ -16,15 +16,41 @@ setup:
} }
} }
- do:
xpack.prelert.put_job:
body: >
{
"jobId":"scheduled-job",
"description":"A job with a scheduler",
"analysisConfig" : {
"bucketSpan":3600,
"detectors" :[{"function":"metric","fieldName":"responsetime","byFieldName":"airline"}]
},
"dataDescription" : {
"format" : "ELASTICSEARCH",
"timeField":"time",
"timeFormat":"yyyy-MM-dd'T'HH:mm:ssX"
},
"schedulerConfig": {
"dataSource":"ELASTICSEARCH",
"baseUrl":"http://marple:9202",
"indexes":["farequote"],
"types":["response"],
"retrieveWholeSource":true
}
}
--- ---
"Test get job stats after uploading data prompting the creation of some stats": "Test get job stats after uploading data prompting the creation of some stats":
- do: - do:
xpack.prelert.get_job: xpack.prelert.get_job:
jobId: job-stats-test job_id: job-stats-test
- is_true: hits.0.config - is_true: hits.0.config
- is_false: hits.0.data_counts - is_false: hits.0.data_counts
- is_false: hits.0.model_size_stats - is_false: hits.0.model_size_stats
- is_false: hits.0.scheduler_state
- do: - do:
xpack.prelert.post_data: xpack.prelert.post_data:
@ -41,7 +67,7 @@ setup:
- do: - do:
xpack.prelert.get_job: xpack.prelert.get_job:
jobId: job-stats-test job_id: job-stats-test
metric: data_counts metric: data_counts
- match: { hits.0.data_counts.processed_record_count: 2 } - match: { hits.0.data_counts.processed_record_count: 2 }
@ -53,23 +79,53 @@ setup:
# won't be created with such a small data sample # won't be created with such a small data sample
- do: - do:
xpack.prelert.get_job: xpack.prelert.get_job:
jobId: "job-stats-test" job_id: "job-stats-test"
metric: "data_counts" metric: "data_counts"
- is_false: hits.0.config - is_false: hits.0.config
- is_true: hits.0.data_counts - is_true: hits.0.data_counts
- is_false: hits.0.model_size_stats - is_false: hits.0.model_size_stats
- is_false: hits.0.scheduler_state
- do: - do:
xpack.prelert.get_job: xpack.prelert.get_job:
jobId: "job-stats-test" job_id: "job-stats-test"
metric: "model_size_stats" metric: "model_size_stats"
- is_false: hits.0.config - is_false: hits.0.config
- is_false: hits.0.data_counts - is_false: hits.0.data_counts
- is_false: hits.0.scheduler_state
- do:
xpack.prelert.get_job:
job_id: "job-stats-test"
metric: "scheduler_state"
- is_false: hits.0.config
- is_false: hits.0.data_counts
- do: - do:
xpack.prelert.get_job: xpack.prelert.get_job:
jobId: "job-stats-test" job_id: "job-stats-test"
metric: "status"
- is_false: hits.0.config
- is_false: hits.0.data_counts
- is_false: hits.0.model_size_stats
- is_false: hits.0.scheduler_state
- match: { hits.0.status: CLOSED }
- do:
xpack.prelert.get_job:
job_id: "job-stats-test"
metric: "_all" metric: "_all"
- is_true: hits.0.config - is_true: hits.0.config
- is_true: hits.0.data_counts - is_true: hits.0.data_counts
- is_false: hits.0.scheduler_state
- match: { hits.0.status: CLOSED }
- do:
xpack.prelert.get_job:
job_id: "scheduled-job"
metric: "scheduler_state"
- is_false: hits.0.config
- is_false: hits.0.data_counts
- is_false: hits.0.model_size_stats
- match: { hits.0.scheduler_state.status: STOPPED }

View File

@ -32,7 +32,7 @@
- do: - do:
xpack.prelert.get_job: xpack.prelert.get_job:
jobId: "farequote" job_id: "farequote"
- match: { hitCount: 1 } - match: { hitCount: 1 }
- match: { hits.0.config.jobId: "farequote" } - match: { hits.0.config.jobId: "farequote" }
@ -44,7 +44,7 @@
- do: - do:
catch: missing catch: missing
xpack.prelert.get_job: xpack.prelert.get_job:
jobId: "farequote" job_id: "farequote"
- do: - do:
indices.exists: indices.exists:
@ -56,7 +56,7 @@
- do: - do:
catch: missing catch: missing
xpack.prelert.get_job: xpack.prelert.get_job:
jobId: "non-existing" job_id: "non-existing"
--- ---
"Test put job with id that is already taken": "Test put job with id that is already taken":

View File

@ -219,7 +219,7 @@ setup:
- do: - do:
xpack.prelert.get_job: xpack.prelert.get_job:
jobId: foo job_id: foo
metric: data_counts metric: data_counts
- match: { hits.0.data_counts.latest_record_timestamp: 1464739200000 } - match: { hits.0.data_counts.latest_record_timestamp: 1464739200000 }