Merge branch 'master' into feature/sql_2

Original commit: elastic/x-pack-elasticsearch@5b23edc3ef
This commit is contained in:
Nik Everett 2017-12-22 09:49:25 -05:00
commit 5b30bc6c7a
63 changed files with 2363 additions and 389 deletions

View File

@ -0,0 +1,69 @@
[float]
[[ml-forecasting]]
=== Forecasting the Future
After the {xpackml} features create baselines of normal behavior for your data,
you can use that information to extrapolate future behavior.
You can use a forecast to estimate a time series value at a specific future date.
For example, you might want to determine how many users you can expect to visit
your website next Sunday at 0900.
You can also use it to estimate the probability of a time series value occurring
at a future date. For example, you might want to determine how likely it is that
your disk utilization will reach 100% before the end of next week.
Each forecast has a unique ID, which you can use to distinguish between forecasts
that you created at different times. You can create a forecast by using the
{ref}/ml-forecast.html[Forecast Jobs API] or by using {kib}. For example:
[role="screenshot"]
image::images/ml-gs-job-forecast.jpg["Example screenshot from the Machine Learning Single Metric Viewer in Kibana"]
//For a more detailed walk-through of {xpackml} features, see <<ml-getting-started>>.
The yellow line in the chart represents the predicted data values. The
shaded yellow area represents the bounds for the predicted values, which also
gives an indication of the confidence of the predictions.
When you create a forecast, you specify its _duration_, which indicates how far
the forecast extends beyond the last record that was processed. By default, the
duration is 1 day. Typically the farther into the future that you forecast, the
lower the confidence levels become (that is to say, the bounds increase).
Eventually if the confidence levels are too low, the forecast stops.
You can also optionally specify when the forecast expires. By default, it
expires in 14 days and is deleted automatically thereafter. You can specify a
different expiration period by using the `expires_in` parameter in the
{ref}/ml-forecast.html[Forecast Jobs API].
//Add examples of forecast_request_stats and forecast documents?
There are some limitations that affect your ability to create a forecast:
* You can generate only three forecasts concurrently. There is no limit to the
number of forecasts that you retain. Existing forecasts are not overwritten when
you create new forecasts. Rather, they are automatically deleted when they expire.
* If you use an `over_field_name` property in your job (that is to say, it's a
_population job_), you cannot create a forecast.
* If you use any of the following analytical functions in your job, you
cannot create a forecast:
** `lat_long`
** `rare` and `freq_rare`
** `time_of_day` and `time_of_week`
+
--
For more information about any of these functions, see <<ml-functions>>.
--
* Forecasts run concurrently with real-time {ml} analysis. That is to say, {ml}
analysis does not stop while forecasts are generated. Forecasts can have an
impact on {ml} jobs, however, especially in terms of memory usage. For this
reason, forecasts run only if the model memory status is acceptable and the
snapshot models for the forecast do not require more than 20 MB. If these memory
limits are reached, consider splitting the job into multiple smaller jobs and
creating forecasts for these.
* The job must be open when you create a forecast. Otherwise, an error occurs.
* If there is insufficient data to generate any meaningful predictions, an
error occurs. In general, forecasts that are created early in the learning phase
of the data analysis are less accurate.

View File

@ -6,6 +6,8 @@ input data.
The {xpackml} features include the following geographic function: `lat_long`.
NOTE: You cannot create forecasts for jobs that contain geographic functions.
[float]
[[ml-lat-long]]
==== Lat_long

View File

@ -12,6 +12,8 @@ number of times (frequency) rare values occur.
====
* The `rare` and `freq_rare` functions should not be used in conjunction with
`exclude_frequent`.
* You cannot create forecasts for jobs that contain `rare` or `freq_rare`
functions.
* Shorter bucket spans (less than 1 hour, for example) are recommended when
looking for rare events. The functions model whether something happens in a
bucket at least once. With longer bucket spans, it is more likely that

View File

@ -13,6 +13,7 @@ The {xpackml} features include the following time functions:
[NOTE]
====
* NOTE: You cannot create forecasts for jobs that contain time functions.
* The `time_of_day` function is not aware of the difference between days, for instance
work days and weekends. When modeling different days, use the `time_of_week` function.
In general, the `time_of_week` function is more suited to modeling the behavior of people

Binary file not shown.

After

Width:  |  Height:  |  Size: 262 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 152 KiB

After

Width:  |  Height:  |  Size: 176 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 62 KiB

After

Width:  |  Height:  |  Size: 96 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 215 KiB

After

Width:  |  Height:  |  Size: 205 KiB

View File

@ -2,6 +2,7 @@
== Overview
include::analyzing.asciidoc[]
include::forecasting.asciidoc[]
[[ml-concepts]]
=== Basic Machine Learning Terms

View File

@ -58,15 +58,16 @@ PUT _xpack/ml/anomaly_detectors/population
//include only workstations as servers and printers would behave differently
//from the population
If your data is stored in {es}, you can create an advanced job with these same
properties. In particular, you specify the `over_field_name` property when you
add detectors:
If your data is stored in {es}, you can use the population job wizard in {kib}
to create a job with these same properties. For example, the population job
wizard provides the following job settings:
[role="screenshot"]
image::images/ml-population-job.jpg["Create a detector for population analysis]
image::images/ml-population-job.jpg["Job settings in the population job wizard]
After you open the job and start the {dfeed} or supply data to the job, you can
view the results in {kib}. For example:
view the results in {kib}. For example, you can view the results in the
**Anomaly Explorer**:
[role="screenshot"]
image::images/ml-population-results.jpg["Population analysis results in the Anomaly Explorer"]
@ -76,7 +77,7 @@ data points for the selected time period. Population analysis is particularly
useful when you have many entities and the data for specific entitles is sporadic
or sparse.
If you click on a section in the time line or swim lanes, you can see more
If you click on a section in the timeline or swimlanes, you can see more
details about the anomalies:
[role="screenshot"]

View File

@ -6,6 +6,14 @@ You can use the following APIs to perform {ml} activities.
See <<ml-api-definitions, Definitions>> for the resource definitions used by the
machine learning APIs and in advanced job configuration options in Kibana.
[float]
[[ml-api-calendar-endpoint]]
=== Calendars
* <<ml-put-calendar,Create calendar>>, <<ml-delete-calendar,Delete calendar>>
* <<ml-put-calendar-job,Add job to calendar>>, <<ml-delete-calendar-job,Delete job from calendar>>
* <<ml-get-calendar,Get calendars>>
[float]
[[ml-api-datafeed-endpoint]]
=== {dfeeds-cap}
@ -24,6 +32,7 @@ machine learning APIs and in advanced job configuration options in Kibana.
//* <<ml-valid-detector,Validate detectors>>, <<ml-valid-job,Validate job>>
* <<ml-put-job,Create job>>, <<ml-delete-job,Delete job>>
* <<ml-put-calendar-job,Add job to calendar>>, <<ml-delete-calendar-job,Delete job from calendar>>
* <<ml-open-job,Open job>>, <<ml-close-job,Close job>>
* <<ml-get-job,Get job info>>, <<ml-get-job-stats,Get job statistics>>
* <<ml-flush-job,Flush job>>
@ -51,21 +60,26 @@ machine learning APIs and in advanced job configuration options in Kibana.
* <<ml-get-influencer,Get influencers>>
* <<ml-get-record,Get records>>
//ADD
include::ml/put-calendar-job.asciidoc[]
//CLOSE
include::ml/close-job.asciidoc[]
//CREATE
include::ml/put-calendar.asciidoc[]
include::ml/put-datafeed.asciidoc[]
include::ml/put-job.asciidoc[]
//DELETE
include::ml/delete-calendar.asciidoc[]
include::ml/delete-datafeed.asciidoc[]
include::ml/delete-job.asciidoc[]
include::ml/delete-calendar-job.asciidoc[]
include::ml/delete-snapshot.asciidoc[]
//FLUSH
include::ml/flush-job.asciidoc[]
//FORECAST
include::ml/forecast.asciidoc[]
//GET
include::ml/get-calendar.asciidoc[]
include::ml/get-bucket.asciidoc[]
include::ml/get-overall-buckets.asciidoc[]
include::ml/get-category.asciidoc[]

View File

@ -0,0 +1,35 @@
[role="xpack"]
[[ml-delete-calendar-job]]
=== Delete Jobs from Calendar API
++++
<titleabbrev>Delete Jobs from Calendar</titleabbrev>
++++
The delete jobs from calendar API enables you to remove jobs from a calendar.
==== Request
`DELETE _xpack/ml/calendars/<calendar_id>/jobs/<job_id>`
//==== Description
==== Path Parameters
`calendar_id`(required)::
(string) Identifier for the calendar.
`job_id` (required)::
(string) Identifier for the job.
//===== Query Parameters
==== Authorization
You must have `manage_ml`, or `manage` cluster privileges to use this API.
For more information, see {xpack-ref}/security-privileges.html[Security Privileges].
//==== Examples

View File

@ -0,0 +1,30 @@
[role="xpack"]
[[ml-delete-calendar]]
=== Delete Calendar API
++++
<titleabbrev>Delete Calendar</titleabbrev>
++++
The delete calendar API enables you to delete a calendar.
==== Request
`DELETE _xpack/ml/calendars/<calendar_id>`
//==== Description
==== Path Parameters
`calendar_id` (required)::
(string) Identifier for the calendar.
//===== Query Parameters
==== Authorization
You must have `manage_ml`, or `manage` cluster privileges to use this API.
For more information, see {xpack-ref}/security-privileges.html[Security Privileges].
//==== Examples

View File

@ -15,17 +15,7 @@ a time series.
==== Description
You can use the API to estimate a time series value at a specific future date.
For example, you might want to determine how many users you can expect to visit
your website next Sunday at 0900.
You can also use it to estimate the probability of a time series value occurring
at a future date. For example, you might want to determine how likely it is that
your disk utilization will reach 100% before the end of next week.
Each time you call the API, it generates a new forecast and returns a unique ID.
Existing forecasts for the same job are not overwritten. You can use the forecast
ID to distinguish between forecasts that you generated at different times.
See {xpack-ref}/ml-overview.html#ml-forecasting[Forecasting the Future].
[NOTE]
===============================
@ -45,9 +35,9 @@ forecast. For more information about this property, see <<ml-job-resource>>.
`duration`::
(time units) A period of time that indicates how far into the future to
forecast. For example, `30d` corresponds to 30 days. The forecast starts at the
last record that was processed. For more information about time units, see
<<time-units>>.
forecast. For example, `30d` corresponds to 30 days. The default value is 1
day. The forecast starts at the last record that was processed. For more
information about time units, see <<time-units>>.
`expires_in`::
(time units) The period of time that forecast results are retained.
@ -84,6 +74,6 @@ When the forecast is created, you receive the following results:
}
----
You can subsequently see the forecast in the *Single Metric Viewer* in {kib}
and in the results that you retrieve by using {ml} APIs such as the
<<ml-get-bucket,get bucket API>> and <<ml-get-record,get records API>>.
You can subsequently see the forecast in the *Single Metric Viewer* in {kib}.
//and in the results that you retrieve by using {ml} APIs such as the
//<<ml-get-bucket,get bucket API>> and <<ml-get-record,get records API>>.

View File

@ -0,0 +1,65 @@
[role="xpack"]
[[ml-get-calendar]]
=== Get Calendars API
++++
<titleabbrev>Get Calendars</titleabbrev>
++++
The get calendars API enables you to retrieve configuration information for
calendars.
==== Request
`GET _xpack/ml/calendars/<calendar_id>` +
`GET _xpack/ml/calendars/<calendar_id>,<calendar_id>` +
`GET _xpack/ml/calendars/` +
`GET _xpack/ml/calendars/_all`
//===== Description
////
You can get information for multiple jobs in a single API request by using a
group name, a comma-separated list of jobs, or a wildcard expression. You can
get information for all jobs by using `_all`, by specifying `*` as the
`<job_id>`, or by omitting the `<job_id>`.
////
==== Path Parameters
`calendar_id`::
(string) Identifier for the calendar. It can be a calendar identifier or a
wildcard expression. If you do not specify one of these options, the API
returns information for all calendars.
==== Request Body
`from`:::
(integer) Skips the specified number of calendars.
`size`:::
(integer) Specifies the maximum number of calendars to obtain.
//==== Results
////
The API returns the following information:
`jobs`::
(array) An array of job resources.
For more information, see <<ml-job-resource,Job Resources>>.
////
==== Authorization
You must have `monitor_ml`, `monitor`, `manage_ml`, or `manage` cluster
privileges to use this API. For more information, see
{xpack-ref}/security-privileges.html[Security Privileges].
//==== Examples

View File

@ -0,0 +1,34 @@
[role="xpack"]
[[ml-put-calendar-job]]
=== Add Jobs to Calendar API
++++
<titleabbrev>Add Jobs to Calendar</titleabbrev>
++++
The add jobs to calendar API enables you to add jobs to a calendar.
==== Request
`PUT _xpack/ml/calendars/<calendar_id>/jobs/<job_id>`
//===== Description
==== Path Parameters
`calendar_id` (required)::
(string) Identifier for the calendar.
`job_id` (required)::
(string) Identifier for the job.
//==== Request Body
==== Authorization
You must have `manage_ml`, or `manage` cluster privileges to use this API.
For more information, see
{xpack-ref}/security-privileges.html[Security Privileges].
//==== Examples

View File

@ -0,0 +1,35 @@
[role="xpack"]
[[ml-put-calendar]]
=== Create Calendar API
++++
<titleabbrev>Create Calendar</titleabbrev>
++++
The create calendar API enables you to instantiate a calendar.
==== Request
`PUT _xpack/ml/calendars/<calendar_id>`
//===== Description
==== Path Parameters
`calendar_id` (required)::
(string) Identifier for the calendar.
==== Request Body
`description`::
(string) A description of the calendar.
==== Authorization
You must have `manage_ml`, or `manage` cluster privileges to use this API.
For more information, see
{xpack-ref}/security-privileges.html[Security Privileges].
//==== Examples
//See plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml

View File

@ -51,6 +51,9 @@ The following snippet shows a simple `index` action definition:
the index action times out and fails. This setting
overrides the default timeouts.
| `refresh` | no | - | Optional setting of the {ref}/docs-refresh.html[refresh policy]
for the write request
|======
[[anatomy-actions-index-multi-doc-support]]

View File

@ -52,6 +52,8 @@ import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.DeleteCalendarAction;
import org.elasticsearch.xpack.ml.action.GetCalendarEventsAction;
import org.elasticsearch.xpack.ml.action.PostCalendarEventsAction;
import org.elasticsearch.xpack.ml.action.UpdateCalendarJobAction;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction;
@ -119,7 +121,9 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarJobAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestGetCalendarEventsAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestGetCalendarsAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestPostCalendarEventAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarJobAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestDeleteDatafeedAction;
@ -471,7 +475,9 @@ public class MachineLearning implements ActionPlugin {
new RestPutCalendarAction(settings, restController),
new RestDeleteCalendarAction(settings, restController),
new RestDeleteCalendarJobAction(settings, restController),
new RestPutCalendarJobAction(settings, restController)
new RestPutCalendarJobAction(settings, restController),
new RestGetCalendarEventsAction(settings, restController),
new RestPostCalendarEventAction(settings, restController)
);
}
@ -521,7 +527,9 @@ public class MachineLearning implements ActionPlugin {
new ActionHandler<>(GetCalendarsAction.INSTANCE, GetCalendarsAction.TransportAction.class),
new ActionHandler<>(PutCalendarAction.INSTANCE, PutCalendarAction.TransportAction.class),
new ActionHandler<>(DeleteCalendarAction.INSTANCE, DeleteCalendarAction.TransportAction.class),
new ActionHandler<>(UpdateCalendarJobAction.INSTANCE, UpdateCalendarJobAction.TransportAction.class)
new ActionHandler<>(UpdateCalendarJobAction.INSTANCE, UpdateCalendarJobAction.TransportAction.class),
new ActionHandler<>(GetCalendarEventsAction.INSTANCE, GetCalendarEventsAction.TransportAction.class),
new ActionHandler<>(PostCalendarEventsAction.INSTANCE, PostCalendarEventsAction.TransportAction.class)
);
}
@ -568,6 +576,7 @@ public class MachineLearning implements ActionPlugin {
// Our indexes are small and one shard puts the
// least possible burden on Elasticsearch
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting))
.build();
templates.put(Auditor.NOTIFICATIONS_INDEX, notificationMessageTemplate);
@ -582,6 +591,7 @@ public class MachineLearning implements ActionPlugin {
// Our indexes are small and one shard puts the
// least possible burden on Elasticsearch
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting))
.version(Version.CURRENT.id)
.putMapping(MlMetaIndex.TYPE, docMapping.string())
@ -596,6 +606,7 @@ public class MachineLearning implements ActionPlugin {
.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()))
// TODO review these settings
.settings(Settings.builder()
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)
// Sacrifice durability for performance: in the event of power
// failure we can lose the last 5 seconds of changes, but it's
@ -613,6 +624,7 @@ public class MachineLearning implements ActionPlugin {
IndexTemplateMetaData jobResultsTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobResultsIndexPrefix())
.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"))
.settings(Settings.builder()
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)
// Sacrifice durability for performance: in the event of power
// failure we can lose the last 5 seconds of changes, but it's

View File

@ -5,12 +5,14 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
@ -31,6 +33,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
@ -247,16 +250,13 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
public static class TransportAction extends TransportJobTaskAction<Request, Response> {
private final JobManager jobManager;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager, AutodetectProcessManager processManager) {
AutodetectProcessManager processManager) {
super(settings, ForecastJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, ForecastJobAction.Request::new, ForecastJobAction.Response::new, ThreadPool.Names.SAME,
processManager);
this.jobManager = jobManager;
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
}
@ -269,20 +269,16 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
@Override
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
Job job = JobManager.getJobOrThrowIfUnknown(task.getJobId(), state);
validate(job, request);
ForecastParams.Builder paramsBuilder = ForecastParams.builder();
if (request.getDuration() != null) {
TimeValue duration = request.getDuration();
TimeValue bucketSpan = jobManager.getJobOrThrowIfUnknown(task.getJobId()).getAnalysisConfig().getBucketSpan();
if (duration.compareTo(bucketSpan) < 0) {
throw new IllegalArgumentException(
"[" + DURATION.getPreferredName()
+ "] must be greater or equal to the bucket span: ["
+ duration.getStringRep() + "/" + bucketSpan.getStringRep() + "]");
}
paramsBuilder.duration(request.getDuration());
}
if (request.getExpiresIn() != null) {
paramsBuilder.expiresIn(request.getExpiresIn());
}
@ -296,6 +292,24 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
}
});
}
static void validate(Job job, Request request) {
if (job.getJobVersion() == null || job.getJobVersion().before(Version.V_6_1_0)) {
throw ExceptionsHelper.badRequestException(
"Cannot run forecast because jobs created prior to version 6.1 are not supported");
}
if (request.getDuration() != null) {
TimeValue duration = request.getDuration();
TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
if (duration.compareTo(bucketSpan) < 0) {
throw ExceptionsHelper.badRequestException(
"[" + DURATION.getPreferredName() + "] must be greater or equal to the bucket span: [" + duration.getStringRep()
+ "/" + bucketSpan.getStringRep() + "]");
}
}
}
}
}

View File

@ -0,0 +1,319 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.action.util.PageParams;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.SpecialEventsQueryBuilder;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
public class GetCalendarEventsAction extends Action<GetCalendarEventsAction.Request, GetCalendarEventsAction.Response,
GetCalendarEventsAction.RequestBuilder> {
public static final GetCalendarEventsAction INSTANCE = new GetCalendarEventsAction();
public static final String NAME = "cluster:monitor/xpack/ml/calendars/events/get";
private GetCalendarEventsAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends ActionRequest implements ToXContentObject {
public static final ParseField AFTER = new ParseField("after");
public static final ParseField BEFORE = new ParseField("before");
private static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
static {
PARSER.declareString(Request::setCalendarId, Calendar.ID);
PARSER.declareString(Request::setAfter, AFTER);
PARSER.declareString(Request::setBefore, BEFORE);
PARSER.declareString(Request::setJobId, Job.ID);
PARSER.declareObject(Request::setPageParams, PageParams.PARSER, PageParams.PAGE);
}
public static Request parseRequest(String calendarId, XContentParser parser) {
Request request = PARSER.apply(parser, null);
if (calendarId != null) {
request.setCalendarId(calendarId);
}
return request;
}
private String calendarId;
private String after;
private String before;
private String jobId;
private PageParams pageParams = PageParams.defaultParams();
Request() {
}
public Request(String calendarId) {
setCalendarId(calendarId);
}
public String getCalendarId() {
return calendarId;
}
private void setCalendarId(String calendarId) {
this.calendarId = ExceptionsHelper.requireNonNull(calendarId, Calendar.ID.getPreferredName());
}
public String getAfter() {
return after;
}
public void setAfter(String after) {
this.after = after;
}
public String getBefore() {
return before;
}
public void setBefore(String before) {
this.before = before;
}
public String getJobId() {
return jobId;
}
public void setJobId(String jobId) {
this.jobId = jobId;
}
public PageParams getPageParams() {
return pageParams;
}
public void setPageParams(PageParams pageParams) {
this.pageParams = Objects.requireNonNull(pageParams);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException e = null;
boolean calendarIdIsAll = GetCalendarsAction.Request.ALL.equals(calendarId);
if (jobId != null && calendarIdIsAll == false) {
e = ValidateActions.addValidationError("If " + Job.ID.getPreferredName() + " is used " +
Calendar.ID.getPreferredName() + " must be '" + GetCalendarsAction.Request.ALL + "'", e);
}
return e;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
calendarId = in.readString();
after = in.readOptionalString();
before = in.readOptionalString();
jobId = in.readOptionalString();
pageParams = new PageParams(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(calendarId);
out.writeOptionalString(after);
out.writeOptionalString(before);
out.writeOptionalString(jobId);
pageParams.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(calendarId, after, before, pageParams, jobId);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(calendarId, other.calendarId) && Objects.equals(after, other.after)
&& Objects.equals(before, other.before) && Objects.equals(pageParams, other.pageParams)
&& Objects.equals(jobId, other.jobId);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Calendar.ID.getPreferredName(), calendarId);
if (after != null) {
builder.field(AFTER.getPreferredName(), after);
}
if (before != null) {
builder.field(BEFORE.getPreferredName(), before);
}
if (jobId != null) {
builder.field(Job.ID.getPreferredName(), jobId);
}
builder.field(PageParams.PAGE.getPreferredName(), pageParams);
builder.endObject();
return builder;
}
}
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new Request());
}
}
public static class Response extends ActionResponse implements ToXContentObject {
private QueryPage<SpecialEvent> specialEvents;
Response() {
}
public Response(QueryPage<SpecialEvent> specialEvents) {
this.specialEvents = specialEvents;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
specialEvents = new QueryPage<>(in, SpecialEvent::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
specialEvents.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return specialEvents.toXContent(builder, params);
}
@Override
public int hashCode() {
return Objects.hash(specialEvents);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return Objects.equals(specialEvents, other.specialEvents);
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final JobProvider jobProvider;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
JobProvider jobProvider) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.jobProvider = jobProvider;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
ActionListener<Boolean> calendarExistsListener = ActionListener.wrap(
r -> {
SpecialEventsQueryBuilder query = new SpecialEventsQueryBuilder()
.after(request.getAfter())
.before(request.getBefore())
.from(request.getPageParams().getFrom())
.size(request.getPageParams().getSize());
if (GetCalendarsAction.Request.ALL.equals(request.getCalendarId()) == false) {
query.calendarIds(Collections.singletonList(request.getCalendarId()));
}
ActionListener<QueryPage<SpecialEvent>> eventsListener = ActionListener.wrap(
events -> {
listener.onResponse(new Response(events));
},
listener::onFailure
);
if (request.getJobId() != null) {
jobProvider.specialEventsForJob(request.getJobId(), query, eventsListener);
} else {
jobProvider.specialEvents(query, eventsListener);
}
},
listener::onFailure);
checkCalendarExists(request.getCalendarId(), calendarExistsListener);
}
private void checkCalendarExists(String calendarId, ActionListener<Boolean> listener) {
if (GetCalendarsAction.Request.ALL.equals(calendarId)) {
listener.onResponse(true);
return;
}
jobProvider.calendar(calendarId, ActionListener.wrap(
c -> listener.onResponse(true),
listener::onFailure
));
}
}
}

View File

@ -11,34 +11,23 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.action.util.PageParams;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.calendars.Calendar;
@ -46,14 +35,10 @@ import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin;
public class GetCalendarsAction extends Action<GetCalendarsAction.Request, GetCalendarsAction.Response, GetCalendarsAction.RequestBuilder> {
@ -74,7 +59,24 @@ public class GetCalendarsAction extends Action<GetCalendarsAction.Request, GetCa
return new Response();
}
public static class Request extends ActionRequest {
public static class Request extends ActionRequest implements ToXContentObject {
public static final String ALL = "_all";
private static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
static {
PARSER.declareString(Request::setCalendarId, Calendar.ID);
PARSER.declareObject(Request::setPageParams, PageParams.PARSER, PageParams.PAGE);
}
public static Request parseRequest(String calendarId, XContentParser parser) {
Request request = PARSER.apply(parser, null);
if (calendarId != null) {
request.setCalendarId(calendarId);
}
return request;
}
private String calendarId;
private PageParams pageParams;
@ -114,18 +116,20 @@ public class GetCalendarsAction extends Action<GetCalendarsAction.Request, GetCa
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
calendarId = in.readString();
calendarId = in.readOptionalString();
pageParams = in.readOptionalWriteable(PageParams::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(calendarId);
out.writeOptionalString(calendarId);
out.writeOptionalWriteable(pageParams);
}
@Override
public int hashCode() {
return Objects.hash(calendarId);
return Objects.hash(calendarId, pageParams);
}
@Override
@ -137,7 +141,20 @@ public class GetCalendarsAction extends Action<GetCalendarsAction.Request, GetCa
return false;
}
Request other = (Request) obj;
return Objects.equals(calendarId, other.calendarId);
return Objects.equals(calendarId, other.calendarId) && Objects.equals(pageParams, other.pageParams);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (calendarId != null) {
builder.field(Calendar.ID.getPreferredName(), calendarId);
}
if (pageParams != null) {
builder.field(PageParams.PAGE.getPreferredName(), pageParams);
}
builder.endObject();
return builder;
}
}
@ -228,7 +245,7 @@ public class GetCalendarsAction extends Action<GetCalendarsAction.Request, GetCa
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
final String calendarId = request.getCalendarId();
if (request.getCalendarId() != null) {
if (request.getCalendarId() != null && Request.ALL.equals(request.getCalendarId()) == false) {
getCalendar(calendarId, listener);
} else {
PageParams pageParams = request.getPageParams();

View File

@ -0,0 +1,310 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin;
public class PostCalendarEventsAction extends Action<PostCalendarEventsAction.Request, PostCalendarEventsAction.Response,
PostCalendarEventsAction.RequestBuilder> {
public static final PostCalendarEventsAction INSTANCE = new PostCalendarEventsAction();
public static final String NAME = "cluster:admin/xpack/ml/calendars/events/post";
public static final ParseField SPECIAL_EVENTS = new ParseField("special_events");
private PostCalendarEventsAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends ActionRequest {
public static Request parseRequest(String calendarId, BytesReference data, XContentType contentType) throws IOException {
List<SpecialEvent.Builder> events = new ArrayList<>();
XContent xContent = contentType.xContent();
int lineNumber = 0;
int from = 0;
int length = data.length();
byte marker = xContent.streamSeparator();
while (true) {
int nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
break;
}
lineNumber++;
try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, data.slice(from, nextMarker - from))) {
try {
SpecialEvent.Builder event = SpecialEvent.PARSER.apply(parser, null);
events.add(event);
} catch (ParsingException pe) {
throw ExceptionsHelper.badRequestException("Failed to parse special event on line [" + lineNumber + "]", pe);
}
from = nextMarker + 1;
}
}
for (SpecialEvent.Builder event: events) {
if (event.getCalendarId() != null && event.getCalendarId().equals(calendarId) == false) {
throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.INCONSISTENT_ID,
Calendar.ID.getPreferredName(), event.getCalendarId(), calendarId));
}
// Set the calendar Id in case it is null
event.calendarId(calendarId);
}
return new Request(calendarId, events.stream().map(SpecialEvent.Builder::build).collect(Collectors.toList()));
}
private static int findNextMarker(byte marker, int from, BytesReference data, int length) {
for (int i = from; i < length; i++) {
if (data.get(i) == marker) {
return i;
}
}
if (from != length) {
throw new IllegalArgumentException("The post calendar events request must be terminated by a newline [\n]");
}
return -1;
}
private String calendarId;
private List<SpecialEvent> specialEvents;
Request() {
}
public Request(String calendarId, List<SpecialEvent> specialEvents) {
this.calendarId = ExceptionsHelper.requireNonNull(calendarId, Calendar.ID.getPreferredName());
this.specialEvents = ExceptionsHelper.requireNonNull(specialEvents, SPECIAL_EVENTS.getPreferredName());
}
public String getCalendarId() {
return calendarId;
}
public List<SpecialEvent> getSpecialEvents() {
return specialEvents;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
calendarId = in.readString();
specialEvents = in.readList(SpecialEvent::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(calendarId);
out.writeList(specialEvents);
}
@Override
public int hashCode() {
return Objects.hash(calendarId, specialEvents);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(calendarId, other.calendarId) && Objects.equals(specialEvents, other.specialEvents);
}
}
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new Request());
}
}
public static class Response extends AcknowledgedResponse implements ToXContentObject {
private List<SpecialEvent> specialEvent;
Response() {
}
public Response(List<SpecialEvent> specialEvents) {
super(true);
this.specialEvent = specialEvents;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
in.readList(SpecialEvent::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
out.writeList(specialEvent);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SPECIAL_EVENTS.getPreferredName(), specialEvent);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(isAcknowledged(), specialEvent);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return Objects.equals(isAcknowledged(), other.isAcknowledged()) && Objects.equals(specialEvent, other.specialEvent);
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final Client client;
private final JobProvider jobProvider;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client, JobProvider jobProvider) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.client = client;
this.jobProvider = jobProvider;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
List<SpecialEvent> events = request.getSpecialEvents();
ActionListener<Boolean> calendarExistsListener = ActionListener.wrap(
r -> {
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
for (SpecialEvent event: events) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
indexRequest.source(event.toXContent(builder,
new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"))));
} catch (IOException e) {
throw new IllegalStateException("Failed to serialise special event", e);
}
bulkRequestBuilder.add(indexRequest);
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client, ML_ORIGIN, BulkAction.INSTANCE, bulkRequestBuilder.request(),
new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
listener.onResponse(new Response(events));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(
ExceptionsHelper.serverError("Error indexing special event", e));
}
});
},
listener::onFailure);
checkCalendarExists(request.getCalendarId(), calendarExistsListener);
}
private void checkCalendarExists(String calendarId, ActionListener<Boolean> listener) {
jobProvider.calendar(calendarId, ActionListener.wrap(
c -> listener.onResponse(true),
listener::onFailure
));
}
}
}

View File

@ -10,7 +10,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -21,6 +20,8 @@ import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Operator;
import org.elasticsearch.xpack.ml.job.config.RuleAction;
import org.elasticsearch.xpack.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.Intervals;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
@ -29,32 +30,27 @@ import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
public class SpecialEvent implements ToXContentObject, Writeable {
public static final ParseField ID = new ParseField("id");
public static final ParseField DESCRIPTION = new ParseField("description");
public static final ParseField START_TIME = new ParseField("start_time");
public static final ParseField END_TIME = new ParseField("end_time");
public static final ParseField TYPE = new ParseField("type");
public static final ParseField JOB_IDS = new ParseField("job_ids");
public static final ParseField RESULTS_FIELD = new ParseField("special_events");
public static final String SPECIAL_EVENT_TYPE = "special_event";
public static final String DOCUMENT_ID_PREFIX = "event_";
public static final ConstructingObjectParser<SpecialEvent, Void> PARSER =
new ConstructingObjectParser<>("special_event", a -> new SpecialEvent((String) a[0], (String) a[1], (ZonedDateTime) a[2],
(ZonedDateTime) a[3], (List<String>) a[4]));
public static final ObjectParser<SpecialEvent.Builder, Void> PARSER =
new ObjectParser<>("special_event", Builder::new);
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), DESCRIPTION);
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
PARSER.declareString(SpecialEvent.Builder::description, DESCRIPTION);
PARSER.declareField(SpecialEvent.Builder::startTime, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(p.longValue()), ZoneOffset.UTC);
} else if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
@ -63,7 +59,7 @@ public class SpecialEvent implements ToXContentObject, Writeable {
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + START_TIME.getPreferredName() + "]");
}, START_TIME, ObjectParser.ValueType.VALUE);
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
PARSER.declareField(SpecialEvent.Builder::endTime, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(p.longValue()), ZoneOffset.UTC);
} else if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
@ -73,7 +69,7 @@ public class SpecialEvent implements ToXContentObject, Writeable {
"unexpected token [" + p.currentToken() + "] for [" + END_TIME.getPreferredName() + "]");
}, END_TIME, ObjectParser.ValueType.VALUE);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), JOB_IDS);
PARSER.declareString(SpecialEvent.Builder::calendarId, Calendar.ID);
PARSER.declareString((builder, s) -> {}, TYPE);
}
@ -81,30 +77,23 @@ public class SpecialEvent implements ToXContentObject, Writeable {
return DOCUMENT_ID_PREFIX + eventId;
}
private final String id;
private final String description;
private final ZonedDateTime startTime;
private final ZonedDateTime endTime;
private final Set<String> jobIds;
private final String calendarId;
public SpecialEvent(String id, String description, ZonedDateTime startTime, ZonedDateTime endTime, List<String> jobIds) {
this.id = Objects.requireNonNull(id);
SpecialEvent(String description, ZonedDateTime startTime, ZonedDateTime endTime, String calendarId) {
this.description = Objects.requireNonNull(description);
this.startTime = Objects.requireNonNull(startTime);
this.endTime = Objects.requireNonNull(endTime);
this.jobIds = new HashSet<>(jobIds);
this.calendarId = Objects.requireNonNull(calendarId);
}
public SpecialEvent(StreamInput in) throws IOException {
id = in.readString();
description = in.readString();
startTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(in.readVLong()), ZoneOffset.UTC);
endTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(in.readVLong()), ZoneOffset.UTC);
jobIds = new HashSet<>(Arrays.asList(in.readStringArray()));
}
public String getId() {
return id;
calendarId = in.readString();
}
public String getDescription() {
@ -119,12 +108,8 @@ public class SpecialEvent implements ToXContentObject, Writeable {
return endTime;
}
public Set<String> getJobIds() {
return jobIds;
}
public String documentId() {
return documentId(id);
public String getCalendarId() {
return calendarId;
}
/**
@ -157,21 +142,19 @@ public class SpecialEvent implements ToXContentObject, Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeString(description);
out.writeVLong(startTime.toInstant().toEpochMilli());
out.writeVLong(endTime.toInstant().toEpochMilli());
out.writeStringArray(jobIds.toArray(new String [0]));
out.writeString(calendarId);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ID.getPreferredName(), id);
builder.field(DESCRIPTION.getPreferredName(), description);
builder.dateField(START_TIME.getPreferredName(), START_TIME.getPreferredName() + "_string", startTime.toInstant().toEpochMilli());
builder.dateField(END_TIME.getPreferredName(), END_TIME.getPreferredName() + "_string", endTime.toInstant().toEpochMilli());
builder.field(JOB_IDS.getPreferredName(), jobIds);
builder.field(Calendar.ID.getPreferredName(), calendarId);
if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) {
builder.field(TYPE.getPreferredName(), SPECIAL_EVENT_TYPE);
}
@ -190,12 +173,83 @@ public class SpecialEvent implements ToXContentObject, Writeable {
}
SpecialEvent other = (SpecialEvent) obj;
return id.equals(other.id) && description.equals(other.description) && startTime.equals(other.startTime)
&& endTime.equals(other.endTime) && jobIds.equals(other.jobIds);
// In Java 8 the tests pass with ZonedDateTime.isEquals() or ZonedDateTime.toInstant.equals()
// but in Java 9 & 10 the same tests fail.
// Both isEquals() and toInstant.equals() work the same; convert to epoch seconds and
// compare seconds and nanos are equal. For some reason the nanos are different in Java 9 & 10.
// It's sufficient to compare just the epoch seconds for the purpose of establishing equality
// which only occurs in testing.
// Note ZonedDataTime.equals() fails because the time zone and date-time must be the same
// which isn't the case in tests where the time zone is randomised.
return description.equals(other.description)
&& Objects.equals(startTime.toInstant().getEpochSecond(), other.startTime.toInstant().getEpochSecond())
&& Objects.equals(endTime.toInstant().getEpochSecond(), other.endTime.toInstant().getEpochSecond())
&& calendarId.equals(other.calendarId);
}
@Override
public int hashCode() {
return Objects.hash(id, description, startTime, endTime, jobIds);
return Objects.hash(description, startTime, endTime, calendarId);
}
public static class Builder {
private String description;
private ZonedDateTime startTime;
private ZonedDateTime endTime;
private String calendarId;
public Builder description(String description) {
this.description = description;
return this;
}
public Builder startTime(ZonedDateTime startTime) {
this.startTime = startTime;
return this;
}
public Builder endTime(ZonedDateTime endTime) {
this.endTime = endTime;
return this;
}
public Builder calendarId(String calendarId) {
this.calendarId = calendarId;
return this;
}
public String getCalendarId() {
return calendarId;
}
public SpecialEvent build() {
if (description == null) {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.FIELD_CANNOT_BE_NULL, DESCRIPTION.getPreferredName()));
}
if (startTime == null) {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.FIELD_CANNOT_BE_NULL, START_TIME.getPreferredName()));
}
if (endTime == null) {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.FIELD_CANNOT_BE_NULL, END_TIME.getPreferredName()));
}
if (calendarId == null) {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.FIELD_CANNOT_BE_NULL, Calendar.ID.getPreferredName()));
}
if (startTime.isBefore(endTime) == false) {
throw ExceptionsHelper.badRequestException("Special event start time [" + startTime +
"] must come before end time [" + endTime + "]");
}
return new SpecialEvent(description, startTime, endTime, calendarId);
}
}
}

View File

@ -299,16 +299,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return createTime;
}
/**
* The Job creation time. This name is preferred when serialising to the
* data store.
*
* @return The date the job was created
*/
public Date getAtTimestamp() {
return createTime;
}
/**
* The time the job was finished or <code>null</code> if not finished.
*
@ -767,7 +757,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return this;
}
Builder setCreateTime(Date createTime) {
public Builder setCreateTime(Date createTime) {
this.createTime = createTime;
return this;
}

View File

@ -172,6 +172,8 @@ public final class Messages {
public static final String REST_NO_SUCH_MODEL_SNAPSHOT = "No model snapshot with id [{0}] exists for job [{1}]";
public static final String REST_START_AFTER_END = "Invalid time range: end time ''{0}'' is earlier than start time ''{1}''.";
public static final String FIELD_CANNOT_BE_NULL = "Field [{0}] cannot be null";
private Messages() {
}

View File

@ -360,11 +360,31 @@ public class JobProvider {
}
public void getAutodetectParams(Job job, Consumer<AutodetectParams> consumer, Consumer<Exception> errorHandler) {
AutodetectParams.Builder paramsBuilder = new AutodetectParams.Builder(job.getId());
String jobId = job.getId();
ActionListener<AutodetectParams.Builder> getSpecialEventsListener = ActionListener.wrap(
paramsBuilder -> {
SpecialEventsQueryBuilder specialEventsQuery = new SpecialEventsQueryBuilder();
Date lastestRecordTime = paramsBuilder.getDataCounts().getLatestRecordTimeStamp();
if (lastestRecordTime != null) {
specialEventsQuery.after(Long.toString(lastestRecordTime.getTime()));
}
specialEventsForJob(jobId, specialEventsQuery, ActionListener.wrap(
events -> {
paramsBuilder.setSpecialEvents(events.results());
consumer.accept(paramsBuilder.build());
},
errorHandler
));
},
errorHandler
);
AutodetectParams.Builder paramsBuilder = new AutodetectParams.Builder(job.getId());
String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
String stateIndex = AnomalyDetectorsIndex.jobStateIndexName();
MultiSearchRequestBuilder msearch = client.prepareMultiSearch()
.add(createLatestDataCountsSearch(resultsIndex, jobId))
.add(createLatestModelSizeStatsSearch(resultsIndex))
@ -377,7 +397,6 @@ public class JobProvider {
msearch.add(createDocIdSearch(MlMetaIndex.INDEX_NAME, MlFilter.documentId(filterId)));
}
msearch.add(createSpecialEventSearch(jobId));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, msearch.request(),
ActionListener.<MultiSearchResponse>wrap(
response -> {
@ -412,7 +431,7 @@ public class JobProvider {
}
}
consumer.accept(paramsBuilder.build());
getSpecialEventsListener.onResponse(paramsBuilder);
},
errorHandler
), client::multiSearch);
@ -425,17 +444,6 @@ public class JobProvider {
.setRouting(id);
}
private SearchRequestBuilder createSpecialEventSearch(String jobId) {
QueryBuilder qb = new BoolQueryBuilder()
.filter(new TermsQueryBuilder(SpecialEvent.TYPE.getPreferredName(), SpecialEvent.SPECIAL_EVENT_TYPE))
.filter(new TermsQueryBuilder(SpecialEvent.JOB_IDS.getPreferredName(), jobId));
return client.prepareSearch(MlMetaIndex.INDEX_NAME)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(qb);
}
private void parseAutodetectParamSearchHit(String jobId, AutodetectParams.Builder paramsBuilder, SearchHit hit,
Consumer<Exception> errorHandler) {
String hitId = hit.getId();
@ -451,10 +459,8 @@ public class JobProvider {
paramsBuilder.setQuantiles(parseSearchHit(hit, Quantiles.PARSER, errorHandler));
} else if (hitId.startsWith(MlFilter.DOCUMENT_ID_PREFIX)) {
paramsBuilder.addFilter(parseSearchHit(hit, MlFilter.PARSER, errorHandler).build());
} else if (hitId.startsWith(SpecialEvent.DOCUMENT_ID_PREFIX)) {
paramsBuilder.addSpecialEvent(parseSearchHit(hit, SpecialEvent.PARSER, errorHandler));
} else {
errorHandler.accept(new IllegalStateException("Unexpected type [" + hit.getType() + "]"));
errorHandler.accept(new IllegalStateException("Unexpected Id [" + hitId + "]"));
}
}
@ -940,6 +946,7 @@ public class JobProvider {
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(searchFromTimeMs))
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE)))
.addAggregation(AggregationBuilders.extendedStats("es").field(ModelSizeStats.MODEL_BYTES_FIELD.getPreferredName()));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, search.request(),
ActionListener.<SearchResponse>wrap(
response -> {
@ -994,20 +1001,45 @@ public class JobProvider {
});
}
public void specialEvents(String jobId, Consumer<List<SpecialEvent>> handler, Consumer<Exception> errorHandler) {
SearchRequestBuilder request = createSpecialEventSearch(jobId);
public void specialEventsForJob(String jobId, SpecialEventsQueryBuilder queryBuilder, ActionListener<QueryPage<SpecialEvent>> handler) {
// Find all the calendars used by the job then the events for those calendars
ActionListener<QueryPage<Calendar>> calendarsListener = ActionListener.wrap(
calendars -> {
if (calendars.results().isEmpty()) {
handler.onResponse(new QueryPage<>(Collections.emptyList(), 0, SpecialEvent.RESULTS_FIELD));
return;
}
List<String> calendarIds = calendars.results().stream().map(Calendar::getId).collect(Collectors.toList());
queryBuilder.calendarIds(calendarIds);
specialEvents(queryBuilder, handler);
},
handler::onFailure
);
CalendarQueryBuilder query = new CalendarQueryBuilder().jobId(jobId);
calendars(query, calendarsListener);
}
public void specialEvents(SpecialEventsQueryBuilder query, ActionListener<QueryPage<SpecialEvent>> handler) {
SearchRequestBuilder request = client.prepareSearch(MlMetaIndex.INDEX_NAME)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(query.build());
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request.request(),
ActionListener.<SearchResponse>wrap(
response -> {
List<SpecialEvent> specialEvents = new ArrayList<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
specialEvents.add(parseSearchHit(hit, SpecialEvent.PARSER, errorHandler));
specialEvents.add(parseSearchHit(hit, SpecialEvent.PARSER, handler::onFailure).build());
}
handler.accept(specialEvents);
handler.onResponse(new QueryPage<>(specialEvents, response.getHits().getTotalHits(),
SpecialEvent.RESULTS_FIELD));
},
errorHandler)
handler::onFailure)
, client::search);
}

View File

@ -0,0 +1,97 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import java.util.ArrayList;
import java.util.List;
/**
* Query builder for {@link SpecialEvent}s
* If <code>calendarIds</code> are not set then all calendars will match.
*/
public class SpecialEventsQueryBuilder {
public static final int DEFAULT_SIZE = 1000;
private int from = 0;
private int size = DEFAULT_SIZE;
private List<String> calendarIds;
private String after;
private String before;
public SpecialEventsQueryBuilder calendarIds(List<String> calendarIds) {
this.calendarIds = calendarIds;
return this;
}
public SpecialEventsQueryBuilder after(String after) {
this.after = after;
return this;
}
public SpecialEventsQueryBuilder before(String before) {
this.before = before;
return this;
}
public SpecialEventsQueryBuilder from(int from) {
this.from = from;
return this;
}
public SpecialEventsQueryBuilder size(int size) {
this.size = size;
return this;
}
public SearchSourceBuilder build() {
List<QueryBuilder> queries = new ArrayList<>();
if (after != null) {
RangeQueryBuilder afterQuery = QueryBuilders.rangeQuery(SpecialEvent.END_TIME.getPreferredName());
afterQuery.gt(after);
queries.add(afterQuery);
}
if (before != null) {
RangeQueryBuilder beforeQuery = QueryBuilders.rangeQuery(SpecialEvent.START_TIME.getPreferredName());
beforeQuery.lt(before);
queries.add(beforeQuery);
}
if (calendarIds != null && calendarIds.isEmpty() == false) {
queries.add(new TermsQueryBuilder(Calendar.ID.getPreferredName(), calendarIds));
}
QueryBuilder typeQuery = new TermsQueryBuilder(SpecialEvent.TYPE.getPreferredName(), SpecialEvent.SPECIAL_EVENT_TYPE);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.sort(SpecialEvent.START_TIME.getPreferredName());
searchSourceBuilder.from(from);
searchSourceBuilder.size(size);
if (queries.isEmpty()) {
searchSourceBuilder.query(typeQuery);
} else {
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.filter(typeQuery);
for (QueryBuilder query : queries) {
boolQueryBuilder.filter(query);
}
searchSourceBuilder.query(boolQueryBuilder);
}
return searchSourceBuilder;
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
@ -35,6 +36,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.SpecialEventsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
@ -261,24 +263,24 @@ public class AutodetectProcessManager extends AbstractComponent {
return;
}
Consumer<List<SpecialEvent>> eventConsumer = specialEvents -> {
communicator.writeUpdateProcessMessage(updateParams, specialEvents, (aVoid, e) -> {
ActionListener<QueryPage<SpecialEvent>> eventsListener = ActionListener.wrap(
specialEvents -> {
communicator.writeUpdateProcessMessage(updateParams, specialEvents.results(), (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
handler.accept(e);
}
});
};
},
handler::accept);
if (updateParams.isUpdateSpecialEvents()) {
jobProvider.specialEvents(jobTask.getJobId(), eventConsumer, handler::accept);
SpecialEventsQueryBuilder query = new SpecialEventsQueryBuilder().after(Long.toString(new Date().getTime()));
jobProvider.specialEventsForJob(jobTask.getJobId(), query, eventsListener);
} else {
eventConsumer.accept(Collections.emptyList());
eventsListener.onResponse(new QueryPage<SpecialEvent>(Collections.emptyList(), 0, SpecialEvent.RESULTS_FIELD));
}
}
public void openJob(JobTask jobTask, Consumer<Exception> handler) {

View File

@ -117,6 +117,10 @@ public class AutodetectParams {
return this;
}
public DataCounts getDataCounts() {
return dataCounts;
}
public Builder setModelSizeStats(ModelSizeStats modelSizeStats) {
this.modelSizeStats = modelSizeStats;
return this;

View File

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.calendar;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.GetCalendarEventsAction;
import org.elasticsearch.xpack.ml.action.util.PageParams;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.io.IOException;
public class RestGetCalendarEventsAction extends BaseRestHandler {
public RestGetCalendarEventsAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET,
MachineLearning.BASE_PATH + "calendars/{" + Calendar.ID.getPreferredName() + "}/events", this);
}
@Override
public String getName() {
return "xpack_ml_get_calendar_events_action";
}
@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String calendarId = restRequest.param(Calendar.ID.getPreferredName());
GetCalendarEventsAction.Request request;
if (restRequest.hasContentOrSourceParam()) {
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
request = GetCalendarEventsAction.Request.parseRequest(calendarId, parser);
}
} else {
request = new GetCalendarEventsAction.Request(calendarId);
request.setAfter(restRequest.param(GetCalendarEventsAction.Request.AFTER.getPreferredName(), null));
request.setBefore(restRequest.param(GetCalendarEventsAction.Request.BEFORE.getPreferredName(), null));
request.setJobId(restRequest.param(Job.ID.getPreferredName(), null));
if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) {
request.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE)));
}
}
return channel -> client.execute(GetCalendarEventsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.rest.calendar;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
@ -23,9 +24,14 @@ public class RestGetCalendarsAction extends BaseRestHandler {
public RestGetCalendarsAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, MachineLearning.BASE_PATH + "calendars/{" + Calendar.ID.getPreferredName() + "}",
this);
controller.registerHandler(RestRequest.Method.GET, MachineLearning.BASE_PATH + "calendars/{" +
Calendar.ID.getPreferredName() + "}", this);
controller.registerHandler(RestRequest.Method.GET, MachineLearning.BASE_PATH + "calendars/", this);
// endpoints that support body parameters must also accept POST
controller.registerHandler(RestRequest.Method.POST, MachineLearning.BASE_PATH + "calendars/{" +
Calendar.ID.getPreferredName() + "}", this);
controller.registerHandler(RestRequest.Method.POST, MachineLearning.BASE_PATH + "calendars/", this);
}
@Override
@ -35,17 +41,25 @@ public class RestGetCalendarsAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
GetCalendarsAction.Request getRequest = new GetCalendarsAction.Request();
String calendarId = restRequest.param(Calendar.ID.getPreferredName());
if (!Strings.isNullOrEmpty(calendarId)) {
getRequest.setCalendarId(calendarId);
}
String calendarId = restRequest.param(Calendar.ID.getPreferredName());
GetCalendarsAction.Request request;
if (restRequest.hasContentOrSourceParam()) {
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
request = GetCalendarsAction.Request.parseRequest(calendarId, parser);
}
} else {
request = new GetCalendarsAction.Request();
if (!Strings.isNullOrEmpty(calendarId)) {
request.setCalendarId(calendarId);
}
if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) {
getRequest.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
request.setPageParams(new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE)));
}
}
return channel -> client.execute(GetCalendarsAction.INSTANCE, getRequest, new RestStatusToXContentListener<>(channel));
return channel -> client.execute(GetCalendarsAction.INSTANCE, request, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.calendar;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.PostCalendarEventsAction;
import org.elasticsearch.xpack.ml.calendars.Calendar;
import java.io.IOException;
public class RestPostCalendarEventAction extends BaseRestHandler {
public RestPostCalendarEventAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.POST,
MachineLearning.BASE_PATH + "calendars/{" + Calendar.ID.getPreferredName() + "}/events", this);
}
@Override
public String getName() {
return "xpack_ml_post_calendar_event_action";
}
@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String calendarId = restRequest.param(Calendar.ID.getPreferredName());
PostCalendarEventsAction.Request request =
PostCalendarEventsAction.Request.parseRequest(calendarId, restRequest.requiredContent(), restRequest.getXContentType());
return channel -> client.execute(PostCalendarEventsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -10,12 +10,20 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.SettingsException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.monitoring.exporter.Exporter.CLUSTER_ALERTS_BLACKLIST_SETTING;
/**
* {@code ClusterAlertsUtil} provides static methods to easily load the JSON resources that
@ -125,4 +133,30 @@ public class ClusterAlertsUtil {
}
}
/**
* Get any blacklisted cluster alerts by their ID.
*
* @param config The {@link Exporter}'s configuration, which is used for the {@link SettingsException}.
* @return Never {@code null}. Can be empty.
* @throws SettingsException if an unknown cluster alert ID exists in the blacklist.
*/
public static List<String> getClusterAlertsBlacklist(final Exporter.Config config) {
final List<String> blacklist = config.settings().getAsList(CLUSTER_ALERTS_BLACKLIST_SETTING, Collections.emptyList());
// validate the blacklist only contains recognized IDs
if (blacklist.isEmpty() == false) {
final List<String> watchIds = Arrays.asList(ClusterAlertsUtil.WATCH_IDS);
final Set<String> unknownIds = blacklist.stream().filter(id -> watchIds.contains(id) == false).collect(Collectors.toSet());
if (unknownIds.isEmpty() == false) {
throw new SettingsException(
"[" + Exporter.settingFQN(config, CLUSTER_ALERTS_BLACKLIST_SETTING) + "] contains unrecognized Cluster Alert IDs [" +
String.join(", ", unknownIds) + "]"
);
}
}
return blacklist;
}
}

View File

@ -26,6 +26,12 @@ public abstract class Exporter implements AutoCloseable {
* Every {@code Exporter} allows users to explicitly disable cluster alerts.
*/
public static final String CLUSTER_ALERTS_MANAGEMENT_SETTING = "cluster_alerts.management.enabled";
/**
* Every {@code Exporter} allows users to explicitly disable specific cluster alerts.
* <p>
* When cluster alerts management is enabled, this should delete anything blacklisted here in addition to not creating it.
*/
public static final String CLUSTER_ALERTS_BLACKLIST_SETTING = "cluster_alerts.management.blacklist";
/**
* Every {@code Exporter} allows users to use a different index time format.
*/
@ -75,7 +81,7 @@ public abstract class Exporter implements AutoCloseable {
return Exporters.EXPORTERS_SETTINGS.getKey() + config.name;
}
protected static String settingFQN(final Config config, final String setting) {
public static String settingFQN(final Config config, final String setting) {
return Exporters.EXPORTERS_SETTINGS.getKey() + config.name + "." + setting;
}

View File

@ -15,6 +15,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -49,8 +50,10 @@ public class ClusterAlertHttpResource extends PublishableHttpResource {
*/
private final Supplier<String> watchId;
/**
* Provides a fully formed Watch (e.g., no variables that need replaced).
* Provides a fully formed Watch (e.g., no variables that need replaced). If {@code null}, then we are always going to delete this
* Cluster Alert.
*/
@Nullable
private final Supplier<String> watch;
/**
@ -58,17 +61,18 @@ public class ClusterAlertHttpResource extends PublishableHttpResource {
*
* @param resourceOwnerName The user-recognizable name.
* @param watchId The name of the watch, which is lazily loaded.
* @param watch The watch provider.
* @param watch The watch provider. {@code null} indicates that we should always delete this Watch.
*/
public ClusterAlertHttpResource(final String resourceOwnerName,
final XPackLicenseState licenseState,
final Supplier<String> watchId, final Supplier<String> watch) {
final Supplier<String> watchId,
@Nullable final Supplier<String> watch) {
// Watcher does not support master_timeout
super(resourceOwnerName, null, CLUSTER_ALERT_VERSION_PARAMETERS);
this.licenseState = Objects.requireNonNull(licenseState);
this.watchId = Objects.requireNonNull(watchId);
this.watch = Objects.requireNonNull(watch);
this.watch = watch;
}
/**
@ -77,7 +81,7 @@ public class ClusterAlertHttpResource extends PublishableHttpResource {
@Override
protected CheckResponse doCheck(final RestClient client) {
// if we should be adding, then we need to check for existence
if (licenseState.isMonitoringClusterAlertsAllowed()) {
if (isWatchDefined() && licenseState.isMonitoringClusterAlertsAllowed()) {
final CheckedFunction<Response, Boolean, IOException> watchChecker =
(response) -> shouldReplaceClusterAlert(response, XContentType.JSON.xContent(), LAST_UPDATED_VERSION);
@ -105,6 +109,15 @@ public class ClusterAlertHttpResource extends PublishableHttpResource {
resourceOwnerName, "monitoring cluster");
}
/**
* Determine if the {@link #watch} is defined. If not, then we should always delete the watch.
*
* @return {@code true} if {@link #watch} is defined (non-{@code null}). Otherwise {@code false}.
*/
boolean isWatchDefined() {
return watch != null;
}
/**
* Create a {@link HttpEntity} for the {@link #watch}.
*

View File

@ -168,7 +168,7 @@ public class HttpExporter extends Exporter {
* @throws SettingsException if any setting is malformed
*/
public HttpExporter(final Config config, final SSLService sslService, final ThreadContext threadContext) {
this(config, sslService, threadContext, new NodeFailureListener());
this(config, sslService, threadContext, new NodeFailureListener(), createResources(config));
}
/**
@ -179,8 +179,9 @@ public class HttpExporter extends Exporter {
* @param listener The node failure listener used to notify an optional sniffer and resources
* @throws SettingsException if any setting is malformed
*/
HttpExporter(final Config config, final SSLService sslService, final ThreadContext threadContext, final NodeFailureListener listener) {
this(config, createRestClient(config, sslService, listener), threadContext, listener);
HttpExporter(final Config config, final SSLService sslService, final ThreadContext threadContext, final NodeFailureListener listener,
final HttpResource resource) {
this(config, createRestClient(config, sslService, listener), threadContext, listener, resource);
}
/**
@ -191,21 +192,9 @@ public class HttpExporter extends Exporter {
* @param listener The node failure listener used to notify an optional sniffer and resources
* @throws SettingsException if any setting is malformed
*/
HttpExporter(final Config config, final RestClient client, final ThreadContext threadContext, final NodeFailureListener listener) {
this(config, client, createSniffer(config, client, listener), threadContext, listener);
}
/**
* Create an {@link HttpExporter}.
*
* @param config The HTTP Exporter's configuration
* @param client The REST Client used to make all requests to the remote Elasticsearch cluster
* @param listener The node failure listener used to notify an optional sniffer and resources
* @throws SettingsException if any setting is malformed
*/
HttpExporter(final Config config, final RestClient client, @Nullable final Sniffer sniffer, final ThreadContext threadContext,
final NodeFailureListener listener) {
this(config, client, sniffer, threadContext, listener, createResources(config));
HttpExporter(final Config config, final RestClient client, final ThreadContext threadContext, final NodeFailureListener listener,
final HttpResource resource) {
this(config, client, createSniffer(config, client, listener), threadContext, listener, resource);
}
/**
@ -583,12 +572,14 @@ public class HttpExporter extends Exporter {
if (settings.getAsBoolean(CLUSTER_ALERTS_MANAGEMENT_SETTING, true)) {
final ClusterService clusterService = config.clusterService();
final List<HttpResource> watchResources = new ArrayList<>();
final List<String> blacklist = ClusterAlertsUtil.getClusterAlertsBlacklist(config);
// add a resource per watch
for (final String watchId : ClusterAlertsUtil.WATCH_IDS) {
final boolean blacklisted = blacklist.contains(watchId);
// lazily load the cluster state to fetch the cluster UUID once it's loaded
final Supplier<String> uniqueWatchId = () -> ClusterAlertsUtil.createUniqueWatchId(clusterService, watchId);
final Supplier<String> watch = () -> ClusterAlertsUtil.loadWatch(clusterService, watchId);
final Supplier<String> watch = blacklisted ? null : () -> ClusterAlertsUtil.loadWatch(clusterService, watchId);
watchResources.add(new ClusterAlertHttpResource(resourceOwnerName, config.licenseState(), uniqueWatchId, watch));
}

View File

@ -93,6 +93,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
private final CleanerService cleanerService;
private final boolean useIngest;
private final DateTimeFormatter dateTimeFormatter;
private final List<String> clusterAlertBlacklist;
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
private final AtomicBoolean installingSomething = new AtomicBoolean(false);
@ -105,6 +106,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
this.clusterService = config.clusterService();
this.licenseState = config.licenseState();
this.useIngest = config.settings().getAsBoolean(USE_INGEST_PIPELINE_SETTING, true);
this.clusterAlertBlacklist = ClusterAlertsUtil.getClusterAlertsBlacklist(config);
this.cleanerService = cleanerService;
this.dateTimeFormatter = dateTimeFormatter(config);
clusterService.addListener(this);
@ -436,10 +438,11 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
for (final String watchId : ClusterAlertsUtil.WATCH_IDS) {
final String uniqueWatchId = ClusterAlertsUtil.createUniqueWatchId(clusterService, watchId);
final boolean addWatch = canAddWatches && clusterAlertBlacklist.contains(watchId) == false;
// we aren't sure if no watches exist yet, so add them
if (indexExists) {
if (canAddWatches) {
if (addWatch) {
logger.trace("checking monitoring watch [{}]", uniqueWatchId);
asyncActions.add(() -> watcher.getWatch(new GetWatchRequest(uniqueWatchId),
@ -451,7 +454,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
asyncActions.add(() -> watcher.deleteWatch(new DeleteWatchRequest(uniqueWatchId),
new ResponseActionListener<>("watch", uniqueWatchId, pendingResponses)));
}
} else if (canAddWatches) {
} else if (addWatch) {
asyncActions.add(() -> putWatch(watcher, watchId, uniqueWatchId, pendingResponses));
}
}

View File

@ -77,6 +77,10 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
data = mutableMap(data);
}
IndexRequest indexRequest = new IndexRequest();
if (action.refreshPolicy != null) {
indexRequest.setRefreshPolicy(action.refreshPolicy);
}
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", data, INDEX_FIELD, action.index));
indexRequest.type(getField(actionId, ctx.id().watchId(), "type",data, TYPE_FIELD, action.docType));
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",data, ID_FIELD, action.docId));
@ -88,7 +92,7 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
if (ctx.simulateAction(actionId)) {
return new IndexAction.Simulated(indexRequest.index(), indexRequest.type(), indexRequest.id(),
return new IndexAction.Simulated(indexRequest.index(), indexRequest.type(), indexRequest.id(), action.refreshPolicy,
new XContentSource(indexRequest.source(), XContentType.JSON));
}
@ -107,6 +111,10 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
BulkRequest bulkRequest = new BulkRequest();
if (action.refreshPolicy != null) {
bulkRequest.setRefreshPolicy(action.refreshPolicy);
}
for (Object item : list) {
if (!(item instanceof Map)) {
throw illegalState("could not execute action [{}] of watch [{}]. failed to index payload data. " +

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.watcher.actions.index;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
@ -31,16 +33,18 @@ public class IndexAction implements Action {
@Nullable final String executionTimeField;
@Nullable final TimeValue timeout;
@Nullable final DateTimeZone dynamicNameTimeZone;
@Nullable final RefreshPolicy refreshPolicy;
public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone) {
@Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
this.index = index;
this.docType = docType;
this.docId = docId;
this.executionTimeField = executionTimeField;
this.timeout = timeout;
this.dynamicNameTimeZone = dynamicNameTimeZone;
this.refreshPolicy = refreshPolicy;
}
@Override
@ -68,6 +72,10 @@ public class IndexAction implements Action {
return dynamicNameTimeZone;
}
public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -78,12 +86,13 @@ public class IndexAction implements Action {
return Objects.equals(index, that.index) && Objects.equals(docType, that.docType) && Objects.equals(docId, that.docId)
&& Objects.equals(executionTimeField, that.executionTimeField)
&& Objects.equals(timeout, that.timeout)
&& Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone);
&& Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone)
&& Objects.equals(refreshPolicy, that.refreshPolicy);
}
@Override
public int hashCode() {
return Objects.hash(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone);
return Objects.hash(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
@Override
@ -107,6 +116,9 @@ public class IndexAction implements Action {
if (dynamicNameTimeZone != null) {
builder.field(Field.DYNAMIC_NAME_TIMEZONE.getPreferredName(), dynamicNameTimeZone);
}
if (refreshPolicy!= null) {
builder.field(Field.REFRESH.getPreferredName(), refreshPolicy.getValue());
}
return builder.endObject();
}
@ -117,6 +129,7 @@ public class IndexAction implements Action {
String executionTimeField = null;
TimeValue timeout = null;
DateTimeZone dynamicNameTimeZone = null;
RefreshPolicy refreshPolicy = null;
String currentFieldName = null;
XContentParser.Token token;
@ -148,7 +161,14 @@ public class IndexAction implements Action {
// Parser for human specified timeouts and 2.x compatibility
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT_HUMAN.toString());
} else if (Field.DYNAMIC_NAME_TIMEZONE.match(currentFieldName)) {
if (token == XContentParser.Token.VALUE_STRING) {
dynamicNameTimeZone = DateTimeZone.forID(parser.text());
} else {
throw new ElasticsearchParseException("could not parse [{}] action for watch [{}]. failed to parse [{}]. must be " +
"a string value (e.g. 'UTC' or '+01:00').", TYPE, watchId, currentFieldName);
}
} else if (Field.REFRESH.match(currentFieldName)) {
refreshPolicy = RefreshPolicy.parse(parser.text());
} else {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. unexpected string field [{}]", TYPE,
watchId, actionId, currentFieldName);
@ -159,7 +179,7 @@ public class IndexAction implements Action {
}
}
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone);
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
public static Builder builder(String index, String docType) {
@ -191,16 +211,18 @@ public class IndexAction implements Action {
private final String index;
private final String docType;
@Nullable
private final String docId;
@Nullable private final String docId;
@Nullable private final RefreshPolicy refreshPolicy;
private final XContentSource source;
protected Simulated(String index, String docType, @Nullable String docId, XContentSource source) {
protected Simulated(String index, String docType, @Nullable String docId, @Nullable RefreshPolicy refreshPolicy,
XContentSource source) {
super(TYPE, Status.SIMULATED);
this.index = index;
this.docType = docType;
this.docId = docId;
this.source = source;
this.refreshPolicy = refreshPolicy;
}
public String index() {
@ -230,6 +252,10 @@ public class IndexAction implements Action {
builder.field(Field.DOC_ID.getPreferredName(), docId);
}
if (refreshPolicy != null) {
builder.field(Field.REFRESH.getPreferredName(), refreshPolicy.getValue());
}
return builder.field(Field.SOURCE.getPreferredName(), source, params)
.endObject()
.endObject();
@ -244,6 +270,7 @@ public class IndexAction implements Action {
String executionTimeField;
TimeValue timeout;
DateTimeZone dynamicNameTimeZone;
RefreshPolicy refreshPolicy;
private Builder(String index, String docType) {
this.index = index;
@ -270,9 +297,14 @@ public class IndexAction implements Action {
return this;
}
public Builder setRefreshPolicy(RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
return this;
}
@Override
public IndexAction build() {
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone);
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
}
@ -287,5 +319,6 @@ public class IndexAction implements Action {
ParseField TIMEOUT = new ParseField("timeout_in_millis");
ParseField TIMEOUT_HUMAN = new ParseField("timeout");
ParseField DYNAMIC_NAME_TIMEZONE = new ParseField("dynamic_name_timezone");
ParseField REFRESH = new ParseField("refresh");
}
}

View File

@ -3,7 +3,7 @@
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1,
"auto_expand_replicas": "0-1",
"codec": "best_compression"
}
},

View File

@ -3,6 +3,7 @@
"order": 2147483647,
"settings": {
"index.number_of_shards": 1,
"index.auto_expand_replicas": "0-1",
"index.refresh_interval" : "-1",
"index.format": 6,
"index.priority": 900

View File

@ -5,10 +5,19 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.ml.action.ForecastJobAction.Request;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.util.Collections;
import java.util.Date;
import static org.hamcrest.Matchers.equalTo;
@ -61,4 +70,51 @@ public class ForecastJobActionRequestTests extends AbstractStreamableXContentTes
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new Request().setExpiresIn("-1s"));
assertThat(e.getMessage(), equalTo("[expires_in] must be non-negative: [-1]"));
}
public void testValidate_jobVersionCannonBeBefore61() {
Job.Builder jobBuilder = createTestJob("forecast-it-test-job-version");
jobBuilder.setJobVersion(Version.V_6_0_1);
ForecastJobAction.Request request = new ForecastJobAction.Request();
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> ForecastJobAction.TransportAction.validate(jobBuilder.build(), request));
assertEquals(
"Cannot run forecast because jobs created prior to version 6.1 are not supported",
e.getMessage());
}
public void testValidate_jobVersionCannonBeBefore61NoJobVersion() {
Job.Builder jobBuilder = createTestJob("forecast-it-test-job-version");
ForecastJobAction.Request request = new ForecastJobAction.Request();
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> ForecastJobAction.TransportAction.validate(jobBuilder.build(), request));
assertEquals(
"Cannot run forecast because jobs created prior to version 6.1 are not supported",
e.getMessage());
}
public void testValidate_DurationCannotBeLessThanBucketSpan() {
Job.Builder jobBuilder = createTestJob("forecast-it-test-job-version");
ForecastJobAction.Request request = new ForecastJobAction.Request();
request.setDuration(TimeValue.timeValueMinutes(1));
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> ForecastJobAction.TransportAction.validate(jobBuilder.build(new Date()), request));
assertEquals("[duration] must be greater or equal to the bucket span: [1m/1h]", e.getMessage());
}
private Job.Builder createTestJob(String jobId) {
Job.Builder jobBuilder = new Job.Builder(jobId);
jobBuilder.setCreateTime(new Date());
Detector.Builder detector = new Detector.Builder("mean", "value");
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
TimeValue bucketSpan = TimeValue.timeValueHours(1);
analysisConfig.setBucketSpan(bucketSpan);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat("epoch");
jobBuilder.setAnalysisConfig(analysisConfig);
jobBuilder.setDataDescription(dataDescription);
return jobBuilder;
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.ml.action.util.PageParams;
public class GetCalendarEventsActionRequestTests extends AbstractStreamableXContentTestCase<GetCalendarEventsAction.Request> {
@Override
protected GetCalendarEventsAction.Request createTestInstance() {
String id = randomAlphaOfLengthBetween(1, 20);
GetCalendarEventsAction.Request request = new GetCalendarEventsAction.Request(id);
if (randomBoolean()) {
request.setAfter(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
request.setBefore(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
request.setJobId(randomAlphaOfLength(8));
}
if (randomBoolean()) {
request.setPageParams(new PageParams(randomIntBetween(0, 10), randomIntBetween(1, 10)));
}
return request;
}
@Override
protected GetCalendarEventsAction.Request createBlankInstance() {
return new GetCalendarEventsAction.Request();
}
@Override
protected GetCalendarEventsAction.Request doParseInstance(XContentParser parser) {
return GetCalendarEventsAction.Request.parseRequest(null, parser);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
public void testValidate() {
GetCalendarEventsAction.Request request = new GetCalendarEventsAction.Request("cal-name");
request.setJobId("foo");
ActionRequestValidationException validationException = request.validate();
assertNotNull(validationException);
assertEquals("Validation Failed: 1: If job_id is used calendar_id must be '_all';", validationException.getMessage());
request = new GetCalendarEventsAction.Request("_all");
request.setJobId("foo");
assertNull(request.validate());
}
}

View File

@ -5,15 +5,21 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
public class GetCalendarsActionRequestTests extends AbstractStreamableTestCase<GetCalendarsAction.Request> {
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.ml.action.util.PageParams;
public class GetCalendarsActionRequestTests extends AbstractStreamableXContentTestCase<GetCalendarsAction.Request> {
@Override
protected GetCalendarsAction.Request createTestInstance() {
GetCalendarsAction.Request request = new GetCalendarsAction.Request();
if (randomBoolean()) {
request.setCalendarId(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
request.setPageParams(PageParams.defaultParams());
}
return request;
}
@ -22,4 +28,13 @@ public class GetCalendarsActionRequestTests extends AbstractStreamableTestCase<G
return new GetCalendarsAction.Request();
}
@Override
protected GetCalendarsAction.Request doParseInstance(XContentParser parser) {
return GetCalendarsAction.Request.parseRequest(null, parser);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.calendars.SpecialEventTests;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class PostCalendarEventActionRequestTests extends AbstractStreamableTestCase<PostCalendarEventsAction.Request> {
@Override
protected PostCalendarEventsAction.Request createTestInstance() {
String id = randomAlphaOfLengthBetween(1, 20);
return createTestInstance(id);
}
private PostCalendarEventsAction.Request createTestInstance(String calendarId) {
int numEvents = randomIntBetween(1, 10);
List<SpecialEvent> events = new ArrayList<>();
for (int i=0; i<numEvents; i++) {
events.add(SpecialEventTests.createSpecialEvent(calendarId));
}
PostCalendarEventsAction.Request request = new PostCalendarEventsAction.Request(calendarId, events);
return request;
}
@Override
protected PostCalendarEventsAction.Request createBlankInstance() {
return new PostCalendarEventsAction.Request();
}
public void testParseRequest() throws IOException {
PostCalendarEventsAction.Request sourceRequest = createTestInstance();
StringBuilder requestString = new StringBuilder();
for (SpecialEvent event: sourceRequest.getSpecialEvents()) {
requestString.append(Strings.toString(event)).append("\r\n");
}
BytesArray data = new BytesArray(requestString.toString().getBytes(StandardCharsets.UTF_8), 0, requestString.length());
PostCalendarEventsAction.Request parsedRequest = PostCalendarEventsAction.Request.parseRequest(
sourceRequest.getCalendarId(), data, XContentType.JSON);
assertEquals(sourceRequest, parsedRequest);
}
public void testParseRequest_throwsIfCalendarIdsAreDifferent() throws IOException {
PostCalendarEventsAction.Request sourceRequest = createTestInstance("foo");
PostCalendarEventsAction.Request request = new PostCalendarEventsAction.Request("bar", sourceRequest.getSpecialEvents());
StringBuilder requestString = new StringBuilder();
for (SpecialEvent event: sourceRequest.getSpecialEvents()) {
requestString.append(Strings.toString(event)).append("\r\n");
}
BytesArray data = new BytesArray(requestString.toString().getBytes(StandardCharsets.UTF_8), 0, requestString.length());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> PostCalendarEventsAction.Request.parseRequest(request.getCalendarId(), data, XContentType.JSON));
assertEquals("Inconsistent calendar_id; 'foo' specified in the body differs from 'bar' specified as a URL argument",
e.getMessage());
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.calendars;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
@ -21,28 +22,22 @@ import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import static org.hamcrest.Matchers.containsString;
public class SpecialEventTests extends AbstractSerializingTestCase<SpecialEvent> {
public static SpecialEvent createSpecialEvent() {
int size = randomInt(10);
List<String> jobIds = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
jobIds.add(randomAlphaOfLengthBetween(1, 20));
}
return new SpecialEvent(randomAlphaOfLength(10), randomAlphaOfLength(10),
ZonedDateTime.ofInstant(Instant.ofEpochMilli(new DateTime(randomDateTimeZone()).getMillis()), ZoneOffset.UTC),
ZonedDateTime.ofInstant(Instant.ofEpochMilli(new DateTime(randomDateTimeZone()).getMillis()), ZoneOffset.UTC),
jobIds);
public static SpecialEvent createSpecialEvent(String calendarId) {
ZonedDateTime start = ZonedDateTime.ofInstant(Instant.ofEpochMilli(new DateTime(randomDateTimeZone()).getMillis()), ZoneOffset.UTC);
return new SpecialEvent(randomAlphaOfLength(10), start, start.plusSeconds(randomIntBetween(1, 10000)),
calendarId);
}
@Override
protected SpecialEvent createTestInstance() {
return createSpecialEvent();
return createSpecialEvent(randomAlphaOfLengthBetween(1, 20));
}
@Override
@ -52,7 +47,7 @@ public class SpecialEventTests extends AbstractSerializingTestCase<SpecialEvent>
@Override
protected SpecialEvent doParseInstance(XContentParser parser) throws IOException {
return SpecialEvent.PARSER.apply(parser, null);
return SpecialEvent.PARSER.apply(parser, null).build();
}
public void testToDetectionRule() {
@ -80,6 +75,36 @@ public class SpecialEventTests extends AbstractSerializingTestCase<SpecialEvent>
long conditionEndTime = Long.parseLong(conditions.get(1).getCondition().getValue());
assertEquals(0, conditionEndTime % bucketSpanSecs);
assertEquals(bucketSpanSecs * (bucketCount + 1), conditionEndTime);
long eventTime = event.getEndTime().toEpochSecond() - conditionStartTime;
long numbBucketsInEvent = (eventTime + bucketSpanSecs -1) / bucketSpanSecs;
assertEquals(bucketSpanSecs * (bucketCount + numbBucketsInEvent), conditionEndTime);
}
public void testBuild() {
SpecialEvent.Builder builder = new SpecialEvent.Builder();
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, builder::build);
assertEquals("Field [description] cannot be null", e.getMessage());
builder.description("foo");
e = expectThrows(ElasticsearchStatusException.class, builder::build);
assertEquals("Field [start_time] cannot be null", e.getMessage());
ZonedDateTime now = ZonedDateTime.now();
builder.startTime(now);
e = expectThrows(ElasticsearchStatusException.class, builder::build);
assertEquals("Field [end_time] cannot be null", e.getMessage());
builder.endTime(now.plusHours(1));
e = expectThrows(ElasticsearchStatusException.class, builder::build);
assertEquals("Field [calendar_id] cannot be null", e.getMessage());
builder.calendarId("foo");
builder.build();
builder = new SpecialEvent.Builder().description("f").calendarId("c");
builder.startTime(now);
builder.endTime(now.minusHours(2));
e = expectThrows(ElasticsearchStatusException.class, builder::build);
assertThat(e.getMessage(), containsString("must come before end time"));
}
}

View File

@ -153,7 +153,7 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
ElasticsearchException e = expectThrows(ElasticsearchException.class,() -> forecast(job.getId(),
TimeValue.timeValueMinutes(10), null));
assertThat(e.getMessage(),
equalTo("java.lang.IllegalArgumentException: [duration] must be greater or equal to the bucket span: [10m/1h]"));
equalTo("[duration] must be greater or equal to the bucket span: [10m/1h]"));
}
private static Map<String, Object> createRecord(long timestamp, double value) {

View File

@ -42,6 +42,7 @@ import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.SpecialEventsQueryBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCountsTests;
@ -63,12 +64,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.not;
@ -170,10 +169,13 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
indexCalendars(calendars);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
final AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
jobProvider.removeJobFromCalendars("bar", ActionListener.wrap(
r -> latch.countDown(),
exceptionHolder::set));
e -> {
exceptionHolder.set(e);
latch.countDown();
}));
latch.await();
if (exceptionHolder.get() != null) {
@ -190,10 +192,12 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
assertThat(catFoo.getJobIds(), contains("cat", "foo"));
CountDownLatch latch2 = new CountDownLatch(1);
exceptionHolder = new AtomicReference<>();
jobProvider.removeJobFromCalendars("cat", ActionListener.wrap(
r -> latch2.countDown(),
exceptionHolder::set));
e -> {
exceptionHolder.set(e);
latch2.countDown();
}));
latch2.await();
if (exceptionHolder.get() != null) {
@ -220,10 +224,13 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
}
jobProvider.calendars(query, ActionListener.wrap(
r -> {
latch.countDown();
result.set(r);
latch.countDown();
},
exceptionHolder::set));
e -> {
exceptionHolder.set(e);
latch.countDown();
}));
latch.await();
if (exceptionHolder.get() != null) {
@ -238,7 +245,10 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
jobProvider.updateCalendar(calendarId, idsToAdd, idsToRemove,
r -> latch.countDown(),
exceptionHolder::set);
e -> {
exceptionHolder.set(e);
latch.countDown();
});
latch.await();
if (exceptionHolder.get() != null) {
@ -254,8 +264,14 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<Calendar> calendarHolder = new AtomicReference<>();
jobProvider.calendar(calendarId, ActionListener.wrap(
c -> { latch.countDown(); calendarHolder.set(c); },
exceptionHolder::set)
c -> {
calendarHolder.set(c);
latch.countDown();
},
e -> {
exceptionHolder.set(e);
latch.countDown();
})
);
latch.await();
@ -267,40 +283,73 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
}
public void testSpecialEvents() throws Exception {
Job.Builder jobA = createJob("job_a");
Job.Builder jobB = createJob("job_b");
Job.Builder jobC = createJob("job_c");
String calendarAId = "maintenance_a";
List<Calendar> calendars = new ArrayList<>();
calendars.add(new Calendar(calendarAId, Collections.singletonList("job_a")));
ZonedDateTime now = ZonedDateTime.now();
List<SpecialEvent> events = new ArrayList<>();
events.add(new SpecialEvent("A_and_B_downtime", "downtime", createZonedDateTime(1000L), createZonedDateTime(2000L),
Arrays.asList("job_a", "job_b")));
events.add(new SpecialEvent("A_downtime", "downtime", createZonedDateTime(5000L), createZonedDateTime(10000L),
Collections.singletonList("job_a")));
events.add(buildSpecialEvent("downtime", now.plusDays(1), now.plusDays(2), calendarAId));
events.add(buildSpecialEvent("downtime_AA", now.plusDays(8), now.plusDays(9), calendarAId));
events.add(buildSpecialEvent("downtime_AAA", now.plusDays(15), now.plusDays(16), calendarAId));
String calendarABId = "maintenance_a_and_b";
calendars.add(new Calendar(calendarABId, Arrays.asList("job_a", "job_b")));
events.add(buildSpecialEvent("downtime_AB", now.plusDays(12), now.plusDays(13), calendarABId));
indexCalendars(calendars);
indexSpecialEvents(events);
Job.Builder job = createJob("job_b");
List<SpecialEvent> returnedEvents = getSpecialEvents(job.getId());
assertEquals(1, returnedEvents.size());
assertEquals(events.get(0), returnedEvents.get(0));
job = createJob("job_a");
returnedEvents = getSpecialEvents(job.getId());
assertEquals(2, returnedEvents.size());
SpecialEventsQueryBuilder query = new SpecialEventsQueryBuilder();
List<SpecialEvent> returnedEvents = getSpecialEventsForJob(jobA.getId(), query);
assertEquals(4, returnedEvents.size());
assertEquals(events.get(0), returnedEvents.get(0));
assertEquals(events.get(1), returnedEvents.get(1));
assertEquals(events.get(3), returnedEvents.get(2));
assertEquals(events.get(2), returnedEvents.get(3));
job = createJob("job_c");
returnedEvents = getSpecialEvents(job.getId());
returnedEvents = getSpecialEventsForJob(jobB.getId(), query);
assertEquals(1, returnedEvents.size());
assertEquals(events.get(3), returnedEvents.get(0));
returnedEvents = getSpecialEventsForJob(jobC.getId(), query);
assertEquals(0, returnedEvents.size());
// Test time filters
// Lands halfway through the second event which should be returned
query.after(Long.toString(now.plusDays(8).plusHours(1).toInstant().toEpochMilli()));
// Lands halfway through the 3rd event which should be returned
query.before(Long.toString(now.plusDays(12).plusHours(1).toInstant().toEpochMilli()));
returnedEvents = getSpecialEventsForJob(jobA.getId(), query);
assertEquals(2, returnedEvents.size());
assertEquals(events.get(1), returnedEvents.get(0));
assertEquals(events.get(3), returnedEvents.get(1));
}
private SpecialEvent buildSpecialEvent(String description, ZonedDateTime start, ZonedDateTime end, String calendarId) {
return new SpecialEvent.Builder().description(description).startTime(start).endTime(end).calendarId(calendarId).build();
}
public void testGetAutodetectParams() throws Exception {
String jobId = "test_get_autodetect_params";
Job.Builder job = createJob(jobId, Arrays.asList("fruit", "tea"));
String calendarId = "downtime";
Calendar calendar = new Calendar(calendarId, Collections.singletonList(jobId));
indexCalendars(Collections.singletonList(calendar));
// index the param docs
ZonedDateTime now = ZonedDateTime.now();
List<SpecialEvent> events = new ArrayList<>();
events.add(new SpecialEvent("A_downtime", "downtime", createZonedDateTime(5000L), createZonedDateTime(10000L),
Collections.singletonList(jobId)));
events.add(new SpecialEvent("A_downtime2", "downtime", createZonedDateTime(20000L), createZonedDateTime(21000L),
Collections.singletonList(jobId)));
// events in the past should be filtered out
events.add(buildSpecialEvent("In the past", now.minusDays(7), now.minusDays(6), calendarId));
events.add(buildSpecialEvent("A_downtime", now.plusDays(1), now.plusDays(2), calendarId));
events.add(buildSpecialEvent("A_downtime2", now.plusDays(8), now.plusDays(9), calendarId));
indexSpecialEvents(events);
List<MlFilter> filters = new ArrayList<>();
@ -335,9 +384,10 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
// special events
assertNotNull(params.specialEvents());
assertEquals(2, params.specialEvents().size());
assertEquals(3, params.specialEvents().size());
assertEquals(events.get(0), params.specialEvents().get(0));
assertEquals(events.get(1), params.specialEvents().get(1));
assertEquals(events.get(2), params.specialEvents().get(2));
// filters
assertNotNull(params.filters());
@ -382,24 +432,25 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
return searchResultHolder.get();
}
private List<SpecialEvent> getSpecialEvents(String jobId) throws Exception {
private List<SpecialEvent> getSpecialEventsForJob(String jobId, SpecialEventsQueryBuilder query) throws Exception {
AtomicReference<Exception> errorHolder = new AtomicReference<>();
AtomicReference<List<SpecialEvent>> searchResultHolder = new AtomicReference<>();
AtomicReference<QueryPage<SpecialEvent>> searchResultHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
jobProvider.specialEvents(jobId, params -> {
jobProvider.specialEventsForJob(jobId, query, ActionListener.wrap(
params -> {
searchResultHolder.set(params);
latch.countDown();
}, e -> {
errorHolder.set(e);
latch.countDown();
});
}));
latch.await();
if (errorHolder.get() != null) {
throw errorHolder.get();
}
return searchResultHolder.get();
return searchResultHolder.get().results();
}
private Job.Builder createJob(String jobId) {
@ -445,7 +496,7 @@ public class JobProviderIT extends XPackSingleNodeTestCase {
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (SpecialEvent event : events) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, event.documentId());
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"));
indexRequest.source(event.toXContent(builder, params));

View File

@ -91,7 +91,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
Collections.singletonList(new DetectionRule.Builder(conditions).build())));
UpdateParams updateParams = new UpdateParams(null, detectorUpdates, true);
List<SpecialEvent> events = Collections.singletonList(SpecialEventTests.createSpecialEvent());
List<SpecialEvent> events = Collections.singletonList(SpecialEventTests.createSpecialEvent(randomAlphaOfLength(10)));
communicator.writeUpdateProcessMessage(updateParams, events, ((aVoid, e) -> {}));

View File

@ -238,12 +238,14 @@ public class FieldConfigWriterTests extends ESTestCase {
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(d));
analysisConfig = builder.build();
specialEvents.add(
new SpecialEvent("1", "The Ashes", ZonedDateTime.ofInstant(Instant.ofEpochMilli(1511395200000L), ZoneOffset.UTC),
ZonedDateTime.ofInstant(Instant.ofEpochMilli(1515369600000L), ZoneOffset.UTC), Collections.emptyList()));
specialEvents.add(
new SpecialEvent("2", "elasticon", ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519603200000L), ZoneOffset.UTC),
ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519862400000L), ZoneOffset.UTC), Collections.emptyList()));
specialEvents.add(new SpecialEvent.Builder().description("The Ashes")
.startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1511395200000L), ZoneOffset.UTC))
.endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1515369600000L), ZoneOffset.UTC))
.calendarId("calendar_id").build());
specialEvents.add(new SpecialEvent.Builder().description("elasticon")
.startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519603200000L), ZoneOffset.UTC))
.endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(1519862400000L), ZoneOffset.UTC))
.calendarId("calendar_id").build());
writer = mock(OutputStreamWriter.class);
createFieldConfigWriter().write();

View File

@ -8,14 +8,19 @@ package org.elasticsearch.xpack.monitoring.exporter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import org.junit.Before;
import static org.hamcrest.Matchers.containsString;
@ -56,7 +61,7 @@ public class ClusterAlertsUtilTests extends ESTestCase {
assertThat(uniqueWatchId, equalTo(clusterUuid + "_" + watchId));
}
public void testLoadWatch() throws IOException {
public void testLoadWatch() {
for (final String watchId : ClusterAlertsUtil.WATCH_IDS) {
final String watch = ClusterAlertsUtil.loadWatch(clusterService, watchId);
@ -74,4 +79,46 @@ public class ClusterAlertsUtilTests extends ESTestCase {
expectThrows(RuntimeException.class, () -> ClusterAlertsUtil.loadWatch(clusterService, "watch-does-not-exist"));
}
public void testGetClusterAlertsBlacklistThrowsForUnknownWatchId() {
final List<String> watchIds = Arrays.asList(ClusterAlertsUtil.WATCH_IDS);
final List<String> blacklist = randomSubsetOf(watchIds);
blacklist.add("fake1");
if (randomBoolean()) {
blacklist.add("fake2");
if (rarely()) {
blacklist.add("fake3");
}
}
final Set<String> unknownIds = blacklist.stream().filter(id -> watchIds.contains(id) == false).collect(Collectors.toSet());
final String unknownIdsString = String.join(", ", unknownIds);
final SettingsException exception =
expectThrows(SettingsException.class,
() -> ClusterAlertsUtil.getClusterAlertsBlacklist(createConfigWithBlacklist("_random", blacklist)));
assertThat(exception.getMessage(),
equalTo("[xpack.monitoring.exporters._random.cluster_alerts.management.blacklist] contains unrecognized Cluster " +
"Alert IDs [" + unknownIdsString + "]"));
}
public void testGetClusterAlertsBlacklist() {
final List<String> blacklist = randomSubsetOf(Arrays.asList(ClusterAlertsUtil.WATCH_IDS));
assertThat(blacklist, equalTo(ClusterAlertsUtil.getClusterAlertsBlacklist(createConfigWithBlacklist("any", blacklist))));
}
private Exporter.Config createConfigWithBlacklist(final String name, final List<String> blacklist) {
final Settings settings = Settings.builder()
.putList(Exporter.CLUSTER_ALERTS_BLACKLIST_SETTING, blacklist)
.build();
final ClusterService clusterService = mock(ClusterService.class);
final XPackLicenseState licenseState = mock(XPackLicenseState.class);
return new Exporter.Config(name, "fake", Settings.EMPTY, settings, clusterService, licenseState);
}
}

View File

@ -36,6 +36,13 @@ public class ClusterAlertHttpResourceTests extends AbstractPublishableHttpResour
private final ClusterAlertHttpResource resource = new ClusterAlertHttpResource(owner, licenseState, () -> watchId, () -> watchValue);
public void testIsWatchDefined() {
final ClusterAlertHttpResource noWatchResource = new ClusterAlertHttpResource(owner, licenseState, () -> watchId, null);
assertThat(noWatchResource.isWatchDefined(), is(false));
assertThat(resource.isWatchDefined(), is(true));
}
public void testWatchToHttpEntity() throws IOException {
final byte[] watchValueBytes = watchValue.getBytes(ContentType.APPLICATION_JSON.getCharset());
final byte[] actualBytes = new byte[watchValueBytes.length];
@ -91,6 +98,26 @@ public class ClusterAlertHttpResourceTests extends AbstractPublishableHttpResour
}
}
public void testDoCheckAsDeleteWatchExistsWhenNoWatchIsSpecified() throws IOException {
final ClusterAlertHttpResource noWatchResource = new ClusterAlertHttpResource(owner, licenseState, () -> watchId, null);
final boolean clusterAlertsAllowed = randomBoolean();
// should not matter
when(licenseState.isMonitoringClusterAlertsAllowed()).thenReturn(clusterAlertsAllowed);
assertCheckAsDeleteExists(noWatchResource, "/_xpack/watcher/watch", watchId);
}
public void testDoCheckWithExceptionAsDeleteWatchErrorWhenNoWatchIsSpecified() throws IOException {
final ClusterAlertHttpResource noWatchResource = new ClusterAlertHttpResource(owner, licenseState, () -> watchId, null);
final boolean clusterAlertsAllowed = randomBoolean();
// should not matter
when(licenseState.isMonitoringClusterAlertsAllowed()).thenReturn(clusterAlertsAllowed);
assertCheckAsDeleteWithException(noWatchResource, "/_xpack/watcher/watch", watchId);
}
public void testDoCheckAsDeleteWatchExists() throws IOException {
when(licenseState.isMonitoringClusterAlertsAllowed()).thenReturn(false);

View File

@ -19,7 +19,6 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
@ -75,6 +74,8 @@ import static org.hamcrest.Matchers.notNullValue;
numDataNodes = 1, numClientNodes = 0, transportClientRatio = 0.0, supportsDedicatedMasters = false)
public class HttpExporterIT extends MonitoringIntegTestCase {
private final List<String> clusterAlertBlacklist =
rarely() ? randomSubsetOf(Arrays.asList(ClusterAlertsUtil.WATCH_IDS)) : Collections.emptyList();
private final boolean templatesExistsAlready = randomBoolean();
private final boolean includeOldTemplates = randomBoolean();
private final boolean pipelineExistsAlready = randomBoolean();
@ -114,12 +115,16 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
.build();
}
public void testExport() throws Exception {
final Settings settings = Settings.builder()
protected Settings.Builder baseSettings() {
return Settings.builder()
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", getFormattedAddress(webServer))
.put("xpack.monitoring.exporters._http.index.template.create_legacy_templates", includeOldTemplates)
.build();
.putList("xpack.monitoring.exporters._http.cluster_alerts.management.blacklist", clusterAlertBlacklist)
.put("xpack.monitoring.exporters._http.index.template.create_legacy_templates", includeOldTemplates);
}
public void testExport() throws Exception {
final Settings settings = baseSettings().build();
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer,
@ -146,10 +151,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
headers.put("X-Found-Cluster", new String[] { headerValue });
headers.put("Array-Check", array);
Settings settings = Settings.builder()
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", getFormattedAddress(webServer))
.put("xpack.monitoring.exporters._http.index.template.create_legacy_templates", includeOldTemplates)
final Settings settings = baseSettings()
.put("xpack.monitoring.exporters._http.headers.X-Cloud-Cluster", headerValue)
.put("xpack.monitoring.exporters._http.headers.X-Found-Cluster", headerValue)
.putList("xpack.monitoring.exporters._http.headers.Array-Check", array)
@ -199,15 +201,9 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
basePath = "/" + basePath;
}
final Settings.Builder builder = Settings.builder()
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", getFormattedAddress(webServer))
final Settings.Builder builder = baseSettings()
.put("xpack.monitoring.exporters._http.proxy.base_path", basePath + (randomBoolean() ? "/" : ""));
if (includeOldTemplates == false) {
builder.put("xpack.monitoring.exporters._http.index.template.create_legacy_templates", false);
}
if (useHeaders) {
builder.put("xpack.monitoring.exporters._http.headers.X-Cloud-Cluster", headerValue)
.put("xpack.monitoring.exporters._http.headers.X-Found-Cluster", headerValue)
@ -231,11 +227,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
public void testHostChangeReChecksTemplate() throws Exception {
Settings settings = Settings.builder()
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", getFormattedAddress(webServer))
.put("xpack.monitoring.exporters._http.index.template.create_legacy_templates", includeOldTemplates)
.build();
final Settings settings = baseSettings().build();
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer,
@ -327,11 +319,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
public void testDynamicIndexFormatChange() throws Exception {
final Settings settings = Settings.builder()
.put("xpack.monitoring.exporters._http.type", "http")
.put("xpack.monitoring.exporters._http.host", getFormattedAddress(webServer))
.put("xpack.monitoring.exporters._http.index.template.create_legacy_templates", includeOldTemplates)
.build();
final Settings settings = baseSettings().build();
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueSetupResponses(webServer,
@ -473,7 +461,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
final boolean remoteClusterAllowsWatcher, final boolean currentLicenseAllowsWatcher,
final boolean alreadyExists,
@Nullable final Map<String, String[]> customHeaders,
@Nullable final String basePath) throws Exception {
@Nullable final String basePath) {
final String pathPrefix = basePathToAssertablePrefix(basePath);
MockRequest request;
@ -486,13 +474,15 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
assertHeaders(request, customHeaders);
if (remoteClusterAllowsWatcher) {
for (Tuple<String, String> watch : monitoringWatches()) {
for (final Tuple<String, String> watch : monitoringWatches()) {
final String uniqueWatchId = ClusterAlertsUtil.createUniqueWatchId(clusterService(), watch.v1());
request = webServer.takeRequest();
// GET / PUT if we are allowed to use it
if (currentLicenseAllowsWatcher) {
if (currentLicenseAllowsWatcher && clusterAlertBlacklist.contains(watch.v1()) == false) {
assertThat(request.getMethod(), equalTo("GET"));
assertThat(request.getUri().getPath(), equalTo(pathPrefix + "/_xpack/watcher/watch/" + watch.v1()));
assertThat(request.getUri().getPath(), equalTo(pathPrefix + "/_xpack/watcher/watch/" + uniqueWatchId));
assertThat(request.getUri().getQuery(), equalTo(resourceClusterAlertQueryString()));
assertHeaders(request, customHeaders);
@ -500,7 +490,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
request = webServer.takeRequest();
assertThat(request.getMethod(), equalTo("PUT"));
assertThat(request.getUri().getPath(), equalTo(pathPrefix + "/_xpack/watcher/watch/" + watch.v1()));
assertThat(request.getUri().getPath(), equalTo(pathPrefix + "/_xpack/watcher/watch/" + uniqueWatchId));
assertThat(request.getUri().getQuery(), equalTo(resourceClusterAlertQueryString()));
assertThat(request.getBody(), equalTo(watch.v2()));
assertHeaders(request, customHeaders);
@ -508,7 +498,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
// DELETE if we're not allowed to use it
} else {
assertThat(request.getMethod(), equalTo("DELETE"));
assertThat(request.getUri().getPath(), equalTo(pathPrefix + "/_xpack/watcher/watch/" + watch.v1()));
assertThat(request.getUri().getPath(), equalTo(pathPrefix + "/_xpack/watcher/watch/" + uniqueWatchId));
assertThat(request.getUri().getQuery(), equalTo(resourceClusterAlertQueryString()));
assertHeaders(request, customHeaders);
}
@ -775,7 +765,10 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
private void enqueueClusterAlertResponsesDoesNotExistYet(final MockWebServer webServer)
throws IOException {
for (final String watchId : monitoringWatchIds()) {
for (final String watchId : ClusterAlertsUtil.WATCH_IDS) {
if (clusterAlertBlacklist.contains(watchId)) {
enqueueDeleteClusterAlertResponse(webServer, watchId);
} else {
if (randomBoolean()) {
enqueueResponse(webServer, 404, "watch [" + watchId + "] does not exist");
} else if (randomBoolean()) {
@ -790,10 +783,13 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
enqueueResponse(webServer, 201, "[" + watchId + "] created");
}
}
}
private void enqueueClusterAlertResponsesExistsAlready(final MockWebServer webServer) throws IOException {
final int count = monitoringWatchIds().size();
for (int i = 0; i < count; ++i) {
for (final String watchId : ClusterAlertsUtil.WATCH_IDS) {
if (clusterAlertBlacklist.contains(watchId)) {
enqueueDeleteClusterAlertResponse(webServer, watchId);
} else {
final int existsVersion;
if (randomBoolean()) {
@ -807,28 +803,21 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
enqueueResponse(webServer, 200, "{\"metadata\":{\"xpack\":{\"version_created\":" + existsVersion + "}}}");
}
}
}
private void enqueueDeleteClusterAlertResponses(final MockWebServer webServer) throws IOException {
for (final String watchId : monitoringWatchIds()) {
for (final String watchId : ClusterAlertsUtil.WATCH_IDS) {
enqueueDeleteClusterAlertResponse(webServer, watchId);
}
}
private void enqueueDeleteClusterAlertResponse(final MockWebServer webServer, final String watchId) throws IOException {
if (randomBoolean()) {
enqueueResponse(webServer, 404, "watch [" + watchId + "] did not exist");
} else {
enqueueResponse(webServer, 200, "watch [" + watchId + "] deleted");
}
}
}
private void writeIndex(XContentBuilder response, String index, String alias) throws IOException {
response.startObject(index);
{
response.startObject("aliases");
{
response.startObject(alias).endObject();
}
response.endObject();
}
response.endObject();
}
private void enqueueResponse(int responseCode, String body) throws IOException {
enqueueResponse(webServer, responseCode, body);

View File

@ -29,6 +29,7 @@ import org.junit.Before;
import org.mockito.InOrder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -195,6 +196,34 @@ public class HttpExporterTests extends ESTestCase {
assertThat(exception.getMessage(), equalTo("[xpack.monitoring.exporters._http.host] invalid host: [" + invalidHost + "]"));
}
public void testExporterWithUnknownBlacklistedClusterAlerts() {
final SSLIOSessionStrategy sslStrategy = mock(SSLIOSessionStrategy.class);
when(sslService.sslIOSessionStrategy(any(Settings.class))).thenReturn(sslStrategy);
final List<String> blacklist = new ArrayList<>();
blacklist.add("does_not_exist");
if (randomBoolean()) {
// a valid ID
blacklist.add(randomFrom(ClusterAlertsUtil.WATCH_IDS));
Collections.shuffle(blacklist, random());
}
final Settings.Builder builder = Settings.builder()
.put("xpack.monitoring.exporters._http.type", HttpExporter.TYPE)
.put("xpack.monitoring.exporters._http.host", "http://localhost:9200")
.putList("xpack.monitoring.exporters._http.cluster_alerts.management.blacklist", blacklist);
final Config config = createConfig(builder.build());
final SettingsException exception =
expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService, threadContext));
assertThat(exception.getMessage(),
equalTo("[xpack.monitoring.exporters._http.cluster_alerts.management.blacklist] contains unrecognized Cluster " +
"Alert IDs [does_not_exist]"));
}
public void testExporterWithHostOnly() throws Exception {
final SSLIOSessionStrategy sslStrategy = mock(SSLIOSessionStrategy.class);
when(sslService.sslIOSessionStrategy(any(Settings.class))).thenReturn(sslStrategy);

View File

@ -185,14 +185,7 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
final ClusterService clusterService = clusterService();
return Arrays.stream(ClusterAlertsUtil.WATCH_IDS)
.map(id -> new Tuple<>(ClusterAlertsUtil.createUniqueWatchId(clusterService, id),
ClusterAlertsUtil.loadWatch(clusterService, id)))
.collect(Collectors.toList());
}
protected List<String> monitoringWatchIds() {
return Arrays.stream(ClusterAlertsUtil.WATCH_IDS)
.map(id -> ClusterAlertsUtil.createUniqueWatchId(clusterService(), id))
.map(id -> new Tuple<>(id, ClusterAlertsUtil.loadWatch(clusterService, id)))
.collect(Collectors.toList());
}

View File

@ -7,6 +7,8 @@ package org.elasticsearch.xpack.watcher.actions.index;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
@ -43,6 +45,7 @@ import java.util.Map;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import static org.elasticsearch.common.util.set.Sets.newHashSet;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
@ -57,6 +60,8 @@ import static org.mockito.Mockito.when;
public class IndexActionTests extends ESTestCase {
private RefreshPolicy refreshPolicy = randomBoolean() ? null : randomFrom(RefreshPolicy.values());
private final Client client = mock(Client.class);
@Before
@ -122,18 +127,28 @@ public class IndexActionTests extends ESTestCase {
.startObject()
.field("unknown", 1234)
.endObject());
// unknown refresh policy
expectFailure(IllegalArgumentException.class, jsonBuilder()
.startObject()
.field(IndexAction.Field.REFRESH.getPreferredName(), "unknown")
.endObject());
}
private void expectParseFailure(XContentBuilder builder) throws Exception {
expectFailure(ElasticsearchParseException.class, builder);
}
private void expectFailure(Class clazz, XContentBuilder builder) throws Exception {
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
XContentParser parser = createParser(builder);
parser.nextToken();
expectThrows(ElasticsearchParseException.class, () ->
expectThrows(clazz, () ->
actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
}
public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() {
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null);
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
final Map<String, Object> docWithId = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap();
@ -183,7 +198,7 @@ public class IndexActionTests extends ESTestCase {
final IndexAction action = new IndexAction(configureIndexDynamically ? null : "my_index",
configureTypeDynamically ? null : "my_type",
configureIdDynamically ? null : "my_id",
null, null, null);
null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
@ -204,7 +219,7 @@ public class IndexActionTests extends ESTestCase {
}
public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() throws Exception {
final IndexAction action = new IndexAction(null, "my-type", null, null, null, null);
final IndexAction action = new IndexAction(null, "my-type", null, null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
@ -237,7 +252,7 @@ public class IndexActionTests extends ESTestCase {
String fieldName = randomFrom("_index", "_type");
final IndexAction action = new IndexAction(fieldName.equals("_index") ? "my_index" : null,
fieldName.equals("_type") ? "my_type" : null,
null,null, null, null);
null,null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
@ -257,7 +272,8 @@ public class IndexActionTests extends ESTestCase {
String docId = randomAlphaOfLength(5);
String timestampField = randomFrom("@timestamp", null);
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null);
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null,
refreshPolicy);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
DateTime executionTime = DateTime.now(UTC);
@ -295,6 +311,9 @@ public class IndexActionTests extends ESTestCase {
assertThat(indexRequest.id(), is(docId));
}
RefreshPolicy expectedRefreshPolicy = refreshPolicy == null ? RefreshPolicy.NONE: refreshPolicy;
assertThat(indexRequest.getRefreshPolicy(), is(expectedRefreshPolicy));
if (timestampField != null) {
assertThat(indexRequest.sourceAsMap().keySet(), is(hasSize(2)));
assertThat(indexRequest.sourceAsMap(), hasEntry(timestampField, executionTime.toString()));
@ -304,7 +323,7 @@ public class IndexActionTests extends ESTestCase {
}
public void testFailureResult() throws Exception {
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null);
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null, refreshPolicy);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
@ -335,6 +354,8 @@ public class IndexActionTests extends ESTestCase {
listener.onResponse(bulkResponse);
when(client.bulk(captor.capture())).thenReturn(listener);
Action.Result result = executable.execute("_id", ctx, payload);
RefreshPolicy expectedRefreshPolicy = refreshPolicy == null ? RefreshPolicy.NONE: refreshPolicy;
assertThat(captor.getValue().getRefreshPolicy(), is(expectedRefreshPolicy));
if (isPartialFailure) {
assertThat(result.status(), is(Status.PARTIAL_FAILURE));
@ -342,5 +363,4 @@ public class IndexActionTests extends ESTestCase {
assertThat(result.status(), is(Status.FAILURE));
}
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.watch;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
@ -585,7 +586,9 @@ public class WatchTests extends ESTestCase {
if (randomBoolean()) {
DateTimeZone timeZone = randomBoolean() ? DateTimeZone.UTC : null;
TimeValue timeout = randomBoolean() ? timeValueSeconds(between(1, 10000)) : null;
IndexAction action = new IndexAction("_index", "_type", randomBoolean() ? "123" : null, null, timeout, timeZone);
WriteRequest.RefreshPolicy refreshPolicy = randomBoolean() ? null : randomFrom(WriteRequest.RefreshPolicy.values());
IndexAction action = new IndexAction("_index", "_type", randomBoolean() ? "123" : null, null, timeout, timeZone,
refreshPolicy);
list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(),
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),

View File

@ -0,0 +1,41 @@
{
"xpack.ml.get_calendar_events": {
"methods": [ "GET" ],
"url": {
"path": "/_xpack/ml/calendars/{calendar_id}/events",
"paths": [
"/_xpack/ml/calendars/{calendar_id}/events"
],
"parts": {
"calendar_id": {
"type": "string",
"description": "The ID of the calendar containing the events",
"required": true
}
},
"params": {
"job_id": {
"type": "string",
"description": "Get events for the job. When this option is used calendar_id must be '_all'"
},
"after": {
"type": "string",
"description": "Get events after this time"
},
"before": {
"type": "date",
"description": "Get events before this time"
},
"from": {
"type": "int",
"description": "Skips a number of events"
},
"size": {
"type": "int",
"description": "Specifies a max number of events to get"
}
}
},
"body": null
}
}

View File

@ -1,6 +1,6 @@
{
"xpack.ml.get_calendars": {
"methods": [ "GET" ],
"methods": [ "GET", "POST" ],
"url": {
"path": "/_xpack/ml/calendars/{calendar_id}",
"paths": [

View File

@ -0,0 +1,20 @@
{
"xpack.ml.post_calendar_events": {
"methods": [ "POST" ],
"url": {
"path": "/_xpack/ml/calendars/{calendar_id}/events",
"paths": [ "/_xpack/ml/calendars/{calendar_id}/events" ],
"parts": {
"calendar_id": {
"type": "string",
"required": true,
"description": "The ID of the calendar to modify"
}
}
},
"body": {
"description" : "A list of special events",
"required" : true
}
}
}

View File

@ -73,6 +73,11 @@
xpack.ml.get_calendars: {}
- match: { count: 2 }
- do:
xpack.ml.get_calendars:
calendar_id: _all
- match: { count: 2 }
- do:
catch: missing
xpack.ml.get_calendars:
@ -177,7 +182,7 @@
- match: { calendars.0.job_ids: [] }
---
"Test update calendar":
"Test update calendar job ids":
- do:
xpack.ml.put_calendar:
@ -234,3 +239,172 @@
xpack.ml.delete_calendar_job:
calendar_id: "Wildlife"
job_id: "missing job"
---
"Test calendar events":
- do:
xpack.ml.put_calendar:
calendar_id: "events"
- do:
xpack.ml.post_calendar_events:
calendar_id: "events"
body: >
{ "description": "event 1", "start_time": "2017-12-01T00:00:00Z", "end_time": "2017-12-02T00:00:00Z", "calendar_id": "events" }
{ "description": "event 2", "start_time": "2017-12-05T00:00:00Z", "end_time": "2017-12-06T00:00:00Z", "calendar_id": "events" }
{ "description": "event 3", "start_time": "2017-12-12T00:00:00Z", "end_time": "2017-12-13T00:00:00Z", "calendar_id": "events" }
{ "description": "event 4", "start_time": "2017-12-12T00:00:00Z", "end_time": "2017-12-15T00:00:00Z", "calendar_id": "events" }
- do:
xpack.ml.get_calendar_events:
calendar_id: "events"
- length: { special_events: 4 }
- match: { special_events.0.description: "event 1" }
- match: { special_events.1.description: "event 2" }
- match: { special_events.2.description: "event 3" }
- match: { special_events.3.description: "event 4" }
- do:
xpack.ml.get_calendar_events:
calendar_id: "events"
from: 1
size: 2
- length: { special_events: 2 }
- match: { special_events.0.description: "event 2" }
- match: { special_events.1.description: "event 3" }
- do:
xpack.ml.get_calendar_events:
calendar_id: "events"
before: "2017-12-12T00:00:00Z"
- length: { special_events: 2 }
- match: { special_events.0.description: "event 1" }
- match: { special_events.1.description: "event 2" }
- do:
xpack.ml.get_calendar_events:
calendar_id: "events"
after: "2017-12-05T03:00:00Z"
- length: { special_events: 3 }
- match: { special_events.0.description: "event 2" }
- match: { special_events.1.description: "event 3" }
- match: { special_events.2.description: "event 4" }
- do:
xpack.ml.get_calendar_events:
calendar_id: "events"
after: "2017-12-02T00:00:00Z"
before: "2017-12-12T00:00:00Z"
- length: { special_events: 1 }
- match: { special_events.0.description: "event 2" }
---
"Test get all calendar events":
- do:
xpack.ml.put_calendar:
calendar_id: "dave-holidays"
- do:
xpack.ml.post_calendar_events:
calendar_id: "dave-holidays"
body: >
{ "description": "xmas", "start_time": "2017-12-25T00:00:00Z", "end_time": "2017-12-26T00:00:00Z" }
{ "description": "ny", "start_time": "2018-01-01T00:00:00Z", "end_time": "2018-01-02T00:00:00Z" }
- do:
xpack.ml.put_calendar:
calendar_id: "tom-holidays"
- do:
xpack.ml.post_calendar_events:
calendar_id: "tom-holidays"
body: >
{ "description": "xmas", "start_time": "2017-12-20T00:00:00Z", "end_time": "2017-12-26T00:00:00Z" }
{ "description": "other", "start_time": "2017-12-27T00:00:00Z", "end_time": "2018-01-02T00:00:00Z" }
- do:
xpack.ml.get_calendar_events:
calendar_id: "_all"
- length: { special_events: 4 }
---
"Test get calendar events for job":
- do:
xpack.ml.put_job:
job_id: cal-crud-job-with-events
body: >
{
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}
- do:
xpack.ml.put_calendar:
calendar_id: "dave-holidays"
body: >
{
"job_ids": ["cal-crud-job-with-events"]
}
- do:
xpack.ml.post_calendar_events:
calendar_id: "dave-holidays"
body: >
{ "description": "xmas", "start_time": "2017-12-25T00:00:00Z", "end_time": "2017-12-26T00:00:00Z" }
{ "description": "ny", "start_time": "2018-01-01T00:00:00Z", "end_time": "2018-01-02T00:00:00Z" }
- do:
xpack.ml.put_calendar:
calendar_id: "tom-holidays"
body: >
{
"job_ids": ["cal-crud-job-with-events"]
}
- do:
xpack.ml.post_calendar_events:
calendar_id: "tom-holidays"
body: >
{ "description": "xmas", "start_time": "2017-12-20T00:00:00Z", "end_time": "2017-12-26T00:00:00Z" }
{ "description": "other", "start_time": "2018-01-15T00:00:00Z", "end_time": "2018-01-16T00:00:00Z" }
- do:
xpack.ml.put_calendar:
calendar_id: "not-used-by-job"
- do:
xpack.ml.post_calendar_events:
calendar_id: "not-used-by-job"
body: >
{ "description": "random", "start_time": "2018-01-20T00:00:00Z", "end_time": "2018-01-26T00:00:00Z" }
{ "description": "random2", "start_time": "2018-02-20T00:00:00Z", "end_time": "2018-02-26T00:00:00Z" }
- do:
catch: /action_request_validation_exception/
xpack.ml.get_calendar_events:
calendar_id: "dave-holiday"
job_id: cal-crud-job-with-events
- do:
xpack.ml.get_calendar_events:
calendar_id: _all
job_id: cal-crud-job-with-events
- match: { count: 4 }
- length: { special_events: 4 }
- do:
xpack.ml.get_calendar_events:
calendar_id: _all
after: "2018-01-01T00:00:00Z"
job_id: cal-crud-job-with-events
- match: { count: 2 }
- length: { special_events: 2 }
- match: { special_events.0.description: ny }
- match: { special_events.1.description: other }

View File

@ -131,6 +131,7 @@ teardown:
"index" : {
"index" : "my_test_index",
"doc_type" : "my-type",
"refresh" : "wait_for",
"doc_id": "my-id"
}
}
@ -156,3 +157,8 @@ teardown:
- is_true: watch_record.node
- is_false: watch_record.result.input.payload.foo
- is_true: watch_record.result.input.payload.spam
- do:
search:
index: my_test_index
- match: { hits.total : 1 }