* Calendar event actions

* Add page params and date range tests

* Address review comments

* Support POSTing params in the body of a request


Original commit: elastic/x-pack-elasticsearch@22a7e17a8f
This commit is contained in:
David Kyle 2017-12-20 17:39:44 +00:00 committed by GitHub
parent 5e51422f4d
commit e53ac4484c
25 changed files with 1423 additions and 212 deletions

View File

@ -52,6 +52,8 @@ import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.DeleteCalendarAction;
import org.elasticsearch.xpack.ml.action.GetCalendarEventsAction;
import org.elasticsearch.xpack.ml.action.PostCalendarEventsAction;
import org.elasticsearch.xpack.ml.action.UpdateCalendarJobAction;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction;
@ -119,7 +121,9 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarJobAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestGetCalendarEventsAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestGetCalendarsAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestPostCalendarEventAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarJobAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestDeleteDatafeedAction;
@ -471,7 +475,9 @@ public class MachineLearning implements ActionPlugin {
new RestPutCalendarAction(settings, restController),
new RestDeleteCalendarAction(settings, restController),
new RestDeleteCalendarJobAction(settings, restController),
new RestPutCalendarJobAction(settings, restController)
new RestPutCalendarJobAction(settings, restController),
new RestGetCalendarEventsAction(settings, restController),
new RestPostCalendarEventAction(settings, restController)
);
}
@ -521,7 +527,9 @@ public class MachineLearning implements ActionPlugin {
new ActionHandler<>(GetCalendarsAction.INSTANCE, GetCalendarsAction.TransportAction.class),
new ActionHandler<>(PutCalendarAction.INSTANCE, PutCalendarAction.TransportAction.class),
new ActionHandler<>(DeleteCalendarAction.INSTANCE, DeleteCalendarAction.TransportAction.class),
new ActionHandler<>(UpdateCalendarJobAction.INSTANCE, UpdateCalendarJobAction.TransportAction.class)
new ActionHandler<>(UpdateCalendarJobAction.INSTANCE, UpdateCalendarJobAction.TransportAction.class),
new ActionHandler<>(GetCalendarEventsAction.INSTANCE, GetCalendarEventsAction.TransportAction.class),
new ActionHandler<>(PostCalendarEventsAction.INSTANCE, PostCalendarEventsAction.TransportAction.class)
);
}

View File

@ -0,0 +1,281 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.action.util.PageParams;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.SpecialEventsQueryBuilder;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
public class GetCalendarEventsAction extends Action<GetCalendarEventsAction.Request, GetCalendarEventsAction.Response,
GetCalendarEventsAction.RequestBuilder> {
public static final GetCalendarEventsAction INSTANCE = new GetCalendarEventsAction();
public static final String NAME = "cluster:monitor/xpack/ml/calendars/events/get";
private GetCalendarEventsAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends ActionRequest implements ToXContentObject {
public static final ParseField AFTER = new ParseField("after");
public static final ParseField BEFORE = new ParseField("before");
private static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
static {
PARSER.declareString(Request::setCalendarId, Calendar.ID);
PARSER.declareString(Request::setAfter, AFTER);
PARSER.declareString(Request::setBefore, BEFORE);
PARSER.declareObject(Request::setPageParams, PageParams.PARSER, PageParams.PAGE);
}
public static Request parseRequest(String calendarId, XContentParser parser) {
Request request = PARSER.apply(parser, null);
if (calendarId != null) {
request.setCalendarId(calendarId);
}
return request;
}
private String calendarId;
private String after;
private String before;
private PageParams pageParams = PageParams.defaultParams();
Request() {
}
public Request(String calendarId) {
setCalendarId(calendarId);
}
public String getCalendarId() {
return calendarId;
}
private void setCalendarId(String calendarId) {
this.calendarId = ExceptionsHelper.requireNonNull(calendarId, Calendar.ID.getPreferredName());
}
public String getAfter() {
return after;
}
public void setAfter(String after) {
this.after = after;
}
public String getBefore() {
return before;
}
public void setBefore(String before) {
this.before = before;
}
public PageParams getPageParams() {
return pageParams;
}
public void setPageParams(PageParams pageParams) {
this.pageParams = Objects.requireNonNull(pageParams);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
calendarId = in.readString();
after = in.readOptionalString();
before = in.readOptionalString();
pageParams = new PageParams(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(calendarId);
out.writeOptionalString(after);
out.writeOptionalString(before);
pageParams.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(calendarId, after, before, pageParams);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(calendarId, other.calendarId) && Objects.equals(after, other.after)
&& Objects.equals(before, other.before) && Objects.equals(pageParams, other.pageParams);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Calendar.ID.getPreferredName(), calendarId);
if (after != null) {
builder.field(AFTER.getPreferredName(), after);
}
if (before != null) {
builder.field(BEFORE.getPreferredName(), before);
}
builder.field(PageParams.PAGE.getPreferredName(), pageParams);
builder.endObject();
return builder;
}
}
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new Request());
}
}
public static class Response extends ActionResponse implements ToXContentObject {
private QueryPage<SpecialEvent> specialEvents;
Response() {
}
public Response(QueryPage<SpecialEvent> specialEvents) {
this.specialEvents = specialEvents;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
specialEvents = new QueryPage<>(in, SpecialEvent::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
specialEvents.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return specialEvents.toXContent(builder, params);
}
@Override
public int hashCode() {
return Objects.hash(specialEvents);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return Objects.equals(specialEvents, other.specialEvents);
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final JobProvider jobProvider;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
JobProvider jobProvider) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.jobProvider = jobProvider;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
ActionListener<Boolean> calendarExistsListener = ActionListener.wrap(
r -> {
SpecialEventsQueryBuilder query = new SpecialEventsQueryBuilder()
.calendarIds(Collections.singletonList(request.getCalendarId()))
.after(request.getAfter())
.before(request.getBefore())
.from(request.getPageParams().getFrom())
.size(request.getPageParams().getSize());
jobProvider.specialEvents(query, ActionListener.wrap(
events -> {
listener.onResponse(new Response(events));
},
listener::onFailure
));
},
listener::onFailure);
checkCalendarExists(request.getCalendarId(), calendarExistsListener);
}
private void checkCalendarExists(String calendarId, ActionListener<Boolean> listener) {
jobProvider.calendar(calendarId, ActionListener.wrap(
c -> listener.onResponse(true),
listener::onFailure
));
}
}
}

View File

@ -11,34 +11,23 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.action.util.PageParams;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.calendars.Calendar;
@ -46,14 +35,10 @@ import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin;
public class GetCalendarsAction extends Action<GetCalendarsAction.Request, GetCalendarsAction.Response, GetCalendarsAction.RequestBuilder> {
@ -74,7 +59,22 @@ public class GetCalendarsAction extends Action<GetCalendarsAction.Request, GetCa
return new Response();
}
public static class Request extends ActionRequest {
public static class Request extends ActionRequest implements ToXContentObject {
private static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
static {
PARSER.declareString(Request::setCalendarId, Calendar.ID);
PARSER.declareObject(Request::setPageParams, PageParams.PARSER, PageParams.PAGE);
}
public static Request parseRequest(String calendarId, XContentParser parser) {
Request request = PARSER.apply(parser, null);
if (calendarId != null) {
request.setCalendarId(calendarId);
}
return request;
}
private String calendarId;
private PageParams pageParams;
@ -114,18 +114,20 @@ public class GetCalendarsAction extends Action<GetCalendarsAction.Request, GetCa
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
calendarId = in.readString();
calendarId = in.readOptionalString();
pageParams = in.readOptionalWriteable(PageParams::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(calendarId);
out.writeOptionalString(calendarId);
out.writeOptionalWriteable(pageParams);
}
@Override
public int hashCode() {
return Objects.hash(calendarId);
return Objects.hash(calendarId, pageParams);
}
@Override
@ -137,7 +139,20 @@ public class GetCalendarsAction extends Action<GetCalendarsAction.Request, GetCa
return false;
}
Request other = (Request) obj;
return Objects.equals(calendarId, other.calendarId);
return Objects.equals(calendarId, other.calendarId) && Objects.equals(pageParams, other.pageParams);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (calendarId != null) {
builder.field(Calendar.ID.getPreferredName(), calendarId);
}
if (pageParams != null) {
builder.field(PageParams.PAGE.getPreferredName(), pageParams);
}
builder.endObject();
return builder;
}
}

View File

@ -0,0 +1,311 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin;
public class PostCalendarEventsAction extends Action<PostCalendarEventsAction.Request, PostCalendarEventsAction.Response,
PostCalendarEventsAction.RequestBuilder> {
public static final PostCalendarEventsAction INSTANCE = new PostCalendarEventsAction();
public static final String NAME = "cluster:admin/xpack/ml/calendars/events/post";
public static final ParseField SPECIAL_EVENTS = new ParseField("special_events");
private PostCalendarEventsAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends ActionRequest {
public static Request parseRequest(String calendarId, BytesReference data, XContentType contentType) throws IOException {
List<SpecialEvent.Builder> events = new ArrayList<>();
XContent xContent = contentType.xContent();
int lineNumber = 0;
int from = 0;
int length = data.length();
byte marker = xContent.streamSeparator();
while (true) {
int nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
break;
}
lineNumber++;
try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, data.slice(from, nextMarker - from))) {
try {
SpecialEvent.Builder event = SpecialEvent.PARSER.apply(parser, null);
events.add(event);
} catch (ParsingException pe) {
throw ExceptionsHelper.badRequestException("Failed to parse special event on line [" + lineNumber + "]", pe);
}
from = nextMarker + 1;
}
}
for (SpecialEvent.Builder event: events) {
if (event.getCalendarId() != null && event.getCalendarId().equals(calendarId) == false) {
throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.INCONSISTENT_ID,
Calendar.ID.getPreferredName(), event.getCalendarId(), calendarId));
}
// Set the calendar Id in case it is null
event.calendarId(calendarId);
}
return new Request(calendarId, events.stream().map(SpecialEvent.Builder::build).collect(Collectors.toList()));
}
private static int findNextMarker(byte marker, int from, BytesReference data, int length) {
for (int i = from; i < length; i++) {
if (data.get(i) == marker) {
return i;
}
}
if (from != length) {
throw new IllegalArgumentException("The post calendar events request must be terminated by a newline [\n]");
}
return -1;
}
private String calendarId;
private List<SpecialEvent> specialEvents;
Request() {
}
public Request(String calendarId, List<SpecialEvent> specialEvents) {
this.calendarId = ExceptionsHelper.requireNonNull(calendarId, Calendar.ID.getPreferredName());
this.specialEvents = ExceptionsHelper.requireNonNull(specialEvents, SPECIAL_EVENTS.getPreferredName());
}
public String getCalendarId() {
return calendarId;
}
public List<SpecialEvent> getSpecialEvents() {
return specialEvents;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
calendarId = in.readString();
specialEvents = in.readList(SpecialEvent::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(calendarId);
out.writeList(specialEvents);
}
@Override
public int hashCode() {
return Objects.hash(calendarId, specialEvents);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(calendarId, other.calendarId) && Objects.equals(specialEvents, other.specialEvents);
}
}
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new Request());
}
}
public static class Response extends AcknowledgedResponse implements ToXContentObject {
private List<SpecialEvent> specialEvent;
Response() {
}
public Response(List<SpecialEvent> specialEvents) {
super(true);
this.specialEvent = specialEvents;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
in.readList(SpecialEvent::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
out.writeList(specialEvent);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SPECIAL_EVENTS.getPreferredName(), specialEvent);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(isAcknowledged(), specialEvent);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return Objects.equals(isAcknowledged(), other.isAcknowledged()) && Objects.equals(specialEvent, other.specialEvent);
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final Client client;
private final JobProvider jobProvider;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client, JobProvider jobProvider) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.client = client;
this.jobProvider = jobProvider;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
List<SpecialEvent> events = request.getSpecialEvents();
ActionListener<Boolean> calendarExistsListener = ActionListener.wrap(
r -> {
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
for (SpecialEvent event: events) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
indexRequest.source(event.toXContent(builder,
new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"))));
} catch (IOException e) {
throw new IllegalStateException("Failed to serialise special event", e);
}
bulkRequestBuilder.add(indexRequest);
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client, ML_ORIGIN, BulkAction.INSTANCE, bulkRequestBuilder.request(),
new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
listener.onResponse(new Response(events));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(
ExceptionsHelper.serverError("Error indexing special event", e));
}
});
},
listener::onFailure);
checkCalendarExists(request.getCalendarId(), calendarExistsListener);
}
private void checkCalendarExists(String calendarId, ActionListener<Boolean> listener) {
jobProvider.calendar(calendarId, ActionListener.wrap(
c -> listener.onResponse(true),
listener::onFailure
));
}
}
}

View File

@ -10,7 +10,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -21,6 +20,8 @@ import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Operator;
import org.elasticsearch.xpack.ml.job.config.RuleAction;
import org.elasticsearch.xpack.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.Intervals;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
@ -29,32 +30,27 @@ import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
public class SpecialEvent implements ToXContentObject, Writeable {
public static final ParseField ID = new ParseField("id");
public static final ParseField DESCRIPTION = new ParseField("description");
public static final ParseField START_TIME = new ParseField("start_time");
public static final ParseField END_TIME = new ParseField("end_time");
public static final ParseField TYPE = new ParseField("type");
public static final ParseField JOB_IDS = new ParseField("job_ids");
public static final ParseField RESULTS_FIELD = new ParseField("special_events");
public static final String SPECIAL_EVENT_TYPE = "special_event";
public static final String DOCUMENT_ID_PREFIX = "event_";
public static final ConstructingObjectParser<SpecialEvent, Void> PARSER =
new ConstructingObjectParser<>("special_event", a -> new SpecialEvent((String) a[0], (String) a[1], (ZonedDateTime) a[2],
(ZonedDateTime) a[3], (List<String>) a[4]));
public static final ObjectParser<SpecialEvent.Builder, Void> PARSER =
new ObjectParser<>("special_event", Builder::new);
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), DESCRIPTION);
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
PARSER.declareString(SpecialEvent.Builder::description, DESCRIPTION);
PARSER.declareField(SpecialEvent.Builder::startTime, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(p.longValue()), ZoneOffset.UTC);
} else if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
@ -63,7 +59,7 @@ public class SpecialEvent implements ToXContentObject, Writeable {
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + START_TIME.getPreferredName() + "]");
}, START_TIME, ObjectParser.ValueType.VALUE);
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
PARSER.declareField(SpecialEvent.Builder::endTime, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(p.longValue()), ZoneOffset.UTC);
} else if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
@ -73,7 +69,7 @@ public class SpecialEvent implements ToXContentObject, Writeable {
"unexpected token [" + p.currentToken() + "] for [" + END_TIME.getPreferredName() + "]");
}, END_TIME, ObjectParser.ValueType.VALUE);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), JOB_IDS);
PARSER.declareString(SpecialEvent.Builder::calendarId, Calendar.ID);
PARSER.declareString((builder, s) -> {}, TYPE);
}
@ -81,30 +77,23 @@ public class SpecialEvent implements ToXContentObject, Writeable {
return DOCUMENT_ID_PREFIX + eventId;
}
private final String id;
private final String description;
private final ZonedDateTime startTime;
private final ZonedDateTime endTime;
private final Set<String> jobIds;
private final String calendarId;
public SpecialEvent(String id, String description, ZonedDateTime startTime, ZonedDateTime endTime, List<String> jobIds) {
this.id = Objects.requireNonNull(id);
SpecialEvent(String description, ZonedDateTime startTime, ZonedDateTime endTime, String calendarId) {
this.description = Objects.requireNonNull(description);
this.startTime = Objects.requireNonNull(startTime);
this.endTime = Objects.requireNonNull(endTime);
this.jobIds = new HashSet<>(jobIds);
this.calendarId = Objects.requireNonNull(calendarId);
}
public SpecialEvent(StreamInput in) throws IOException {
id = in.readString();
description = in.readString();
startTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(in.readVLong()), ZoneOffset.UTC);
endTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(in.readVLong()), ZoneOffset.UTC);
jobIds = new HashSet<>(Arrays.asList(in.readStringArray()));
}
public String getId() {
return id;
calendarId = in.readString();
}
public String getDescription() {
@ -119,12 +108,8 @@ public class SpecialEvent implements ToXContentObject, Writeable {
return endTime;
}
public Set<String> getJobIds() {
return jobIds;
}
public String documentId() {
return documentId(id);
public String getCalendarId() {
return calendarId;
}
/**
@ -157,21 +142,19 @@ public class SpecialEvent implements ToXContentObject, Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeString(description);
out.writeVLong(startTime.toInstant().toEpochMilli());
out.writeVLong(endTime.toInstant().toEpochMilli());
out.writeStringArray(jobIds.toArray(new String [0]));
out.writeString(calendarId);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ID.getPreferredName(), id);
builder.field(DESCRIPTION.getPreferredName(), description);
builder.dateField(START_TIME.getPreferredName(), START_TIME.getPreferredName() + "_string", startTime.toInstant().toEpochMilli());
builder.dateField(END_TIME.getPreferredName(), END_TIME.getPreferredName() + "_string", endTime.toInstant().toEpochMilli());
builder.field(JOB_IDS.getPreferredName(), jobIds);
builder.field(Calendar.ID.getPreferredName(), calendarId);
if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) {
builder.field(TYPE.getPreferredName(), SPECIAL_EVENT_TYPE);
}
@ -190,12 +173,73 @@ public class SpecialEvent implements ToXContentObject, Writeable {
}
SpecialEvent other = (SpecialEvent) obj;
return id.equals(other.id) && description.equals(other.description) && startTime.equals(other.startTime)
&& endTime.equals(other.endTime) && jobIds.equals(other.jobIds);
return description.equals(other.description) && startTime.isEqual(other.startTime)
&& endTime.isEqual(other.endTime) && calendarId.equals(other.calendarId);
}
@Override
public int hashCode() {
return Objects.hash(id, description, startTime, endTime, jobIds);
return Objects.hash(description, startTime, endTime, calendarId);
}
public static class Builder {
private String description;
private ZonedDateTime startTime;
private ZonedDateTime endTime;
private String calendarId;
public Builder description(String description) {
this.description = description;
return this;
}
public Builder startTime(ZonedDateTime startTime) {
this.startTime = startTime;
return this;
}
public Builder endTime(ZonedDateTime endTime) {
this.endTime = endTime;
return this;
}
public Builder calendarId(String calendarId) {
this.calendarId = calendarId;
return this;
}
public String getCalendarId() {
return calendarId;
}
public SpecialEvent build() {
if (description == null) {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.FIELD_CANNOT_BE_NULL, DESCRIPTION.getPreferredName()));
}
if (startTime == null) {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.FIELD_CANNOT_BE_NULL, START_TIME.getPreferredName()));
}
if (endTime == null) {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.FIELD_CANNOT_BE_NULL, END_TIME.getPreferredName()));
}
if (calendarId == null) {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.FIELD_CANNOT_BE_NULL, Calendar.ID.getPreferredName()));
}
if (startTime.isBefore(endTime) == false) {
throw ExceptionsHelper.badRequestException("Special event start time [" + startTime +
"] must come before end time [" + endTime + "]");
}
return new SpecialEvent(description, startTime, endTime, calendarId);
}
}
}

View File

@ -299,16 +299,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return createTime;
}
/**
* The Job creation time. This name is preferred when serialising to the
* data store.
*
* @return The date the job was created
*/
public Date getAtTimestamp() {
return createTime;
}
/**
* The time the job was finished or <code>null</code> if not finished.
*

View File

@ -172,6 +172,8 @@ public final class Messages {
public static final String REST_NO_SUCH_MODEL_SNAPSHOT = "No model snapshot with id [{0}] exists for job [{1}]";
public static final String REST_START_AFTER_END = "Invalid time range: end time ''{0}'' is earlier than start time ''{1}''.";
public static final String FIELD_CANNOT_BE_NULL = "Field [{0}] cannot be null";
private Messages() {
}

View File

@ -360,11 +360,31 @@ public class JobProvider {
}
public void getAutodetectParams(Job job, Consumer<AutodetectParams> consumer, Consumer<Exception> errorHandler) {
AutodetectParams.Builder paramsBuilder = new AutodetectParams.Builder(job.getId());
String jobId = job.getId();
ActionListener<AutodetectParams.Builder> getSpecialEventsListener = ActionListener.wrap(
paramsBuilder -> {
SpecialEventsQueryBuilder specialEventsQuery = new SpecialEventsQueryBuilder();
Date lastestRecordTime = paramsBuilder.getDataCounts().getLatestRecordTimeStamp();
if (lastestRecordTime != null) {
specialEventsQuery.after(Long.toString(lastestRecordTime.getTime()));
}
specialEventsForJob(jobId, specialEventsQuery, ActionListener.wrap(
events -> {
paramsBuilder.setSpecialEvents(events.results());
consumer.accept(paramsBuilder.build());
},
errorHandler
));
},
errorHandler
);
AutodetectParams.Builder paramsBuilder = new AutodetectParams.Builder(job.getId());
String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
String stateIndex = AnomalyDetectorsIndex.jobStateIndexName();
MultiSearchRequestBuilder msearch = client.prepareMultiSearch()
.add(createLatestDataCountsSearch(resultsIndex, jobId))
.add(createLatestModelSizeStatsSearch(resultsIndex))
@ -377,44 +397,43 @@ public class JobProvider {
msearch.add(createDocIdSearch(MlMetaIndex.INDEX_NAME, MlFilter.documentId(filterId)));
}
msearch.add(createSpecialEventSearch(jobId));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, msearch.request(),
ActionListener.<MultiSearchResponse>wrap(
response -> {
for (int i = 0; i < response.getResponses().length; i++) {
MultiSearchResponse.Item itemResponse = response.getResponses()[i];
if (itemResponse.isFailure()) {
errorHandler.accept(itemResponse.getFailure());
} else {
SearchResponse searchResponse = itemResponse.getResponse();
ShardSearchFailure[] shardFailures = searchResponse.getShardFailures();
int unavailableShards = searchResponse.getTotalShards() - searchResponse.getSuccessfulShards();
if (shardFailures != null && shardFailures.length > 0) {
LOGGER.error("[{}] Search request returned shard failures: {}", jobId,
Arrays.toString(shardFailures));
errorHandler.accept(new ElasticsearchException(
ExceptionsHelper.shardFailuresToErrorMsg(jobId, shardFailures)));
} else if (unavailableShards > 0) {
errorHandler.accept(new ElasticsearchException("[" + jobId
+ "] Search request encountered [" + unavailableShards + "] unavailable shards"));
} else {
SearchHits hits = searchResponse.getHits();
long hitsCount = hits.getHits().length;
if (hitsCount == 0) {
SearchRequest searchRequest = msearch.request().requests().get(i);
LOGGER.debug("Found 0 hits for [{}]", new Object[]{searchRequest.indices()});
ActionListener.<MultiSearchResponse>wrap(
response -> {
for (int i = 0; i < response.getResponses().length; i++) {
MultiSearchResponse.Item itemResponse = response.getResponses()[i];
if (itemResponse.isFailure()) {
errorHandler.accept(itemResponse.getFailure());
} else {
for (SearchHit hit : hits) {
parseAutodetectParamSearchHit(jobId, paramsBuilder, hit, errorHandler);
SearchResponse searchResponse = itemResponse.getResponse();
ShardSearchFailure[] shardFailures = searchResponse.getShardFailures();
int unavailableShards = searchResponse.getTotalShards() - searchResponse.getSuccessfulShards();
if (shardFailures != null && shardFailures.length > 0) {
LOGGER.error("[{}] Search request returned shard failures: {}", jobId,
Arrays.toString(shardFailures));
errorHandler.accept(new ElasticsearchException(
ExceptionsHelper.shardFailuresToErrorMsg(jobId, shardFailures)));
} else if (unavailableShards > 0) {
errorHandler.accept(new ElasticsearchException("[" + jobId
+ "] Search request encountered [" + unavailableShards + "] unavailable shards"));
} else {
SearchHits hits = searchResponse.getHits();
long hitsCount = hits.getHits().length;
if (hitsCount == 0) {
SearchRequest searchRequest = msearch.request().requests().get(i);
LOGGER.debug("Found 0 hits for [{}]", new Object[]{searchRequest.indices()});
} else {
for (SearchHit hit : hits) {
parseAutodetectParamSearchHit(jobId, paramsBuilder, hit, errorHandler);
}
}
}
}
}
}
}
consumer.accept(paramsBuilder.build());
},
errorHandler
getSpecialEventsListener.onResponse(paramsBuilder);
},
errorHandler
), client::multiSearch);
}
@ -425,17 +444,6 @@ public class JobProvider {
.setRouting(id);
}
private SearchRequestBuilder createSpecialEventSearch(String jobId) {
QueryBuilder qb = new BoolQueryBuilder()
.filter(new TermsQueryBuilder(SpecialEvent.TYPE.getPreferredName(), SpecialEvent.SPECIAL_EVENT_TYPE))
.filter(new TermsQueryBuilder(SpecialEvent.JOB_IDS.getPreferredName(), jobId));
return client.prepareSearch(MlMetaIndex.INDEX_NAME)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(qb);
}
private void parseAutodetectParamSearchHit(String jobId, AutodetectParams.Builder paramsBuilder, SearchHit hit,
Consumer<Exception> errorHandler) {
String hitId = hit.getId();
@ -451,10 +459,8 @@ public class JobProvider {
paramsBuilder.setQuantiles(parseSearchHit(hit, Quantiles.PARSER, errorHandler));
} else if (hitId.startsWith(MlFilter.DOCUMENT_ID_PREFIX)) {
paramsBuilder.addFilter(parseSearchHit(hit, MlFilter.PARSER, errorHandler).build());
} else if (hitId.startsWith(SpecialEvent.DOCUMENT_ID_PREFIX)) {
paramsBuilder.addSpecialEvent(parseSearchHit(hit, SpecialEvent.PARSER, errorHandler));
} else {
errorHandler.accept(new IllegalStateException("Unexpected type [" + hit.getType() + "]"));
errorHandler.accept(new IllegalStateException("Unexpected Id [" + hitId + "]"));
}
}
@ -940,6 +946,7 @@ public class JobProvider {
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(searchFromTimeMs))
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE)))
.addAggregation(AggregationBuilders.extendedStats("es").field(ModelSizeStats.MODEL_BYTES_FIELD.getPreferredName()));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, search.request(),
ActionListener.<SearchResponse>wrap(
response -> {
@ -994,21 +1001,46 @@ public class JobProvider {
});
}
public void specialEvents(String jobId, Consumer<List<SpecialEvent>> handler, Consumer<Exception> errorHandler) {
SearchRequestBuilder request = createSpecialEventSearch(jobId);
public void specialEventsForJob(String jobId, SpecialEventsQueryBuilder queryBuilder, ActionListener<QueryPage<SpecialEvent>> handler) {
// Find all the calendars used by the job then the events for those calendars
ActionListener<QueryPage<Calendar>> calendarsListener = ActionListener.wrap(
calendars -> {
if (calendars.results().isEmpty()) {
handler.onResponse(new QueryPage<>(Collections.emptyList(), 0, SpecialEvent.RESULTS_FIELD));
return;
}
List<String> calendarIds = calendars.results().stream().map(Calendar::getId).collect(Collectors.toList());
queryBuilder.calendarIds(calendarIds);
specialEvents(queryBuilder, handler);
},
handler::onFailure
);
CalendarQueryBuilder query = new CalendarQueryBuilder().jobId(jobId);
calendars(query, calendarsListener);
}
public void specialEvents(SpecialEventsQueryBuilder query, ActionListener<QueryPage<SpecialEvent>> handler) {
SearchRequestBuilder request = client.prepareSearch(MlMetaIndex.INDEX_NAME)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(query.build());
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request.request(),
ActionListener.<SearchResponse>wrap(
response -> {
List<SpecialEvent> specialEvents = new ArrayList<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
specialEvents.add(parseSearchHit(hit, SpecialEvent.PARSER, errorHandler));
}
response -> {
List<SpecialEvent> specialEvents = new ArrayList<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
specialEvents.add(parseSearchHit(hit, SpecialEvent.PARSER, handler::onFailure).build());
}
handler.accept(specialEvents);
},
errorHandler)
, client::search);
handler.onResponse(new QueryPage<>(specialEvents, response.getHits().getTotalHits(),
SpecialEvent.RESULTS_FIELD));
},
handler::onFailure)
, client::search);
}
public void updateCalendar(String calendarId, Set<String> jobIdsToAdd, Set<String> jobIdsToRemove,

View File

@ -0,0 +1,97 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import java.util.ArrayList;
import java.util.List;
/**
* Query builder for {@link SpecialEvent}s
* If <code>calendarIds</code> are not set then all calendars will match.
*/
public class SpecialEventsQueryBuilder {
public static final int DEFAULT_SIZE = 1000;
private int from = 0;
private int size = DEFAULT_SIZE;
private List<String> calendarIds;
private String after;
private String before;
public SpecialEventsQueryBuilder calendarIds(List<String> calendarIds) {
this.calendarIds = calendarIds;
return this;
}
public SpecialEventsQueryBuilder after(String after) {
this.after = after;
return this;
}
public SpecialEventsQueryBuilder before(String before) {
this.before = before;
return this;
}
public SpecialEventsQueryBuilder from(int from) {
this.from = from;
return this;
}
public SpecialEventsQueryBuilder size(int size) {
this.size = size;
return this;
}
public SearchSourceBuilder build() {
List<QueryBuilder> queries = new ArrayList<>();
if (after != null) {
RangeQueryBuilder afterQuery = QueryBuilders.rangeQuery(SpecialEvent.END_TIME.getPreferredName());
afterQuery.gt(after);
queries.add(afterQuery);
}
if (before != null) {
RangeQueryBuilder beforeQuery = QueryBuilders.rangeQuery(SpecialEvent.START_TIME.getPreferredName());
beforeQuery.lt(before);
queries.add(beforeQuery);
}
if (calendarIds != null && calendarIds.isEmpty() == false) {
queries.add(new TermsQueryBuilder(Calendar.ID.getPreferredName(), calendarIds));
}
QueryBuilder typeQuery = new TermsQueryBuilder(SpecialEvent.TYPE.getPreferredName(), SpecialEvent.SPECIAL_EVENT_TYPE);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.sort(SpecialEvent.START_TIME.getPreferredName());
searchSourceBuilder.from(from);
searchSourceBuilder.size(size);
if (queries.isEmpty()) {
searchSourceBuilder.query(typeQuery);
} else {
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.filter(typeQuery);
for (QueryBuilder query : queries) {
boolQueryBuilder.filter(query);
}
searchSourceBuilder.query(boolQueryBuilder);
}
return searchSourceBuilder;
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
@ -35,6 +36,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.SpecialEventsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
@ -261,24 +263,24 @@ public class AutodetectProcessManager extends AbstractComponent {
return;
}
Consumer<List<SpecialEvent>> eventConsumer = specialEvents -> {
communicator.writeUpdateProcessMessage(updateParams, specialEvents, (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
handler.accept(e);
}
});
};
ActionListener<QueryPage<SpecialEvent>> eventsListener = ActionListener.wrap(
specialEvents -> {
communicator.writeUpdateProcessMessage(updateParams, specialEvents.results(), (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
handler.accept(e);
}
});
},
handler::accept);
if (updateParams.isUpdateSpecialEvents()) {
jobProvider.specialEvents(jobTask.getJobId(), eventConsumer, handler::accept);
SpecialEventsQueryBuilder query = new SpecialEventsQueryBuilder().after(Long.toString(new Date().getTime()));
jobProvider.specialEventsForJob(jobTask.getJobId(), query, eventsListener);
} else {
eventConsumer.accept(Collections.emptyList());
eventsListener.onResponse(new QueryPage<SpecialEvent>(Collections.emptyList(), 0, SpecialEvent.RESULTS_FIELD));
}
}
public void openJob(JobTask jobTask, Consumer<Exception> handler) {

View File

@ -117,6 +117,10 @@ public class AutodetectParams {
return this;
}
public DataCounts getDataCounts() {
return dataCounts;
}
public Builder setModelSizeStats(ModelSizeStats modelSizeStats) {
this.modelSizeStats = modelSizeStats;
return this;

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.calendar;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.GetCalendarEventsAction;
import org.elasticsearch.xpack.ml.action.util.PageParams;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import java.io.IOException;
public class RestGetCalendarEventsAction extends BaseRestHandler {
public RestGetCalendarEventsAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET,
MachineLearning.BASE_PATH + "calendars/{" + Calendar.ID.getPreferredName() + "}/events", this);
}
@Override
public String getName() {
return "xpack_ml_get_calendar_events_action";
}
@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String calendarId = restRequest.param(Calendar.ID.getPreferredName());
GetCalendarEventsAction.Request request;
if (restRequest.hasContentOrSourceParam()) {
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
request = GetCalendarEventsAction.Request.parseRequest(calendarId, parser);
}
} else {
request = new GetCalendarEventsAction.Request(calendarId);
request.setAfter(restRequest.param(GetCalendarEventsAction.Request.AFTER.getPreferredName(), null));
request.setBefore(restRequest.param(GetCalendarEventsAction.Request.BEFORE.getPreferredName(), null));
if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) {
request.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE)));
}
}
return channel -> client.execute(GetCalendarEventsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.rest.calendar;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
@ -23,9 +24,14 @@ public class RestGetCalendarsAction extends BaseRestHandler {
public RestGetCalendarsAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, MachineLearning.BASE_PATH + "calendars/{" + Calendar.ID.getPreferredName() + "}",
this);
controller.registerHandler(RestRequest.Method.GET, MachineLearning.BASE_PATH + "calendars/{" +
Calendar.ID.getPreferredName() + "}", this);
controller.registerHandler(RestRequest.Method.GET, MachineLearning.BASE_PATH + "calendars/", this);
// endpoints that support body parameters must also accept POST
controller.registerHandler(RestRequest.Method.POST, MachineLearning.BASE_PATH + "calendars/{" +
Calendar.ID.getPreferredName() + "}", this);
controller.registerHandler(RestRequest.Method.POST, MachineLearning.BASE_PATH + "calendars/", this);
}
@Override
@ -35,17 +41,25 @@ public class RestGetCalendarsAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
GetCalendarsAction.Request getRequest = new GetCalendarsAction.Request();
String calendarId = restRequest.param(Calendar.ID.getPreferredName());
if (!Strings.isNullOrEmpty(calendarId)) {
getRequest.setCalendarId(calendarId);
GetCalendarsAction.Request request;
if (restRequest.hasContentOrSourceParam()) {
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
request = GetCalendarsAction.Request.parseRequest(calendarId, parser);
}
} else {
request = new GetCalendarsAction.Request();
if (!Strings.isNullOrEmpty(calendarId)) {
request.setCalendarId(calendarId);
}
if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) {
request.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE)));
}
}
if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) {
getRequest.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE)));
}
return channel -> client.execute(GetCalendarsAction.INSTANCE, getRequest, new RestStatusToXContentListener<>(channel));
return channel -> client.execute(GetCalendarsAction.INSTANCE, request, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.calendar;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.PostCalendarEventsAction;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import java.io.IOException;
public class RestPostCalendarEventAction extends BaseRestHandler {
public RestPostCalendarEventAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.POST,
MachineLearning.BASE_PATH + "calendars/{" + Calendar.ID.getPreferredName() + "}/events", this);
}
@Override
public String getName() {
return "xpack_ml_post_calendar_event_action";
}
@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String calendarId = restRequest.param(Calendar.ID.getPreferredName());
PostCalendarEventsAction.Request request =
PostCalendarEventsAction.Request.parseRequest(calendarId, restRequest.requiredContent(), restRequest.getXContentType());
return channel -> client.execute(PostCalendarEventsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.ml.action.util.PageParams;
public class GetCalendarEventsActionRequestTests extends AbstractStreamableXContentTestCase<GetCalendarEventsAction.Request> {
@Override
protected GetCalendarEventsAction.Request createTestInstance() {
String id = randomAlphaOfLengthBetween(1, 20);
GetCalendarEventsAction.Request request = new GetCalendarEventsAction.Request(id);
if (randomBoolean()) {
request.setAfter(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
request.setBefore(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
request.setPageParams(new PageParams(randomIntBetween(0, 10), randomIntBetween(1, 10)));
}
return request;
}
@Override
protected GetCalendarEventsAction.Request createBlankInstance() {
return new GetCalendarEventsAction.Request();
}
@Override
protected GetCalendarEventsAction.Request doParseInstance(XContentParser parser) {
return GetCalendarEventsAction.Request.parseRequest(null, parser);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
}

View File

@ -5,15 +5,21 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
public class GetCalendarsActionRequestTests extends AbstractStreamableTestCase<GetCalendarsAction.Request> {
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.ml.action.util.PageParams;
public class GetCalendarsActionRequestTests extends AbstractStreamableXContentTestCase<GetCalendarsAction.Request> {
@Override
protected GetCalendarsAction.Request createTestInstance() {
GetCalendarsAction.Request request = new GetCalendarsAction.Request();
request.setCalendarId(randomAlphaOfLengthBetween(1, 20));
if (randomBoolean()) {
request.setCalendarId(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
request.setPageParams(PageParams.defaultParams());
}
return request;
}
@ -22,4 +28,13 @@ public class GetCalendarsActionRequestTests extends AbstractStreamableTestCase<G
return new GetCalendarsAction.Request();
}
@Override
protected GetCalendarsAction.Request doParseInstance(XContentParser parser) {
return GetCalendarsAction.Request.parseRequest(null, parser);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.calendars.SpecialEventTests;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class PostCalendarEventActionRequestTests extends AbstractStreamableTestCase<PostCalendarEventsAction.Request> {
@Override
protected PostCalendarEventsAction.Request createTestInstance() {
String id = randomAlphaOfLengthBetween(1, 20);
return createTestInstance(id);
}
private PostCalendarEventsAction.Request createTestInstance(String calendarId) {
int numEvents = randomIntBetween(1, 10);
List<SpecialEvent> events = new ArrayList<>();
for (int i=0; i<numEvents; i++) {
events.add(SpecialEventTests.createSpecialEvent(calendarId));
}
PostCalendarEventsAction.Request request = new PostCalendarEventsAction.Request(calendarId, events);
return request;
}
@Override
protected PostCalendarEventsAction.Request createBlankInstance() {
return new PostCalendarEventsAction.Request();
}
public void testParseRequest() throws IOException {
PostCalendarEventsAction.Request sourceRequest = createTestInstance();
StringBuilder requestString = new StringBuilder();
for (SpecialEvent event: sourceRequest.getSpecialEvents()) {
requestString.append(Strings.toString(event)).append("\r\n");
}
BytesArray data = new BytesArray(requestString.toString().getBytes(StandardCharsets.UTF_8), 0, requestString.length());
PostCalendarEventsAction.Request parsedRequest = PostCalendarEventsAction.Request.parseRequest(
sourceRequest.getCalendarId(), data, XContentType.JSON);
assertEquals(sourceRequest, parsedRequest);
}
public void testParseRequest_throwsIfCalendarIdsAreDifferent() throws IOException {
PostCalendarEventsAction.Request sourceRequest = createTestInstance("foo");
PostCalendarEventsAction.Request request = new PostCalendarEventsAction.Request("bar", sourceRequest.getSpecialEvents());
StringBuilder requestString = new StringBuilder();
for (SpecialEvent event: sourceRequest.getSpecialEvents()) {
requestString.append(Strings.toString(event)).append("\r\n");
}
BytesArray data = new BytesArray(requestString.toString().getBytes(StandardCharsets.UTF_8), 0, requestString.length());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> PostCalendarEventsAction.Request.parseRequest(request.getCalendarId(), data, XContentType.JSON));
assertEquals("Inconsistent calendar_id; 'foo' specified in the body differs from 'bar' specified as a URL argument",
e.getMessage());
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.calendars;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
@ -21,28 +22,22 @@ import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import static org.hamcrest.Matchers.containsString;
public class SpecialEventTests extends AbstractSerializingTestCase<SpecialEvent> {
public static SpecialEvent createSpecialEvent() {
int size = randomInt(10);
List<String> jobIds = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
jobIds.add(randomAlphaOfLengthBetween(1, 20));
}
return new SpecialEvent(randomAlphaOfLength(10), randomAlphaOfLength(10),
ZonedDateTime.ofInstant(Instant.ofEpochMilli(new DateTime(randomDateTimeZone()).getMillis()), ZoneOffset.UTC),
ZonedDateTime.ofInstant(Instant.ofEpochMilli(new DateTime(randomDateTimeZone()).getMillis()), ZoneOffset.UTC),
jobIds);
public static SpecialEvent createSpecialEvent(String calendarId) {
ZonedDateTime start = ZonedDateTime.ofInstant(Instant.ofEpochMilli(new DateTime(randomDateTimeZone()).getMillis()), ZoneOffset.UTC);
return new SpecialEvent(randomAlphaOfLength(10), start, start.plusSeconds(randomIntBetween(1, 10000)),
calendarId);
}
@Override
protected SpecialEvent createTestInstance() {
return createSpecialEvent();
return createSpecialEvent(randomAlphaOfLengthBetween(1, 20));
}
@Override
@ -52,7 +47,7 @@ public class SpecialEventTests extends AbstractSerializingTestCase<SpecialEvent>
@Override
protected SpecialEvent doParseInstance(XContentParser parser) throws IOException {
return SpecialEvent.PARSER.apply(parser, null);
return SpecialEvent.PARSER.apply(parser, null).build();
}
public void testToDetectionRule() {
@ -80,6 +75,36 @@ public class SpecialEventTests extends AbstractSerializingTestCase<SpecialEvent>
long conditionEndTime = Long.parseLong(conditions.get(1).getCondition().getValue());
assertEquals(0, conditionEndTime % bucketSpanSecs);
assertEquals(bucketSpanSecs * (bucketCount + 1), conditionEndTime);
long eventTime = event.getEndTime().toEpochSecond() - conditionStartTime;
long numbBucketsInEvent = (eventTime + bucketSpanSecs -1) / bucketSpanSecs;
assertEquals(bucketSpanSecs * (bucketCount + numbBucketsInEvent), conditionEndTime);
}
public void testBuild() {
SpecialEvent.Builder builder = new SpecialEvent.Builder();
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, builder::build);
assertEquals("Field [description] cannot be null", e.getMessage());
builder.description("foo");
e = expectThrows(ElasticsearchStatusException.class, builder::build);
assertEquals("Field [start_time] cannot be null", e.getMessage());
ZonedDateTime now = ZonedDateTime.now();
builder.startTime(now);
e = expectThrows(ElasticsearchStatusException.class, builder::build);
assertEquals("Field [end_time] cannot be null", e.getMessage());
builder.endTime(now.plusHours(1));
e = expectThrows(ElasticsearchStatusException.class, builder::build);
assertEquals("Field [calendar_id] cannot be null", e.getMessage());
builder.calendarId("foo");
builder.build();
builder = new SpecialEvent.Builder().description("f").calendarId("c");
builder.startTime(now);
builder.endTime(now.minusHours(2));
e = expectThrows(ElasticsearchStatusException.class, builder::build);
assertThat(e.getMessage(), containsString("must come before end time"));
}
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.SpecialEventsQueryBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCountsTests;
@ -63,12 +64,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.not;
@ -267,40 +266,73 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
}
public void testSpecialEvents() throws Exception {
Job.Builder jobA = createJob("job_a");
Job.Builder jobB = createJob("job_b");
Job.Builder jobC = createJob("job_c");
String calendarAId = "maintenance_a";
List<Calendar> calendars = new ArrayList<>();
calendars.add(new Calendar(calendarAId, Collections.singletonList("job_a")));
ZonedDateTime now = ZonedDateTime.now();
List<SpecialEvent> events = new ArrayList<>();
events.add(new SpecialEvent("A_and_B_downtime", "downtime", createZonedDateTime(1000L), createZonedDateTime(2000L),
Arrays.asList("job_a", "job_b")));
events.add(new SpecialEvent("A_downtime", "downtime", createZonedDateTime(5000L), createZonedDateTime(10000L),
Collections.singletonList("job_a")));
events.add(buildSpecialEvent("downtime", now.plusDays(1), now.plusDays(2), calendarAId));
events.add(buildSpecialEvent("downtime_AA", now.plusDays(8), now.plusDays(9), calendarAId));
events.add(buildSpecialEvent("downtime_AAA", now.plusDays(15), now.plusDays(16), calendarAId));
String calendarABId = "maintenance_a_and_b";
calendars.add(new Calendar(calendarABId, Arrays.asList("job_a", "job_b")));
events.add(buildSpecialEvent("downtime_AB", now.plusDays(12), now.plusDays(13), calendarABId));
indexCalendars(calendars);
indexSpecialEvents(events);
Job.Builder job = createJob("job_b");
List<SpecialEvent> returnedEvents = getSpecialEvents(job.getId());
assertEquals(1, returnedEvents.size());
assertEquals(events.get(0), returnedEvents.get(0));
job = createJob("job_a");
returnedEvents = getSpecialEvents(job.getId());
assertEquals(2, returnedEvents.size());
SpecialEventsQueryBuilder query = new SpecialEventsQueryBuilder();
List<SpecialEvent> returnedEvents = getSpecialEventsForJob(jobA.getId(), query);
assertEquals(4, returnedEvents.size());
assertEquals(events.get(0), returnedEvents.get(0));
assertEquals(events.get(1), returnedEvents.get(1));
assertEquals(events.get(3), returnedEvents.get(2));
assertEquals(events.get(2), returnedEvents.get(3));
job = createJob("job_c");
returnedEvents = getSpecialEvents(job.getId());
returnedEvents = getSpecialEventsForJob(jobB.getId(), query);
assertEquals(1, returnedEvents.size());
assertEquals(events.get(3), returnedEvents.get(0));
returnedEvents = getSpecialEventsForJob(jobC.getId(), query);
assertEquals(0, returnedEvents.size());
// Test time filters
// Lands halfway through the second event which should be returned
query.after(Long.toString(now.plusDays(8).plusHours(1).toInstant().toEpochMilli()));
// Lands halfway through the 3rd event which should be returned
query.before(Long.toString(now.plusDays(12).plusHours(1).toInstant().toEpochMilli()));
returnedEvents = getSpecialEventsForJob(jobA.getId(), query);
assertEquals(2, returnedEvents.size());
assertEquals(events.get(1), returnedEvents.get(0));
assertEquals(events.get(3), returnedEvents.get(1));
}
private SpecialEvent buildSpecialEvent(String description, ZonedDateTime start, ZonedDateTime end, String calendarId) {
return new SpecialEvent.Builder().description(description).startTime(start).endTime(end).calendarId(calendarId).build();
}
public void testGetAutodetectParams() throws Exception {
String jobId = "test_get_autodetect_params";
Job.Builder job = createJob(jobId, Arrays.asList("fruit", "tea"));
String calendarId = "downtime";
Calendar calendar = new Calendar(calendarId, Collections.singletonList(jobId));
indexCalendars(Collections.singletonList(calendar));
// index the param docs
ZonedDateTime now = ZonedDateTime.now();
List<SpecialEvent> events = new ArrayList<>();
events.add(new SpecialEvent("A_downtime", "downtime", createZonedDateTime(5000L), createZonedDateTime(10000L),
Collections.singletonList(jobId)));
events.add(new SpecialEvent("A_downtime2", "downtime", createZonedDateTime(20000L), createZonedDateTime(21000L),
Collections.singletonList(jobId)));
// events in the past should be filtered out
events.add(buildSpecialEvent("In the past", now.minusDays(7), now.minusDays(6), calendarId));
events.add(buildSpecialEvent("A_downtime", now.plusDays(1), now.plusDays(2), calendarId));
events.add(buildSpecialEvent("A_downtime2", now.plusDays(8), now.plusDays(9), calendarId));
indexSpecialEvents(events);
List<MlFilter> filters = new ArrayList<>();
@ -335,9 +367,10 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
// special events
assertNotNull(params.specialEvents());
assertEquals(2, params.specialEvents().size());
assertEquals(3, params.specialEvents().size());
assertEquals(events.get(0), params.specialEvents().get(0));
assertEquals(events.get(1), params.specialEvents().get(1));
assertEquals(events.get(2), params.specialEvents().get(2));
// filters
assertNotNull(params.filters());
@ -382,24 +415,25 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
return searchResultHolder.get();
}
private List<SpecialEvent> getSpecialEvents(String jobId) throws Exception {
private List<SpecialEvent> getSpecialEventsForJob(String jobId, SpecialEventsQueryBuilder query) throws Exception {
AtomicReference<Exception> errorHolder = new AtomicReference<>();
AtomicReference<List<SpecialEvent>> searchResultHolder = new AtomicReference<>();
AtomicReference<QueryPage<SpecialEvent>> searchResultHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
jobProvider.specialEvents(jobId, params -> {
jobProvider.specialEventsForJob(jobId, query, ActionListener.wrap(
params -> {
searchResultHolder.set(params);
latch.countDown();
}, e -> {
errorHolder.set(e);
latch.countDown();
});
}));
latch.await();
if (errorHolder.get() != null) {
throw errorHolder.get();
}
return searchResultHolder.get();
return searchResultHolder.get().results();
}
private Job.Builder createJob(String jobId) {
@ -445,7 +479,7 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (SpecialEvent event : events) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, event.documentId());
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"));
indexRequest.source(event.toXContent(builder, params));

View File

@ -91,7 +91,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
Collections.singletonList(new DetectionRule.Builder(conditions).build())));
UpdateParams updateParams = new UpdateParams(null, detectorUpdates, true);
List<SpecialEvent> events = Collections.singletonList(SpecialEventTests.createSpecialEvent());
List<SpecialEvent> events = Collections.singletonList(SpecialEventTests.createSpecialEvent(randomAlphaOfLength(10)));
communicator.writeUpdateProcessMessage(updateParams, events, ((aVoid, e) -> {}));

View File

@ -238,12 +238,14 @@ public class FieldConfigWriterTests extends ESTestCase {
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(d));
analysisConfig = builder.build();
specialEvents.add(
new SpecialEvent("1", "The Ashes", ZonedDateTime.ofInstant(Instant.ofEpochMilli(1511395200000L), ZoneOffset.UTC),
ZonedDateTime.ofInstant(Instant.ofEpochMilli(1515369600000L), ZoneOffset.UTC), Collections.emptyList()));
specialEvents.add(
new SpecialEvent("2", "elasticon", ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519603200000L), ZoneOffset.UTC),
ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519862400000L), ZoneOffset.UTC), Collections.emptyList()));
specialEvents.add(new SpecialEvent.Builder().description("The Ashes")
.startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1511395200000L), ZoneOffset.UTC))
.endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1515369600000L), ZoneOffset.UTC))
.calendarId("calendar_id").build());
specialEvents.add(new SpecialEvent.Builder().description("elasticon")
.startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519603200000L), ZoneOffset.UTC))
.endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519862400000L), ZoneOffset.UTC))
.calendarId("calendar_id").build());
writer = mock(OutputStreamWriter.class);
createFieldConfigWriter().write();

View File

@ -0,0 +1,37 @@
{
"xpack.ml.get_calendar_events": {
"methods": [ "GET" ],
"url": {
"path": "/_xpack/ml/calendars/{calendar_id}/events",
"paths": [
"/_xpack/ml/calendars/{calendar_id}/events"
],
"parts": {
"calendar_id": {
"type": "string",
"description": "The ID of the calendar containing the events",
"required": true
}
},
"params": {
"after": {
"type": "string",
"description": "Get events after this time"
},
"before": {
"type": "date",
"description": "Get events before this time"
},
"from": {
"type": "int",
"description": "Skips a number of events"
},
"size": {
"type": "int",
"description": "Specifies a max number of events to get"
}
}
},
"body": null
}
}

View File

@ -1,6 +1,6 @@
{
"xpack.ml.get_calendars": {
"methods": [ "GET" ],
"methods": [ "GET", "POST" ],
"url": {
"path": "/_xpack/ml/calendars/{calendar_id}",
"paths": [

View File

@ -0,0 +1,20 @@
{
"xpack.ml.post_calendar_events": {
"methods": [ "POST" ],
"url": {
"path": "/_xpack/ml/calendars/{calendar_id}/events",
"paths": [ "/_xpack/ml/calendars/{calendar_id}/events" ],
"parts": {
"calendar_id": {
"type": "string",
"required": true,
"description": "The ID of the calendar to modify"
}
}
},
"body": {
"description" : "A list of special events",
"required" : true
}
}
}

View File

@ -177,7 +177,7 @@
- match: { calendars.0.job_ids: [] }
---
"Test update calendar":
"Test update calendar job ids":
- do:
xpack.ml.put_calendar:
@ -234,3 +234,62 @@
xpack.ml.delete_calendar_job:
calendar_id: "Wildlife"
job_id: "missing job"
---
"Test calendar events":
- do:
xpack.ml.put_calendar:
calendar_id: "events"
- do:
xpack.ml.post_calendar_events:
calendar_id: "events"
body: >
{ "description": "event 1", "start_time": "2017-12-01T00:00:00Z", "end_time": "2017-12-02T00:00:00Z", "calendar_id": "events"}
{ "description": "event 2", "start_time": "2017-12-05T00:00:00Z", "end_time": "2017-12-06T00:00:00Z", "calendar_id": "events"}
{ "description": "event 3", "start_time": "2017-12-12T00:00:00Z", "end_time": "2017-12-13T00:00:00Z", "calendar_id": "events"}
{ "description": "event 4", "start_time": "2017-12-12T00:00:00Z", "end_time": "2017-12-15T00:00:00Z", "calendar_id": "events"}
- do:
xpack.ml.get_calendar_events:
calendar_id: "events"
- length: { special_events: 4 }
- match: { special_events.0.description: "event 1" }
- match: { special_events.1.description: "event 2" }
- match: { special_events.2.description: "event 3" }
- match: { special_events.3.description: "event 4" }
- do:
xpack.ml.get_calendar_events:
calendar_id: "events"
from: 1
size: 2
- length: { special_events: 2 }
- match: { special_events.0.description: "event 2" }
- match: { special_events.1.description: "event 3" }
- do:
xpack.ml.get_calendar_events:
calendar_id: "events"
before: "2017-12-12T00:00:00Z"
- length: { special_events: 2 }
- match: { special_events.0.description: "event 1" }
- match: { special_events.1.description: "event 2" }
- do:
xpack.ml.get_calendar_events:
calendar_id: "events"
after: "2017-12-05T03:00:00Z"
- length: { special_events: 3 }
- match: { special_events.0.description: "event 2" }
- match: { special_events.1.description: "event 3" }
- match: { special_events.2.description: "event 4" }
- do:
xpack.ml.get_calendar_events:
calendar_id: "events"
after: "2017-12-02T00:00:00Z"
before: "2017-12-12T00:00:00Z"
- length: { special_events: 1 }
- match: { special_events.0.description: "event 2" }