From d5bb1f603bdea52c66e97da9c0e29b1ee7db24f6 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 23 Nov 2016 10:00:21 +0000 Subject: [PATCH] Add scheduler status filter to jobs endpoint (elastic/elasticsearch#350) * Add scheduler status filter to jobs endpoint * For scheduled jobs set the initial scheduler state * Add status filter to job endpoint Original commit: elastic/x-pack-elasticsearch@c7ed1627e220631ae572819d744dc3ce09753221 --- .../xpack/prelert/action/GetJobAction.java | 87 ++++++++++++++++--- .../xpack/prelert/job/SchedulerState.java | 17 ++-- .../xpack/prelert/job/manager/JobManager.java | 14 +++ .../prelert/job/metadata/Allocation.java | 3 +- .../prelert/job/metadata/JobAllocator.java | 8 +- .../prelert/job/metadata/PrelertMetadata.java | 11 +++ .../prelert/rest/job/RestGetJobAction.java | 4 +- .../action/GetJobActionRequestTests.java | 2 + .../action/GetJobActionResponseTests.java | 22 ++++- .../xpack/prelert/action/ScheduledJobsIT.java | 4 +- .../prelert/job/SchedulerStateTests.java | 4 +- .../job/metadata/JobAllocatorTests.java | 35 ++++++++ .../scheduler/ScheduledJobServiceTests.java | 4 +- .../api/xpack.prelert.get_job.json | 10 +-- .../rest-api-spec/test/job_get_stats.yaml | 66 ++++++++++++-- .../rest-api-spec/test/jobs_crud.yaml | 6 +- .../test/revert_model_snapshot.yaml | 2 +- 17 files changed, 252 insertions(+), 47 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobAction.java index 7738ed8696e..26b4b5d089f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobAction.java @@ -38,7 +38,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.Job; +import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.ModelSizeStats; +import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager; import org.elasticsearch.xpack.prelert.job.manager.JobManager; import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider; @@ -92,10 +94,11 @@ public class GetJobAction extends Action stats) { if (stats.contains("_all")) { all(); - } - else { + } else { config(stats.contains("config")); dataCounts(stats.contains("data_counts")); modelSizeStats(stats.contains("model_size_stats")); + schedulerStatus(stats.contains("scheduler_state")); + status(stats.contains("status")); } } + public boolean status() { + return status; + } + + public Request status(boolean status) { + this.status = status; + return this; + } + @Override public ActionRequestValidationException validate() { return null; @@ -171,6 +195,8 @@ public class GetJobAction extends Action(Collections.singletonList(jobInfo), 1); } else { @@ -412,7 +465,9 @@ public class GetJobAction extends Action(jobInfoList, jobsPage.hitCount()); @@ -441,6 +496,14 @@ public class GetJobAction extends Action JobSchedulerStatus.fromString(p.text()), STATUS, ValueType.STRING); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), START_TIME_MILLIS); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), START_TIME_MILLIS); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), END_TIME_MILLIS); } private JobSchedulerStatus status; - private long startTimeMillis; + @Nullable + private Long startTimeMillis; @Nullable private Long endTimeMillis; - public SchedulerState(JobSchedulerStatus status, long startTimeMillis, Long endTimeMillis) { + public SchedulerState(JobSchedulerStatus status, Long startTimeMillis, Long endTimeMillis) { this.status = status; this.startTimeMillis = startTimeMillis; this.endTimeMillis = endTimeMillis; @@ -52,7 +53,7 @@ public class SchedulerState extends ToXContentToBytes implements Writeable { public SchedulerState(StreamInput in) throws IOException { status = JobSchedulerStatus.fromStream(in); - startTimeMillis = in.readLong(); + startTimeMillis = in.readOptionalLong(); endTimeMillis = in.readOptionalLong(); } @@ -60,7 +61,7 @@ public class SchedulerState extends ToXContentToBytes implements Writeable { return status; } - public long getStartTimeMillis() { + public Long getStartTimeMillis() { return startTimeMillis; } @@ -98,7 +99,7 @@ public class SchedulerState extends ToXContentToBytes implements Writeable { @Override public void writeTo(StreamOutput out) throws IOException { status.writeTo(out); - out.writeLong(startTimeMillis); + out.writeOptionalLong(startTimeMillis); out.writeOptionalLong(endTimeMillis); } @@ -106,7 +107,9 @@ public class SchedulerState extends ToXContentToBytes implements Writeable { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(STATUS.getPreferredName(), status.name().toUpperCase(Locale.ROOT)); - builder.field(START_TIME_MILLIS.getPreferredName(), startTimeMillis); + if (startTimeMillis != null) { + builder.field(START_TIME_MILLIS.getPreferredName(), startTimeMillis); + } if (endTimeMillis != null) { builder.field(END_TIME_MILLIS.getPreferredName(), endTimeMillis); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java index adfb3414d74..d329c8a5718 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java @@ -340,6 +340,16 @@ public class JobManager { } } + public Optional getSchedulerState(String jobId) { + Job job = getJobOrThrowIfUnknown(clusterService.state(), jobId); + if (job.getSchedulerConfig() == null) { + return Optional.empty(); + } + + Allocation allocation = getAllocation(clusterService.state(), jobId); + return Optional.ofNullable(allocation.getSchedulerState()); + } + public void updateSchedulerStatus(String jobId, JobSchedulerStatus newStatus) { clusterService.submitStateUpdateTask("update-scheduler-status-job-" + jobId, new ClusterStateUpdateTask() { @@ -506,6 +516,10 @@ public class JobManager { }); } + public JobStatus getJobStatus(String jobId) { + return getJobAllocation(jobId).getStatus(); + } + public void setJobStatus(String jobId, JobStatus newStatus) { clusterService.submitStateUpdateTask("set-paused-status-job-" + jobId, new ClusterStateUpdateTask() { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java index 55dbc9ee3de..8eb3927a2c7 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/Allocation.java @@ -186,7 +186,8 @@ public class Allocation extends AbstractDiffable implements ToXConte } break; case STOPPED: - if (currentSchedulerStatus != JobSchedulerStatus.STOPPING) { + if ((currentSchedulerStatus != JobSchedulerStatus.STOPPED || + currentSchedulerStatus != JobSchedulerStatus.STOPPING) == false) { String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, newSchedulerStatus); throw ExceptionsHelper.conflictStatusException(msg); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocator.java index 0ae90cb6aa0..f07b4463301 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocator.java @@ -50,7 +50,13 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe for (String jobId : prelertMetadata.getJobs().keySet()) { if (prelertMetadata.getAllocations().containsKey(jobId) == false) { - builder.putAllocation(prelertNode.getId(), jobId); + boolean addSchedulderState = prelertMetadata.getJobs().get(jobId).getSchedulerConfig() != null; + if (addSchedulderState) { + builder.putAllocationWithScheduler(prelertNode.getId(), jobId); + } + else { + builder.putAllocation(prelertNode.getId(), jobId); + } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java index d6ecbeb3217..7af7297de4f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java @@ -18,6 +18,8 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.prelert.job.Job; +import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; +import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import java.io.IOException; @@ -219,6 +221,15 @@ public class PrelertMetadata implements MetaData.Custom { return this; } + public Builder putAllocationWithScheduler(String nodeId, String jobId) { + Allocation.Builder builder = new Allocation.Builder(); + builder.setJobId(jobId); + builder.setNodeId(nodeId); + builder.setSchedulerState(new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); + this.allocations.put(jobId, builder.build()); + return this; + } + public Builder updateAllocation(String jobId, Allocation updated) { Allocation previous = this.allocations.put(jobId, updated); if (previous == null) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestGetJobAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestGetJobAction.java index a8e338b41d2..182a2ce7bbb 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestGetJobAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/job/RestGetJobAction.java @@ -51,7 +51,6 @@ public class RestGetJobAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { - final GetJobAction.Request request; if (RestActions.hasBodyContent(restRequest)) { BytesReference bodyBytes = RestActions.getRestContent(restRequest); @@ -66,8 +65,7 @@ public class RestGetJobAction extends BaseRestHandler { request.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), DEFAULT_FROM), restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), DEFAULT_SIZE))); } + return channel -> transportGetJobAction.execute(request, new RestStatusToXContentListener<>(channel)); - - } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionRequestTests.java index f52a550ac29..b1066149b34 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionRequestTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionRequestTests.java @@ -17,6 +17,8 @@ public class GetJobActionRequestTests extends AbstractStreamableTestCase + { + "jobId":"scheduled-job", + "description":"A job with a scheduler", + "analysisConfig" : { + "bucketSpan":3600, + "detectors" :[{"function":"metric","fieldName":"responsetime","byFieldName":"airline"}] + }, + "dataDescription" : { + "format" : "ELASTICSEARCH", + "timeField":"time", + "timeFormat":"yyyy-MM-dd'T'HH:mm:ssX" + }, + "schedulerConfig": { + "dataSource":"ELASTICSEARCH", + "baseUrl":"http://marple:9202", + "indexes":["farequote"], + "types":["response"], + "retrieveWholeSource":true + } + } + --- "Test get job stats after uploading data prompting the creation of some stats": - do: xpack.prelert.get_job: - jobId: job-stats-test + job_id: job-stats-test - is_true: hits.0.config - is_false: hits.0.data_counts - is_false: hits.0.model_size_stats + - is_false: hits.0.scheduler_state + - do: xpack.prelert.post_data: @@ -41,7 +67,7 @@ setup: - do: xpack.prelert.get_job: - jobId: job-stats-test + job_id: job-stats-test metric: data_counts - match: { hits.0.data_counts.processed_record_count: 2 } @@ -53,23 +79,53 @@ setup: # won't be created with such a small data sample - do: xpack.prelert.get_job: - jobId: "job-stats-test" + job_id: "job-stats-test" metric: "data_counts" - is_false: hits.0.config - is_true: hits.0.data_counts - is_false: hits.0.model_size_stats + - is_false: hits.0.scheduler_state - do: xpack.prelert.get_job: - jobId: "job-stats-test" + job_id: "job-stats-test" metric: "model_size_stats" - is_false: hits.0.config - is_false: hits.0.data_counts + - is_false: hits.0.scheduler_state + + - do: + xpack.prelert.get_job: + job_id: "job-stats-test" + metric: "scheduler_state" + - is_false: hits.0.config + - is_false: hits.0.data_counts - do: xpack.prelert.get_job: - jobId: "job-stats-test" + job_id: "job-stats-test" + metric: "status" + - is_false: hits.0.config + - is_false: hits.0.data_counts + - is_false: hits.0.model_size_stats + - is_false: hits.0.scheduler_state + - match: { hits.0.status: CLOSED } + + - do: + xpack.prelert.get_job: + job_id: "job-stats-test" metric: "_all" - is_true: hits.0.config - is_true: hits.0.data_counts + - is_false: hits.0.scheduler_state + - match: { hits.0.status: CLOSED } + + - do: + xpack.prelert.get_job: + job_id: "scheduled-job" + metric: "scheduler_state" + - is_false: hits.0.config + - is_false: hits.0.data_counts + - is_false: hits.0.model_size_stats + - match: { hits.0.scheduler_state.status: STOPPED } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml index 2f2d6888922..ab688be051f 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml @@ -32,7 +32,7 @@ - do: xpack.prelert.get_job: - jobId: "farequote" + job_id: "farequote" - match: { hitCount: 1 } - match: { hits.0.config.jobId: "farequote" } @@ -44,7 +44,7 @@ - do: catch: missing xpack.prelert.get_job: - jobId: "farequote" + job_id: "farequote" - do: indices.exists: @@ -56,7 +56,7 @@ - do: catch: missing xpack.prelert.get_job: - jobId: "non-existing" + job_id: "non-existing" --- "Test put job with id that is already taken": diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml index 2fb6c034e16..8a240abaf52 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml @@ -219,7 +219,7 @@ setup: - do: xpack.prelert.get_job: - jobId: foo + job_id: foo metric: data_counts - match: { hits.0.data_counts.latest_record_timestamp: 1464739200000 }