Add get API for schedulers (elastic/elasticsearch#582)

Original commit: elastic/x-pack-elasticsearch@59c555d60a
This commit is contained in:
Dimitris Athanasiou 2016-12-20 10:09:45 +00:00 committed by GitHub
parent 9b3764b7fa
commit bee58f8b81
27 changed files with 1016 additions and 72 deletions

View File

@ -41,6 +41,8 @@ import org.elasticsearch.xpack.prelert.action.GetJobsStatsAction;
import org.elasticsearch.xpack.prelert.action.GetListAction; import org.elasticsearch.xpack.prelert.action.GetListAction;
import org.elasticsearch.xpack.prelert.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.prelert.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.prelert.action.GetRecordsAction; import org.elasticsearch.xpack.prelert.action.GetRecordsAction;
import org.elasticsearch.xpack.prelert.action.GetSchedulersAction;
import org.elasticsearch.xpack.prelert.action.GetSchedulersStatsAction;
import org.elasticsearch.xpack.prelert.action.JobDataAction; import org.elasticsearch.xpack.prelert.action.JobDataAction;
import org.elasticsearch.xpack.prelert.action.OpenJobAction; import org.elasticsearch.xpack.prelert.action.OpenJobAction;
import org.elasticsearch.xpack.prelert.action.PutJobAction; import org.elasticsearch.xpack.prelert.action.PutJobAction;
@ -99,6 +101,8 @@ import org.elasticsearch.xpack.prelert.rest.results.RestGetCategoriesAction;
import org.elasticsearch.xpack.prelert.rest.results.RestGetInfluencersAction; import org.elasticsearch.xpack.prelert.rest.results.RestGetInfluencersAction;
import org.elasticsearch.xpack.prelert.rest.results.RestGetRecordsAction; import org.elasticsearch.xpack.prelert.rest.results.RestGetRecordsAction;
import org.elasticsearch.xpack.prelert.rest.schedulers.RestDeleteSchedulerAction; import org.elasticsearch.xpack.prelert.rest.schedulers.RestDeleteSchedulerAction;
import org.elasticsearch.xpack.prelert.rest.schedulers.RestGetSchedulersAction;
import org.elasticsearch.xpack.prelert.rest.schedulers.RestGetSchedulersStatsAction;
import org.elasticsearch.xpack.prelert.rest.schedulers.RestPutSchedulerAction; import org.elasticsearch.xpack.prelert.rest.schedulers.RestPutSchedulerAction;
import org.elasticsearch.xpack.prelert.rest.schedulers.RestStartSchedulerAction; import org.elasticsearch.xpack.prelert.rest.schedulers.RestStartSchedulerAction;
import org.elasticsearch.xpack.prelert.rest.schedulers.RestStopSchedulerAction; import org.elasticsearch.xpack.prelert.rest.schedulers.RestStopSchedulerAction;
@ -246,6 +250,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
RestGetModelSnapshotsAction.class, RestGetModelSnapshotsAction.class,
RestRevertModelSnapshotAction.class, RestRevertModelSnapshotAction.class,
RestPutModelSnapshotDescriptionAction.class, RestPutModelSnapshotDescriptionAction.class,
RestGetSchedulersAction.class,
RestGetSchedulersStatsAction.class,
RestPutSchedulerAction.class, RestPutSchedulerAction.class,
RestDeleteSchedulerAction.class, RestDeleteSchedulerAction.class,
RestStartSchedulerAction.class, RestStartSchedulerAction.class,
@ -280,6 +286,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(GetModelSnapshotsAction.INSTANCE, GetModelSnapshotsAction.TransportAction.class), new ActionHandler<>(GetModelSnapshotsAction.INSTANCE, GetModelSnapshotsAction.TransportAction.class),
new ActionHandler<>(RevertModelSnapshotAction.INSTANCE, RevertModelSnapshotAction.TransportAction.class), new ActionHandler<>(RevertModelSnapshotAction.INSTANCE, RevertModelSnapshotAction.TransportAction.class),
new ActionHandler<>(PutModelSnapshotDescriptionAction.INSTANCE, PutModelSnapshotDescriptionAction.TransportAction.class), new ActionHandler<>(PutModelSnapshotDescriptionAction.INSTANCE, PutModelSnapshotDescriptionAction.TransportAction.class),
new ActionHandler<>(GetSchedulersAction.INSTANCE, GetSchedulersAction.TransportAction.class),
new ActionHandler<>(GetSchedulersStatsAction.INSTANCE, GetSchedulersStatsAction.TransportAction.class),
new ActionHandler<>(PutSchedulerAction.INSTANCE, PutSchedulerAction.TransportAction.class), new ActionHandler<>(PutSchedulerAction.INSTANCE, PutSchedulerAction.TransportAction.class),
new ActionHandler<>(DeleteSchedulerAction.INSTANCE, DeleteSchedulerAction.TransportAction.class), new ActionHandler<>(DeleteSchedulerAction.INSTANCE, DeleteSchedulerAction.TransportAction.class),
new ActionHandler<>(StartSchedulerAction.INSTANCE, StartSchedulerAction.TransportAction.class), new ActionHandler<>(StartSchedulerAction.INSTANCE, StartSchedulerAction.TransportAction.class),

View File

@ -38,7 +38,6 @@ import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException; import java.io.IOException;
@ -54,7 +53,6 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
private static final String DATA_COUNTS = "data_counts"; private static final String DATA_COUNTS = "data_counts";
private static final String MODEL_SIZE_STATS = "model_size_stats"; private static final String MODEL_SIZE_STATS = "model_size_stats";
private static final String SCHEDULER_STATUS = "scheduler_status";
private static final String STATUS = "status"; private static final String STATUS = "status";
private GetJobsStatsAction() { private GetJobsStatsAction() {
@ -135,16 +133,12 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
@Nullable @Nullable
private ModelSizeStats modelSizeStats; private ModelSizeStats modelSizeStats;
private JobStatus status; private JobStatus status;
@Nullable
private SchedulerStatus schedulerStatus;
JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobStatus status, JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobStatus status) {
@Nullable SchedulerStatus schedulerStatus) {
this.jobId = Objects.requireNonNull(jobId); this.jobId = Objects.requireNonNull(jobId);
this.dataCounts = Objects.requireNonNull(dataCounts); this.dataCounts = Objects.requireNonNull(dataCounts);
this.modelSizeStats = modelSizeStats; this.modelSizeStats = modelSizeStats;
this.status = Objects.requireNonNull(status); this.status = Objects.requireNonNull(status);
this.schedulerStatus = schedulerStatus;
} }
JobStats(StreamInput in) throws IOException { JobStats(StreamInput in) throws IOException {
@ -152,7 +146,6 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
dataCounts = new DataCounts(in); dataCounts = new DataCounts(in);
modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new); modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new);
status = JobStatus.fromStream(in); status = JobStatus.fromStream(in);
schedulerStatus = in.readOptionalWriteable(SchedulerStatus::fromStream);
} }
public String getJobid() { public String getJobid() {
@ -167,10 +160,6 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
return modelSizeStats; return modelSizeStats;
} }
public SchedulerStatus getSchedulerStatus() {
return schedulerStatus;
}
public JobStatus getStatus() { public JobStatus getStatus() {
return status; return status;
} }
@ -184,9 +173,6 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
builder.field(MODEL_SIZE_STATS, modelSizeStats); builder.field(MODEL_SIZE_STATS, modelSizeStats);
} }
builder.field(STATUS, status); builder.field(STATUS, status);
if (schedulerStatus != null) {
builder.field(SCHEDULER_STATUS, schedulerStatus);
}
builder.endObject(); builder.endObject();
return builder; return builder;
@ -198,12 +184,11 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
dataCounts.writeTo(out); dataCounts.writeTo(out);
out.writeOptionalWriteable(modelSizeStats); out.writeOptionalWriteable(modelSizeStats);
status.writeTo(out); status.writeTo(out);
out.writeOptionalWriteable(schedulerStatus);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(jobId, dataCounts, modelSizeStats, schedulerStatus, status); return Objects.hash(jobId, dataCounts, modelSizeStats, status);
} }
@Override @Override
@ -218,8 +203,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
return Objects.equals(jobId, other.jobId) return Objects.equals(jobId, other.jobId)
&& Objects.equals(this.dataCounts, other.dataCounts) && Objects.equals(this.dataCounts, other.dataCounts)
&& Objects.equals(this.modelSizeStats, other.modelSizeStats) && Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.status, other.status) && Objects.equals(this.status, other.status);
&& Objects.equals(this.schedulerStatus, other.schedulerStatus);
} }
} }
@ -321,8 +305,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
DataCounts dataCounts = readDataCounts(job.getId()); DataCounts dataCounts = readDataCounts(job.getId());
ModelSizeStats modelSizeStats = readModelSizeStats(job.getId()); ModelSizeStats modelSizeStats = readModelSizeStats(job.getId());
JobStatus status = prelertMetadata.getAllocations().get(job.getId()).getStatus(); JobStatus status = prelertMetadata.getAllocations().get(job.getId()).getStatus();
Optional<SchedulerStatus> schedulerStatus = prelertMetadata.getSchedulerStatusByJobId(job.getId()); jobsStats.add(new Response.JobStats(job.getId(), dataCounts, modelSizeStats, status));
jobsStats.add(new Response.JobStats(job.getId(), dataCounts, modelSizeStats, status, schedulerStatus.orElse(null)));
} }
QueryPage<Response.JobStats> jobsStatsPage = new QueryPage<>(jobsStats, jobsStats.size(), Job.RESULTS_FIELD); QueryPage<Response.JobStats> jobsStatsPage = new QueryPage<>(jobsStats, jobsStats.size(), Job.RESULTS_FIELD);

View File

@ -0,0 +1,237 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.scheduler.Scheduler;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
public class GetSchedulersAction extends Action<GetSchedulersAction.Request, GetSchedulersAction.Response,
GetSchedulersAction.RequestBuilder> {
public static final GetSchedulersAction INSTANCE = new GetSchedulersAction();
public static final String NAME = "cluster:admin/prelert/schedulers/get";
private static final String ALL = "_all";
private GetSchedulersAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends MasterNodeReadRequest<Request> {
private String schedulerId;
public Request(String schedulerId) {
this.schedulerId = ExceptionsHelper.requireNonNull(schedulerId, SchedulerConfig.ID.getPreferredName());
}
Request() {}
public String getSchedulerId() {
return schedulerId;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
schedulerId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(schedulerId);
}
@Override
public int hashCode() {
return Objects.hash(schedulerId);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(schedulerId, other.schedulerId);
}
}
public static class RequestBuilder extends MasterNodeReadOperationRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, GetSchedulersAction action) {
super(client, action, new Request());
}
}
public static class Response extends ActionResponse implements StatusToXContent {
private QueryPage<SchedulerConfig> schedulers;
public Response(QueryPage<SchedulerConfig> schedulers) {
this.schedulers = schedulers;
}
public Response() {}
public QueryPage<SchedulerConfig> getResponse() {
return schedulers;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
schedulers = new QueryPage<>(in, SchedulerConfig::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
schedulers.writeTo(out);
}
@Override
public RestStatus status() {
return schedulers.count() == 0 ? RestStatus.NOT_FOUND : RestStatus.OK;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return schedulers.doXContentBody(builder, params);
}
@Override
public int hashCode() {
return Objects.hash(schedulers);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return Objects.equals(schedulers, other.schedulers);
}
@SuppressWarnings("deprecation")
@Override
public final String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.prettyPrint();
builder.startObject();
toXContent(builder, EMPTY_PARAMS);
builder.endObject();
return builder.string();
} catch (Exception e) {
// So we have a stack trace logged somewhere
return "{ \"error\" : \"" + org.elasticsearch.ExceptionsHelper.detailedMessage(e) + "\"}";
}
}
}
public static class TransportAction extends TransportMasterNodeReadAction<Request, Response> {
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, GetSchedulersAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
logger.debug("Get scheduler '{}'", request.getSchedulerId());
QueryPage<SchedulerConfig> response = null;
PrelertMetadata prelertMetadata = state.metaData().custom(PrelertMetadata.TYPE);
if (ALL.equals(request.getSchedulerId())) {
List<SchedulerConfig> schedulerConfigs = prelertMetadata.getSchedulers().values().stream().map(
s -> s.getConfig()).collect(Collectors.toList());
response = new QueryPage<>(schedulerConfigs, schedulerConfigs.size(), Scheduler.RESULTS_FIELD);
} else {
Scheduler scheduler = prelertMetadata.getScheduler(request.getSchedulerId());
if (scheduler == null) {
throw ExceptionsHelper.missingSchedulerException(request.getSchedulerId());
}
response = new QueryPage<>(Collections.singletonList(scheduler.getConfig()), 1, Scheduler.RESULTS_FIELD);
}
listener.onResponse(new Response(response));
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}
}

View File

@ -0,0 +1,300 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.scheduler.Scheduler;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
public class GetSchedulersStatsAction extends Action<GetSchedulersStatsAction.Request, GetSchedulersStatsAction.Response,
GetSchedulersStatsAction.RequestBuilder> {
public static final GetSchedulersStatsAction INSTANCE = new GetSchedulersStatsAction();
public static final String NAME = "cluster:admin/prelert/schedulers/stats/get";
private static final String ALL = "_all";
private static final String STATUS = "status";
private GetSchedulersStatsAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends MasterNodeReadRequest<Request> {
private String schedulerId;
public Request(String schedulerId) {
this.schedulerId = ExceptionsHelper.requireNonNull(schedulerId, SchedulerConfig.ID.getPreferredName());
}
Request() {}
public String getSchedulerId() {
return schedulerId;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
schedulerId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(schedulerId);
}
@Override
public int hashCode() {
return Objects.hash(schedulerId);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(schedulerId, other.schedulerId);
}
}
public static class RequestBuilder extends MasterNodeReadOperationRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, GetSchedulersStatsAction action) {
super(client, action, new Request());
}
}
public static class Response extends ActionResponse implements StatusToXContent {
public static class SchedulerStats implements ToXContent, Writeable {
private final String schedulerId;
private final SchedulerStatus schedulerStatus;
SchedulerStats(String schedulerId, SchedulerStatus schedulerStatus) {
this.schedulerId = Objects.requireNonNull(schedulerId);
this.schedulerStatus = Objects.requireNonNull(schedulerStatus);
}
SchedulerStats(StreamInput in) throws IOException {
schedulerId = in.readString();
schedulerStatus = SchedulerStatus.fromStream(in);
}
public String getSchedulerId() {
return schedulerId;
}
public SchedulerStatus getSchedulerStatus() {
return schedulerStatus;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SchedulerConfig.ID.getPreferredName(), schedulerId);
builder.field(STATUS, schedulerStatus);
builder.endObject();
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(schedulerId);
schedulerStatus.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(schedulerId, schedulerStatus);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
GetSchedulersStatsAction.Response.SchedulerStats other = (GetSchedulersStatsAction.Response.SchedulerStats) obj;
return Objects.equals(schedulerId, other.schedulerId) && Objects.equals(this.schedulerStatus, other.schedulerStatus);
}
}
private QueryPage<SchedulerStats> schedulersStats;
public Response(QueryPage<SchedulerStats> schedulersStats) {
this.schedulersStats = schedulersStats;
}
public Response() {}
public QueryPage<SchedulerStats> getResponse() {
return schedulersStats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
schedulersStats = new QueryPage<>(in, SchedulerStats::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
schedulersStats.writeTo(out);
}
@Override
public RestStatus status() {
return schedulersStats.count() == 0 ? RestStatus.NOT_FOUND : RestStatus.OK;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return schedulersStats.doXContentBody(builder, params);
}
@Override
public int hashCode() {
return Objects.hash(schedulersStats);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return Objects.equals(schedulersStats, other.schedulersStats);
}
@SuppressWarnings("deprecation")
@Override
public final String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.prettyPrint();
builder.startObject();
toXContent(builder, EMPTY_PARAMS);
builder.endObject();
return builder.string();
} catch (Exception e) {
// So we have a stack trace logged somewhere
return "{ \"error\" : \"" + org.elasticsearch.ExceptionsHelper.detailedMessage(e) + "\"}";
}
}
}
public static class TransportAction extends TransportMasterNodeReadAction<Request, Response> {
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, GetSchedulersStatsAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
logger.debug("Get stats for scheduler '{}'", request.getSchedulerId());
List<Response.SchedulerStats> stats = new ArrayList<>();
PrelertMetadata prelertMetadata = state.metaData().custom(PrelertMetadata.TYPE);
if (ALL.equals(request.getSchedulerId())) {
Collection<Scheduler> schedulers = prelertMetadata.getSchedulers().values();
for (Scheduler scheduler : schedulers) {
stats.add(new Response.SchedulerStats(scheduler.getId(), scheduler.getStatus()));
}
} else {
Scheduler scheduler = prelertMetadata.getScheduler(request.getSchedulerId());
if (scheduler == null) {
throw ExceptionsHelper.missingSchedulerException(request.getSchedulerId());
}
stats.add(new Response.SchedulerStats(scheduler.getId(), scheduler.getStatus()));
}
QueryPage<Response.SchedulerStats> statsPage = new QueryPage<>(stats, stats.size(), Scheduler.RESULTS_FIELD);
listener.onResponse(new Response(statsPage));
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}
}

View File

@ -188,7 +188,6 @@ public final class Messages {
public static final String JOB_DATA_CONCURRENT_USE_UPDATE = "job.data.concurrent.use.update"; public static final String JOB_DATA_CONCURRENT_USE_UPDATE = "job.data.concurrent.use.update";
public static final String JOB_DATA_CONCURRENT_USE_UPLOAD = "job.data.concurrent.use.upload"; public static final String JOB_DATA_CONCURRENT_USE_UPLOAD = "job.data.concurrent.use.upload";
public static final String SCHEDULER_CONFIG_FIELD_NOT_SUPPORTED = "scheduler.config.field.not.supported";
public static final String SCHEDULER_CONFIG_INVALID_OPTION_VALUE = "scheduler.config.invalid.option.value"; public static final String SCHEDULER_CONFIG_INVALID_OPTION_VALUE = "scheduler.config.invalid.option.value";
public static final String SCHEDULER_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "scheduler.does.not.support.job.with.latency"; public static final String SCHEDULER_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "scheduler.does.not.support.job.with.latency";

View File

@ -307,7 +307,7 @@ public class PrelertMetadata implements MetaData.Custom {
public Builder removeScheduler(String schedulerId) { public Builder removeScheduler(String schedulerId) {
Scheduler scheduler = schedulers.get(schedulerId); Scheduler scheduler = schedulers.get(schedulerId);
if (scheduler == null) { if (scheduler == null) {
throw new ResourceNotFoundException(Messages.getMessage(Messages.SCHEDULER_NOT_FOUND, schedulerId)); throw ExceptionsHelper.missingSchedulerException(schedulerId);
} }
if (scheduler.getStatus() != SchedulerStatus.STOPPED) { if (scheduler.getStatus() != SchedulerStatus.STOPPED) {
String msg = Messages.getMessage(Messages.SCHEDULER_CANNOT_DELETE_IN_CURRENT_STATE, schedulerId, scheduler.getStatus()); String msg = Messages.getMessage(Messages.SCHEDULER_CANNOT_DELETE_IN_CURRENT_STATE, schedulerId, scheduler.getStatus());
@ -400,7 +400,7 @@ public class PrelertMetadata implements MetaData.Custom {
public Builder updateSchedulerStatus(String schedulerId, SchedulerStatus newStatus) { public Builder updateSchedulerStatus(String schedulerId, SchedulerStatus newStatus) {
Scheduler scheduler = schedulers.get(schedulerId); Scheduler scheduler = schedulers.get(schedulerId);
if (scheduler == null) { if (scheduler == null) {
throw new ResourceNotFoundException(Messages.getMessage(Messages.SCHEDULER_NOT_FOUND, schedulerId)); throw ExceptionsHelper.missingSchedulerException(schedulerId);
} }
SchedulerStatus currentStatus = scheduler.getStatus(); SchedulerStatus currentStatus = scheduler.getStatus();

View File

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.rest.schedulers;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestStatusToXContentListener;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.GetSchedulersAction;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import java.io.IOException;
public class RestGetSchedulersAction extends BaseRestHandler {
private final GetSchedulersAction.TransportAction transportGetSchedulersAction;
@Inject
public RestGetSchedulersAction(Settings settings, RestController controller,
GetSchedulersAction.TransportAction transportGetSchedulersAction) {
super(settings);
this.transportGetSchedulersAction = transportGetSchedulersAction;
controller.registerHandler(RestRequest.Method.GET, PrelertPlugin.BASE_PATH
+ "schedulers/{" + SchedulerConfig.ID.getPreferredName() + "}", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
GetSchedulersAction.Request request = new GetSchedulersAction.Request(restRequest.param(SchedulerConfig.ID.getPreferredName()));
return channel -> transportGetSchedulersAction.execute(request, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.rest.schedulers;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestStatusToXContentListener;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.GetSchedulersStatsAction;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import java.io.IOException;
public class RestGetSchedulersStatsAction extends BaseRestHandler {
private final GetSchedulersStatsAction.TransportAction transportGetSchedulersStatsAction;
@Inject
public RestGetSchedulersStatsAction(Settings settings, RestController controller,
GetSchedulersStatsAction.TransportAction transportGetSchedulersStatsAction) {
super(settings);
this.transportGetSchedulersStatsAction = transportGetSchedulersStatsAction;
controller.registerHandler(RestRequest.Method.GET, PrelertPlugin.BASE_PATH
+ "schedulers/{" + SchedulerConfig.ID.getPreferredName() + "}/_stats", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
GetSchedulersStatsAction.Request request = new GetSchedulersStatsAction.Request(
restRequest.param(SchedulerConfig.ID.getPreferredName()));
return channel -> transportGetSchedulersStatsAction.execute(request, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -134,7 +134,7 @@ public class ScheduledJobRunner extends AbstractComponent {
public static void validate(String schedulerId, PrelertMetadata prelertMetadata) { public static void validate(String schedulerId, PrelertMetadata prelertMetadata) {
Scheduler scheduler = prelertMetadata.getScheduler(schedulerId); Scheduler scheduler = prelertMetadata.getScheduler(schedulerId);
if (scheduler == null) { if (scheduler == null) {
throw new ResourceNotFoundException(Messages.getMessage(Messages.SCHEDULER_NOT_FOUND, schedulerId)); throw ExceptionsHelper.missingSchedulerException(schedulerId);
} }
Job job = prelertMetadata.getJobs().get(scheduler.getJobId()); Job job = prelertMetadata.getJobs().get(scheduler.getJobId());
if (job == null) { if (job == null) {

View File

@ -25,6 +25,9 @@ public class Scheduler extends AbstractDiffable<Scheduler> implements ToXContent
public static final Scheduler PROTO = new Scheduler(null, null); public static final Scheduler PROTO = new Scheduler(null, null);
// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("schedulers");
public static final ConstructingObjectParser<Scheduler, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>("scheduler", public static final ConstructingObjectParser<Scheduler, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>("scheduler",
a -> new Scheduler((SchedulerConfig) a[0], (SchedulerStatus) a[1])); a -> new Scheduler((SchedulerConfig) a[0], (SchedulerStatus) a[1]));

View File

@ -419,7 +419,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
private Map<String, Object> query = null; private Map<String, Object> query = null;
private Map<String, Object> aggregations = null; private Map<String, Object> aggregations = null;
private Map<String, Object> scriptFields = null; private Map<String, Object> scriptFields = null;
private Boolean retrieveWholeSource;
private Integer scrollSize; private Integer scrollSize;
public Builder() { public Builder() {
@ -427,7 +426,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
query.put(MATCH_ALL_ES_QUERY, new HashMap<String, Object>()); query.put(MATCH_ALL_ES_QUERY, new HashMap<String, Object>());
setQuery(query); setQuery(query);
setQueryDelay(DEFAULT_ELASTICSEARCH_QUERY_DELAY); setQueryDelay(DEFAULT_ELASTICSEARCH_QUERY_DELAY);
setRetrieveWholeSource(false);
setScrollSize(DEFAULT_SCROLL_SIZE); setScrollSize(DEFAULT_SCROLL_SIZE);
} }
@ -499,10 +497,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
this.scriptFields = Objects.requireNonNull(scriptFields); this.scriptFields = Objects.requireNonNull(scriptFields);
} }
public void setRetrieveWholeSource(boolean retrieveWholeSource) {
this.retrieveWholeSource = retrieveWholeSource;
}
public void setScrollSize(int scrollSize) { public void setScrollSize(int scrollSize) {
if (scrollSize < 0) { if (scrollSize < 0) {
String msg = Messages.getMessage(Messages.SCHEDULER_CONFIG_INVALID_OPTION_VALUE, String msg = Messages.getMessage(Messages.SCHEDULER_CONFIG_INVALID_OPTION_VALUE,
@ -524,11 +518,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
if (types == null || types.isEmpty() || types.contains(null) || types.contains("")) { if (types == null || types.isEmpty() || types.contains(null) || types.contains("")) {
throw invalidOptionValue(TYPES.getPreferredName(), types); throw invalidOptionValue(TYPES.getPreferredName(), types);
} }
if (Boolean.TRUE.equals(retrieveWholeSource)) {
if (scriptFields != null) {
throw notSupportedValue(SCRIPT_FIELDS, Messages.SCHEDULER_CONFIG_FIELD_NOT_SUPPORTED);
}
}
return new SchedulerConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize); return new SchedulerConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize);
} }

View File

@ -23,6 +23,10 @@ public class ExceptionsHelper {
throw new ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_CONFIG_ID_ALREADY_TAKEN, jobId)); throw new ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_CONFIG_ID_ALREADY_TAKEN, jobId));
} }
public static ResourceNotFoundException missingSchedulerException(String schedulerId) {
throw new ResourceNotFoundException(Messages.getMessage(Messages.SCHEDULER_NOT_FOUND, schedulerId));
}
public static ElasticsearchException serverError(String msg) { public static ElasticsearchException serverError(String msg) {
return new ElasticsearchException(msg); return new ElasticsearchException(msg);
} }

View File

@ -137,7 +137,6 @@ job.config.transform.output.name.used.more.than.once = Transform output name ''{
job.config.transform.unknown.type = Unknown TransformType ''{0}'' job.config.transform.unknown.type = Unknown TransformType ''{0}''
job.config.unknown.function = Unknown function ''{0}'' job.config.unknown.function = Unknown function ''{0}''
scheduler.config.field.not.supported = Scheduler configuration field {0} not supported
scheduler.config.invalid.option.value = Invalid {0} value ''{1}'' in scheduler configuration scheduler.config.invalid.option.value = Invalid {0} value ''{1}'' in scheduler configuration
scheduler.does.not.support.job.with.latency = A job configured with scheduler cannot support latency scheduler.does.not.support.job.with.latency = A job configured with scheduler cannot support latency

View File

@ -9,7 +9,7 @@ import org.elasticsearch.xpack.prelert.action.GetJobsAction.Request;
import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase; import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
public class GetJobActionRequestTests extends AbstractStreamableTestCase<GetJobsAction.Request> { public class GetJobsActionRequestTests extends AbstractStreamableTestCase<GetJobsAction.Request> {
@Override @Override
protected Request createTestInstance() { protected Request createTestInstance() {

View File

@ -24,7 +24,7 @@ import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
public class GetJobActionResponseTests extends AbstractStreamableTestCase<GetJobsAction.Response> { public class GetJobsActionResponseTests extends AbstractStreamableTestCase<GetJobsAction.Response> {
@Override @Override
protected Response createTestInstance() { protected Response createTestInstance() {

View File

@ -41,12 +41,7 @@ public class GetJobsStatsActionResponseTests extends AbstractStreamableTestCase<
} }
JobStatus jobStatus = randomFrom(EnumSet.allOf(JobStatus.class)); JobStatus jobStatus = randomFrom(EnumSet.allOf(JobStatus.class));
SchedulerStatus schedulerStatus = null; Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobStatus);
if (randomBoolean()) {
schedulerStatus = randomFrom(SchedulerStatus.values());
}
Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobStatus, schedulerStatus);
jobStatsList.add(jobStats); jobStatsList.add(jobStats);
} }

View File

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.GetSchedulersAction.Request;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
public class GetSchedulersActionRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomBoolean() ? Job.ALL : randomAsciiOfLengthBetween(1, 20));
}
@Override
protected Request createBlankInstance() {
return new Request();
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.GetSchedulersAction.Response;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.scheduler.Scheduler;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfigTests;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class GetSchedulersActionResponseTests extends AbstractStreamableTestCase<Response> {
@Override
protected Response createTestInstance() {
final Response result;
int listSize = randomInt(10);
List<SchedulerConfig> schedulerList = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
String schedulerId = SchedulerConfigTests.randomValidSchedulerId();
String jobId = randomAsciiOfLength(10);
SchedulerConfig.Builder schedulerConfig = new SchedulerConfig.Builder(schedulerId, jobId);
schedulerConfig.setIndexes(randomSubsetOf(2, Arrays.asList("index-1", "index-2", "index-3")));
schedulerConfig.setTypes(randomSubsetOf(2, Arrays.asList("type-1", "type-2", "type-3")));
schedulerConfig.setFrequency(randomPositiveLong());
schedulerConfig.setQueryDelay(randomPositiveLong());
if (randomBoolean()) {
schedulerConfig.setQuery(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
schedulerConfig.setScriptFields(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
schedulerConfig.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
schedulerConfig.setAggregations(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
schedulerList.add(schedulerConfig.build());
}
result = new Response(new QueryPage<>(schedulerList, schedulerList.size(), Scheduler.RESULTS_FIELD));
return result;
}
@Override
protected Response createBlankInstance() {
return new Response();
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.GetSchedulersStatsAction.Request;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
public class GetSchedulersStatsActionRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomBoolean() ? Job.ALL : randomAsciiOfLengthBetween(1, 20));
}
@Override
protected Request createBlankInstance() {
return new Request();
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.xpack.prelert.action.GetSchedulersStatsAction.Response;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.scheduler.Scheduler;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
import java.util.ArrayList;
import java.util.List;
public class GetSchedulersStatsActionResponseTests extends AbstractStreamableTestCase<Response> {
@Override
protected Response createTestInstance() {
final Response result;
int listSize = randomInt(10);
List<Response.SchedulerStats> schedulerStatsList = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
String schedulerId = randomAsciiOfLength(10);
SchedulerStatus schedulerStatus = randomFrom(SchedulerStatus.values());
Response.SchedulerStats schedulerStats = new Response.SchedulerStats(schedulerId, schedulerStatus);
schedulerStatsList.add(schedulerStats);
}
result = new Response(new QueryPage<>(schedulerStatsList, schedulerStatsList.size(), Scheduler.RESULTS_FIELD));
return result;
}
@Override
protected Response createBlankInstance() {
return new Response();
}
}

View File

@ -34,10 +34,7 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
if (randomBoolean()) { if (randomBoolean()) {
builder.setQuery(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10))); builder.setQuery(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
} }
boolean retrieveWholeSource = randomBoolean(); if (randomBoolean()) {
if (retrieveWholeSource) {
builder.setRetrieveWholeSource(randomBoolean());
} else if (randomBoolean()) {
builder.setScriptFields(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10))); builder.setScriptFields(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
} }
if (randomBoolean()) { if (randomBoolean()) {
@ -149,7 +146,6 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
defaultQuery.put("match_all", new HashMap<String, Object>()); defaultQuery.put("match_all", new HashMap<String, Object>());
expectedSchedulerConfig.setQuery(defaultQuery); expectedSchedulerConfig.setQuery(defaultQuery);
expectedSchedulerConfig.setQueryDelay(60L); expectedSchedulerConfig.setQueryDelay(60L);
expectedSchedulerConfig.setRetrieveWholeSource(false);
expectedSchedulerConfig.setScrollSize(1000); expectedSchedulerConfig.setScrollSize(1000);
SchedulerConfig.Builder defaultedSchedulerConfig = new SchedulerConfig.Builder("scheduler1", "job1"); SchedulerConfig.Builder defaultedSchedulerConfig = new SchedulerConfig.Builder("scheduler1", "job1");
defaultedSchedulerConfig.setIndexes(Arrays.asList("index")); defaultedSchedulerConfig.setIndexes(Arrays.asList("index"));
@ -266,7 +262,7 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
assertEquals(Collections.singletonMap("match_all", new HashMap<>()), conf.build().getQuery()); assertEquals(Collections.singletonMap("match_all", new HashMap<>()), conf.build().getQuery());
} }
public void testCheckValid_GivenScriptFieldsNotWholeSource() throws IOException { public void testCheckValid_GivenScriptFields() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1"); SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1");
conf.setIndexes(Arrays.asList("myindex")); conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype")); conf.setTypes(Arrays.asList("mytype"));
@ -274,22 +270,9 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
+ "\"inline\" : \"doc['responsetime'].value * 2\" } } }"; + "\"inline\" : \"doc['responsetime'].value * 2\" } } }";
XContentParser parser = XContentFactory.xContent(json).createParser(json); XContentParser parser = XContentFactory.xContent(json).createParser(json);
conf.setScriptFields(parser.map()); conf.setScriptFields(parser.map());
conf.setRetrieveWholeSource(false);
assertEquals(1, conf.build().getScriptFields().size()); assertEquals(1, conf.build().getScriptFields().size());
} }
public void testCheckValid_GivenScriptFieldsAndWholeSource() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
String json = "{ \"twiceresponsetime\" : { \"script\" : { \"lang\" : \"expression\", "
+ "\"inline\" : \"doc['responsetime'].value * 2\" } } }";
XContentParser parser = XContentFactory.xContent(json).createParser(json);
conf.setScriptFields(parser.map());
conf.setRetrieveWholeSource(true);
expectThrows(IllegalArgumentException.class, conf::build);
}
public void testCheckValid_GivenNullIndexes() throws IOException { public void testCheckValid_GivenNullIndexes() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1"); SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1");
expectThrows(IllegalArgumentException.class, () -> conf.setIndexes(null)); expectThrows(IllegalArgumentException.class, () -> conf.setIndexes(null));

View File

@ -0,0 +1,19 @@
{
"xpack.prelert.get_schedulers": {
"methods": [ "GET"],
"url": {
"path": "/_xpack/prelert/schedulers/{scheduler_id}",
"paths": [
"/_xpack/prelert/schedulers/{scheduler_id}"
],
"parts": {
"scheduler_id": {
"type": "string",
"required": true,
"description": "The ID of the schedulers to fetch"
}
}
},
"body": null
}
}

View File

@ -0,0 +1,19 @@
{
"xpack.prelert.get_schedulers_stats": {
"methods": [ "GET"],
"url": {
"path": "/_xpack/prelert/schedulers/{scheduler_id}/_stats",
"paths": [
"/_xpack/prelert/schedulers/{scheduler_id}/_stats"
],
"parts": {
"scheduler_id": {
"type": "string",
"required": true,
"description": "The ID of the schedulers stats to fetch"
}
}
},
"body": null
}
}

View File

@ -0,0 +1,87 @@
setup:
- do:
xpack.prelert.put_job:
body: >
{
"job_id":"job-1",
"analysis_config" : {
"bucket_span":3600,
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"ELASTICSEARCH",
"time_field":"time",
"time_format":"epoch"
}
}
- do:
xpack.prelert.put_job:
body: >
{
"job_id":"job-2",
"analysis_config" : {
"bucket_span":3600,
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"ELASTICSEARCH",
"time_field":"time",
"time_format":"epoch"
}
}
- do:
xpack.prelert.put_scheduler:
scheduler_id: scheduler-1
body: >
{
"job_id":"job-1",
"indexes":["index-1"],
"types":["type-1"]
}
- do:
xpack.prelert.put_scheduler:
scheduler_id: scheduler-2
body: >
{
"job_id":"job-2",
"indexes":["index-2"],
"types":["type-2"]
}
---
"Test get scheduler given missing scheduler_id":
- do:
catch: missing
xpack.prelert.get_schedulers:
scheduler_id: missing-scheduler
---
"Test get single scheduler":
- do:
xpack.prelert.get_schedulers:
scheduler_id: scheduler-1
- match: { schedulers.0.scheduler_id: "scheduler-1"}
- match: { schedulers.0.job_id: "job-1"}
- do:
xpack.prelert.get_schedulers:
scheduler_id: scheduler-2
- match: { schedulers.0.scheduler_id: "scheduler-2"}
- match: { schedulers.0.job_id: "job-2"}
---
"Test get all schedulers":
- do:
xpack.prelert.get_schedulers:
scheduler_id: _all
- match: { count: 2 }
- match: { schedulers.0.scheduler_id: "scheduler-1"}
- match: { schedulers.0.job_id: "job-1"}
- match: { schedulers.1.scheduler_id: "scheduler-2"}
- match: { schedulers.1.job_id: "job-2"}

View File

@ -0,0 +1,87 @@
setup:
- do:
xpack.prelert.put_job:
body: >
{
"job_id":"job-1",
"analysis_config" : {
"bucket_span":3600,
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"ELASTICSEARCH",
"time_field":"time",
"time_format":"epoch"
}
}
- do:
xpack.prelert.put_job:
body: >
{
"job_id":"job-2",
"analysis_config" : {
"bucket_span":3600,
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"ELASTICSEARCH",
"time_field":"time",
"time_format":"epoch"
}
}
- do:
xpack.prelert.put_scheduler:
scheduler_id: scheduler-1
body: >
{
"job_id":"job-1",
"indexes":["index-1"],
"types":["type-1"]
}
- do:
xpack.prelert.put_scheduler:
scheduler_id: scheduler-2
body: >
{
"job_id":"job-2",
"indexes":["index-2"],
"types":["type-2"]
}
---
"Test get scheduler stats given missing scheduler_id":
- do:
catch: missing
xpack.prelert.get_schedulers_stats:
scheduler_id: missing-scheduler
---
"Test get single scheduler stats":
- do:
xpack.prelert.get_schedulers_stats:
scheduler_id: scheduler-1
- match: { schedulers.0.scheduler_id: "scheduler-1"}
- match: { schedulers.0.status: "STOPPED"}
- do:
xpack.prelert.get_schedulers_stats:
scheduler_id: scheduler-2
- match: { schedulers.0.scheduler_id: "scheduler-2"}
- match: { schedulers.0.status: "STOPPED"}
---
"Test get all schedulers stats":
- do:
xpack.prelert.get_schedulers_stats:
scheduler_id: _all
- match: { count: 2 }
- match: { schedulers.0.scheduler_id: "scheduler-1"}
- match: { schedulers.0.status: "STOPPED"}
- match: { schedulers.1.scheduler_id: "scheduler-2"}
- match: { schedulers.1.status: "STOPPED"}

View File

@ -89,7 +89,6 @@ setup:
- match: { jobs.0.data_counts.input_field_count: 4 } - match: { jobs.0.data_counts.input_field_count: 4 }
- match: { jobs.0.model_size_stats.model_bytes: 100 } - match: { jobs.0.model_size_stats.model_bytes: 100 }
- match: { jobs.0.status: OPENED } - match: { jobs.0.status: OPENED }
- is_false: jobs.0.scheduler_status
--- ---
"Test get job stats of scheduled job that has not received and data": "Test get job stats of scheduled job that has not received and data":
@ -101,7 +100,6 @@ setup:
- match: { jobs.0.data_counts.processed_record_count: 0 } - match: { jobs.0.data_counts.processed_record_count: 0 }
- is_false: jobs.0.model_size_stats - is_false: jobs.0.model_size_stats
- match: { jobs.0.status: OPENED } - match: { jobs.0.status: OPENED }
- match: { jobs.0.scheduler_status: STOPPED }
--- ---
"Test get job stats given missing job": "Test get job stats given missing job":

View File

@ -34,16 +34,16 @@ setup:
"scheduler_id": "scheduler-1" "scheduler_id": "scheduler-1"
"start": 0 "start": 0
- do: - do:
xpack.prelert.get_jobs_stats: xpack.prelert.get_schedulers_stats:
job_id: "scheduled-job" scheduler_id: "scheduler-1"
- match: { jobs.0.scheduler_status: STARTED } - match: { schedulers.0.status: STARTED }
- do: - do:
xpack.prelert.stop_scheduler: xpack.prelert.stop_scheduler:
"scheduler_id": "scheduler-1" "scheduler_id": "scheduler-1"
- do: - do:
xpack.prelert.get_jobs_stats: xpack.prelert.get_schedulers_stats:
job_id: "scheduled-job" scheduler_id: "scheduler-1"
- match: { jobs.0.scheduler_status: STOPPED } - match: { schedulers.0.status: STOPPED }
--- ---
"Test start non existing scheduler": "Test start non existing scheduler":
- do: - do: