Redesign the get anomaly_detectors APIs (elastic/elasticsearch#570)

* Redesign the get anomaly_detectors APIs

This commit redesigns the APIs to get anomaly_detectors.

The new design has 2 GET APIs:

- An API to get the configurations: /anomaly_detectors/{job_id}
- An API to get the stats: /anomaly_detectors/{job_id}/_stats

For both APIs entering "_all" as the job_id returns results for
all jobs.

Note that page params have been removed. They were useful
when the configs were stored in an index. Now that they are part
of cluster state there is no need. Additionally, future support
for wildcard job_id expressions will give users a tool to narrow
down the GET actions to a certain subset of jobs which will be
more useful than the from/size approach.

Follow up:

- Implement similar GET APIs for schedulers
- Remove scheduler_stats from the anomaly_detectors _stats API
as it will be part of the schedulers _stats API

Closes elastic/elasticsearch#548

Original commit: elastic/x-pack-elasticsearch@046a0db8f5
This commit is contained in:
Dimitris Athanasiou 2016-12-19 13:09:54 +00:00 committed by GitHub
parent 4eda09d24a
commit 8750e5f926
23 changed files with 659 additions and 753 deletions

View File

@ -37,6 +37,7 @@ import org.elasticsearch.xpack.prelert.action.GetBucketsAction;
import org.elasticsearch.xpack.prelert.action.GetCategoriesDefinitionAction;
import org.elasticsearch.xpack.prelert.action.GetInfluencersAction;
import org.elasticsearch.xpack.prelert.action.GetJobsAction;
import org.elasticsearch.xpack.prelert.action.GetJobsStatsAction;
import org.elasticsearch.xpack.prelert.action.GetListAction;
import org.elasticsearch.xpack.prelert.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.prelert.action.GetRecordsAction;
@ -72,18 +73,17 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcessF
import org.elasticsearch.xpack.prelert.job.process.autodetect.BlackHoleAutodetectProcess;
import org.elasticsearch.xpack.prelert.job.process.autodetect.NativeAutodetectProcessFactory;
import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.prelert.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.prelert.scheduler.ScheduledJobRunner;
import org.elasticsearch.xpack.prelert.job.process.normalizer.NativeNormalizerProcessFactory;
import org.elasticsearch.xpack.prelert.job.process.normalizer.MultiplyingNormalizerProcess;
import org.elasticsearch.xpack.prelert.job.process.normalizer.NativeNormalizerProcessFactory;
import org.elasticsearch.xpack.prelert.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.prelert.job.process.normalizer.NormalizerProcessFactory;
import org.elasticsearch.xpack.prelert.scheduler.http.HttpDataExtractorFactory;
import org.elasticsearch.xpack.prelert.job.status.StatusReporter;
import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
import org.elasticsearch.xpack.prelert.rest.job.RestCloseJobAction;
import org.elasticsearch.xpack.prelert.rest.job.RestDeleteJobAction;
import org.elasticsearch.xpack.prelert.rest.job.RestFlushJobAction;
import org.elasticsearch.xpack.prelert.rest.job.RestGetJobsAction;
import org.elasticsearch.xpack.prelert.rest.job.RestGetJobsStatsAction;
import org.elasticsearch.xpack.prelert.rest.job.RestJobDataAction;
import org.elasticsearch.xpack.prelert.rest.job.RestOpenJobAction;
import org.elasticsearch.xpack.prelert.rest.job.RestPutJobAction;
@ -105,6 +105,8 @@ import org.elasticsearch.xpack.prelert.rest.schedulers.RestStopSchedulerAction;
import org.elasticsearch.xpack.prelert.rest.validate.RestValidateDetectorAction;
import org.elasticsearch.xpack.prelert.rest.validate.RestValidateTransformAction;
import org.elasticsearch.xpack.prelert.rest.validate.RestValidateTransformsAction;
import org.elasticsearch.xpack.prelert.scheduler.ScheduledJobRunner;
import org.elasticsearch.xpack.prelert.scheduler.http.HttpDataExtractorFactory;
import org.elasticsearch.xpack.prelert.utils.NamedPipeHelper;
import java.io.IOException;
@ -224,6 +226,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
public List<Class<? extends RestHandler>> getRestHandlers() {
return Arrays.asList(
RestGetJobsAction.class,
RestGetJobsStatsAction.class,
RestPutJobAction.class,
RestDeleteJobAction.class,
RestOpenJobAction.class,
@ -255,6 +258,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(GetJobsAction.INSTANCE, GetJobsAction.TransportAction.class),
new ActionHandler<>(GetJobsStatsAction.INSTANCE, GetJobsStatsAction.TransportAction.class),
new ActionHandler<>(PutJobAction.INSTANCE, PutJobAction.TransportAction.class),
new ActionHandler<>(DeleteJobAction.INSTANCE, DeleteJobAction.TransportAction.class),
new ActionHandler<>(OpenJobAction.INSTANCE, OpenJobAction.TransportAction.class),

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
@ -20,60 +19,30 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.results.PageParams;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.Response, GetJobsAction.RequestBuilder> {
public static final GetJobsAction INSTANCE = new GetJobsAction();
public static final String NAME = "cluster:admin/prelert/jobs/get";
private static final String ALL = "_all";
private static final String CONFIG = "config";
private static final String DATA_COUNTS = "data_counts";
private static final String MODEL_SIZE_STATS = "model_size_stats";
private static final String SCHEDULER_STATUS = "scheduler_status";
private static final String STATUS = "status";
private static final List<String> METRIC_WHITELIST = Arrays.asList(ALL, CONFIG, DATA_COUNTS,
MODEL_SIZE_STATS, SCHEDULER_STATUS, STATUS);
private GetJobsAction() {
super(NAME);
}
@ -90,125 +59,18 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
public static class Request extends MasterNodeReadRequest<Request> {
public static final ObjectParser<Request, ParseFieldMatcherSupplier> PARSER = new ObjectParser<>(NAME, Request::new);
public static final ParseField METRIC = new ParseField("metric");
private String jobId;
static {
PARSER.declareString(Request::setJobId, Job.ID);
PARSER.declareObject(Request::setPageParams, PageParams.PARSER, PageParams.PAGE);
PARSER.declareString((request, metric) -> {
Set<String> stats = Strings.splitStringByCommaToSet(metric);
request.setStats(stats);
}, METRIC);
public Request(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
private String jobId = null;
private PageParams pageParams = null;
private boolean config;
private boolean dataCounts;
private boolean modelSizeStats;
private boolean schedulerStatus;
private boolean status;
public Request() {
config = true;
}
public void setJobId(String jobId) {
if (pageParams != null) {
throw new IllegalArgumentException("Cannot set [from, size] when getting a single job.");
}
this.jobId = jobId;
}
Request() {}
public String getJobId() {
return jobId;
}
public PageParams getPageParams() {
return pageParams;
}
public void setPageParams(PageParams pageParams) {
if (jobId != null) {
throw new IllegalArgumentException("Cannot set [jobId] when getting multiple jobs.");
}
this.pageParams = ExceptionsHelper.requireNonNull(pageParams, PageParams.PAGE.getPreferredName());
}
public Request all() {
config = true;
dataCounts = true;
modelSizeStats = true;
schedulerStatus = true;
status = true;
return this;
}
public boolean config() {
return config;
}
public Request config(boolean config) {
this.config = config;
return this;
}
public boolean dataCounts() {
return dataCounts;
}
public Request dataCounts(boolean dataCounts) {
this.dataCounts = dataCounts;
return this;
}
public boolean modelSizeStats() {
return modelSizeStats;
}
public Request modelSizeStats(boolean modelSizeStats) {
this.modelSizeStats = modelSizeStats;
return this;
}
public boolean schedulerStatus() {
return schedulerStatus;
}
public Request schedulerStatus(boolean schedulerStatus) {
this.schedulerStatus = schedulerStatus;
return this;
}
public void setStats(Set<String> stats) {
for (String s : stats) {
if (!METRIC_WHITELIST.contains(s)) {
throw new ElasticsearchStatusException("Metric [" + s + "] is not a valid metric. "
+ "Accepted metrics are: [" + METRIC_WHITELIST + "]", RestStatus.BAD_REQUEST);
}
}
if (stats.contains(ALL)) {
all();
} else {
config(stats.contains(CONFIG));
dataCounts(stats.contains(DATA_COUNTS));
modelSizeStats(stats.contains(MODEL_SIZE_STATS));
schedulerStatus(stats.contains(SCHEDULER_STATUS));
status(stats.contains(STATUS));
}
}
public boolean status() {
return status;
}
public Request status(boolean status) {
this.status = status;
return this;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -217,30 +79,18 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readOptionalString();
config = in.readBoolean();
dataCounts = in.readBoolean();
modelSizeStats = in.readBoolean();
schedulerStatus = in.readBoolean();
status = in.readBoolean();
pageParams = in.readOptionalWriteable(PageParams::new);
jobId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(jobId);
out.writeBoolean(config);
out.writeBoolean(dataCounts);
out.writeBoolean(modelSizeStats);
out.writeBoolean(schedulerStatus);
out.writeBoolean(status);
out.writeOptionalWriteable(pageParams);
out.writeString(jobId);
}
@Override
public int hashCode() {
return Objects.hash(jobId, config, dataCounts, modelSizeStats, schedulerStatus, status, pageParams);
return Objects.hash(jobId);
}
@Override
@ -252,13 +102,7 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId)
&& this.config == other.config
&& this.dataCounts == other.dataCounts
&& this.modelSizeStats == other.modelSizeStats
&& this.schedulerStatus == other.schedulerStatus
&& this.status == other.status
&& Objects.equals(this.pageParams, other.pageParams);
return Objects.equals(jobId, other.jobId);
}
}
@ -271,137 +115,22 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
public static class Response extends ActionResponse implements StatusToXContent {
public static class JobInfo implements ToXContent, Writeable {
private final String jobId;
@Nullable
private Job jobConfig;
@Nullable
private DataCounts dataCounts;
@Nullable
private ModelSizeStats modelSizeStats;
@Nullable
private SchedulerStatus schedulerStatus;
@Nullable
private JobStatus status;
private QueryPage<Job> jobs;
JobInfo(String jobId, @Nullable Job job, @Nullable DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats,
@Nullable SchedulerStatus schedulerStatus, @Nullable JobStatus status) {
this.jobId = jobId;
this.jobConfig = job;
this.dataCounts = dataCounts;
this.modelSizeStats = modelSizeStats;
this.schedulerStatus = schedulerStatus;
this.status = status;
}
JobInfo(StreamInput in) throws IOException {
jobId = in.readString();
jobConfig = in.readOptionalWriteable(Job::new);
dataCounts = in.readOptionalWriteable(DataCounts::new);
modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new);
schedulerStatus = in.readOptionalWriteable(SchedulerStatus::fromStream);
status = in.readOptionalWriteable(JobStatus::fromStream);
}
public String getJobid() {
return jobId;
}
public Job getJobConfig() {
return jobConfig;
}
public DataCounts getDataCounts() {
return dataCounts;
}
public ModelSizeStats getModelSizeStats() {
return modelSizeStats;
}
public SchedulerStatus getSchedulerStatus() {
return schedulerStatus;
}
public JobStatus getStatus() {
return status;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
if (jobConfig != null) {
builder.field(CONFIG, jobConfig);
}
if (dataCounts != null) {
builder.field(DATA_COUNTS, dataCounts);
}
if (modelSizeStats != null) {
builder.field(MODEL_SIZE_STATS, modelSizeStats);
}
if (schedulerStatus != null) {
builder.field(SCHEDULER_STATUS, schedulerStatus);
}
if (status != null) {
builder.field(STATUS, status);
}
builder.endObject();
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeOptionalWriteable(jobConfig);
out.writeOptionalWriteable(dataCounts);
out.writeOptionalWriteable(modelSizeStats);
out.writeOptionalWriteable(schedulerStatus);
out.writeOptionalWriteable(status);
}
@Override
public int hashCode() {
return Objects.hash(jobId, jobConfig, dataCounts, modelSizeStats, schedulerStatus, status);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
JobInfo other = (JobInfo) obj;
return Objects.equals(jobId, other.jobId)
&& Objects.equals(jobConfig, other.jobConfig)
&& Objects.equals(this.dataCounts, other.dataCounts)
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.schedulerStatus, other.schedulerStatus)
&& Objects.equals(this.status, other.status);
}
}
private QueryPage<JobInfo> jobs;
public Response(QueryPage<JobInfo> jobs) {
public Response(QueryPage<Job> jobs) {
this.jobs = jobs;
}
public Response() {}
public QueryPage<JobInfo> getResponse() {
public QueryPage<Job> getResponse() {
return jobs;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobs = new QueryPage<>(in, JobInfo::new);
jobs = new QueryPage<>(in, Job::new);
}
@Override
@ -411,13 +140,13 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
}
@Override
public RestStatus status() {
return jobs.count() == 0 ? RestStatus.NOT_FOUND : RestStatus.OK;
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return jobs.doXContentBody(builder, params);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return jobs.doXContentBody(builder, params);
public RestStatus status() {
return jobs.count() == 0 ? RestStatus.NOT_FOUND : RestStatus.OK;
}
@Override
@ -458,18 +187,14 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
public static class TransportAction extends TransportMasterNodeReadAction<Request, Response> {
private final JobManager jobManager;
private final AutodetectProcessManager processManager;
private final JobProvider jobProvider;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager, AutodetectProcessManager processManager, JobProvider jobProvider) {
JobManager jobManager) {
super(settings, GetJobsAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;
this.processManager = processManager;
this.jobProvider = jobProvider;
}
@Override
@ -484,86 +209,14 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
logger.debug("Get job '{}', config={}, data_counts={}, model_size_stats={}",
request.getJobId(), request.config(), request.dataCounts(), request.modelSizeStats());
QueryPage<Response.JobInfo> response;
// Single Job
if (request.jobId != null && !request.jobId.isEmpty()) {
// always get the job regardless of the request.config param because if the job
// can't be found a different response is returned.
QueryPage<Job> jobs = jobManager.getJob(request.getJobId(), state);
if (jobs.count() == 0) {
logger.debug(String.format(Locale.ROOT, "Cannot find job '%s'", request.getJobId()));
throw QueryPage.emptyQueryPage(Job.RESULTS_FIELD);
} else if (jobs.count() > 1) {
logger.error("More than one job found for {} [{}]", Job.ID.getPreferredName(), request.getJobId());
}
logger.debug("Returning job [" + request.getJobId() + "]");
Job jobConfig = request.config() ? jobs.results().get(0) : null;
DataCounts dataCounts = readDataCounts(request.dataCounts(), request.getJobId());
ModelSizeStats modelSizeStats = readModelSizeStats(request.modelSizeStats(), request.getJobId());
SchedulerStatus schedulerStatus = readSchedulerStatus(request.schedulerStatus(), request.getJobId());
JobStatus jobStatus = readJobStatus(request.status(), request.getJobId());
Response.JobInfo jobInfo = new Response.JobInfo(
request.getJobId(), jobConfig, dataCounts, modelSizeStats, schedulerStatus, jobStatus);
response = new QueryPage<>(Collections.singletonList(jobInfo), 1, Job.RESULTS_FIELD);
} else if (request.getPageParams() != null) {
// Multiple Jobs
PageParams pageParams = request.getPageParams();
QueryPage<Job> jobsPage = jobManager.getJobs(pageParams.getFrom(), pageParams.getSize(), state);
List<Response.JobInfo> jobInfoList = new ArrayList<>();
for (Job job : jobsPage.results()) {
Job jobConfig = request.config() ? job : null;
DataCounts dataCounts = readDataCounts(request.dataCounts(), job.getId());
ModelSizeStats modelSizeStats = readModelSizeStats(request.modelSizeStats(), job.getId());
SchedulerStatus schedulerStatus = readSchedulerStatus(request.schedulerStatus(), job.getId());
JobStatus jobStatus = readJobStatus(request.status(), job.getId());
Response.JobInfo jobInfo = new Response.JobInfo(job.getId(), jobConfig, dataCounts, modelSizeStats,
schedulerStatus, jobStatus);
jobInfoList.add(jobInfo);
}
response = new QueryPage<>(jobInfoList, jobsPage.count(), Job.RESULTS_FIELD);
} else {
throw new IllegalStateException("Both jobId and pageParams are null");
}
listener.onResponse(new Response(response));
logger.debug("Get job '{}'", request.getJobId());
QueryPage<Job> jobs = jobManager.getJob(request.getJobId(), state);
listener.onResponse(new Response(jobs));
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
private DataCounts readDataCounts(boolean dataCounts, String jobId) {
if (dataCounts) {
Optional<DataCounts> counts = processManager.getDataCounts(jobId);
return counts.orElseGet(() -> jobProvider.dataCounts(jobId));
}
return null;
}
private ModelSizeStats readModelSizeStats(boolean modelSizeStats, String jobId) {
if (modelSizeStats) {
Optional<ModelSizeStats> sizeStats = processManager.getModelSizeStats(jobId);
return sizeStats.orElseGet(() -> jobProvider.modelSizeStats(jobId).orElse(null));
}
return null;
}
private SchedulerStatus readSchedulerStatus(boolean schedulerState, String jobId) {
return schedulerState ? jobManager.getSchedulerStatus(jobId).orElse(null) : null;
}
private JobStatus readJobStatus(boolean status, String jobId) {
return status ? jobManager.getJobStatus(jobId) : null;
}
}
}

View File

@ -0,0 +1,344 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJobsStatsAction.Response, GetJobsStatsAction.RequestBuilder> {
public static final GetJobsStatsAction INSTANCE = new GetJobsStatsAction();
public static final String NAME = "cluster:admin/prelert/jobs/stats/get";
private static final String ALL = "_all";
private static final String CONFIG = "config";
private static final String DATA_COUNTS = "data_counts";
private static final String MODEL_SIZE_STATS = "model_size_stats";
private static final String SCHEDULER_STATUS = "scheduler_status";
private static final String STATUS = "status";
private GetJobsStatsAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends ActionRequest {
private String jobId;
public Request(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
Request() {}
public String getJobId() {
return jobId;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
}
@Override
public int hashCode() {
return Objects.hash(jobId);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId);
}
}
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, GetJobsStatsAction action) {
super(client, action, new Request());
}
}
public static class Response extends ActionResponse implements StatusToXContent {
public static class JobStats implements ToXContent, Writeable {
private final String jobId;
private DataCounts dataCounts;
@Nullable
private ModelSizeStats modelSizeStats;
private JobStatus status;
@Nullable
private SchedulerStatus schedulerStatus;
JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobStatus status,
@Nullable SchedulerStatus schedulerStatus) {
this.jobId = Objects.requireNonNull(jobId);
this.dataCounts = Objects.requireNonNull(dataCounts);
this.modelSizeStats = modelSizeStats;
this.status = Objects.requireNonNull(status);
this.schedulerStatus = schedulerStatus;
}
JobStats(StreamInput in) throws IOException {
jobId = in.readString();
dataCounts = new DataCounts(in);
modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new);
status = JobStatus.fromStream(in);
schedulerStatus = in.readOptionalWriteable(SchedulerStatus::fromStream);
}
public String getJobid() {
return jobId;
}
public DataCounts getDataCounts() {
return dataCounts;
}
public ModelSizeStats getModelSizeStats() {
return modelSizeStats;
}
public SchedulerStatus getSchedulerStatus() {
return schedulerStatus;
}
public JobStatus getStatus() {
return status;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(DATA_COUNTS, dataCounts);
if (modelSizeStats != null) {
builder.field(MODEL_SIZE_STATS, modelSizeStats);
}
builder.field(STATUS, status);
if (schedulerStatus != null) {
builder.field(SCHEDULER_STATUS, schedulerStatus);
}
builder.endObject();
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
dataCounts.writeTo(out);
out.writeOptionalWriteable(modelSizeStats);
status.writeTo(out);
out.writeOptionalWriteable(schedulerStatus);
}
@Override
public int hashCode() {
return Objects.hash(jobId, dataCounts, modelSizeStats, schedulerStatus, status);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
JobStats other = (JobStats) obj;
return Objects.equals(jobId, other.jobId)
&& Objects.equals(this.dataCounts, other.dataCounts)
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.status, other.status)
&& Objects.equals(this.schedulerStatus, other.schedulerStatus);
}
}
private QueryPage<JobStats> jobsStats;
public Response(QueryPage<JobStats> jobsStats) {
this.jobsStats = jobsStats;
}
public Response() {}
public QueryPage<JobStats> getResponse() {
return jobsStats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobsStats = new QueryPage<>(in, JobStats::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
jobsStats.writeTo(out);
}
@Override
public RestStatus status() {
return jobsStats.count() == 0 ? RestStatus.NOT_FOUND : RestStatus.OK;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return jobsStats.doXContentBody(builder, params);
}
@Override
public int hashCode() {
return Objects.hash(jobsStats);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return Objects.equals(jobsStats, other.jobsStats);
}
@SuppressWarnings("deprecation")
@Override
public final String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.prettyPrint();
builder.startObject();
toXContent(builder, EMPTY_PARAMS);
builder.endObject();
return builder.string();
} catch (Exception e) {
// So we have a stack trace logged somewhere
return "{ \"error\" : \"" + org.elasticsearch.ExceptionsHelper.detailedMessage(e) + "\"}";
}
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final ClusterService clusterService;
private final JobManager jobManager;
private final AutodetectProcessManager processManager;
private final JobProvider jobProvider;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager, AutodetectProcessManager processManager, JobProvider jobProvider) {
super(settings, GetJobsStatsAction.NAME, false, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.clusterService = clusterService;
this.jobManager = jobManager;
this.processManager = processManager;
this.jobProvider = jobProvider;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
logger.debug("Get stats for job '{}'", request.getJobId());
List<Response.JobStats> jobsStats = new ArrayList<>();
QueryPage<Job> jobs = jobManager.getJob(request.getJobId(), clusterService.state());
PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE);
for (Job job : jobs.results()) {
DataCounts dataCounts = readDataCounts(job.getId());
ModelSizeStats modelSizeStats = readModelSizeStats(job.getId());
JobStatus status = prelertMetadata.getAllocations().get(job.getId()).getStatus();
Optional<SchedulerStatus> schedulerStatus = prelertMetadata.getSchedulerStatusByJobId(job.getId());
jobsStats.add(new Response.JobStats(job.getId(), dataCounts, modelSizeStats, status, schedulerStatus.orElse(null)));
}
QueryPage<Response.JobStats> jobsStatsPage = new QueryPage<>(jobsStats, jobsStats.size(), Job.RESULTS_FIELD);
listener.onResponse(new GetJobsStatsAction.Response(jobsStatsPage));
}
private DataCounts readDataCounts(String jobId) {
Optional<DataCounts> counts = processManager.getDataCounts(jobId);
return counts.orElseGet(() -> jobProvider.dataCounts(jobId));
}
private ModelSizeStats readModelSizeStats(String jobId) {
Optional<ModelSizeStats> sizeStats = processManager.getModelSizeStats(jobId);
return sizeStats.orElseGet(() -> jobProvider.modelSizeStats(jobId).orElse(null));
}
}
}

View File

@ -78,6 +78,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("jobs");
public static final String ALL = "_all";
public static final ObjectParser<Builder, ParseFieldMatcherSupplier> PARSER = new ObjectParser<>("job_details", Builder::new);
static {

View File

@ -25,7 +25,6 @@ import org.elasticsearch.xpack.prelert.job.IgnoreDowntime;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
@ -34,13 +33,14 @@ import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
@ -79,41 +79,38 @@ public class JobManager extends AbstractComponent {
}
/**
* Get the details of the specific job wrapped in a <code>Optional</code>
* Get the jobs that match the given {@code jobId}.
* Note that when the {@code jocId} is {@link Job#ALL} all jobs are returned.
*
* @param jobId
* the jobId
* @return An {@code Optional} containing the {@code Job} if a job
* with the given {@code jobId} exists, or an empty {@code Optional}
* otherwise
* @return A {@link QueryPage} containing the matching {@code Job}s
*/
public QueryPage<Job> getJob(String jobId, ClusterState clusterState) {
if (jobId.equals(Job.ALL)) {
return getJobs(clusterState);
}
PrelertMetadata prelertMetadata = clusterState.getMetaData().custom(PrelertMetadata.TYPE);
Job job = prelertMetadata.getJobs().get(jobId);
if (job == null) {
logger.debug(String.format(Locale.ROOT, "Cannot find job '%s'", jobId));
throw QueryPage.emptyQueryPage(Job.RESULTS_FIELD);
}
logger.debug("Returning job [" + jobId + "]");
return new QueryPage<>(Collections.singletonList(job), 1, Job.RESULTS_FIELD);
}
/**
* Get details of all Jobs.
*
* @param from
* Skip the first N Jobs. This parameter is for paging results if
* not required set to 0.
* @param size
* Take only this number of Jobs
* @return A query page object with hitCount set to the total number of jobs
* not the only the number returned here as determined by the
* <code>size</code> parameter.
*/
public QueryPage<Job> getJobs(int from, int size, ClusterState clusterState) {
public QueryPage<Job> getJobs(ClusterState clusterState) {
PrelertMetadata prelertMetadata = clusterState.getMetaData().custom(PrelertMetadata.TYPE);
List<Job> jobs = prelertMetadata.getJobs().entrySet().stream()
.skip(from)
.limit(size)
.map(Map.Entry::getValue)
.collect(Collectors.toList());
return new QueryPage<>(jobs, prelertMetadata.getJobs().size(), Job.RESULTS_FIELD);
@ -254,11 +251,6 @@ public class JobManager extends AbstractComponent {
return buildNewClusterState(currentState, builder);
}
public Optional<SchedulerStatus> getSchedulerStatus(String jobId) {
PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE);
return prelertMetadata.getSchedulerStatusByJobId(jobId);
}
public void updateSchedulerStatus(UpdateSchedulerStatusAction.Request request,
ActionListener<UpdateSchedulerStatusAction.Response> actionListener) {
String schedulerId = request.getSchedulerId();
@ -348,10 +340,6 @@ public class JobManager extends AbstractComponent {
});
}
public JobStatus getJobStatus(String jobId) {
return getJobAllocation(jobId).getStatus();
}
public void setJobStatus(UpdateJobStatusAction.Request request, ActionListener<UpdateJobStatusAction.Response> actionListener) {
clusterService.submitStateUpdateTask("set-job-status-" + request.getStatus() + "-" + request.getJobId(),
new AckedClusterStateUpdateTask<UpdateJobStatusAction.Response>(request, actionListener) {

View File

@ -6,12 +6,8 @@
package org.elasticsearch.xpack.prelert.rest.job;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
@ -19,10 +15,8 @@ import org.elasticsearch.rest.action.RestStatusToXContentListener;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.GetJobsAction;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.results.PageParams;
import java.io.IOException;
import java.util.Set;
public class RestGetJobsAction extends BaseRestHandler {
@ -33,46 +27,13 @@ public class RestGetJobsAction extends BaseRestHandler {
super(settings);
this.transportGetJobAction = transportGetJobAction;
// GETs
controller.registerHandler(RestRequest.Method.GET, PrelertPlugin.BASE_PATH
+ "anomaly_detectors/{" + Job.ID.getPreferredName() + "}/_stats", this);
controller.registerHandler(RestRequest.Method.GET,
PrelertPlugin.BASE_PATH + "anomaly_detectors/{" + Job.ID.getPreferredName() + "}/_stats/{metric}", this);
controller.registerHandler(RestRequest.Method.GET, PrelertPlugin.BASE_PATH + "anomaly_detectors/_stats", this);
// POSTs
controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH
+ "anomaly_detectors/{" + Job.ID.getPreferredName() + "}/_stats", this);
controller.registerHandler(RestRequest.Method.POST,
PrelertPlugin.BASE_PATH + "anomaly_detectors/{" + Job.ID.getPreferredName() + "}/_stats/{metric}", this);
controller.registerHandler(RestRequest.Method.POST, PrelertPlugin.BASE_PATH + "anomaly_detectors/_stats", this);
+ "anomaly_detectors/{" + Job.ID.getPreferredName() + "}", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
final GetJobsAction.Request request;
if (restRequest.hasContentOrSourceParam()) {
BytesReference bodyBytes = restRequest.contentOrSourceParam();
XContentParser parser = XContentFactory.xContent(bodyBytes).createParser(bodyBytes);
request = GetJobsAction.Request.PARSER.apply(parser, () -> parseFieldMatcher);
} else {
String jobId = restRequest.param(Job.ID.getPreferredName());
request = new GetJobsAction.Request();
if (jobId != null && !jobId.isEmpty()) {
request.setJobId(jobId);
}
if (restRequest.hasParam(PageParams.FROM.getPreferredName())
|| restRequest.hasParam(PageParams.SIZE.getPreferredName())
|| jobId == null) {
request.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE)));
}
Set<String> stats = Strings.splitStringByCommaToSet(
restRequest.param(GetJobsAction.Request.METRIC.getPreferredName(), "config"));
request.setStats(stats);
}
GetJobsAction.Request request = new GetJobsAction.Request(restRequest.param(Job.ID.getPreferredName()));
return channel -> transportGetJobAction.execute(request, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.rest.job;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestStatusToXContentListener;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.GetJobsStatsAction;
import org.elasticsearch.xpack.prelert.job.Job;
import java.io.IOException;
public class RestGetJobsStatsAction extends BaseRestHandler {
private final GetJobsStatsAction.TransportAction transportGetJobsStatsAction;
@Inject
public RestGetJobsStatsAction(Settings settings, RestController controller,
GetJobsStatsAction.TransportAction transportGetJobsStatsAction) {
super(settings);
this.transportGetJobsStatsAction = transportGetJobsStatsAction;
controller.registerHandler(RestRequest.Method.GET, PrelertPlugin.BASE_PATH
+ "anomaly_detectors/{" + Job.ID.getPreferredName() + "}/_stats", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(restRequest.param(Job.ID.getPreferredName()));
return channel -> transportGetJobsStatsAction.execute(request, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -6,28 +6,14 @@
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.GetJobsAction.Request;
import org.elasticsearch.xpack.prelert.job.results.PageParams;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
public class GetJobActionRequestTests extends AbstractStreamableTestCase<GetJobsAction.Request> {
@Override
protected Request createTestInstance() {
Request instance = new Request();
instance.config(randomBoolean());
instance.dataCounts(randomBoolean());
instance.modelSizeStats(randomBoolean());
instance.schedulerStatus(randomBoolean());
instance.status(randomBoolean());
if (randomBoolean()) {
int from = randomInt(PageParams.MAX_FROM_SIZE_SUM);
int maxSize = PageParams.MAX_FROM_SIZE_SUM - from;
int size = randomInt(maxSize);
instance.setPageParams(new PageParams(from, size));
} else {
instance.setJobId(randomAsciiOfLengthBetween(1, 20));
}
return instance;
return new Request(randomBoolean() ? Job.ALL : randomAsciiOfLengthBetween(1, 20));
}
@Override

View File

@ -8,25 +8,19 @@ package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.GetJobsAction.Response;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.AnalysisLimits;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.IgnoreDowntime;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelDebugConfig;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.transform.TransformConfig;
import org.elasticsearch.xpack.prelert.job.transform.TransformType;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -37,7 +31,7 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase<GetJob
final Response result;
int listSize = randomInt(10);
List<Response.JobInfo> jobInfoList = new ArrayList<>(listSize);
List<Job> jobList = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
String jobId = randomAsciiOfLength(10);
String description = randomBoolean() ? randomAsciiOfLength(10) : null;
@ -68,35 +62,10 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase<GetJob
modelDebugConfig, ignoreDowntime, normalizationWindowDays, backgroundPersistInterval,
modelSnapshotRetentionDays, resultsRetentionDays, customConfig, modelSnapshotId);
DataCounts dataCounts = null;
if (randomBoolean()) {
dataCounts = new DataCounts(randomAsciiOfLength(10), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate());
}
ModelSizeStats sizeStats = null;
if (randomBoolean()) {
sizeStats = new ModelSizeStats.Builder("foo").build();
}
SchedulerStatus schedulerStatus = null;
if (randomBoolean()) {
schedulerStatus = randomFrom(SchedulerStatus.values());
}
JobStatus jobStatus = null;
if (randomBoolean()) {
jobStatus = randomFrom(EnumSet.allOf(JobStatus.class));
}
Response.JobInfo jobInfo = new Response.JobInfo(jobId, job, dataCounts, sizeStats, schedulerStatus, jobStatus);
jobInfoList.add(jobInfo);
jobList.add(job);
}
result = new Response(new QueryPage<>(jobInfoList, jobInfoList.size(), Job.RESULTS_FIELD));
result = new Response(new QueryPage<>(jobList, jobList.size(), Job.RESULTS_FIELD));
return result;
}

View File

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.GetJobsStatsAction.Request;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
public class GetJobsStatsActionRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomBoolean() ? Job.ALL : randomAsciiOfLengthBetween(1, 20));
}
@Override
protected Request createBlankInstance() {
return new Request();
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.GetJobsStatsAction.Response;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
public class GetJobsStatsActionResponseTests extends AbstractStreamableTestCase<Response> {
@Override
protected Response createTestInstance() {
final Response result;
int listSize = randomInt(10);
List<Response.JobStats> jobStatsList = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
String jobId = randomAsciiOfLength(10);
DataCounts dataCounts = new DataCounts(randomAsciiOfLength(10), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate());
ModelSizeStats sizeStats = null;
if (randomBoolean()) {
sizeStats = new ModelSizeStats.Builder("foo").build();
}
JobStatus jobStatus = randomFrom(EnumSet.allOf(JobStatus.class));
SchedulerStatus schedulerStatus = null;
if (randomBoolean()) {
schedulerStatus = randomFrom(SchedulerStatus.values());
}
Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobStatus, schedulerStatus);
jobStatsList.add(jobStats);
}
result = new Response(new QueryPage<>(jobStatsList, jobStatsList.size(), Job.RESULTS_FIELD));
return result;
}
@Override
protected Response createBlankInstance() {
return new Response();
}
}

View File

@ -23,12 +23,11 @@ import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.prelert.scheduler.Scheduler;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.scheduler.Scheduler;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.junit.After;
import java.io.IOException;
@ -221,12 +220,10 @@ public class ScheduledJobsIT extends ESIntegTestCase {
client.execute(StopSchedulerAction.INSTANCE, new StopSchedulerAction.Request(schedulerId)).get();
assertTrue(response.isAcknowledged());
assertBusy(() -> {
GetJobsAction.Response r = null;
GetJobsStatsAction.Response r = null;
try {
GetJobsAction.Request request = new GetJobsAction.Request();
request.setJobId(jobId);
request.schedulerStatus(true);
r = client.execute(GetJobsAction.INSTANCE, request).get();
GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId);
r = client.execute(GetJobsStatsAction.INSTANCE, request).get();
} catch (Exception e) {
fail();
}

View File

@ -68,34 +68,10 @@ public class PrelertJobIT extends ESRestTestCase {
assertThat(responseAsString, containsString("\"job_id\":\"farequote\""));
}
public void testGetJobs_GivenNegativeFrom() throws Exception {
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/_stats?from=-1"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
assertThat(e.getMessage(), containsString("\"reason\":\"Parameter [from] cannot be < 0\""));
}
public void testGetJobs_GivenNegativeSize() throws Exception {
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/_stats?size=-1"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
assertThat(e.getMessage(), containsString("\"reason\":\"Parameter [size] cannot be < 0\""));
}
public void testGetJobs_GivenFromAndSizeSumTo10001() throws Exception {
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/_stats?from=1000&size=11001"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
assertThat(e.getMessage(), containsString("\"reason\":\"The sum of parameters [from] and [size] cannot be higher than 10000."));
}
public void testGetJobs_GivenSingleJob() throws Exception {
createFarequoteJob();
Response response = client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/_stats");
Response response = client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/_all");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
String responseAsString = responseEntityToString(response);
@ -108,7 +84,7 @@ public class PrelertJobIT extends ESRestTestCase {
createFarequoteJob("farequote_2");
createFarequoteJob("farequote_3");
Response response = client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/_stats");
Response response = client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/_all");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
String responseAsString = responseEntityToString(response);
@ -118,51 +94,6 @@ public class PrelertJobIT extends ESRestTestCase {
assertThat(responseAsString, containsString("\"job_id\":\"farequote_3\""));
}
public void testGetJobs_GivenMultipleJobsAndFromIsOne() throws Exception {
createFarequoteJob("farequote_1");
createFarequoteJob("farequote_2");
createFarequoteJob("farequote_3");
Response response = client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/_stats?from=1");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":3"));
assertThat(responseAsString, not(containsString("\"job_id\":\"farequote_1\"")));
assertThat(responseAsString, containsString("\"job_id\":\"farequote_2\""));
assertThat(responseAsString, containsString("\"job_id\":\"farequote_3\""));
}
public void testGetJobs_GivenMultipleJobsAndSizeIsOne() throws Exception {
createFarequoteJob("farequote_1");
createFarequoteJob("farequote_2");
createFarequoteJob("farequote_3");
Response response = client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/_stats?size=1");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":3"));
assertThat(responseAsString, containsString("\"job_id\":\"farequote_1\""));
assertThat(responseAsString, not(containsString("\"job_id\":\"farequote_2\"")));
assertThat(responseAsString, not(containsString("\"job_id\":\"farequote_3\"")));
}
public void testGetJobs_GivenMultipleJobsAndFromIsOneAndSizeIsOne() throws Exception {
createFarequoteJob("farequote_1");
createFarequoteJob("farequote_2");
createFarequoteJob("farequote_3");
Response response = client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/_stats?from=1&size=1");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":3"));
assertThat(responseAsString, not(containsString("\"job_id\":\"farequote_1\"")));
assertThat(responseAsString, containsString("\"job_id\":\"farequote_2\""));
assertThat(responseAsString, not(containsString("\"job_id\":\"farequote_3\"")));
}
private Response createFarequoteJob() throws Exception {
return createFarequoteJob("farequote");
}

View File

@ -39,8 +39,8 @@ public class ScheduledJobIT extends ESRestTestCase {
assertThat(responseEntityToString(startSchedulerRequest), containsString("{\"task\":\""));
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats",
Collections.singletonMap("metric", "data_counts"));
Response getJobResponse = client().performRequest("get",
PrelertPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
assertThat(responseEntityToString(getJobResponse), containsString("\"input_record_count\":2"));
} catch (Exception e) {
throw new RuntimeException(e);
@ -62,8 +62,8 @@ public class ScheduledJobIT extends ESRestTestCase {
assertThat(responseEntityToString(response), containsString("{\"task\":\""));
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats",
Collections.singletonMap("metric", "data_counts"));
Response getJobResponse = client().performRequest("get",
PrelertPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String responseAsString = responseEntityToString(getJobResponse);
assertThat(responseAsString, containsString("\"input_record_count\":2"));
} catch (Exception e1) {

View File

@ -96,6 +96,22 @@ public class JobManagerTests extends ESTestCase {
expectThrows(ResourceNotFoundException.class, () -> jobManager.getJobAllocation("bar"));
}
public void testGetJob_GivenJobIdIsAll() {
PrelertMetadata.Builder prelertMetadata = new PrelertMetadata.Builder();
for (int i = 0; i < 3; i++) {
prelertMetadata.putJob(buildJobBuilder(Integer.toString(i)).build(), false);
}
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata.build())).build();
JobManager jobManager = createJobManager();
QueryPage<Job> result = jobManager.getJob("_all", clusterState);
assertThat(result.count(), equalTo(3L));
assertThat(result.results().get(0).getId(), equalTo("0"));
assertThat(result.results().get(1).getId(), equalTo("1"));
assertThat(result.results().get(2).getId(), equalTo("2"));
}
public void testGetJobs() {
PrelertMetadata.Builder prelertMetadata = new PrelertMetadata.Builder();
for (int i = 0; i < 10; i++) {
@ -105,7 +121,7 @@ public class JobManagerTests extends ESTestCase {
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata.build())).build();
JobManager jobManager = createJobManager();
QueryPage<Job> result = jobManager.getJobs(0, 10, clusterState);
QueryPage<Job> result = jobManager.getJobs(clusterState);
assertThat(result.count(), equalTo(10L));
assertThat(result.results().get(0).getId(), equalTo("0"));
assertThat(result.results().get(1).getId(), equalTo("1"));
@ -117,30 +133,6 @@ public class JobManagerTests extends ESTestCase {
assertThat(result.results().get(7).getId(), equalTo("7"));
assertThat(result.results().get(8).getId(), equalTo("8"));
assertThat(result.results().get(9).getId(), equalTo("9"));
result = jobManager.getJobs(0, 5, clusterState);
assertThat(result.count(), equalTo(10L));
assertThat(result.results().get(0).getId(), equalTo("0"));
assertThat(result.results().get(1).getId(), equalTo("1"));
assertThat(result.results().get(2).getId(), equalTo("2"));
assertThat(result.results().get(3).getId(), equalTo("3"));
assertThat(result.results().get(4).getId(), equalTo("4"));
result = jobManager.getJobs(5, 5, clusterState);
assertThat(result.count(), equalTo(10L));
assertThat(result.results().get(0).getId(), equalTo("5"));
assertThat(result.results().get(1).getId(), equalTo("6"));
assertThat(result.results().get(2).getId(), equalTo("7"));
assertThat(result.results().get(3).getId(), equalTo("8"));
assertThat(result.results().get(4).getId(), equalTo("9"));
result = jobManager.getJobs(9, 1, clusterState);
assertThat(result.count(), equalTo(10L));
assertThat(result.results().get(0).getId(), equalTo("9"));
result = jobManager.getJobs(9, 10, clusterState);
assertThat(result.count(), equalTo(10L));
assertThat(result.results().get(0).getId(), equalTo("9"));
}
private JobManager createJobManager() {

View File

@ -1,37 +1,19 @@
{
"xpack.prelert.get_jobs": {
"methods": [ "GET", "POST" ],
"methods": [ "GET"],
"url": {
"path": "/_xpack/prelert/anomaly_detectors/{job_id}",
"paths": [
"/_xpack/prelert/anomaly_detectors/_stats",
"/_xpack/prelert/anomaly_detectors/{job_id}/_stats",
"/_xpack/prelert/anomaly_detectors/{job_id}/_stats/{metric}"
"/_xpack/prelert/anomaly_detectors/{job_id}"
],
"parts": {
"job_id": {
"type": "string",
"description": "The ID of the job to fetch"
},
"metric" : {
"type" : "list",
"options" : ["_all", "config", "data_counts", "model_size_stats", "scheduler_status", "status"],
"description" : "Limit the information returned to the specified statistics"
}
},
"params": {
"from": {
"type": "int",
"description": "skips a number of jobs"
},
"size": {
"type": "int",
"description": "specifies a max number of jobs to get"
"required": true,
"description": "The ID of the jobs to fetch"
}
}
},
"body": {
"description": "Job selection criteria"
}
"body": null
}
}

View File

@ -0,0 +1,19 @@
{
"xpack.prelert.get_jobs_stats": {
"methods": [ "GET"],
"url": {
"path": "/_xpack/prelert/anomaly_detectors/{job_id}/_stats",
"paths": [
"/_xpack/prelert/anomaly_detectors/{job_id}/_stats"
],
"parts": {
"job_id": {
"type": "string",
"required": true,
"description": "The ID of the jobs stats to fetch"
}
}
},
"body": null
}
}

View File

@ -23,34 +23,17 @@
index: "prelertresults-farequote"
- is_true: "prelertresults-farequote"
- do:
xpack.prelert.get_jobs:
from: 0
size: 100
- match: { count: 1 }
- match: { jobs.0.config.job_id: "farequote" }
- do:
xpack.prelert.get_jobs:
job_id: "farequote"
- match: { count: 1 }
- match: { jobs.0.config.job_id: "farequote" }
- do:
xpack.prelert.get_jobs: {}
- match: { count: 1 }
- match: { jobs.0.config.job_id: "farequote" }
- match: { jobs.0.job_id: "farequote" }
- do:
xpack.prelert.delete_job:
job_id: "farequote"
- match: { acknowledged: true }
- do:
catch: missing
xpack.prelert.get_jobs:
job_id: "farequote"
- do:
indices.exists:
index: "prelertresults-farequote"
@ -136,48 +119,3 @@
catch: /Cannot delete job \[scheduler-job\] while scheduler \[test-scheduler-1\] refers to it/
xpack.prelert.delete_job:
job_id: scheduler-job
---
"Test jobid + from and/or size":
- do:
catch: request
xpack.prelert.get_jobs:
job_id: "job-stats-test"
from: 0
- do:
catch: request
xpack.prelert.get_jobs:
job_id: "job-stats-test"
size: 100
- do:
catch: request
xpack.prelert.get_jobs:
job_id: "job-stats-test"
from: 0
size: 100
---
"Test jobid + from and/or size (via body)":
- do:
catch: request
xpack.prelert.get_jobs:
body:
job_id: "job-stats-test"
from: 0
- do:
catch: request
xpack.prelert.get_jobs:
body:
job_id: "job-stats-test"
size: 100
- do:
catch: request
xpack.prelert.get_jobs:
body:
job_id: "job-stats-test"
from: 0
size: 100

View File

@ -0,0 +1,69 @@
setup:
- do:
xpack.prelert.put_job:
body: >
{
"job_id":"job-1",
"description":"Job 1",
"analysis_config" : {
"bucket_span":300,
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"JSON",
"time_field":"time",
"time_format":"epoch"
}
}
- do:
xpack.prelert.put_job:
body: >
{
"job_id":"job-2",
"description":"Job 2",
"analysis_config" : {
"bucket_span":600,
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format" : "JSON",
"time_field":"time",
"time_format":"yyyy-MM-dd'T'HH:mm:ssX"
}
}
---
"Test get job given missing job_id":
- do:
catch: missing
xpack.prelert.get_jobs:
job_id: missing-job
---
"Test get single job":
- do:
xpack.prelert.get_jobs:
job_id: job-1
- match: { jobs.0.job_id: "job-1"}
- match: { jobs.0.description: "Job 1"}
- do:
xpack.prelert.get_jobs:
job_id: job-2
- match: { jobs.0.job_id: "job-2"}
- match: { jobs.0.description: "Job 2"}
---
"Test get all jobs":
- do:
xpack.prelert.get_jobs:
job_id: _all
- match: { count: 2 }
- match: { jobs.0.job_id: "job-1"}
- match: { jobs.0.description: "Job 1"}
- match: { jobs.1.job_id: "job-2"}
- match: { jobs.1.description: "Job 2"}

View File

@ -66,15 +66,6 @@ setup:
---
"Test get job stats after uploading data prompting the creation of some stats":
- do:
xpack.prelert.get_jobs:
job_id: job-stats-test
- is_true: jobs.0.config
- is_false: jobs.0.data_counts
- is_false: jobs.0.model_size_stats
- is_false: jobs.0.scheduler_status
- do:
xpack.prelert.job_data:
@ -90,75 +81,32 @@ setup:
- do:
xpack.prelert.get_jobs:
xpack.prelert.get_jobs_stats:
job_id: job-stats-test
metric: data_counts
- match: { jobs.0.job_id : job-stats-test }
- match: { jobs.0.data_counts.processed_record_count: 2 }
- match: { jobs.0.data_counts.processed_field_count: 4}
- match: { jobs.0.data_counts.input_field_count: 4 }
# Test filters
- do:
xpack.prelert.get_jobs:
job_id: "job-stats-test"
metric: "data_counts"
- is_false: jobs.0.config
- is_true: jobs.0.data_counts
- is_false: jobs.0.model_size_stats
- is_false: jobs.0.scheduler_status
- do:
xpack.prelert.get_jobs:
job_id: "job-stats-test"
metric: "model_size_stats"
- is_false: jobs.0.config
- is_false: jobs.0.data_counts
- is_false: jobs.0.scheduler_status
- match: { jobs.0.model_size_stats.model_bytes: 100 }
- do:
xpack.prelert.get_jobs:
job_id: "job-stats-test"
metric: "scheduler_status"
- is_false: jobs.0.config
- is_false: jobs.0.data_counts
- do:
xpack.prelert.get_jobs:
job_id: "job-stats-test"
metric: "status"
- is_false: jobs.0.config
- is_false: jobs.0.data_counts
- is_false: jobs.0.model_size_stats
- is_false: jobs.0.scheduler_status
- match: { jobs.0.status: OPENED }
- do:
xpack.prelert.get_jobs:
job_id: "job-stats-test"
metric: "_all"
- is_true: jobs.0.config
- is_true: jobs.0.data_counts
- is_false: jobs.0.scheduler_status
- match: { jobs.0.job_id: job-stats-test }
- match: { jobs.0.status: OPENED }
---
"Test get job stats of scheduled job that has not received and data":
- do:
xpack.prelert.get_jobs:
job_id: "scheduled-job"
metric: "scheduler_status"
- is_false: jobs.0.config
- is_false: jobs.0.data_counts
xpack.prelert.get_jobs_stats:
job_id: scheduled-job
- match: { jobs.0.job_id : scheduled-job }
- match: { jobs.0.data_counts.processed_record_count: 0 }
- is_false: jobs.0.model_size_stats
- match: { jobs.0.status: OPENED }
- match: { jobs.0.scheduler_status: STOPPED }
---
"Test bad metric":
- do:
catch: request
xpack.prelert.get_jobs:
job_id: "job-stats-test"
metric: "foobar"
"Test get job stats given missing job":
- do:
catch: missing
xpack.prelert.get_jobs_stats:
job_id: unknown-job

View File

@ -50,9 +50,8 @@ setup:
- match: { acknowledged: true }
- do:
xpack.prelert.get_jobs:
xpack.prelert.get_jobs_stats:
job_id: farequote
metric: status
- match: { jobs.0.status: "CLOSED" }
- do:

View File

@ -288,9 +288,8 @@ setup:
- match: { influencers.0.timestamp: 1462060800000 }
- do:
xpack.prelert.get_jobs:
xpack.prelert.get_jobs_stats:
job_id: foo
metric: data_counts
- match: { jobs.0.data_counts.latest_record_timestamp: 1464739200000 }

View File

@ -34,17 +34,15 @@ setup:
"scheduler_id": "scheduler-1"
"start": 0
- do:
xpack.prelert.get_jobs:
xpack.prelert.get_jobs_stats:
job_id: "scheduled-job"
metric: "scheduler_status"
- match: { jobs.0.scheduler_status: STARTED }
- do:
xpack.prelert.stop_scheduler:
"scheduler_id": "scheduler-1"
- do:
xpack.prelert.get_jobs:
xpack.prelert.get_jobs_stats:
job_id: "scheduled-job"
metric: "scheduler_status"
- match: { jobs.0.scheduler_status: STOPPED }
---
"Test start non existing scheduler":