From e53ac4484ca86d6231baec808ff9e7f0708b8673 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 20 Dec 2017 17:39:44 +0000 Subject: [PATCH] [ML] Calendar event actions (elastic/x-pack-elasticsearch#3365) * Calendar event actions * Add page params and date range tests * Address review comments * Support POSTing params in the body of a request Original commit: elastic/x-pack-elasticsearch@22a7e17a8fbd83d42ecf242f5eae8819d71b9697 --- .../xpack/ml/MachineLearning.java | 12 +- .../ml/action/GetCalendarEventsAction.java | 281 ++++++++++++++++ .../xpack/ml/action/GetCalendarsAction.java | 59 ++-- .../ml/action/PostCalendarEventsAction.java | 311 ++++++++++++++++++ .../xpack/ml/calendars/SpecialEvent.java | 120 ++++--- .../xpack/ml/job/config/Job.java | 10 - .../xpack/ml/job/messages/Messages.java | 2 + .../xpack/ml/job/persistence/JobProvider.java | 150 +++++---- .../SpecialEventsQueryBuilder.java | 97 ++++++ .../autodetect/AutodetectProcessManager.java | 30 +- .../autodetect/params/AutodetectParams.java | 4 + .../calendar/RestGetCalendarEventsAction.java | 57 ++++ .../rest/calendar/RestGetCalendarsAction.java | 36 +- .../calendar/RestPostCalendarEventAction.java | 41 +++ .../GetCalendarEventsActionRequestTests.java | 44 +++ .../GetCalendarsActionRequestTests.java | 23 +- .../PostCalendarEventActionRequestTests.java | 77 +++++ .../xpack/ml/calendars/SpecialEventTests.java | 55 +++- .../xpack/ml/integration/JobProviderIT.java | 90 +++-- .../AutodetectCommunicatorTests.java | 2 +- .../writer/FieldConfigWriterTests.java | 14 +- .../api/xpack.ml.get_calendar_events.json | 37 +++ .../api/xpack.ml.get_calendars.json | 2 +- .../api/xpack.ml.post_calendar_events.json | 20 ++ .../rest-api-spec/test/ml/calendar_crud.yml | 61 +++- 25 files changed, 1423 insertions(+), 212 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetCalendarEventsAction.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostCalendarEventsAction.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/SpecialEventsQueryBuilder.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/rest/calendar/RestGetCalendarEventsAction.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/rest/calendar/RestPostCalendarEventAction.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetCalendarEventsActionRequestTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/action/PostCalendarEventActionRequestTests.java create mode 100644 plugin/src/test/resources/rest-api-spec/api/xpack.ml.get_calendar_events.json create mode 100644 plugin/src/test/resources/rest-api-spec/api/xpack.ml.post_calendar_events.json diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index f6bd76593e6..dc506c49d70 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -52,6 +52,8 @@ import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.DeleteCalendarAction; +import org.elasticsearch.xpack.ml.action.GetCalendarEventsAction; +import org.elasticsearch.xpack.ml.action.PostCalendarEventsAction; import org.elasticsearch.xpack.ml.action.UpdateCalendarJobAction; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction; @@ -119,7 +121,9 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction; import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarAction; import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarJobAction; +import org.elasticsearch.xpack.ml.rest.calendar.RestGetCalendarEventsAction; import org.elasticsearch.xpack.ml.rest.calendar.RestGetCalendarsAction; +import org.elasticsearch.xpack.ml.rest.calendar.RestPostCalendarEventAction; import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarAction; import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarJobAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestDeleteDatafeedAction; @@ -471,7 +475,9 @@ public class MachineLearning implements ActionPlugin { new RestPutCalendarAction(settings, restController), new RestDeleteCalendarAction(settings, restController), new RestDeleteCalendarJobAction(settings, restController), - new RestPutCalendarJobAction(settings, restController) + new RestPutCalendarJobAction(settings, restController), + new RestGetCalendarEventsAction(settings, restController), + new RestPostCalendarEventAction(settings, restController) ); } @@ -521,7 +527,9 @@ public class MachineLearning implements ActionPlugin { new ActionHandler<>(GetCalendarsAction.INSTANCE, GetCalendarsAction.TransportAction.class), new ActionHandler<>(PutCalendarAction.INSTANCE, PutCalendarAction.TransportAction.class), new ActionHandler<>(DeleteCalendarAction.INSTANCE, DeleteCalendarAction.TransportAction.class), - new ActionHandler<>(UpdateCalendarJobAction.INSTANCE, UpdateCalendarJobAction.TransportAction.class) + new ActionHandler<>(UpdateCalendarJobAction.INSTANCE, UpdateCalendarJobAction.TransportAction.class), + new ActionHandler<>(GetCalendarEventsAction.INSTANCE, GetCalendarEventsAction.TransportAction.class), + new ActionHandler<>(PostCalendarEventsAction.INSTANCE, PostCalendarEventsAction.TransportAction.class) ); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetCalendarEventsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetCalendarEventsAction.java new file mode 100644 index 00000000000..9aa3b651567 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetCalendarEventsAction.java @@ -0,0 +1,281 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.action.util.PageParams; +import org.elasticsearch.xpack.ml.action.util.QueryPage; +import org.elasticsearch.xpack.ml.calendars.Calendar; +import org.elasticsearch.xpack.ml.calendars.SpecialEvent; +import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.SpecialEventsQueryBuilder; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; + +import java.io.IOException; +import java.util.Collections; +import java.util.Objects; + +public class GetCalendarEventsAction extends Action { + public static final GetCalendarEventsAction INSTANCE = new GetCalendarEventsAction(); + public static final String NAME = "cluster:monitor/xpack/ml/calendars/events/get"; + + private GetCalendarEventsAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends ActionRequest implements ToXContentObject { + + public static final ParseField AFTER = new ParseField("after"); + public static final ParseField BEFORE = new ParseField("before"); + + private static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); + + static { + PARSER.declareString(Request::setCalendarId, Calendar.ID); + PARSER.declareString(Request::setAfter, AFTER); + PARSER.declareString(Request::setBefore, BEFORE); + PARSER.declareObject(Request::setPageParams, PageParams.PARSER, PageParams.PAGE); + } + + public static Request parseRequest(String calendarId, XContentParser parser) { + Request request = PARSER.apply(parser, null); + if (calendarId != null) { + request.setCalendarId(calendarId); + } + return request; + } + + private String calendarId; + private String after; + private String before; + private PageParams pageParams = PageParams.defaultParams(); + + Request() { + } + + public Request(String calendarId) { + setCalendarId(calendarId); + } + + public String getCalendarId() { + return calendarId; + } + + private void setCalendarId(String calendarId) { + this.calendarId = ExceptionsHelper.requireNonNull(calendarId, Calendar.ID.getPreferredName()); + } + + public String getAfter() { + return after; + } + public void setAfter(String after) { + this.after = after; + } + + public String getBefore() { + return before; + } + + public void setBefore(String before) { + this.before = before; + } + + public PageParams getPageParams() { + return pageParams; + } + + public void setPageParams(PageParams pageParams) { + this.pageParams = Objects.requireNonNull(pageParams); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + calendarId = in.readString(); + after = in.readOptionalString(); + before = in.readOptionalString(); + pageParams = new PageParams(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(calendarId); + out.writeOptionalString(after); + out.writeOptionalString(before); + pageParams.writeTo(out); + } + + @Override + public int hashCode() { + return Objects.hash(calendarId, after, before, pageParams); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Request other = (Request) obj; + return Objects.equals(calendarId, other.calendarId) && Objects.equals(after, other.after) + && Objects.equals(before, other.before) && Objects.equals(pageParams, other.pageParams); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Calendar.ID.getPreferredName(), calendarId); + if (after != null) { + builder.field(AFTER.getPreferredName(), after); + } + if (before != null) { + builder.field(BEFORE.getPreferredName(), before); + } + builder.field(PageParams.PAGE.getPreferredName(), pageParams); + builder.endObject(); + return builder; + } + } + + public static class RequestBuilder extends ActionRequestBuilder { + + public RequestBuilder(ElasticsearchClient client) { + super(client, INSTANCE, new Request()); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + private QueryPage specialEvents; + + Response() { + } + + public Response(QueryPage specialEvents) { + this.specialEvents = specialEvents; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + specialEvents = new QueryPage<>(in, SpecialEvent::new); + + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + specialEvents.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return specialEvents.toXContent(builder, params); + } + + @Override + public int hashCode() { + return Objects.hash(specialEvents); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Response other = (Response) obj; + return Objects.equals(specialEvents, other.specialEvents); + } + } + + public static class TransportAction extends HandledTransportAction { + + private final JobProvider jobProvider; + + @Inject + public TransportAction(Settings settings, ThreadPool threadPool, + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + JobProvider jobProvider) { + super(settings, NAME, threadPool, transportService, actionFilters, + indexNameExpressionResolver, Request::new); + this.jobProvider = jobProvider; + } + + @Override + protected void doExecute(Request request, ActionListener listener) { + ActionListener calendarExistsListener = ActionListener.wrap( + r -> { + SpecialEventsQueryBuilder query = new SpecialEventsQueryBuilder() + .calendarIds(Collections.singletonList(request.getCalendarId())) + .after(request.getAfter()) + .before(request.getBefore()) + .from(request.getPageParams().getFrom()) + .size(request.getPageParams().getSize()); + + jobProvider.specialEvents(query, ActionListener.wrap( + events -> { + listener.onResponse(new Response(events)); + }, + listener::onFailure + )); + }, + listener::onFailure); + + checkCalendarExists(request.getCalendarId(), calendarExistsListener); + } + + private void checkCalendarExists(String calendarId, ActionListener listener) { + jobProvider.calendar(calendarId, ActionListener.wrap( + c -> listener.onResponse(true), + listener::onFailure + )); + } + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetCalendarsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetCalendarsAction.java index c547745bd79..e51941e1c11 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetCalendarsAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetCalendarsAction.java @@ -11,34 +11,23 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.get.GetAction; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.StatusToXContentObject; +import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.MlMetaIndex; import org.elasticsearch.xpack.ml.action.util.PageParams; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.calendars.Calendar; @@ -46,14 +35,10 @@ import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; -import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN; -import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin; public class GetCalendarsAction extends Action { @@ -74,7 +59,22 @@ public class GetCalendarsAction extends Action PARSER = new ObjectParser<>(NAME, Request::new); + + static { + PARSER.declareString(Request::setCalendarId, Calendar.ID); + PARSER.declareObject(Request::setPageParams, PageParams.PARSER, PageParams.PAGE); + } + + public static Request parseRequest(String calendarId, XContentParser parser) { + Request request = PARSER.apply(parser, null); + if (calendarId != null) { + request.setCalendarId(calendarId); + } + return request; + } private String calendarId; private PageParams pageParams; @@ -114,18 +114,20 @@ public class GetCalendarsAction extends Action { + public static final PostCalendarEventsAction INSTANCE = new PostCalendarEventsAction(); + public static final String NAME = "cluster:admin/xpack/ml/calendars/events/post"; + + public static final ParseField SPECIAL_EVENTS = new ParseField("special_events"); + + private PostCalendarEventsAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends ActionRequest { + + public static Request parseRequest(String calendarId, BytesReference data, XContentType contentType) throws IOException { + List events = new ArrayList<>(); + + XContent xContent = contentType.xContent(); + int lineNumber = 0; + int from = 0; + int length = data.length(); + byte marker = xContent.streamSeparator(); + while (true) { + int nextMarker = findNextMarker(marker, from, data, length); + if (nextMarker == -1) { + break; + } + lineNumber++; + + try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, data.slice(from, nextMarker - from))) { + try { + SpecialEvent.Builder event = SpecialEvent.PARSER.apply(parser, null); + events.add(event); + } catch (ParsingException pe) { + throw ExceptionsHelper.badRequestException("Failed to parse special event on line [" + lineNumber + "]", pe); + } + + from = nextMarker + 1; + } + } + + for (SpecialEvent.Builder event: events) { + if (event.getCalendarId() != null && event.getCalendarId().equals(calendarId) == false) { + throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.INCONSISTENT_ID, + Calendar.ID.getPreferredName(), event.getCalendarId(), calendarId)); + } + + // Set the calendar Id in case it is null + event.calendarId(calendarId); + } + return new Request(calendarId, events.stream().map(SpecialEvent.Builder::build).collect(Collectors.toList())); + } + + private static int findNextMarker(byte marker, int from, BytesReference data, int length) { + for (int i = from; i < length; i++) { + if (data.get(i) == marker) { + return i; + } + } + if (from != length) { + throw new IllegalArgumentException("The post calendar events request must be terminated by a newline [\n]"); + } + return -1; + } + + private String calendarId; + private List specialEvents; + + Request() { + } + + public Request(String calendarId, List specialEvents) { + this.calendarId = ExceptionsHelper.requireNonNull(calendarId, Calendar.ID.getPreferredName()); + this.specialEvents = ExceptionsHelper.requireNonNull(specialEvents, SPECIAL_EVENTS.getPreferredName()); + } + + public String getCalendarId() { + return calendarId; + } + + public List getSpecialEvents() { + return specialEvents; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + calendarId = in.readString(); + specialEvents = in.readList(SpecialEvent::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(calendarId); + out.writeList(specialEvents); + } + + @Override + public int hashCode() { + return Objects.hash(calendarId, specialEvents); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Request other = (Request) obj; + return Objects.equals(calendarId, other.calendarId) && Objects.equals(specialEvents, other.specialEvents); + } + } + + public static class RequestBuilder extends ActionRequestBuilder { + + public RequestBuilder(ElasticsearchClient client) { + super(client, INSTANCE, new Request()); + } + } + + public static class Response extends AcknowledgedResponse implements ToXContentObject { + + private List specialEvent; + + Response() { + } + + public Response(List specialEvents) { + super(true); + this.specialEvent = specialEvents; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + readAcknowledged(in); + in.readList(SpecialEvent::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + writeAcknowledged(out); + out.writeList(specialEvent); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(SPECIAL_EVENTS.getPreferredName(), specialEvent); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(isAcknowledged(), specialEvent); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Response other = (Response) obj; + return Objects.equals(isAcknowledged(), other.isAcknowledged()) && Objects.equals(specialEvent, other.specialEvent); + } + } + + public static class TransportAction extends HandledTransportAction { + + private final Client client; + private final JobProvider jobProvider; + + @Inject + public TransportAction(Settings settings, ThreadPool threadPool, + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + Client client, JobProvider jobProvider) { + super(settings, NAME, threadPool, transportService, actionFilters, + indexNameExpressionResolver, Request::new); + this.client = client; + this.jobProvider = jobProvider; + } + + @Override + protected void doExecute(Request request, ActionListener listener) { + List events = request.getSpecialEvents(); + + ActionListener calendarExistsListener = ActionListener.wrap( + r -> { + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + + for (SpecialEvent event: events) { + IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + indexRequest.source(event.toXContent(builder, + new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true")))); + } catch (IOException e) { + throw new IllegalStateException("Failed to serialise special event", e); + } + bulkRequestBuilder.add(indexRequest); + } + + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + executeAsyncWithOrigin(client, ML_ORIGIN, BulkAction.INSTANCE, bulkRequestBuilder.request(), + new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + listener.onResponse(new Response(events)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure( + ExceptionsHelper.serverError("Error indexing special event", e)); + } + }); + }, + listener::onFailure); + + checkCalendarExists(request.getCalendarId(), calendarExistsListener); + } + + private void checkCalendarExists(String calendarId, ActionListener listener) { + jobProvider.calendar(calendarId, ActionListener.wrap( + c -> listener.onResponse(true), + listener::onFailure + )); + } + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/calendars/SpecialEvent.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/calendars/SpecialEvent.java index f65c33f2c9e..f7da0c1b948 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/calendars/SpecialEvent.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/calendars/SpecialEvent.java @@ -10,7 +10,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -21,6 +20,8 @@ import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Operator; import org.elasticsearch.xpack.ml.job.config.RuleAction; import org.elasticsearch.xpack.ml.job.config.RuleCondition; +import org.elasticsearch.xpack.ml.job.messages.Messages; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.Intervals; import org.elasticsearch.xpack.ml.utils.time.TimeUtils; @@ -29,32 +30,27 @@ import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Set; public class SpecialEvent implements ToXContentObject, Writeable { - public static final ParseField ID = new ParseField("id"); public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField START_TIME = new ParseField("start_time"); public static final ParseField END_TIME = new ParseField("end_time"); public static final ParseField TYPE = new ParseField("type"); - public static final ParseField JOB_IDS = new ParseField("job_ids"); + + public static final ParseField RESULTS_FIELD = new ParseField("special_events"); public static final String SPECIAL_EVENT_TYPE = "special_event"; public static final String DOCUMENT_ID_PREFIX = "event_"; - public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>("special_event", a -> new SpecialEvent((String) a[0], (String) a[1], (ZonedDateTime) a[2], - (ZonedDateTime) a[3], (List) a[4])); + public static final ObjectParser PARSER = + new ObjectParser<>("special_event", Builder::new); static { - PARSER.declareString(ConstructingObjectParser.constructorArg(), ID); - PARSER.declareString(ConstructingObjectParser.constructorArg(), DESCRIPTION); - PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { + PARSER.declareString(SpecialEvent.Builder::description, DESCRIPTION); + PARSER.declareField(SpecialEvent.Builder::startTime, p -> { if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) { return ZonedDateTime.ofInstant(Instant.ofEpochMilli(p.longValue()), ZoneOffset.UTC); } else if (p.currentToken() == XContentParser.Token.VALUE_STRING) { @@ -63,7 +59,7 @@ public class SpecialEvent implements ToXContentObject, Writeable { throw new IllegalArgumentException( "unexpected token [" + p.currentToken() + "] for [" + START_TIME.getPreferredName() + "]"); }, START_TIME, ObjectParser.ValueType.VALUE); - PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { + PARSER.declareField(SpecialEvent.Builder::endTime, p -> { if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) { return ZonedDateTime.ofInstant(Instant.ofEpochMilli(p.longValue()), ZoneOffset.UTC); } else if (p.currentToken() == XContentParser.Token.VALUE_STRING) { @@ -73,7 +69,7 @@ public class SpecialEvent implements ToXContentObject, Writeable { "unexpected token [" + p.currentToken() + "] for [" + END_TIME.getPreferredName() + "]"); }, END_TIME, ObjectParser.ValueType.VALUE); - PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), JOB_IDS); + PARSER.declareString(SpecialEvent.Builder::calendarId, Calendar.ID); PARSER.declareString((builder, s) -> {}, TYPE); } @@ -81,30 +77,23 @@ public class SpecialEvent implements ToXContentObject, Writeable { return DOCUMENT_ID_PREFIX + eventId; } - private final String id; private final String description; private final ZonedDateTime startTime; private final ZonedDateTime endTime; - private final Set jobIds; + private final String calendarId; - public SpecialEvent(String id, String description, ZonedDateTime startTime, ZonedDateTime endTime, List jobIds) { - this.id = Objects.requireNonNull(id); + SpecialEvent(String description, ZonedDateTime startTime, ZonedDateTime endTime, String calendarId) { this.description = Objects.requireNonNull(description); this.startTime = Objects.requireNonNull(startTime); this.endTime = Objects.requireNonNull(endTime); - this.jobIds = new HashSet<>(jobIds); + this.calendarId = Objects.requireNonNull(calendarId); } public SpecialEvent(StreamInput in) throws IOException { - id = in.readString(); description = in.readString(); startTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(in.readVLong()), ZoneOffset.UTC); endTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(in.readVLong()), ZoneOffset.UTC); - jobIds = new HashSet<>(Arrays.asList(in.readStringArray())); - } - - public String getId() { - return id; + calendarId = in.readString(); } public String getDescription() { @@ -119,12 +108,8 @@ public class SpecialEvent implements ToXContentObject, Writeable { return endTime; } - public Set getJobIds() { - return jobIds; - } - - public String documentId() { - return documentId(id); + public String getCalendarId() { + return calendarId; } /** @@ -157,21 +142,19 @@ public class SpecialEvent implements ToXContentObject, Writeable { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(id); out.writeString(description); out.writeVLong(startTime.toInstant().toEpochMilli()); out.writeVLong(endTime.toInstant().toEpochMilli()); - out.writeStringArray(jobIds.toArray(new String [0])); + out.writeString(calendarId); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(ID.getPreferredName(), id); builder.field(DESCRIPTION.getPreferredName(), description); builder.dateField(START_TIME.getPreferredName(), START_TIME.getPreferredName() + "_string", startTime.toInstant().toEpochMilli()); builder.dateField(END_TIME.getPreferredName(), END_TIME.getPreferredName() + "_string", endTime.toInstant().toEpochMilli()); - builder.field(JOB_IDS.getPreferredName(), jobIds); + builder.field(Calendar.ID.getPreferredName(), calendarId); if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) { builder.field(TYPE.getPreferredName(), SPECIAL_EVENT_TYPE); } @@ -190,12 +173,73 @@ public class SpecialEvent implements ToXContentObject, Writeable { } SpecialEvent other = (SpecialEvent) obj; - return id.equals(other.id) && description.equals(other.description) && startTime.equals(other.startTime) - && endTime.equals(other.endTime) && jobIds.equals(other.jobIds); + return description.equals(other.description) && startTime.isEqual(other.startTime) + && endTime.isEqual(other.endTime) && calendarId.equals(other.calendarId); } @Override public int hashCode() { - return Objects.hash(id, description, startTime, endTime, jobIds); + return Objects.hash(description, startTime, endTime, calendarId); + } + + public static class Builder { + private String description; + private ZonedDateTime startTime; + private ZonedDateTime endTime; + private String calendarId; + + + public Builder description(String description) { + this.description = description; + return this; + } + + public Builder startTime(ZonedDateTime startTime) { + this.startTime = startTime; + return this; + } + + public Builder endTime(ZonedDateTime endTime) { + this.endTime = endTime; + return this; + } + + public Builder calendarId(String calendarId) { + this.calendarId = calendarId; + return this; + } + + public String getCalendarId() { + return calendarId; + } + + public SpecialEvent build() { + if (description == null) { + throw ExceptionsHelper.badRequestException( + Messages.getMessage(Messages.FIELD_CANNOT_BE_NULL, DESCRIPTION.getPreferredName())); + } + + if (startTime == null) { + throw ExceptionsHelper.badRequestException( + Messages.getMessage(Messages.FIELD_CANNOT_BE_NULL, START_TIME.getPreferredName())); + } + + if (endTime == null) { + throw ExceptionsHelper.badRequestException( + Messages.getMessage(Messages.FIELD_CANNOT_BE_NULL, END_TIME.getPreferredName())); + } + + if (calendarId == null) { + throw ExceptionsHelper.badRequestException( + Messages.getMessage(Messages.FIELD_CANNOT_BE_NULL, Calendar.ID.getPreferredName())); + } + + if (startTime.isBefore(endTime) == false) { + throw ExceptionsHelper.badRequestException("Special event start time [" + startTime + + "] must come before end time [" + endTime + "]"); + } + + return new SpecialEvent(description, startTime, endTime, calendarId); + } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java index c697ebb02a3..e4416368c96 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java @@ -299,16 +299,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO return createTime; } - /** - * The Job creation time. This name is preferred when serialising to the - * data store. - * - * @return The date the job was created - */ - public Date getAtTimestamp() { - return createTime; - } - /** * The time the job was finished or null if not finished. * diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java index c7cd36e13ad..8eecd9b18ff 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java @@ -172,6 +172,8 @@ public final class Messages { public static final String REST_NO_SUCH_MODEL_SNAPSHOT = "No model snapshot with id [{0}] exists for job [{1}]"; public static final String REST_START_AFTER_END = "Invalid time range: end time ''{0}'' is earlier than start time ''{1}''."; + public static final String FIELD_CANNOT_BE_NULL = "Field [{0}] cannot be null"; + private Messages() { } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 66a90e3e969..e1dcbeb9b15 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -360,11 +360,31 @@ public class JobProvider { } public void getAutodetectParams(Job job, Consumer consumer, Consumer errorHandler) { - AutodetectParams.Builder paramsBuilder = new AutodetectParams.Builder(job.getId()); String jobId = job.getId(); + + ActionListener getSpecialEventsListener = ActionListener.wrap( + paramsBuilder -> { + SpecialEventsQueryBuilder specialEventsQuery = new SpecialEventsQueryBuilder(); + Date lastestRecordTime = paramsBuilder.getDataCounts().getLatestRecordTimeStamp(); + if (lastestRecordTime != null) { + specialEventsQuery.after(Long.toString(lastestRecordTime.getTime())); + } + specialEventsForJob(jobId, specialEventsQuery, ActionListener.wrap( + events -> { + paramsBuilder.setSpecialEvents(events.results()); + consumer.accept(paramsBuilder.build()); + }, + errorHandler + )); + }, + errorHandler + ); + + AutodetectParams.Builder paramsBuilder = new AutodetectParams.Builder(job.getId()); String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); String stateIndex = AnomalyDetectorsIndex.jobStateIndexName(); + MultiSearchRequestBuilder msearch = client.prepareMultiSearch() .add(createLatestDataCountsSearch(resultsIndex, jobId)) .add(createLatestModelSizeStatsSearch(resultsIndex)) @@ -377,44 +397,43 @@ public class JobProvider { msearch.add(createDocIdSearch(MlMetaIndex.INDEX_NAME, MlFilter.documentId(filterId))); } - msearch.add(createSpecialEventSearch(jobId)); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, msearch.request(), - ActionListener.wrap( - response -> { - for (int i = 0; i < response.getResponses().length; i++) { - MultiSearchResponse.Item itemResponse = response.getResponses()[i]; - if (itemResponse.isFailure()) { - errorHandler.accept(itemResponse.getFailure()); - } else { - SearchResponse searchResponse = itemResponse.getResponse(); - ShardSearchFailure[] shardFailures = searchResponse.getShardFailures(); - int unavailableShards = searchResponse.getTotalShards() - searchResponse.getSuccessfulShards(); - if (shardFailures != null && shardFailures.length > 0) { - LOGGER.error("[{}] Search request returned shard failures: {}", jobId, - Arrays.toString(shardFailures)); - errorHandler.accept(new ElasticsearchException( - ExceptionsHelper.shardFailuresToErrorMsg(jobId, shardFailures))); - } else if (unavailableShards > 0) { - errorHandler.accept(new ElasticsearchException("[" + jobId - + "] Search request encountered [" + unavailableShards + "] unavailable shards")); - } else { - SearchHits hits = searchResponse.getHits(); - long hitsCount = hits.getHits().length; - if (hitsCount == 0) { - SearchRequest searchRequest = msearch.request().requests().get(i); - LOGGER.debug("Found 0 hits for [{}]", new Object[]{searchRequest.indices()}); + ActionListener.wrap( + response -> { + for (int i = 0; i < response.getResponses().length; i++) { + MultiSearchResponse.Item itemResponse = response.getResponses()[i]; + if (itemResponse.isFailure()) { + errorHandler.accept(itemResponse.getFailure()); } else { - for (SearchHit hit : hits) { - parseAutodetectParamSearchHit(jobId, paramsBuilder, hit, errorHandler); + SearchResponse searchResponse = itemResponse.getResponse(); + ShardSearchFailure[] shardFailures = searchResponse.getShardFailures(); + int unavailableShards = searchResponse.getTotalShards() - searchResponse.getSuccessfulShards(); + if (shardFailures != null && shardFailures.length > 0) { + LOGGER.error("[{}] Search request returned shard failures: {}", jobId, + Arrays.toString(shardFailures)); + errorHandler.accept(new ElasticsearchException( + ExceptionsHelper.shardFailuresToErrorMsg(jobId, shardFailures))); + } else if (unavailableShards > 0) { + errorHandler.accept(new ElasticsearchException("[" + jobId + + "] Search request encountered [" + unavailableShards + "] unavailable shards")); + } else { + SearchHits hits = searchResponse.getHits(); + long hitsCount = hits.getHits().length; + if (hitsCount == 0) { + SearchRequest searchRequest = msearch.request().requests().get(i); + LOGGER.debug("Found 0 hits for [{}]", new Object[]{searchRequest.indices()}); + } else { + for (SearchHit hit : hits) { + parseAutodetectParamSearchHit(jobId, paramsBuilder, hit, errorHandler); + } + } } } } - } - } - consumer.accept(paramsBuilder.build()); - }, - errorHandler + getSpecialEventsListener.onResponse(paramsBuilder); + }, + errorHandler ), client::multiSearch); } @@ -425,17 +444,6 @@ public class JobProvider { .setRouting(id); } - - private SearchRequestBuilder createSpecialEventSearch(String jobId) { - QueryBuilder qb = new BoolQueryBuilder() - .filter(new TermsQueryBuilder(SpecialEvent.TYPE.getPreferredName(), SpecialEvent.SPECIAL_EVENT_TYPE)) - .filter(new TermsQueryBuilder(SpecialEvent.JOB_IDS.getPreferredName(), jobId)); - - return client.prepareSearch(MlMetaIndex.INDEX_NAME) - .setIndicesOptions(IndicesOptions.lenientExpandOpen()) - .setQuery(qb); - } - private void parseAutodetectParamSearchHit(String jobId, AutodetectParams.Builder paramsBuilder, SearchHit hit, Consumer errorHandler) { String hitId = hit.getId(); @@ -451,10 +459,8 @@ public class JobProvider { paramsBuilder.setQuantiles(parseSearchHit(hit, Quantiles.PARSER, errorHandler)); } else if (hitId.startsWith(MlFilter.DOCUMENT_ID_PREFIX)) { paramsBuilder.addFilter(parseSearchHit(hit, MlFilter.PARSER, errorHandler).build()); - } else if (hitId.startsWith(SpecialEvent.DOCUMENT_ID_PREFIX)) { - paramsBuilder.addSpecialEvent(parseSearchHit(hit, SpecialEvent.PARSER, errorHandler)); } else { - errorHandler.accept(new IllegalStateException("Unexpected type [" + hit.getType() + "]")); + errorHandler.accept(new IllegalStateException("Unexpected Id [" + hitId + "]")); } } @@ -940,6 +946,7 @@ public class JobProvider { .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(searchFromTimeMs)) .filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE))) .addAggregation(AggregationBuilders.extendedStats("es").field(ModelSizeStats.MODEL_BYTES_FIELD.getPreferredName())); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, search.request(), ActionListener.wrap( response -> { @@ -994,21 +1001,46 @@ public class JobProvider { }); } - public void specialEvents(String jobId, Consumer> handler, Consumer errorHandler) { - SearchRequestBuilder request = createSpecialEventSearch(jobId); + public void specialEventsForJob(String jobId, SpecialEventsQueryBuilder queryBuilder, ActionListener> handler) { + + // Find all the calendars used by the job then the events for those calendars + + ActionListener> calendarsListener = ActionListener.wrap( + calendars -> { + if (calendars.results().isEmpty()) { + handler.onResponse(new QueryPage<>(Collections.emptyList(), 0, SpecialEvent.RESULTS_FIELD)); + return; + } + List calendarIds = calendars.results().stream().map(Calendar::getId).collect(Collectors.toList()); + queryBuilder.calendarIds(calendarIds); + specialEvents(queryBuilder, handler); + }, + handler::onFailure + ); + + CalendarQueryBuilder query = new CalendarQueryBuilder().jobId(jobId); + calendars(query, calendarsListener); + } + + public void specialEvents(SpecialEventsQueryBuilder query, ActionListener> handler) { + SearchRequestBuilder request = client.prepareSearch(MlMetaIndex.INDEX_NAME) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(query.build()); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request.request(), ActionListener.wrap( - response -> { - List specialEvents = new ArrayList<>(); - SearchHit[] hits = response.getHits().getHits(); - for (SearchHit hit : hits) { - specialEvents.add(parseSearchHit(hit, SpecialEvent.PARSER, errorHandler)); - } + response -> { + List specialEvents = new ArrayList<>(); + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + specialEvents.add(parseSearchHit(hit, SpecialEvent.PARSER, handler::onFailure).build()); + } - handler.accept(specialEvents); - }, - errorHandler) - , client::search); + handler.onResponse(new QueryPage<>(specialEvents, response.getHits().getTotalHits(), + SpecialEvent.RESULTS_FIELD)); + }, + handler::onFailure) + , client::search); } public void updateCalendar(String calendarId, Set jobIdsToAdd, Set jobIdsToRemove, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/SpecialEventsQueryBuilder.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/SpecialEventsQueryBuilder.java new file mode 100644 index 00000000000..18646adcd17 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/SpecialEventsQueryBuilder.java @@ -0,0 +1,97 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence; + +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.ml.calendars.Calendar; +import org.elasticsearch.xpack.ml.calendars.SpecialEvent; + +import java.util.ArrayList; +import java.util.List; + +/** + * Query builder for {@link SpecialEvent}s + * If calendarIds are not set then all calendars will match. + */ +public class SpecialEventsQueryBuilder { + public static final int DEFAULT_SIZE = 1000; + + private int from = 0; + private int size = DEFAULT_SIZE; + + private List calendarIds; + private String after; + private String before; + + public SpecialEventsQueryBuilder calendarIds(List calendarIds) { + this.calendarIds = calendarIds; + return this; + } + + public SpecialEventsQueryBuilder after(String after) { + this.after = after; + return this; + } + + public SpecialEventsQueryBuilder before(String before) { + this.before = before; + return this; + } + + public SpecialEventsQueryBuilder from(int from) { + this.from = from; + return this; + } + + public SpecialEventsQueryBuilder size(int size) { + this.size = size; + return this; + } + + public SearchSourceBuilder build() { + List queries = new ArrayList<>(); + + if (after != null) { + RangeQueryBuilder afterQuery = QueryBuilders.rangeQuery(SpecialEvent.END_TIME.getPreferredName()); + afterQuery.gt(after); + queries.add(afterQuery); + } + if (before != null) { + RangeQueryBuilder beforeQuery = QueryBuilders.rangeQuery(SpecialEvent.START_TIME.getPreferredName()); + beforeQuery.lt(before); + queries.add(beforeQuery); + } + + if (calendarIds != null && calendarIds.isEmpty() == false) { + queries.add(new TermsQueryBuilder(Calendar.ID.getPreferredName(), calendarIds)); + } + + QueryBuilder typeQuery = new TermsQueryBuilder(SpecialEvent.TYPE.getPreferredName(), SpecialEvent.SPECIAL_EVENT_TYPE); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.sort(SpecialEvent.START_TIME.getPreferredName()); + searchSourceBuilder.from(from); + searchSourceBuilder.size(size); + + if (queries.isEmpty()) { + searchSourceBuilder.query(typeQuery); + } else { + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + boolQueryBuilder.filter(typeQuery); + for (QueryBuilder query : queries) { + boolQueryBuilder.filter(query); + } + searchSourceBuilder.query(boolQueryBuilder); + } + + return searchSourceBuilder; + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 89e933ebe28..b2d737ef608 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -24,6 +24,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask; +import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.calendars.SpecialEvent; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; @@ -35,6 +36,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.persistence.SpecialEventsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; @@ -261,24 +263,24 @@ public class AutodetectProcessManager extends AbstractComponent { return; } - Consumer> eventConsumer = specialEvents -> { - communicator.writeUpdateProcessMessage(updateParams, specialEvents, (aVoid, e) -> { - if (e == null) { - handler.accept(null); - } else { - handler.accept(e); - } - }); - }; - + ActionListener> eventsListener = ActionListener.wrap( + specialEvents -> { + communicator.writeUpdateProcessMessage(updateParams, specialEvents.results(), (aVoid, e) -> { + if (e == null) { + handler.accept(null); + } else { + handler.accept(e); + } + }); + }, + handler::accept); if (updateParams.isUpdateSpecialEvents()) { - jobProvider.specialEvents(jobTask.getJobId(), eventConsumer, handler::accept); + SpecialEventsQueryBuilder query = new SpecialEventsQueryBuilder().after(Long.toString(new Date().getTime())); + jobProvider.specialEventsForJob(jobTask.getJobId(), query, eventsListener); } else { - eventConsumer.accept(Collections.emptyList()); + eventsListener.onResponse(new QueryPage(Collections.emptyList(), 0, SpecialEvent.RESULTS_FIELD)); } - - } public void openJob(JobTask jobTask, Consumer handler) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParams.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParams.java index 28428ad8c89..c9ca66fb69d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParams.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParams.java @@ -117,6 +117,10 @@ public class AutodetectParams { return this; } + public DataCounts getDataCounts() { + return dataCounts; + } + public Builder setModelSizeStats(ModelSizeStats modelSizeStats) { this.modelSizeStats = modelSizeStats; return this; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/calendar/RestGetCalendarEventsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/calendar/RestGetCalendarEventsAction.java new file mode 100644 index 00000000000..d6aa900c48b --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/calendar/RestGetCalendarEventsAction.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.rest.calendar; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.action.GetCalendarEventsAction; +import org.elasticsearch.xpack.ml.action.util.PageParams; +import org.elasticsearch.xpack.ml.calendars.Calendar; + +import java.io.IOException; + +public class RestGetCalendarEventsAction extends BaseRestHandler { + public RestGetCalendarEventsAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.GET, + MachineLearning.BASE_PATH + "calendars/{" + Calendar.ID.getPreferredName() + "}/events", this); + } + + @Override + public String getName() { + return "xpack_ml_get_calendar_events_action"; + } + + @Override + protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String calendarId = restRequest.param(Calendar.ID.getPreferredName()); + + GetCalendarEventsAction.Request request; + + if (restRequest.hasContentOrSourceParam()) { + try (XContentParser parser = restRequest.contentOrSourceParamParser()) { + request = GetCalendarEventsAction.Request.parseRequest(calendarId, parser); + } + } else { + request = new GetCalendarEventsAction.Request(calendarId); + request.setAfter(restRequest.param(GetCalendarEventsAction.Request.AFTER.getPreferredName(), null)); + request.setBefore(restRequest.param(GetCalendarEventsAction.Request.BEFORE.getPreferredName(), null)); + + if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) { + request.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM), + restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE))); + } + } + + return channel -> client.execute(GetCalendarEventsAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/calendar/RestGetCalendarsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/calendar/RestGetCalendarsAction.java index 449ade7d6a0..10d8fc17404 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/calendar/RestGetCalendarsAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/calendar/RestGetCalendarsAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.rest.calendar; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; @@ -23,9 +24,14 @@ public class RestGetCalendarsAction extends BaseRestHandler { public RestGetCalendarsAction(Settings settings, RestController controller) { super(settings); - controller.registerHandler(RestRequest.Method.GET, MachineLearning.BASE_PATH + "calendars/{" + Calendar.ID.getPreferredName() + "}", - this); + controller.registerHandler(RestRequest.Method.GET, MachineLearning.BASE_PATH + "calendars/{" + + Calendar.ID.getPreferredName() + "}", this); controller.registerHandler(RestRequest.Method.GET, MachineLearning.BASE_PATH + "calendars/", this); + + // endpoints that support body parameters must also accept POST + controller.registerHandler(RestRequest.Method.POST, MachineLearning.BASE_PATH + "calendars/{" + + Calendar.ID.getPreferredName() + "}", this); + controller.registerHandler(RestRequest.Method.POST, MachineLearning.BASE_PATH + "calendars/", this); } @Override @@ -35,17 +41,25 @@ public class RestGetCalendarsAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { - GetCalendarsAction.Request getRequest = new GetCalendarsAction.Request(); + String calendarId = restRequest.param(Calendar.ID.getPreferredName()); - if (!Strings.isNullOrEmpty(calendarId)) { - getRequest.setCalendarId(calendarId); + + GetCalendarsAction.Request request; + if (restRequest.hasContentOrSourceParam()) { + try (XContentParser parser = restRequest.contentOrSourceParamParser()) { + request = GetCalendarsAction.Request.parseRequest(calendarId, parser); + } + } else { + request = new GetCalendarsAction.Request(); + if (!Strings.isNullOrEmpty(calendarId)) { + request.setCalendarId(calendarId); + } + if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) { + request.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM), + restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE))); + } } - if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) { - getRequest.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM), - restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE))); - } - - return channel -> client.execute(GetCalendarsAction.INSTANCE, getRequest, new RestStatusToXContentListener<>(channel)); + return channel -> client.execute(GetCalendarsAction.INSTANCE, request, new RestStatusToXContentListener<>(channel)); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/calendar/RestPostCalendarEventAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/calendar/RestPostCalendarEventAction.java new file mode 100644 index 00000000000..0f3cf815fd0 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/calendar/RestPostCalendarEventAction.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.rest.calendar; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.action.PostCalendarEventsAction; +import org.elasticsearch.xpack.ml.calendars.Calendar; + +import java.io.IOException; + +public class RestPostCalendarEventAction extends BaseRestHandler { + + public RestPostCalendarEventAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.POST, + MachineLearning.BASE_PATH + "calendars/{" + Calendar.ID.getPreferredName() + "}/events", this); + } + + @Override + public String getName() { + return "xpack_ml_post_calendar_event_action"; + } + + @Override + protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String calendarId = restRequest.param(Calendar.ID.getPreferredName()); + + PostCalendarEventsAction.Request request = + PostCalendarEventsAction.Request.parseRequest(calendarId, restRequest.requiredContent(), restRequest.getXContentType()); + return channel -> client.execute(PostCalendarEventsAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetCalendarEventsActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetCalendarEventsActionRequestTests.java new file mode 100644 index 00000000000..92aaf9de295 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetCalendarEventsActionRequestTests.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractStreamableXContentTestCase; +import org.elasticsearch.xpack.ml.action.util.PageParams; + +public class GetCalendarEventsActionRequestTests extends AbstractStreamableXContentTestCase { + + @Override + protected GetCalendarEventsAction.Request createTestInstance() { + String id = randomAlphaOfLengthBetween(1, 20); + GetCalendarEventsAction.Request request = new GetCalendarEventsAction.Request(id); + if (randomBoolean()) { + request.setAfter(randomAlphaOfLengthBetween(1, 20)); + } + if (randomBoolean()) { + request.setBefore(randomAlphaOfLengthBetween(1, 20)); + } + if (randomBoolean()) { + request.setPageParams(new PageParams(randomIntBetween(0, 10), randomIntBetween(1, 10))); + } + return request; + } + + @Override + protected GetCalendarEventsAction.Request createBlankInstance() { + return new GetCalendarEventsAction.Request(); + } + + @Override + protected GetCalendarEventsAction.Request doParseInstance(XContentParser parser) { + return GetCalendarEventsAction.Request.parseRequest(null, parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetCalendarsActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetCalendarsActionRequestTests.java index b177f646bc9..c646d835d95 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetCalendarsActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetCalendarsActionRequestTests.java @@ -5,15 +5,21 @@ */ package org.elasticsearch.xpack.ml.action; -import org.elasticsearch.test.AbstractStreamableTestCase; - -public class GetCalendarsActionRequestTests extends AbstractStreamableTestCase { +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractStreamableXContentTestCase; +import org.elasticsearch.xpack.ml.action.util.PageParams; +public class GetCalendarsActionRequestTests extends AbstractStreamableXContentTestCase { @Override protected GetCalendarsAction.Request createTestInstance() { GetCalendarsAction.Request request = new GetCalendarsAction.Request(); - request.setCalendarId(randomAlphaOfLengthBetween(1, 20)); + if (randomBoolean()) { + request.setCalendarId(randomAlphaOfLengthBetween(1, 20)); + } + if (randomBoolean()) { + request.setPageParams(PageParams.defaultParams()); + } return request; } @@ -22,4 +28,13 @@ public class GetCalendarsActionRequestTests extends AbstractStreamableTestCase { + + @Override + protected PostCalendarEventsAction.Request createTestInstance() { + String id = randomAlphaOfLengthBetween(1, 20); + return createTestInstance(id); + } + + private PostCalendarEventsAction.Request createTestInstance(String calendarId) { + int numEvents = randomIntBetween(1, 10); + List events = new ArrayList<>(); + for (int i=0; i PostCalendarEventsAction.Request.parseRequest(request.getCalendarId(), data, XContentType.JSON)); + assertEquals("Inconsistent calendar_id; 'foo' specified in the body differs from 'bar' specified as a URL argument", + e.getMessage()); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/calendars/SpecialEventTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/calendars/SpecialEventTests.java index 6ecd8496ac8..8cf11a300e1 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/calendars/SpecialEventTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/calendars/SpecialEventTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.calendars; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; @@ -21,28 +22,22 @@ import java.io.IOException; import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; -import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import static org.hamcrest.Matchers.containsString; + public class SpecialEventTests extends AbstractSerializingTestCase { - public static SpecialEvent createSpecialEvent() { - int size = randomInt(10); - List jobIds = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - jobIds.add(randomAlphaOfLengthBetween(1, 20)); - } - - return new SpecialEvent(randomAlphaOfLength(10), randomAlphaOfLength(10), - ZonedDateTime.ofInstant(Instant.ofEpochMilli(new DateTime(randomDateTimeZone()).getMillis()), ZoneOffset.UTC), - ZonedDateTime.ofInstant(Instant.ofEpochMilli(new DateTime(randomDateTimeZone()).getMillis()), ZoneOffset.UTC), - jobIds); + public static SpecialEvent createSpecialEvent(String calendarId) { + ZonedDateTime start = ZonedDateTime.ofInstant(Instant.ofEpochMilli(new DateTime(randomDateTimeZone()).getMillis()), ZoneOffset.UTC); + return new SpecialEvent(randomAlphaOfLength(10), start, start.plusSeconds(randomIntBetween(1, 10000)), + calendarId); } @Override protected SpecialEvent createTestInstance() { - return createSpecialEvent(); + return createSpecialEvent(randomAlphaOfLengthBetween(1, 20)); } @Override @@ -52,7 +47,7 @@ public class SpecialEventTests extends AbstractSerializingTestCase @Override protected SpecialEvent doParseInstance(XContentParser parser) throws IOException { - return SpecialEvent.PARSER.apply(parser, null); + return SpecialEvent.PARSER.apply(parser, null).build(); } public void testToDetectionRule() { @@ -80,6 +75,36 @@ public class SpecialEventTests extends AbstractSerializingTestCase long conditionEndTime = Long.parseLong(conditions.get(1).getCondition().getValue()); assertEquals(0, conditionEndTime % bucketSpanSecs); - assertEquals(bucketSpanSecs * (bucketCount + 1), conditionEndTime); + + long eventTime = event.getEndTime().toEpochSecond() - conditionStartTime; + long numbBucketsInEvent = (eventTime + bucketSpanSecs -1) / bucketSpanSecs; + assertEquals(bucketSpanSecs * (bucketCount + numbBucketsInEvent), conditionEndTime); + } + + public void testBuild() { + SpecialEvent.Builder builder = new SpecialEvent.Builder(); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, builder::build); + assertEquals("Field [description] cannot be null", e.getMessage()); + builder.description("foo"); + e = expectThrows(ElasticsearchStatusException.class, builder::build); + assertEquals("Field [start_time] cannot be null", e.getMessage()); + ZonedDateTime now = ZonedDateTime.now(); + builder.startTime(now); + e = expectThrows(ElasticsearchStatusException.class, builder::build); + assertEquals("Field [end_time] cannot be null", e.getMessage()); + builder.endTime(now.plusHours(1)); + e = expectThrows(ElasticsearchStatusException.class, builder::build); + assertEquals("Field [calendar_id] cannot be null", e.getMessage()); + builder.calendarId("foo"); + builder.build(); + + + builder = new SpecialEvent.Builder().description("f").calendarId("c"); + builder.startTime(now); + builder.endTime(now.minusHours(2)); + + e = expectThrows(ElasticsearchStatusException.class, builder::build); + assertThat(e.getMessage(), containsString("must come before end time")); } } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/JobProviderIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/JobProviderIT.java index 850ea920857..b78b0504414 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/JobProviderIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/JobProviderIT.java @@ -42,6 +42,7 @@ import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.persistence.SpecialEventsQueryBuilder; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCountsTests; @@ -63,12 +64,10 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.not; @@ -267,40 +266,73 @@ public class JobProviderIT extends XPackSingleNodeTestCase { } public void testSpecialEvents() throws Exception { + Job.Builder jobA = createJob("job_a"); + Job.Builder jobB = createJob("job_b"); + Job.Builder jobC = createJob("job_c"); + + String calendarAId = "maintenance_a"; + List calendars = new ArrayList<>(); + calendars.add(new Calendar(calendarAId, Collections.singletonList("job_a"))); + + ZonedDateTime now = ZonedDateTime.now(); List events = new ArrayList<>(); - events.add(new SpecialEvent("A_and_B_downtime", "downtime", createZonedDateTime(1000L), createZonedDateTime(2000L), - Arrays.asList("job_a", "job_b"))); - events.add(new SpecialEvent("A_downtime", "downtime", createZonedDateTime(5000L), createZonedDateTime(10000L), - Collections.singletonList("job_a"))); + events.add(buildSpecialEvent("downtime", now.plusDays(1), now.plusDays(2), calendarAId)); + events.add(buildSpecialEvent("downtime_AA", now.plusDays(8), now.plusDays(9), calendarAId)); + events.add(buildSpecialEvent("downtime_AAA", now.plusDays(15), now.plusDays(16), calendarAId)); + + String calendarABId = "maintenance_a_and_b"; + calendars.add(new Calendar(calendarABId, Arrays.asList("job_a", "job_b"))); + + events.add(buildSpecialEvent("downtime_AB", now.plusDays(12), now.plusDays(13), calendarABId)); + + indexCalendars(calendars); indexSpecialEvents(events); - - Job.Builder job = createJob("job_b"); - List returnedEvents = getSpecialEvents(job.getId()); - assertEquals(1, returnedEvents.size()); - assertEquals(events.get(0), returnedEvents.get(0)); - - job = createJob("job_a"); - returnedEvents = getSpecialEvents(job.getId()); - assertEquals(2, returnedEvents.size()); + SpecialEventsQueryBuilder query = new SpecialEventsQueryBuilder(); + List returnedEvents = getSpecialEventsForJob(jobA.getId(), query); + assertEquals(4, returnedEvents.size()); assertEquals(events.get(0), returnedEvents.get(0)); assertEquals(events.get(1), returnedEvents.get(1)); + assertEquals(events.get(3), returnedEvents.get(2)); + assertEquals(events.get(2), returnedEvents.get(3)); - job = createJob("job_c"); - returnedEvents = getSpecialEvents(job.getId()); + returnedEvents = getSpecialEventsForJob(jobB.getId(), query); + assertEquals(1, returnedEvents.size()); + assertEquals(events.get(3), returnedEvents.get(0)); + + returnedEvents = getSpecialEventsForJob(jobC.getId(), query); assertEquals(0, returnedEvents.size()); + + // Test time filters + // Lands halfway through the second event which should be returned + query.after(Long.toString(now.plusDays(8).plusHours(1).toInstant().toEpochMilli())); + // Lands halfway through the 3rd event which should be returned + query.before(Long.toString(now.plusDays(12).plusHours(1).toInstant().toEpochMilli())); + returnedEvents = getSpecialEventsForJob(jobA.getId(), query); + assertEquals(2, returnedEvents.size()); + assertEquals(events.get(1), returnedEvents.get(0)); + assertEquals(events.get(3), returnedEvents.get(1)); + } + + private SpecialEvent buildSpecialEvent(String description, ZonedDateTime start, ZonedDateTime end, String calendarId) { + return new SpecialEvent.Builder().description(description).startTime(start).endTime(end).calendarId(calendarId).build(); } public void testGetAutodetectParams() throws Exception { String jobId = "test_get_autodetect_params"; Job.Builder job = createJob(jobId, Arrays.asList("fruit", "tea")); + String calendarId = "downtime"; + Calendar calendar = new Calendar(calendarId, Collections.singletonList(jobId)); + indexCalendars(Collections.singletonList(calendar)); + // index the param docs + ZonedDateTime now = ZonedDateTime.now(); List events = new ArrayList<>(); - events.add(new SpecialEvent("A_downtime", "downtime", createZonedDateTime(5000L), createZonedDateTime(10000L), - Collections.singletonList(jobId))); - events.add(new SpecialEvent("A_downtime2", "downtime", createZonedDateTime(20000L), createZonedDateTime(21000L), - Collections.singletonList(jobId))); + // events in the past should be filtered out + events.add(buildSpecialEvent("In the past", now.minusDays(7), now.minusDays(6), calendarId)); + events.add(buildSpecialEvent("A_downtime", now.plusDays(1), now.plusDays(2), calendarId)); + events.add(buildSpecialEvent("A_downtime2", now.plusDays(8), now.plusDays(9), calendarId)); indexSpecialEvents(events); List filters = new ArrayList<>(); @@ -335,9 +367,10 @@ public class JobProviderIT extends XPackSingleNodeTestCase { // special events assertNotNull(params.specialEvents()); - assertEquals(2, params.specialEvents().size()); + assertEquals(3, params.specialEvents().size()); assertEquals(events.get(0), params.specialEvents().get(0)); assertEquals(events.get(1), params.specialEvents().get(1)); + assertEquals(events.get(2), params.specialEvents().get(2)); // filters assertNotNull(params.filters()); @@ -382,24 +415,25 @@ public class JobProviderIT extends XPackSingleNodeTestCase { return searchResultHolder.get(); } - private List getSpecialEvents(String jobId) throws Exception { + private List getSpecialEventsForJob(String jobId, SpecialEventsQueryBuilder query) throws Exception { AtomicReference errorHolder = new AtomicReference<>(); - AtomicReference> searchResultHolder = new AtomicReference<>(); + AtomicReference> searchResultHolder = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - jobProvider.specialEvents(jobId, params -> { + jobProvider.specialEventsForJob(jobId, query, ActionListener.wrap( + params -> { searchResultHolder.set(params); latch.countDown(); }, e -> { errorHolder.set(e); latch.countDown(); - }); + })); latch.await(); if (errorHolder.get() != null) { throw errorHolder.get(); } - return searchResultHolder.get(); + return searchResultHolder.get().results(); } private Job.Builder createJob(String jobId) { @@ -445,7 +479,7 @@ public class JobProviderIT extends XPackSingleNodeTestCase { bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (SpecialEvent event : events) { - IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, event.documentId()); + IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true")); indexRequest.source(event.toXContent(builder, params)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 4c2135c6246..449060a0110 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -91,7 +91,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { Collections.singletonList(new DetectionRule.Builder(conditions).build()))); UpdateParams updateParams = new UpdateParams(null, detectorUpdates, true); - List events = Collections.singletonList(SpecialEventTests.createSpecialEvent()); + List events = Collections.singletonList(SpecialEventTests.createSpecialEvent(randomAlphaOfLength(10))); communicator.writeUpdateProcessMessage(updateParams, events, ((aVoid, e) -> {})); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriterTests.java index 7d83f2589b8..b6ea9e659b2 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriterTests.java @@ -238,12 +238,14 @@ public class FieldConfigWriterTests extends ESTestCase { AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(d)); analysisConfig = builder.build(); - specialEvents.add( - new SpecialEvent("1", "The Ashes", ZonedDateTime.ofInstant(Instant.ofEpochMilli(1511395200000L), ZoneOffset.UTC), - ZonedDateTime.ofInstant(Instant.ofEpochMilli(1515369600000L), ZoneOffset.UTC), Collections.emptyList())); - specialEvents.add( - new SpecialEvent("2", "elasticon", ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519603200000L), ZoneOffset.UTC), - ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519862400000L), ZoneOffset.UTC), Collections.emptyList())); + specialEvents.add(new SpecialEvent.Builder().description("The Ashes") + .startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1511395200000L), ZoneOffset.UTC)) + .endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1515369600000L), ZoneOffset.UTC)) + .calendarId("calendar_id").build()); + specialEvents.add(new SpecialEvent.Builder().description("elasticon") + .startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519603200000L), ZoneOffset.UTC)) + .endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519862400000L), ZoneOffset.UTC)) + .calendarId("calendar_id").build()); writer = mock(OutputStreamWriter.class); createFieldConfigWriter().write(); diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.get_calendar_events.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.get_calendar_events.json new file mode 100644 index 00000000000..5acf8dc25ec --- /dev/null +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.get_calendar_events.json @@ -0,0 +1,37 @@ +{ + "xpack.ml.get_calendar_events": { + "methods": [ "GET" ], + "url": { + "path": "/_xpack/ml/calendars/{calendar_id}/events", + "paths": [ + "/_xpack/ml/calendars/{calendar_id}/events" + ], + "parts": { + "calendar_id": { + "type": "string", + "description": "The ID of the calendar containing the events", + "required": true + } + }, + "params": { + "after": { + "type": "string", + "description": "Get events after this time" + }, + "before": { + "type": "date", + "description": "Get events before this time" + }, + "from": { + "type": "int", + "description": "Skips a number of events" + }, + "size": { + "type": "int", + "description": "Specifies a max number of events to get" + } + } + }, + "body": null + } +} \ No newline at end of file diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.get_calendars.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.get_calendars.json index 44c06e3501b..5b252a0e89c 100644 --- a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.get_calendars.json +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.get_calendars.json @@ -1,6 +1,6 @@ { "xpack.ml.get_calendars": { - "methods": [ "GET" ], + "methods": [ "GET", "POST" ], "url": { "path": "/_xpack/ml/calendars/{calendar_id}", "paths": [ diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.post_calendar_events.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.post_calendar_events.json new file mode 100644 index 00000000000..7c75c3d6fef --- /dev/null +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.post_calendar_events.json @@ -0,0 +1,20 @@ +{ + "xpack.ml.post_calendar_events": { + "methods": [ "POST" ], + "url": { + "path": "/_xpack/ml/calendars/{calendar_id}/events", + "paths": [ "/_xpack/ml/calendars/{calendar_id}/events" ], + "parts": { + "calendar_id": { + "type": "string", + "required": true, + "description": "The ID of the calendar to modify" + } + } + }, + "body": { + "description" : "A list of special events", + "required" : true + } + } +} \ No newline at end of file diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml b/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml index f39d88b6f2f..6792939b437 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml @@ -177,7 +177,7 @@ - match: { calendars.0.job_ids: [] } --- -"Test update calendar": +"Test update calendar job ids": - do: xpack.ml.put_calendar: @@ -234,3 +234,62 @@ xpack.ml.delete_calendar_job: calendar_id: "Wildlife" job_id: "missing job" + +--- +"Test calendar events": + + - do: + xpack.ml.put_calendar: + calendar_id: "events" + + - do: + xpack.ml.post_calendar_events: + calendar_id: "events" + body: > + { "description": "event 1", "start_time": "2017-12-01T00:00:00Z", "end_time": "2017-12-02T00:00:00Z", "calendar_id": "events"} + { "description": "event 2", "start_time": "2017-12-05T00:00:00Z", "end_time": "2017-12-06T00:00:00Z", "calendar_id": "events"} + { "description": "event 3", "start_time": "2017-12-12T00:00:00Z", "end_time": "2017-12-13T00:00:00Z", "calendar_id": "events"} + { "description": "event 4", "start_time": "2017-12-12T00:00:00Z", "end_time": "2017-12-15T00:00:00Z", "calendar_id": "events"} + + - do: + xpack.ml.get_calendar_events: + calendar_id: "events" + - length: { special_events: 4 } + - match: { special_events.0.description: "event 1" } + - match: { special_events.1.description: "event 2" } + - match: { special_events.2.description: "event 3" } + - match: { special_events.3.description: "event 4" } + + - do: + xpack.ml.get_calendar_events: + calendar_id: "events" + from: 1 + size: 2 + - length: { special_events: 2 } + - match: { special_events.0.description: "event 2" } + - match: { special_events.1.description: "event 3" } + + - do: + xpack.ml.get_calendar_events: + calendar_id: "events" + before: "2017-12-12T00:00:00Z" + - length: { special_events: 2 } + - match: { special_events.0.description: "event 1" } + - match: { special_events.1.description: "event 2" } + + - do: + xpack.ml.get_calendar_events: + calendar_id: "events" + after: "2017-12-05T03:00:00Z" + - length: { special_events: 3 } + - match: { special_events.0.description: "event 2" } + - match: { special_events.1.description: "event 3" } + - match: { special_events.2.description: "event 4" } + + - do: + xpack.ml.get_calendar_events: + calendar_id: "events" + after: "2017-12-02T00:00:00Z" + before: "2017-12-12T00:00:00Z" + - length: { special_events: 1 } + - match: { special_events.0.description: "event 2" }