Consolidate GetJob/GetJobs (elastic/elasticsearch#342)

In the same vein of GetBucket/GetBuckets and GetCategory/GetCategories.

This one was a bit more involved, since previously GetJobs didn't support 
configuring the various metrics. So now all metrics can be configured when 
requesting both a single job, or a range of jobs. It also unifies the request 
body handling between the two and adds POST handlers.

And similar to the other refactorings, the SingleDoc response is gone in 
favor of always returning an array of hits.

Original commit: elastic/x-pack-elasticsearch@ac47bb9bf6
This commit is contained in:
Zachary Tong 2016-11-21 16:31:01 -05:00 committed by GitHub
parent c8ef5c64de
commit 15e8cf7bcb
19 changed files with 231 additions and 489 deletions

View File

@ -33,7 +33,6 @@ import org.elasticsearch.xpack.prelert.action.GetBucketAction;
import org.elasticsearch.xpack.prelert.action.GetCategoryDefinitionAction;
import org.elasticsearch.xpack.prelert.action.GetInfluencersAction;
import org.elasticsearch.xpack.prelert.action.GetJobAction;
import org.elasticsearch.xpack.prelert.action.GetJobsAction;
import org.elasticsearch.xpack.prelert.action.GetListAction;
import org.elasticsearch.xpack.prelert.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.prelert.action.GetRecordsAction;
@ -78,7 +77,6 @@ import org.elasticsearch.xpack.prelert.rest.data.RestPostDataFlushAction;
import org.elasticsearch.xpack.prelert.rest.influencers.RestGetInfluencersAction;
import org.elasticsearch.xpack.prelert.rest.job.RestDeleteJobAction;
import org.elasticsearch.xpack.prelert.rest.job.RestGetJobAction;
import org.elasticsearch.xpack.prelert.rest.job.RestGetJobsAction;
import org.elasticsearch.xpack.prelert.rest.job.RestPauseJobAction;
import org.elasticsearch.xpack.prelert.rest.job.RestPutJobsAction;
import org.elasticsearch.xpack.prelert.rest.job.RestResumeJobAction;
@ -187,7 +185,6 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
public List<Class<? extends RestHandler>> getRestHandlers() {
return Arrays.asList(
RestGetJobAction.class,
RestGetJobsAction.class,
RestPutJobsAction.class,
RestDeleteJobAction.class,
RestPauseJobAction.class,
@ -217,7 +214,6 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(GetJobAction.INSTANCE, GetJobAction.TransportAction.class),
new ActionHandler<>(GetJobsAction.INSTANCE, GetJobsAction.TransportAction.class),
new ActionHandler<>(PutJobAction.INSTANCE, PutJobAction.TransportAction.class),
new ActionHandler<>(DeleteJobAction.INSTANCE, DeleteJobAction.TransportAction.class),
new ActionHandler<>(PauseJobAction.INSTANCE, PauseJobAction.TransportAction.class),

View File

@ -33,11 +33,11 @@ import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleter;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterFactory;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.Request,
DeleteModelSnapshotAction.Response, DeleteModelSnapshotAction.RequestBuilder> {
@ -171,9 +171,9 @@ public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.
//
// NORELEASE: technically, this could be stale and refuse a delete, but I think that's acceptable
// since it is non-destructive
Optional<Job> job = jobManager.getJob(request.getJobId(), clusterService.state());
if (job.isPresent()) {
String currentModelInUse = job.get().getModelSnapshotId();
QueryPage<Job> job = jobManager.getJob(request.getJobId(), clusterService.state());
if (job.hitCount() > 0) {
String currentModelInUse = job.hits().get(0).getModelSnapshotId();
if (currentModelInUse != null && currentModelInUse.equals(request.getSnapshotId())) {
throw new IllegalArgumentException(Messages.getMessage(Messages.REST_CANNOT_DELETE_HIGHEST_PRIORITY,
request.getSnapshotId(), request.getJobId()));

View File

@ -20,11 +20,15 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -38,13 +42,18 @@ import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.results.PageParams;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import org.elasticsearch.xpack.prelert.utils.SingleDocument;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Response, GetJobAction.RequestBuilder> {
@ -67,22 +76,44 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
public static class Request extends MasterNodeReadRequest<Request> {
public static final ObjectParser<Request, ParseFieldMatcherSupplier> PARSER = new ObjectParser<>(NAME, Request::new);
public static final ParseField METRIC = new ParseField("metric");
static {
PARSER.declareString(Request::setJobId, Job.ID);
PARSER.declareObject(Request::setPageParams, PageParams.PARSER, PageParams.PAGE);
PARSER.declareString((request, metric) -> {
Set<String> stats = Strings.splitStringByCommaToSet(metric);
request.setStats(stats);
}, METRIC);
}
private String jobId;
private boolean config;
private boolean dataCounts;
private boolean modelSizeStats;
private PageParams pageParams = null;
public Request() {
Request() {
}
public Request(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
public void setJobId(String jobId) {
this.jobId = jobId;
}
public String getJobId() {
return jobId;
}
public PageParams getPageParams() {
return pageParams;
}
public void setPageParams(PageParams pageParams) {
this.pageParams = ExceptionsHelper.requireNonNull(pageParams, PageParams.PAGE.getPreferredName());
}
public Request all() {
this.config = true;
this.dataCounts = true;
@ -117,6 +148,16 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
return this;
}
public void setStats(Set<String> stats) {
if (stats.contains("_all")) {
all();
}
else {
config(stats.contains("config"));
dataCounts(stats.contains("data_counts"));
modelSizeStats(stats.contains("model_size_stats"));
}
}
@Override
public ActionRequestValidationException validate() {
@ -126,24 +167,26 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
jobId = in.readOptionalString();
config = in.readBoolean();
dataCounts = in.readBoolean();
modelSizeStats = in.readBoolean();
pageParams = in.readOptionalWriteable(PageParams::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeOptionalString(jobId);
out.writeBoolean(config);
out.writeBoolean(dataCounts);
out.writeBoolean(modelSizeStats);
out.writeOptionalWriteable(pageParams);
}
@Override
public int hashCode() {
return Objects.hash(jobId, config, dataCounts, modelSizeStats);
return Objects.hash(jobId, config, dataCounts, modelSizeStats, pageParams);
}
@Override
@ -155,8 +198,11 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId) && this.config == other.config
&& this.dataCounts == other.dataCounts && this.modelSizeStats == other.modelSizeStats;
return Objects.equals(jobId, other.jobId)
&& this.config == other.config
&& this.dataCounts == other.dataCounts
&& this.modelSizeStats == other.modelSizeStats
&& Objects.equals(this.pageParams, other.pageParams);
}
}
@ -233,41 +279,43 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
}
}
private SingleDocument<JobInfo> jobResponse;
private QueryPage<JobInfo> jobs;
public Response() {
jobResponse = SingleDocument.empty(Job.TYPE);
public Response(QueryPage<JobInfo> jobs) {
this.jobs = jobs;
}
public Response(JobInfo jobResponse) {
this.jobResponse = new SingleDocument<>(Job.TYPE, jobResponse);
public Response() {}
public QueryPage<JobInfo> getResponse() {
return jobs;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobResponse = new SingleDocument<>(in, JobInfo::new);
jobs = new QueryPage<>(in, JobInfo::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
jobResponse.writeTo(out);
jobs.writeTo(out);
}
@Override
public RestStatus status() {
return jobResponse.status();
return jobs.hitCount() == 0 ? RestStatus.NOT_FOUND : RestStatus.OK;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return jobResponse.toXContent(builder, params);
return jobs.doXContentBody(builder, params);
}
@Override
public int hashCode() {
return Objects.hash(jobResponse);
return Objects.hash(jobs);
}
@Override
@ -279,7 +327,7 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
return false;
}
Response other = (Response) obj;
return Objects.equals(jobResponse, other.jobResponse);
return Objects.equals(jobs, other.jobs);
}
@SuppressWarnings("deprecation")
@ -332,21 +380,45 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
logger.debug("Get job '{}', config={}, data_counts={}, model_size_stats={}",
request.getJobId(), request.config(), request.dataCounts(), request.modelSizeStats());
// always get the job regardless of the request.config param because if the job
// can't be found a different response is returned.
Optional<Job> optionalJob = jobManager.getJob(request.getJobId(), state);
if (optionalJob.isPresent() == false) {
logger.debug(String.format(Locale.ROOT, "Cannot find job '%s'", request.getJobId()));
listener.onResponse(new Response());
return;
QueryPage<Response.JobInfo> response;
// Single Job
if (request.jobId != null && !request.jobId.isEmpty()) {
// always get the job regardless of the request.config param because if the job
// can't be found a different response is returned.
QueryPage<Job> jobs = jobManager.getJob(request.getJobId(), state);
if (jobs.hitCount() == 0) {
logger.debug(String.format(Locale.ROOT, "Cannot find job '%s'", request.getJobId()));
response = new QueryPage<>(Collections.emptyList(), 0);
listener.onResponse(new Response(response));
return;
} else if (jobs.hitCount() > 1) {
logger.error(String.format(Locale.ROOT, "More than one job found for jobId [%s]", request.getJobId()));
}
logger.debug("Returning job [" + request.getJobId() + "]");
Job jobConfig = request.config() ? jobs.hits().get(0) : null;
DataCounts dataCounts = readDataCounts(request.dataCounts(), request.getJobId());
ModelSizeStats modelSizeStats = readModelSizeStats(request.modelSizeStats(), request.getJobId());
Response.JobInfo jobInfo = new Response.JobInfo(jobConfig, dataCounts, modelSizeStats);
response = new QueryPage<>(Collections.singletonList(jobInfo), 1);
} else {
// Multiple Jobs
QueryPage<Job> jobsPage = jobManager.getJobs(request.pageParams.getFrom(), request.pageParams.getSize(), state);
List<Response.JobInfo> jobInfoList = new ArrayList<>();
for (Job job : jobsPage.hits()) {
Job jobConfig = request.config() ? job : null;
DataCounts dataCounts = readDataCounts(request.dataCounts(), job.getJobId());
ModelSizeStats modelSizeStats = readModelSizeStats(request.modelSizeStats(), job.getJobId());
Response.JobInfo jobInfo = new Response.JobInfo(jobConfig, dataCounts, modelSizeStats);
jobInfoList.add(jobInfo);
}
response = new QueryPage<>(jobInfoList, jobsPage.hitCount());
}
logger.debug("Returning job '" + optionalJob.get().getJobId() + "'");
Job job = request.config() && optionalJob.isPresent() ? optionalJob.get() : null;
DataCounts dataCounts = readDataCounts(request);
ModelSizeStats modelSizeStats = readModelSizeStats(request);
listener.onResponse(new Response(new Response.JobInfo(job, dataCounts, modelSizeStats)));
listener.onResponse(new Response(response));
}
@Override
@ -354,21 +426,19 @@ public class GetJobAction extends Action<GetJobAction.Request, GetJobAction.Resp
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
private DataCounts readDataCounts(Request request) {
if (request.dataCounts()) {
Optional<DataCounts> counts = processManager.getDataCounts(request.getJobId());
return counts.orElseGet(() -> jobProvider.dataCounts(request.getJobId()));
private DataCounts readDataCounts(boolean dataCounts, String jobId) {
if (dataCounts) {
Optional<DataCounts> counts = processManager.getDataCounts(jobId);
return counts.orElseGet(() -> jobProvider.dataCounts(jobId));
}
return null;
}
private ModelSizeStats readModelSizeStats(Request request) {
if (request.modelSizeStats()) {
Optional<ModelSizeStats> sizeStats = processManager.getModelSizeStats(request.getJobId());
return sizeStats.orElseGet(() -> jobProvider.modelSizeStats(request.getJobId()).orElse(null));
private ModelSizeStats readModelSizeStats(boolean modelSizeStats, String jobId) {
if (modelSizeStats) {
Optional<ModelSizeStats> sizeStats = processManager.getModelSizeStats(jobId);
return sizeStats.orElseGet(() -> jobProvider.modelSizeStats(jobId).orElse(null));
}
return null;
}
}

View File

@ -1,228 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.results.PageParams;
import java.io.IOException;
import java.util.Objects;
public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.Response, GetJobsAction.RequestBuilder> {
public static final GetJobsAction INSTANCE = new GetJobsAction();
public static final String NAME = "cluster:admin/prelert/jobs/get";
private GetJobsAction() {
super(NAME);
}
@Override
public GetJobsAction.RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public GetJobsAction.Response newResponse() {
return new Response();
}
public static class Request extends MasterNodeReadRequest<Request> implements ToXContent {
public static final ObjectParser<Request, ParseFieldMatcherSupplier> PARSER = new ObjectParser<>(NAME, Request::new);
static {
PARSER.declareObject(Request::setPageParams, PageParams.PARSER, PageParams.PAGE);
}
private PageParams pageParams = new PageParams(0, 100);
public PageParams getPageParams() {
return pageParams;
}
public void setPageParams(PageParams pageParams) {
this.pageParams = pageParams;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
pageParams = new PageParams(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
pageParams.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(PageParams.PAGE.getPreferredName(), pageParams);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(pageParams);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(pageParams, other.pageParams);
}
}
public static class Response extends ActionResponse implements ToXContent {
private QueryPage<Job> jobs;
public Response(QueryPage<Job> jobs) {
this.jobs = jobs;
}
public Response() {}
public QueryPage<Job> getResponse() {
return jobs;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobs = new QueryPage<>(in, Job::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
jobs.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return jobs.doXContentBody(builder, params);
}
@Override
public int hashCode() {
return Objects.hash(jobs);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return Objects.equals(jobs, other.jobs);
}
@SuppressWarnings("deprecation")
@Override
public final String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.prettyPrint();
builder.startObject();
toXContent(builder, EMPTY_PARAMS);
builder.endObject();
return builder.string();
} catch (Exception e) {
// So we have a stack trace logged somewhere
return "{ \"error\" : \"" + org.elasticsearch.ExceptionsHelper.detailedMessage(e) + "\"}";
}
}
}
public static class RequestBuilder extends MasterNodeReadOperationRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, GetJobsAction action) {
super(client, action, new Request());
}
}
public static class TransportAction extends TransportMasterNodeReadAction<Request, Response> {
private final JobManager jobManager;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager) {
super(settings, GetJobsAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
QueryPage<Job> jobsPage = jobManager.getJobs(request.pageParams.getFrom(), request.pageParams.getSize(), state);
listener.onResponse(new Response(jobsPage));
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}
}

View File

@ -46,6 +46,7 @@ import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterF
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.OldDataRemover;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import org.elasticsearch.xpack.prelert.utils.SingleDocument;
@ -53,7 +54,6 @@ import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -338,9 +338,9 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
throw new IllegalStateException(Messages.getMessage(Messages.REST_INVALID_REVERT_PARAMS));
}
Optional<Job> job = jobManager.getJob(request.getJobId(), clusterService.state());
QueryPage<Job> job = jobManager.getJob(request.getJobId(), clusterService.state());
Allocation allocation = jobManager.getJobAllocation(request.getJobId());
if (job.isPresent() && allocation.getStatus().equals(JobStatus.RUNNING)) {
if (job.hitCount() > 0 && allocation.getStatus().equals(JobStatus.RUNNING)) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -93,14 +94,14 @@ public class JobManager {
* with the given {@code jobId} exists, or an empty {@code Optional}
* otherwise
*/
public Optional<Job> getJob(String jobId, ClusterState clusterState) {
public QueryPage<Job> getJob(String jobId, ClusterState clusterState) {
PrelertMetadata prelertMetadata = clusterState.getMetaData().custom(PrelertMetadata.TYPE);
Job job = prelertMetadata.getJobs().get(jobId);
if (job == null) {
return Optional.empty();
return new QueryPage<>(Collections.emptyList(), 0);
}
return Optional.of(job);
return new QueryPage<>(Collections.singletonList(job), 1);
}
/**

View File

@ -7,20 +7,27 @@ package org.elasticsearch.xpack.prelert.rest.job;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.rest.action.RestStatusToXContentListener;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.GetJobAction;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.results.PageParams;
import java.io.IOException;
import java.util.Set;
public class RestGetJobAction extends BaseRestHandler {
private static final int DEFAULT_FROM = 0;
private static final int DEFAULT_SIZE = 100;
private final GetJobAction.TransportAction transportGetJobAction;
@ -28,24 +35,39 @@ public class RestGetJobAction extends BaseRestHandler {
public RestGetJobAction(Settings settings, RestController controller, GetJobAction.TransportAction transportGetJobAction) {
super(settings);
this.transportGetJobAction = transportGetJobAction;
// GETs
controller.registerHandler(RestRequest.Method.GET, PrelertPlugin.BASE_PATH + "jobs/{" + Job.ID.getPreferredName() + "}", this);
controller.registerHandler(RestRequest.Method.GET,
PrelertPlugin.BASE_PATH + "jobs/{" + Job.ID.getPreferredName() + "}/{metric}", this);
controller.registerHandler(RestRequest.Method.GET, PrelertPlugin.BASE_PATH + "jobs", this);
// POSTs
controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH + "jobs/{" + Job.ID.getPreferredName() + "}", this);
controller.registerHandler(RestRequest.Method.POST,
PrelertPlugin.BASE_PATH + "jobs/{" + Job.ID.getPreferredName() + "}/{metric}", this);
controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH + "jobs", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
GetJobAction.Request getJobRequest = new GetJobAction.Request(restRequest.param(Job.ID.getPreferredName()));
Set<String> stats = Strings.splitStringByCommaToSet(restRequest.param("metric", "config"));
if (stats.contains("_all")) {
getJobRequest.all();
}
else {
getJobRequest.config(stats.contains("config"));
getJobRequest.dataCounts(stats.contains("data_counts"));
getJobRequest.modelSizeStats(stats.contains("model_size_stats"));
}
return channel -> transportGetJobAction.execute(getJobRequest, new RestStatusToXContentListener<>(channel));
final GetJobAction.Request request;
if (RestActions.hasBodyContent(restRequest)) {
BytesReference bodyBytes = RestActions.getRestContent(restRequest);
XContentParser parser = XContentFactory.xContent(bodyBytes).createParser(bodyBytes);
request = GetJobAction.Request.PARSER.apply(parser, () -> parseFieldMatcher);
} else {
request = new GetJobAction.Request();
request.setJobId(restRequest.param(Job.ID.getPreferredName()));
Set<String> stats = Strings.splitStringByCommaToSet(
restRequest.param(GetJobAction.Request.METRIC.getPreferredName(), "config"));
request.setStats(stats);
request.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), DEFAULT_FROM),
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), DEFAULT_SIZE)));
}
return channel -> transportGetJobAction.execute(request, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -1,53 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.rest.job;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.GetJobsAction;
import org.elasticsearch.xpack.prelert.action.GetJobsAction.Response;
import org.elasticsearch.xpack.prelert.job.results.PageParams;
import java.io.IOException;
public class RestGetJobsAction extends BaseRestHandler {
private static final int DEFAULT_FROM = 0;
private static final int DEFAULT_SIZE = 100;
private final GetJobsAction.TransportAction transportGetJobsAction;
@Inject
public RestGetJobsAction(Settings settings, RestController controller, GetJobsAction.TransportAction transportGetJobsAction) {
super(settings);
this.transportGetJobsAction = transportGetJobsAction;
controller.registerHandler(RestRequest.Method.GET, PrelertPlugin.BASE_PATH + "jobs", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
final GetJobsAction.Request request;
if (RestActions.hasBodyContent(restRequest)) {
BytesReference bodyBytes = RestActions.getRestContent(restRequest);
XContentParser parser = XContentFactory.xContent(bodyBytes).createParser(bodyBytes);
request = GetJobsAction.Request.PARSER.apply(parser, () -> parseFieldMatcher);
} else {
request = new GetJobsAction.Request();
request.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), DEFAULT_FROM),
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), DEFAULT_SIZE)));
}
return channel -> transportGetJobsAction.execute(request, new RestToXContentListener<Response>(channel));
}
}

View File

@ -6,16 +6,26 @@
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.GetJobAction.Request;
import org.elasticsearch.xpack.prelert.job.results.PageParams;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
public class GetJobActionRequestTests extends AbstractStreamableTestCase<GetJobAction.Request> {
@Override
protected Request createTestInstance() {
Request instance = new Request(randomAsciiOfLengthBetween(1, 20));
Request instance = new Request();
instance.config(randomBoolean());
instance.dataCounts(randomBoolean());
instance.modelSizeStats(randomBoolean());
if (randomBoolean()) {
int from = randomInt(PageParams.MAX_FROM_SIZE_SUM);
int maxSize = PageParams.MAX_FROM_SIZE_SUM - from;
int size = randomInt(maxSize);
instance.setPageParams(new PageParams(from, size));
}
if (randomBoolean()) {
instance.setJobId(randomAsciiOfLengthBetween(1, 20));
}
return instance;
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.ModelDebugConfig;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.transform.TransformConfig;
import org.elasticsearch.xpack.prelert.job.transform.TransformType;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
@ -32,9 +33,10 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase<GetJob
@Override
protected Response createTestInstance() {
final Response result;
if (randomBoolean()) {
result = new Response();
} else {
int listSize = randomInt(10);
List<Response.JobInfo> jobInfoList = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
String jobId = randomAsciiOfLength(10);
String description = randomBoolean() ? randomAsciiOfLength(10) : null;
Date createTime = new Date(randomPositiveLong());
@ -71,7 +73,7 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase<GetJob
ModelSizeStats sizeStats = null;
if (randomBoolean()) {
dataCounts = new DataCounts(randomAsciiOfLength(10), randomIntBetween(1, 1_000_000),
dataCounts = new DataCounts(randomAsciiOfLength(10), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate());
@ -79,9 +81,12 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase<GetJob
if (randomBoolean()) {
sizeStats = new ModelSizeStats.Builder("foo").build();
}
result = new Response(new GetJobAction.Response.JobInfo(job, dataCounts, sizeStats));
Response.JobInfo jobInfo = new Response.JobInfo(job, dataCounts, sizeStats);
jobInfoList.add(jobInfo);
}
result = new Response(new QueryPage<>(jobInfoList, jobInfoList.size()));
return result;
}

View File

@ -1,38 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.action.GetJobsAction.Request;
import org.elasticsearch.xpack.prelert.job.results.PageParams;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableXContentTestCase;
public class GetJobsActionRequestTests extends AbstractStreamableXContentTestCase<GetJobsAction.Request> {
@Override
protected Request parseInstance(XContentParser parser, ParseFieldMatcher matcher) {
return GetJobsAction.Request.PARSER.apply(parser, () -> matcher);
}
@Override
protected Request createTestInstance() {
Request request = new Request();
if (randomBoolean()) {
int from = randomInt(PageParams.MAX_FROM_SIZE_SUM);
int maxSize = PageParams.MAX_FROM_SIZE_SUM - from;
int size = randomInt(maxSize);
request.setPageParams(new PageParams(from, size));
}
return request;
}
@Override
protected Request createBlankInstance() {
return new Request();
}
}

View File

@ -1,37 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.GetJobsAction.Response;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder;
import static org.elasticsearch.xpack.prelert.job.JobTests.randomValidJobId;
public class GetJobsActionResponseTests extends AbstractStreamableTestCase<GetJobsAction.Response> {
@Override
protected Response createTestInstance() {
int listSize = randomInt(10);
List<Job> hits = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
hits.add(buildJobBuilder(randomValidJobId()).build());
}
QueryPage<Job> buckets = new QueryPage<>(hits, listSize);
return new Response(buckets);
}
@Override
protected Response createBlankInstance() {
return new Response();
}
}

View File

@ -44,8 +44,7 @@ public class PrelertJobIT extends ESRestTestCase {
() -> client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/non-existing-job"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
assertThat(e.getMessage(), containsString("\"exists\":false"));
assertThat(e.getMessage(), containsString("\"type\":\"job\""));
assertThat(e.getMessage(), containsString("\"hitCount\":0"));
}
public void testGetJob_GivenJobExists() throws Exception {
@ -55,8 +54,7 @@ public class PrelertJobIT extends ESRestTestCase {
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"exists\":true"));
assertThat(responseAsString, containsString("\"type\":\"job\""));
assertThat(responseAsString, containsString("\"hitCount\":1"));
assertThat(responseAsString, containsString("\"jobId\":\"farequote\""));
}

View File

@ -58,8 +58,9 @@ public class JobManagerTests extends ESTestCase {
builder.putJob(buildJobBuilder("foo").build(), false);
ClusterState clusterState = ClusterState.builder(new ClusterName("name"))
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, builder.build())).build();
Optional<Job> doc = jobManager.getJob("foo", clusterState);
assertTrue(doc.isPresent());
QueryPage<Job> doc = jobManager.getJob("foo", clusterState);
assertTrue(doc.hitCount() > 0);
assertThat(doc.hits().get(0).getJobId(), equalTo("foo"));
}
public void testFilter() {

View File

@ -1,13 +1,16 @@
{
"xpack.prelert.get_job": {
"methods": [ "GET" ],
"methods": [ "GET", "POST" ],
"url": {
"path": "/_xpack/prelert/jobs/{job_id}",
"paths": [ "/_xpack/prelert/jobs/{job_id}", "/_xpack/prelert/jobs/{job_id}/{metric}"],
"path": "/_xpack/prelert/jobs/{jobId}",
"paths": [
"/_xpack/prelert/jobs",
"/_xpack/prelert/jobs/{jobId}",
"/_xpack/prelert/jobs/{jobId}/{metric}"
],
"parts": {
"job_id": {
"jobId": {
"type": "string",
"required": true,
"description": "The ID of the job to fetch"
},
"metric" : {
@ -15,8 +18,20 @@
"options" : ["_all", "config", "data_counts", "model_size_stats"],
"description" : "Limit the information returned to the specified statistics"
}
},
"params": {
"from": {
"type": "int",
"description": "skips a number of jobs"
},
"size": {
"type": "int",
"description": "specifies a max number of jobs to get"
}
}
},
"body": null
"body": {
"description" : "Job selection criteria"
}
}
}

View File

@ -1,19 +0,0 @@
{
"xpack.prelert.get_jobs": {
"methods": [ "GET" ],
"url": {
"path": "/_xpack/prelert/jobs",
"paths": [ "/_xpack/prelert/jobs" ],
"params": {
"from": {
"type": "int",
"description": "skips a number of jobs"
},
"size": {
"type": "int",
"description": "specifies a max number of jobs to get"
}
}
}
}
}

View File

@ -20,11 +20,11 @@ setup:
"Test get job stats after uploading data prompting the creation of some stats":
- do:
xpack.prelert.get_job:
job_id: job-stats-test
jobId: job-stats-test
- is_true: document.config
- is_false: document.data_counts
- is_false: document.model_size_stats
- is_true: hits.0.config
- is_false: hits.0.data_counts
- is_false: hits.0.model_size_stats
- do:
xpack.prelert.post_data:
@ -41,35 +41,35 @@ setup:
- do:
xpack.prelert.get_job:
job_id: job-stats-test
jobId: job-stats-test
metric: data_counts
- match: { document.data_counts.processed_record_count: 2 }
- match: { document.data_counts.processed_field_count: 4}
- match: { document.data_counts.input_field_count: 4 }
- match: { hits.0.data_counts.processed_record_count: 2 }
- match: { hits.0.data_counts.processed_field_count: 4}
- match: { hits.0.data_counts.input_field_count: 4 }
# Test filters
# It's difficult to test for the presence of model_size_stats as they
# won't be created with such a small data sample
- do:
xpack.prelert.get_job:
job_id: "job-stats-test"
jobId: "job-stats-test"
metric: "data_counts"
- is_false: document.config
- is_true: document.data_counts
- is_false: document.model_size_stats
- is_false: hits.0.config
- is_true: hits.0.data_counts
- is_false: hits.0.model_size_stats
- do:
xpack.prelert.get_job:
job_id: "job-stats-test"
jobId: "job-stats-test"
metric: "model_size_stats"
- is_false: document.config
- is_false: document.data_counts
- is_false: hits.0.config
- is_false: hits.0.data_counts
- do:
xpack.prelert.get_job:
job_id: "job-stats-test"
jobId: "job-stats-test"
metric: "_all"
- is_true: document.config
- is_true: document.data_counts
- is_true: hits.0.config
- is_true: hits.0.data_counts

View File

@ -24,18 +24,17 @@
- is_true: "prelertresults-farequote"
- do:
xpack.prelert.get_jobs:
xpack.prelert.get_job:
from: 0
size: 100
- match: { hitCount: 1 }
- match: { hits.0.jobId: "farequote" }
- match: { hits.0.config.jobId: "farequote" }
- do:
xpack.prelert.get_job:
job_id: "farequote"
- match: { exists: true }
- match: { type: "job" }
- match: { document.config.jobId: "farequote" }
jobId: "farequote"
- match: { hitCount: 1 }
- match: { hits.0.config.jobId: "farequote" }
- do:
xpack.prelert.delete_job:
@ -45,7 +44,7 @@
- do:
catch: missing
xpack.prelert.get_job:
job_id: "farequote"
jobId: "farequote"
- do:
indices.exists:
@ -57,7 +56,7 @@
- do:
catch: missing
xpack.prelert.get_job:
job_id: "non-existing"
jobId: "non-existing"
---
"Test put job with id that is already taken":

View File

@ -219,9 +219,9 @@ setup:
- do:
xpack.prelert.get_job:
job_id: foo
jobId: foo
metric: data_counts
- match: { document.data_counts.latest_record_timestamp: 1464739200000 }
- match: { hits.0.data_counts.latest_record_timestamp: 1464739200000 }