[ML] Calendar jobs endpoints (elastic/x-pack-elasticsearch#3320)

* Calendar jobs endpoints

* Refactor put and delete calendar job to use the same action

* Check jobs exist when creating the calendar

* Address review comments

* Add isGroupOrJobMethod

* Increase default page size for calendar query


Original commit: elastic/x-pack-elasticsearch@7484799fe9
This commit is contained in:
David Kyle 2017-12-19 13:57:32 +00:00 committed by GitHub
parent 3efd35cadf
commit a8997387b7
22 changed files with 1017 additions and 97 deletions

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -53,6 +52,7 @@ import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.DeleteCalendarAction;
import org.elasticsearch.xpack.ml.action.UpdateCalendarJobAction;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.action.DeleteFilterAction;
@ -118,8 +118,10 @@ import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarJobAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestGetCalendarsAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarJobAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestDeleteDatafeedAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedStatsAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedsAction;
@ -467,7 +469,9 @@ public class MachineLearning implements ActionPlugin {
new RestForecastJobAction(settings, restController),
new RestGetCalendarsAction(settings, restController),
new RestPutCalendarAction(settings, restController),
new RestDeleteCalendarAction(settings, restController)
new RestDeleteCalendarAction(settings, restController),
new RestDeleteCalendarJobAction(settings, restController),
new RestPutCalendarJobAction(settings, restController)
);
}
@ -516,7 +520,8 @@ public class MachineLearning implements ActionPlugin {
new ActionHandler<>(ForecastJobAction.INSTANCE, ForecastJobAction.TransportAction.class),
new ActionHandler<>(GetCalendarsAction.INSTANCE, GetCalendarsAction.TransportAction.class),
new ActionHandler<>(PutCalendarAction.INSTANCE, PutCalendarAction.TransportAction.class),
new ActionHandler<>(DeleteCalendarAction.INSTANCE, DeleteCalendarAction.TransportAction.class)
new ActionHandler<>(DeleteCalendarAction.INSTANCE, DeleteCalendarAction.TransportAction.class),
new ActionHandler<>(UpdateCalendarJobAction.INSTANCE, UpdateCalendarJobAction.TransportAction.class)
);
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings;
@ -32,6 +33,12 @@ public final class MlMetaIndex {
builder.startObject(TYPE);
ElasticsearchMappings.addDefaultMapping(builder);
builder.startObject(ElasticsearchMappings.PROPERTIES)
.startObject(Calendar.ID.getPreferredName())
.field(ElasticsearchMappings.TYPE, ElasticsearchMappings.KEYWORD)
.endObject()
.startObject(Calendar.JOB_IDS.getPreferredName())
.field(ElasticsearchMappings.TYPE, ElasticsearchMappings.KEYWORD)
.endObject()
.startObject(SpecialEvent.START_TIME.getPreferredName())
.field(ElasticsearchMappings.TYPE, ElasticsearchMappings.DATE)
.endObject()

View File

@ -81,6 +81,10 @@ public class MlMetadata implements MetaData.Custom {
return jobs;
}
public boolean isGroupOrJob(String id) {
return groupOrJobLookup.isGroupOrJob(id);
}
public Set<String> expandJobIds(String expression, boolean allowNoJobs) {
return groupOrJobLookup.expandJobIds(expression, allowNoJobs);
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.action.util.PageParams;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import java.io.IOException;
@ -212,16 +213,16 @@ public class GetCalendarsAction extends Action<GetCalendarsAction.Request, GetCa
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final Client client;
private final JobProvider jobProvider;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client) {
JobProvider jobProvider) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.client = client;
this.jobProvider = jobProvider;
}
@Override
@ -239,76 +240,24 @@ public class GetCalendarsAction extends Action<GetCalendarsAction.Request, GetCa
}
private void getCalendar(String calendarId, ActionListener<Response> listener) {
GetRequest getRequest = new GetRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, Calendar.documentId(calendarId));
executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getDocResponse) {
try {
QueryPage<Calendar> calendars;
if (getDocResponse.isExists()) {
BytesReference docSource = getDocResponse.getSourceAsBytesRef();
try (XContentParser parser =
XContentFactory.xContent(docSource).createParser(NamedXContentRegistry.EMPTY, docSource)) {
Calendar calendar = Calendar.PARSER.apply(parser, null).build();
calendars = new QueryPage<>(Collections.singletonList(calendar), 1, Calendar.RESULTS_FIELD);
Response response = new Response(calendars);
listener.onResponse(response);
}
} else {
this.onFailure(QueryPage.emptyQueryPage(Calendar.RESULTS_FIELD));
}
} catch (Exception e) {
this.onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
jobProvider.calendar(calendarId, ActionListener.wrap(
calendar -> {
QueryPage<Calendar> page = new QueryPage<>(Collections.singletonList(calendar), 1, Calendar.RESULTS_FIELD);
listener.onResponse(new Response(page));
},
listener::onFailure
));
}
private void getCalendars(PageParams pageParams, ActionListener<Response> listener) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.from(pageParams.getFrom())
.size(pageParams.getSize())
.sort(Calendar.ID.getPreferredName())
.query(QueryBuilders.termQuery(Calendar.TYPE.getPreferredName(), Calendar.CALENDAR_TYPE));
SearchRequest searchRequest = new SearchRequest(MlMetaIndex.INDEX_NAME)
.indicesOptions(JobProvider.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS))
.source(sourceBuilder);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {
List<Calendar> docs = new ArrayList<>();
for (SearchHit hit : response.getHits().getHits()) {
BytesReference docSource = hit.getSourceRef();
try (XContentParser parser = XContentFactory.xContent(docSource).createParser(
NamedXContentRegistry.EMPTY, docSource)) {
docs.add(Calendar.PARSER.apply(parser, null).build());
} catch (IOException e) {
this.onFailure(e);
}
}
Response getResponse = new Response(
new QueryPage<>(docs, docs.size(), Calendar.RESULTS_FIELD));
listener.onResponse(getResponse);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
},
client::search);
CalendarQueryBuilder query = new CalendarQueryBuilder().pageParams(pageParams).sort(true);
jobProvider.calendars(query, ActionListener.wrap(
calendars -> {
listener.onResponse(new Response(calendars));
},
listener::onFailure
));
}
}
}

View File

@ -20,7 +20,9 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -34,13 +36,18 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.watcher.support.Exceptions;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN;
@ -162,36 +169,62 @@ public class PutCalendarAction extends Action<PutCalendarAction.Request, PutCale
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
calendar = new Calendar(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
calendar.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return calendar.toXContent(builder, params);
}
@Override
public int hashCode() {
return Objects.hash(isAcknowledged(), calendar);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return Objects.equals(isAcknowledged(), other.isAcknowledged()) && Objects.equals(calendar, other.calendar);
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final Client client;
private final ClusterService clusterService;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client) {
IndexNameExpressionResolver indexNameExpressionResolver,
Client client, ClusterService clusterService) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.client = client;
this.clusterService = clusterService;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
final Calendar calendar = request.getCalendar();
Calendar calendar = request.getCalendar();
checkJobsExist(calendar.getJobIds(), listener::onFailure);
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, calendar.documentId());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
indexRequest.source(calendar.toXContent(builder,
@ -218,5 +251,17 @@ public class PutCalendarAction extends Action<PutCalendarAction.Request, PutCale
}
});
}
private void checkJobsExist(List<String> jobIds, Consumer<Exception> errorHandler) {
ClusterState state = clusterService.state();
MlMetadata mlMetadata = state.getMetaData().custom(MlMetadata.TYPE);
for (String jobId: jobIds) {
Set<String> jobs = mlMetadata.expandJobIds(jobId, true);
if (jobs.isEmpty()) {
errorHandler.accept(ExceptionsHelper.missingJobException(jobId));
return;
}
}
}
}
}

View File

@ -0,0 +1,168 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class UpdateCalendarJobAction extends Action<UpdateCalendarJobAction.Request, PutCalendarAction.Response,
UpdateCalendarJobAction.RequestBuilder> {
public static final UpdateCalendarJobAction INSTANCE = new UpdateCalendarJobAction();
public static final String NAME = "cluster:admin/xpack/ml/calendars/jobs/update";
private UpdateCalendarJobAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client);
}
@Override
public PutCalendarAction.Response newResponse() {
return new PutCalendarAction.Response();
}
public static class Request extends ActionRequest {
private String calendarId;
private Set<String> jobIdsToAdd;
private Set<String> jobIdsToRemove;
Request() {
}
public Request(String calendarId, Set<String> jobIdsToAdd, Set<String> jobIdsToRemove) {
this.calendarId = ExceptionsHelper.requireNonNull(calendarId, Calendar.ID.getPreferredName());
this.jobIdsToAdd = ExceptionsHelper.requireNonNull(jobIdsToAdd, "job_ids_to_add");
this.jobIdsToRemove = ExceptionsHelper.requireNonNull(jobIdsToRemove, "job_ids_to_remove");
}
public String getCalendarId() {
return calendarId;
}
public Set<String> getJobIdsToAdd() {
return jobIdsToAdd;
}
public Set<String> getJobIdsToRemove() {
return jobIdsToRemove;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
calendarId = in.readString();
jobIdsToAdd = new HashSet<>(in.readList(StreamInput::readString));
jobIdsToRemove = new HashSet<>(in.readList(StreamInput::readString));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(calendarId);
out.writeStringList(new ArrayList<>(jobIdsToAdd));
out.writeStringList(new ArrayList<>(jobIdsToRemove));
}
@Override
public int hashCode() {
return Objects.hash(calendarId, jobIdsToAdd, jobIdsToRemove);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(calendarId, other.calendarId) && Objects.equals(jobIdsToAdd, other.jobIdsToAdd)
&& Objects.equals(jobIdsToRemove, other.jobIdsToRemove);
}
}
public static class RequestBuilder extends ActionRequestBuilder<Request, PutCalendarAction.Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new Request());
}
}
public static class TransportAction extends HandledTransportAction<Request, PutCalendarAction.Response> {
private final ClusterService clusterService;
private final JobProvider jobProvider;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, JobProvider jobProvider) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.clusterService = clusterService;
this.jobProvider = jobProvider;
}
@Override
protected void doExecute(Request request, ActionListener<PutCalendarAction.Response> listener) {
ClusterState state = clusterService.state();
MlMetadata mlMetadata = state.getMetaData().custom(MlMetadata.TYPE);
for (String jobToAdd: request.getJobIdsToAdd()) {
if (mlMetadata.isGroupOrJob(jobToAdd) == false) {
listener.onFailure(ExceptionsHelper.missingJobException(jobToAdd));
return;
}
}
for (String jobToRemove: request.getJobIdsToRemove()) {
if (mlMetadata.isGroupOrJob(jobToRemove) == false) {
listener.onFailure(ExceptionsHelper.missingJobException(jobToRemove));
return;
}
}
jobProvider.updateCalendar(request.getCalendarId(), request.getJobIdsToAdd(), request.getJobIdsToRemove(),
c -> listener.onResponse(new PutCalendarAction.Response(c)), listener::onFailure);
}
}
}

View File

@ -13,10 +13,8 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -70,7 +68,7 @@ public class Calendar implements ToXContentObject, Writeable {
}
public List<String> getJobIds() {
return new ArrayList<>(jobIds);
return Collections.unmodifiableList(jobIds);
}
@Override

View File

@ -313,7 +313,7 @@ public class JobManager extends AbstractComponent {
String jobId = request.getJobId();
logger.debug("Deleting job '" + jobId + "'");
// Step 3. When the job has been removed from the cluster state, return a response
// Step 4. When the job has been removed from the cluster state, return a response
// -------
CheckedConsumer<Boolean, Exception> apiResponseHandler = jobDeleted -> {
if (jobDeleted) {
@ -325,7 +325,7 @@ public class JobManager extends AbstractComponent {
}
};
// Step 2. When the physical storage has been deleted, remove from Cluster State
// Step 3. When the physical storage has been deleted, remove from Cluster State
// -------
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> clusterService.submitStateUpdateTask("delete-job-" + jobId,
new AckedClusterStateUpdateTask<Boolean>(request, ActionListener.wrap(apiResponseHandler, actionListener::onFailure)) {
@ -351,11 +351,18 @@ public class JobManager extends AbstractComponent {
}
});
// Step 2. Remove the job from any calendars
CheckedConsumer<Boolean, Exception> removeFromCalendarsHandler = response -> {
jobProvider.removeJobFromCalendars(jobId, ActionListener.<Boolean>wrap(deleteJobStateHandler::accept,
actionListener::onFailure ));
};
// Step 1. Delete the physical storage
// This task manages the physical deletion of the job state and results
task.delete(jobId, client, clusterService.state(), deleteJobStateHandler::accept, actionListener::onFailure);
task.delete(jobId, client, clusterService.state(), removeFromCalendarsHandler, actionListener::onFailure);
}
public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionListener<RevertModelSnapshotAction.Response> actionListener,

View File

@ -59,6 +59,10 @@ public class GroupOrJobLookup {
return new GroupOrJobResolver().expand(expression, allowNoJobs);
}
public boolean isGroupOrJob(String id) {
return groupOrJobLookup.containsKey(id);
}
private class GroupOrJobResolver extends NameResolver {
private GroupOrJobResolver() {

View File

@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.action.util.PageParams;
import org.elasticsearch.xpack.ml.calendars.Calendar;
public class CalendarQueryBuilder {
private PageParams pageParams = new PageParams(0, 10000);
private String jobId;
private boolean sort = false;
/**
* Page the query result
* @param params The page parameters
* @return this
*/
public CalendarQueryBuilder pageParams(PageParams params) {
this.pageParams = params;
return this;
}
/**
* Query only calendars used by this job
* @param jobId The job Id
* @return this
*/
public CalendarQueryBuilder jobId(String jobId) {
this.jobId = jobId;
return this;
}
/**
* Sort results by calendar_id
* @param sort Sort if true
* @return this
*/
public CalendarQueryBuilder sort(boolean sort) {
this.sort = sort;
return this;
}
public SearchSourceBuilder build() {
QueryBuilder qb;
if (jobId != null) {
qb = new BoolQueryBuilder()
.filter(new TermsQueryBuilder(Calendar.TYPE.getPreferredName(), Calendar.CALENDAR_TYPE))
.filter(new TermsQueryBuilder(Calendar.JOB_IDS.getPreferredName(), jobId));
} else {
qb = new TermsQueryBuilder(Calendar.TYPE.getPreferredName(), Calendar.CALENDAR_TYPE);
}
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(qb);
if (sort) {
sourceBuilder.sort(Calendar.ID.getPreferredName());
}
sourceBuilder.from(pageParams.getFrom()).size(pageParams.getSize());
return sourceBuilder;
}
}

View File

@ -18,6 +18,11 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
@ -25,6 +30,9 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
@ -38,6 +46,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
@ -62,6 +71,7 @@ import org.elasticsearch.xpack.ml.action.GetCategoriesAction;
import org.elasticsearch.xpack.ml.action.GetInfluencersAction;
import org.elasticsearch.xpack.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
@ -87,13 +97,16 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.ClientHelper.clientWithOrigin;
@ -998,6 +1011,136 @@ public class JobProvider {
, client::search);
}
public void updateCalendar(String calendarId, Set<String> jobIdsToAdd, Set<String> jobIdsToRemove,
Consumer<Calendar> handler, Consumer<Exception> errorHandler) {
ActionListener<Calendar> getCalendarListener = ActionListener.wrap(
calendar -> {
Set<String> currentJobs = new HashSet<>(calendar.getJobIds());
currentJobs.addAll(jobIdsToAdd);
currentJobs.removeAll(jobIdsToRemove);
Calendar updatedCalendar = new Calendar(calendar.getId(), new ArrayList<>(currentJobs));
UpdateRequest updateRequest = new UpdateRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, updatedCalendar.documentId());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
updateRequest.doc(updatedCalendar.toXContent(builder, ToXContent.EMPTY_PARAMS));
} catch (IOException e) {
throw new IllegalStateException("Failed to serialise calendar with id [" + updatedCalendar.getId() + "]", e);
}
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, updateRequest,
ActionListener.<UpdateResponse>wrap(
response -> {
handler.accept(updatedCalendar);
},
errorHandler)
, client::update);
},
errorHandler
);
calendar(calendarId, getCalendarListener);
}
public void calendars(CalendarQueryBuilder queryBuilder, ActionListener<QueryPage<Calendar>> listener) {
SearchRequest searchRequest = client.prepareSearch(MlMetaIndex.INDEX_NAME)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(queryBuilder.build()).request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
response -> {
List<Calendar> calendars = new ArrayList<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
calendars.add(parseSearchHit(hit, Calendar.PARSER, listener::onFailure).build());
}
listener.onResponse(new QueryPage<Calendar>(calendars, response.getHits().getTotalHits(),
Calendar.RESULTS_FIELD));
},
listener::onFailure)
, client::search);
}
public void removeJobFromCalendars(String jobId, ActionListener<Boolean> listener) {
ActionListener<BulkResponse> updateCalandarsListener = ActionListener.wrap(
r -> {
if (r.hasFailures()) {
listener.onResponse(false);
}
listener.onResponse(true);
},
listener::onFailure
);
ActionListener<QueryPage<Calendar>> getCalendarsListener = ActionListener.wrap(
r -> {
BulkRequestBuilder bulkUpdate = client.prepareBulk();
bulkUpdate.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
r.results().stream()
.map(c -> {
Set<String> ids = new HashSet<>(c.getJobIds());
ids.remove(jobId);
return new Calendar(c.getId(), new ArrayList<>(ids));
}).forEach(c -> {
UpdateRequest updateRequest = new UpdateRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE,
c.documentId());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
updateRequest.doc(c.toXContent(builder, ToXContent.EMPTY_PARAMS));
} catch (IOException e) {
throw new IllegalStateException("Failed to serialise calendar with id [" + c.getId() + "]", e);
}
bulkUpdate.add(updateRequest);
});
if (bulkUpdate.numberOfActions() > 0) {
executeAsyncWithOrigin(client, ML_ORIGIN, BulkAction.INSTANCE, bulkUpdate.request(), updateCalandarsListener);
} else {
listener.onResponse(true);
}
},
listener::onFailure
);
CalendarQueryBuilder query = new CalendarQueryBuilder().jobId(jobId);
calendars(query, getCalendarsListener);
}
public void calendar(String calendarId, ActionListener<Calendar> listener) {
GetRequest getRequest = new GetRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, Calendar.documentId(calendarId));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, getRequest, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getDocResponse) {
try {
if (getDocResponse.isExists()) {
BytesReference docSource = getDocResponse.getSourceAsBytesRef();
try (XContentParser parser =
XContentFactory.xContent(docSource).createParser(NamedXContentRegistry.EMPTY, docSource)) {
Calendar calendar = Calendar.PARSER.apply(parser, null).build();
listener.onResponse(calendar);
}
} else {
this.onFailure(new ResourceNotFoundException("No calendar with id [" + calendarId + "]"));
}
} catch (Exception e) {
this.onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
},
client::get);
}
private void handleLatestModelSizeStats(String jobId, ModelSizeStats latestModelSizeStats, Consumer<Long> handler,
Consumer<Exception> errorHandler) {
if (latestModelSizeStats != null) {

View File

@ -35,5 +35,4 @@ public class RestDeleteCalendarAction extends BaseRestHandler {
DeleteCalendarAction.Request request = new DeleteCalendarAction.Request(restRequest.param(Calendar.ID.getPreferredName()));
return channel -> client.execute(DeleteCalendarAction.INSTANCE, request, new AcknowledgedRestListener<>(channel));
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.calendar;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.UpdateCalendarJobAction;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.io.IOException;
import java.util.Collections;
public class RestDeleteCalendarJobAction extends BaseRestHandler {
public RestDeleteCalendarJobAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.DELETE,
MachineLearning.BASE_PATH + "calendars/{" + Calendar.ID.getPreferredName() + "}/jobs/{" +
Job.ID.getPreferredName() + "}", this);
}
@Override
public String getName() {
return "xpack_ml_delete_calendar_job_action";
}
@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String calendarId = restRequest.param(Calendar.ID.getPreferredName());
String jobId = restRequest.param(Job.ID.getPreferredName());
UpdateCalendarJobAction.Request request =
new UpdateCalendarJobAction.Request(calendarId, Collections.emptySet(), Collections.singleton(jobId));
return channel -> client.execute(UpdateCalendarJobAction.INSTANCE, request, new AcknowledgedRestListener<>(channel));
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.calendar;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.UpdateCalendarJobAction;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.io.IOException;
import java.util.Collections;
public class RestPutCalendarJobAction extends BaseRestHandler {
public RestPutCalendarJobAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.PUT,
MachineLearning.BASE_PATH + "calendars/{" + Calendar.ID.getPreferredName() + "}/jobs/{" +
Job.ID.getPreferredName() + "}", this);
}
@Override
public String getName() {
return "xpack_ml_put_calendar_job_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String calendarId = restRequest.param(Calendar.ID.getPreferredName());
String jobId = restRequest.param(Job.ID.getPreferredName());
UpdateCalendarJobAction.Request putCalendarRequest =
new UpdateCalendarJobAction.Request(calendarId, Collections.singleton(jobId), Collections.emptySet());
return channel -> client.execute(UpdateCalendarJobAction.INSTANCE, putCalendarRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.ml.calendars.CalendarTests;
public class PutCalendarActionResponseTests extends AbstractStreamableTestCase<PutCalendarAction.Response> {
@Override
protected PutCalendarAction.Response createTestInstance() {
return new PutCalendarAction.Response(CalendarTests.testInstance());
}
@Override
protected PutCalendarAction.Response createBlankInstance() {
return new PutCalendarAction.Response();
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import java.util.HashSet;
import java.util.Set;
public class UpdateCalendarJobActionResquestTests extends AbstractStreamableTestCase<UpdateCalendarJobAction.Request> {
@Override
protected UpdateCalendarJobAction.Request createTestInstance() {
int addSize = randomIntBetween(0, 2);
Set<String> toAdd = new HashSet<>();
for (int i=0; i<addSize; i++) {
toAdd.add(randomAlphaOfLength(10));
}
int removeSize = randomIntBetween(0, 2);
Set<String> toRemove = new HashSet<>();
for (int i=0; i<removeSize; i++) {
toRemove.add(randomAlphaOfLength(10));
}
return new UpdateCalendarJobAction.Request(randomAlphaOfLength(10), toAdd, toRemove);
}
@Override
protected UpdateCalendarJobAction.Request createBlankInstance() {
return new UpdateCalendarJobAction.Request();
}
}

View File

@ -18,8 +18,7 @@ import static org.hamcrest.Matchers.equalTo;
public class CalendarTests extends AbstractSerializingTestCase<Calendar> {
@Override
protected Calendar createTestInstance() {
public static Calendar testInstance() {
int size = randomInt(10);
List<String> items = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
@ -28,6 +27,11 @@ public class CalendarTests extends AbstractSerializingTestCase<Calendar> {
return new Calendar(randomAlphaOfLengthBetween(1, 20), items);
}
@Override
protected Calendar createTestInstance() {
return testInstance();
}
@Override
protected Writeable.Reader<Calendar> instanceReader() {
return Calendar::new;

View File

@ -25,6 +25,8 @@ import org.elasticsearch.xpack.XPackSingleNodeTestCase;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Connective;
@ -36,6 +38,7 @@ import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.RuleAction;
import org.elasticsearch.xpack.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
@ -56,9 +59,21 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.collection.IsEmptyCollection.empty;
import static org.hamcrest.core.Is.is;
public class JobProviderIT extends XPackSingleNodeTestCase {
@ -97,6 +112,160 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
});
}
public void testGetCalandarByJobId() throws Exception {
List<Calendar> calendars = new ArrayList<>();
calendars.add(new Calendar("empty calendar", Collections.emptyList()));
calendars.add(new Calendar("foo calendar", Collections.singletonList("foo")));
calendars.add(new Calendar("foo bar calendar", Arrays.asList("foo", "bar")));
calendars.add(new Calendar("cat calendar", Collections.singletonList("cat")));
calendars.add(new Calendar("cat foo calendar", Arrays.asList("cat", "foo")));
indexCalendars(calendars);
List<Calendar> queryResult = getCalendars("ted");
assertThat(queryResult, is(empty()));
queryResult = getCalendars("foo");
assertThat(queryResult, hasSize(3));
Long matchedCount = queryResult.stream().filter(
c -> c.getId().equals("foo calendar") || c.getId().equals("foo bar calendar") || c.getId().equals("cat foo calendar"))
.collect(Collectors.counting());
assertEquals(new Long(3), matchedCount);
queryResult = getCalendars("bar");
assertThat(queryResult, hasSize(1));
assertEquals("foo bar calendar", queryResult.get(0).getId());
}
public void testUpdateCalendar() throws Exception {
String calendarId = "empty calendar";
Calendar emptyCal = new Calendar(calendarId, Collections.emptyList());
indexCalendars(Collections.singletonList(emptyCal));
Set<String> addedIds = new HashSet<>();
addedIds.add("foo");
addedIds.add("bar");
updateCalendar(calendarId, addedIds, Collections.emptySet());
Calendar updated = getCalendar(calendarId);
assertEquals(calendarId, updated.getId());
assertEquals(addedIds, new HashSet<>(updated.getJobIds()));
Set<String> removedIds = new HashSet<>();
removedIds.add("foo");
updateCalendar(calendarId, Collections.emptySet(), removedIds);
updated = getCalendar(calendarId);
assertEquals(calendarId, updated.getId());
assertEquals(1, updated.getJobIds().size());
assertEquals("bar", updated.getJobIds().get(0));
}
public void testRemoveJobFromCalendar() throws Exception {
List<Calendar> calendars = new ArrayList<>();
calendars.add(new Calendar("empty calendar", Collections.emptyList()));
calendars.add(new Calendar("foo calendar", Collections.singletonList("foo")));
calendars.add(new Calendar("foo bar calendar", Arrays.asList("foo", "bar")));
calendars.add(new Calendar("cat calendar", Collections.singletonList("cat")));
calendars.add(new Calendar("cat foo calendar", Arrays.asList("cat", "foo")));
indexCalendars(calendars);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
jobProvider.removeJobFromCalendars("bar", ActionListener.wrap(
r -> latch.countDown(),
exceptionHolder::set));
latch.await();
if (exceptionHolder.get() != null) {
throw exceptionHolder.get();
}
List<Calendar> updatedCalendars = getCalendars(null);
assertEquals(5, updatedCalendars.size());
for (Calendar cal: updatedCalendars) {
assertThat("bar", not(isIn(cal.getJobIds())));
}
Calendar catFoo = getCalendar("cat foo calendar");
assertThat(catFoo.getJobIds(), contains("cat", "foo"));
CountDownLatch latch2 = new CountDownLatch(1);
exceptionHolder = new AtomicReference<>();
jobProvider.removeJobFromCalendars("cat", ActionListener.wrap(
r -> latch2.countDown(),
exceptionHolder::set));
latch2.await();
if (exceptionHolder.get() != null) {
throw exceptionHolder.get();
}
updatedCalendars = getCalendars(null);
assertEquals(5, updatedCalendars.size());
for (Calendar cal: updatedCalendars) {
assertThat("bar", not(isIn(cal.getJobIds())));
assertThat("cat", not(isIn(cal.getJobIds())));
}
}
private List<Calendar> getCalendars(String jobId) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<QueryPage<Calendar>> result = new AtomicReference<>();
CalendarQueryBuilder query = new CalendarQueryBuilder();
if (jobId != null) {
query.jobId(jobId);
}
jobProvider.calendars(query, ActionListener.wrap(
r -> {
latch.countDown();
result.set(r);
},
exceptionHolder::set));
latch.await();
if (exceptionHolder.get() != null) {
throw exceptionHolder.get();
}
return result.get().results();
}
private void updateCalendar(String calendarId, Set<String> idsToAdd, Set<String> idsToRemove) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
jobProvider.updateCalendar(calendarId, idsToAdd, idsToRemove,
r -> latch.countDown(),
exceptionHolder::set);
latch.await();
if (exceptionHolder.get() != null) {
throw exceptionHolder.get();
}
client().admin().indices().prepareRefresh(MlMetaIndex.INDEX_NAME).get();
}
private Calendar getCalendar(String calendarId) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<Calendar> calendarHolder = new AtomicReference<>();
jobProvider.calendar(calendarId, ActionListener.wrap(
c -> { latch.countDown(); calendarHolder.set(c); },
exceptionHolder::set)
);
latch.await();
if (exceptionHolder.get() != null) {
throw exceptionHolder.get();
}
return calendarHolder.get();
}
public void testSpecialEvents() throws Exception {
List<SpecialEvent> events = new ArrayList<>();
events.add(new SpecialEvent("A_and_B_downtime", "downtime", createZonedDateTime(1000L), createZonedDateTime(2000L),
@ -321,7 +490,8 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
for (MlFilter filter : filters) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
indexRequest.source(filter.toXContent(builder, ToXContent.EMPTY_PARAMS));
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"));
indexRequest.source(filter.toXContent(builder, params));
bulkRequest.add(indexRequest);
}
}
@ -341,7 +511,21 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
private void indexQuantiles(Quantiles quantiles) {
JobResultsPersister persister = new JobResultsPersister(nodeSettings(), client());
persister.persistQuantiles(quantiles);
}
private void indexCalendars(List<Calendar> calendars) throws IOException {
BulkRequestBuilder bulkRequest = client().prepareBulk();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (Calendar calendar: calendars) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, calendar.documentId());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"));
indexRequest.source(calendar.toXContent(builder, params));
bulkRequest.add(indexRequest);
}
}
bulkRequest.execute().actionGet();
}
private ZonedDateTime createZonedDateTime(long epochMs) {

View File

@ -89,6 +89,20 @@ public class GroupOrJobLookupTests extends ESTestCase {
assertThat(groupOrJobLookup.expandJobIds("foo-group,*-2", false), contains("bar-2", "foo-1", "foo-2"));
}
public void testIsGroupOrJob() {
List<Job> jobs = new ArrayList<>();
jobs.add(mockJob("foo-1", Arrays.asList("foo-group", "ones")));
jobs.add(mockJob("foo-2", Arrays.asList("foo-group", "twos")));
jobs.add(mockJob("bar-1", Arrays.asList("bar-group", "ones")));
jobs.add(mockJob("nogroup", Collections.emptyList()));
GroupOrJobLookup groupOrJobLookup = new GroupOrJobLookup(jobs);
assertTrue(groupOrJobLookup.isGroupOrJob("foo-1"));
assertTrue(groupOrJobLookup.isGroupOrJob("twos"));
assertTrue(groupOrJobLookup.isGroupOrJob("nogroup"));
assertFalse(groupOrJobLookup.isGroupOrJob("missing"));
}
private static Job mockJob(String jobId, List<String> groups) {
Job job = mock(Job.class);
when(job.getId()).thenReturn(jobId);

View File

@ -0,0 +1,22 @@
{
"xpack.ml.delete_calendar_job": {
"methods": [ "DELETE" ],
"url": {
"path": "/_xpack/ml/calendars/{calendar_id}/jobs/{job_id}",
"paths": [ "/_xpack/ml/calendars/{calendar_id}/jobs/{job_id}" ],
"parts": {
"calendar_id": {
"type" : "string",
"required" : true,
"description" : "The ID of the calendar to modify"
},
"job_id": {
"type": "string",
"required": true,
"description": "The ID of the job to remove from the calendar"
}
}
},
"body": null
}
}

View File

@ -0,0 +1,22 @@
{
"xpack.ml.put_calendar_job": {
"methods": [ "PUT" ],
"url": {
"path": "/_xpack/ml/calendars/{calendar_id}/jobs/{job_id}",
"paths": [ "/_xpack/ml/calendars/{calendar_id}/jobs/{job_id}" ],
"parts": {
"calendar_id": {
"type": "string",
"required": true,
"description": "The ID of the calendar to modify"
},
"job_id": {
"type": "string",
"required": true,
"description": "The ID of the job to add to the calendar"
}
}
},
"body": null
}
}

View File

@ -1,25 +1,52 @@
---
"Test calendar CRUD":
- do:
xpack.ml.put_job:
job_id: cal-job
body: >
{
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}
- match: { job_id: "cal-job" }
- do:
xpack.ml.put_job:
job_id: cal-job2
body: >
{
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}
- match: { job_id: "cal-job2" }
- do:
xpack.ml.put_calendar:
calendar_id: "advent"
body: >
{
"job_ids": ["abc", "xyz"]
"job_ids": ["cal-job", "cal-job2"]
}
- match: { calendar_id: advent }
- match: { job_ids.0: abc }
- match: { job_ids.1: xyz }
- match: { job_ids.0: cal-job }
- match: { job_ids.1: cal-job2 }
- do:
xpack.ml.get_calendars:
calendar_id: "advent"
- match: { count: 1 }
- match: { count: 1 }
- match:
calendars.0:
calendar_id: "advent"
job_ids: ["abc", "xyz"]
job_ids: ["cal-job", "cal-job2"]
- is_false: type
- do:
@ -27,7 +54,7 @@
calendar_id: "Dogs of the Year"
body: >
{
"job_ids": ["abc2"]
"job_ids": ["cal-job"]
}
- do:
@ -51,6 +78,15 @@
xpack.ml.get_calendars:
calendar_id: "Dogs of the Year"
- do:
catch: missing
xpack.ml.put_calendar:
calendar_id: "new cal with unknown job"
body: >
{
"job_ids": ["cal-job", "unknown-job"]
}
---
"Test PageParams":
- do:
@ -66,14 +102,16 @@
- do:
xpack.ml.get_calendars:
from: 2
- match: { count: 1 }
- match: { count: 3 }
- length: { calendars: 1}
- match: { calendars.0.calendar_id: Calendar3 }
- do:
xpack.ml.get_calendars:
from: 1
size: 1
- match: { count: 1 }
- match: { count: 3 }
- length: { calendars: 1}
- match: { calendars.0.calendar_id: Calendar2 }
---
@ -90,10 +128,6 @@
- do:
xpack.ml.put_calendar:
calendar_id: "Mayan"
body: >
{
"job_ids": ["apocalypse"]
}
- do:
catch: /version_conflict_engine_exception/
@ -106,3 +140,97 @@
catch: bad_request
xpack.ml.put_calendar:
calendar_id: "_all"
---
"Test deleted job is removed from calendar":
- do:
xpack.ml.put_job:
job_id: cal-crud-test-delete
body: >
{
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}
- match: { job_id: "cal-crud-test-delete" }
- do:
xpack.ml.put_calendar:
calendar_id: "delete-test"
body: >
{
"job_ids": ["cal-crud-test-delete"]
}
- do:
xpack.ml.delete_job:
job_id: cal-crud-test-delete
- match: { acknowledged: true }
- do:
xpack.ml.get_calendars:
calendar_id: "delete-test"
- match: { count: 1 }
- match: { calendars.0.job_ids: [] }
---
"Test update calendar":
- do:
xpack.ml.put_calendar:
calendar_id: "Wildlife"
- do:
xpack.ml.put_job:
job_id: tiger
body: >
{
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}
- match: { job_id: "tiger" }
- do:
xpack.ml.put_calendar_job:
calendar_id: "Wildlife"
job_id: "tiger"
- match: { calendar_id: "Wildlife" }
- match: { job_ids.0: "tiger" }
- do:
xpack.ml.get_calendars:
calendar_id: "Wildlife"
- match: { count: 1 }
- match: { calendars.0.calendar_id: "Wildlife" }
- length: { calendars.0.job_ids: 1 }
- match: { calendars.0.job_ids.0: "tiger" }
- do:
xpack.ml.delete_calendar_job:
calendar_id: "Wildlife"
job_id: "tiger"
- do:
xpack.ml.get_calendars:
calendar_id: "Wildlife"
- match: { count: 1 }
- match: { calendars.0.calendar_id: "Wildlife" }
- length: { calendars.0.job_ids: 0 }
- do:
catch: missing
xpack.ml.put_calendar_job:
calendar_id: "Wildlife"
job_id: "missing job"
- do:
catch: missing
xpack.ml.delete_calendar_job:
calendar_id: "Wildlife"
job_id: "missing job"