Merge branch 'master' into feature/sql_2

Original commit: elastic/x-pack-elasticsearch@2067b14cf8
This commit is contained in:
Nik Everett 2017-12-18 12:15:04 -05:00
commit 4680e1e166
49 changed files with 546 additions and 300 deletions

View File

@ -0,0 +1,29 @@
[float]
[[ml-analyzing]]
=== Analyzing the Past and Present
The {xpackml} features automate the analysis of time-series data by creating
accurate baselines of normal behavior in the data and identifying anomalous
patterns in that data. You can submit your data for analysis in batches or
continuously in real-time {dfeeds}.
Using proprietary {ml} algorithms, the following circumstances are detected,
scored, and linked with statistically significant influencers in the data:
* Anomalies related to temporal deviations in values, counts, or frequencies
* Statistical rarity
* Unusual behaviors for a member of a population
Automated periodicity detection and quick adaptation to changing data ensure
that you dont need to specify algorithms, models, or other data science-related
configurations in order to get the benefits of {ml}.
You can view the {ml} results in {kib} where, for example, charts illustrate the
actual data values, the bounds for the expected values, and the anomalies that
occur outside these bounds.
[role="screenshot"]
image::images/ml-gs-job-analysis.jpg["Example screenshot from the Machine Learning Single Metric Viewer in Kibana"]
For a more detailed walk-through of {xpackml} features, see
<<ml-getting-started>>.

Binary file not shown.

After

Width:  |  Height:  |  Size: 92 KiB

View File

@ -3,28 +3,18 @@
[partintro]
--
The {xpackml} features automate the analysis of time-series data by creating
accurate baselines of normal behaviors in the data and identifying anomalous
patterns in that data.
Using proprietary {ml} algorithms, the following circumstances are detected,
scored, and linked with statistically significant influencers in the data:
* Anomalies related to temporal deviations in values, counts, or frequencies
* Statistical rarity
* Unusual behaviors for a member of a population
Automated periodicity detection and quick adaptation to changing data ensure
that you dont need to specify algorithms, models, or other data science-related
configurations in order to get the benefits of {ml}.
[float]
[[ml-intro]]
== Integration with the Elastic Stack
Machine learning is tightly integrated with the Elastic Stack. Data is pulled
from {es} for analysis and anomaly results are displayed in {kib} dashboards.
* <<ml-overview>>
* <<ml-getting-started>>
* <<ml-configuring>>
* <<stopping-ml>>
* <<ml-troubleshooting, Troubleshooting Machine Learning>>
* <<ml-api-quickref>>
* <<ml-functions>>
--
include::overview.asciidoc[]

View File

@ -157,3 +157,17 @@ poorer precision worthwhile. If you want to view or change the aggregations
that are used in your job, refer to the `aggregations` property in your {dfeed}.
For more information, see {ref}/ml-datafeed-resource.html[Datafeed Resources].
[float]
=== Security Integration
When {security} is enabled, a {dfeed} stores the roles of the user who created
or updated the {dfeed} **at that time**. This means that if those roles are
updated then the {dfeed} will subsequently run with the new permissions associated
with the roles. However, if the user's roles are adjusted after creating or
updating the {dfeed} then the {dfeed} will continue to run with the permissions
associated with the original roles.
A way to update the roles stored within the {dfeed} without changing any other
settings is to submit an empty JSON document (`{}`) to the
{ref}/ml-update-datafeed.html[update {dfeed} API].

View File

@ -1,6 +1,14 @@
[[ml-concepts]]
[[ml-overview]]
== Overview
include::analyzing.asciidoc[]
[[ml-concepts]]
=== Basic Machine Learning Terms
++++
<titleabbrev>Basic Terms</titleabbrev>
++++
There are a few concepts that are core to {ml} in {xpack}. Understanding these
concepts from the outset will tremendously help ease the learning process.

View File

@ -1,5 +1,8 @@
[[ml-troubleshooting]]
== {xpackml} Troubleshooting
++++
<titleabbrev>{xpackml}</titleabbrev>
++++
Use the information in this section to troubleshoot common problems and find
answers for frequently asked questions.
@ -52,7 +55,7 @@ object mapping [field_name] with an object mapping [field_name]`.
This issue typically occurs when two or more jobs store their results in the
same index and the results contain fields with the same name but different
data types or different `fields` settings.
data types or different `fields` settings.
By default, {ml} results are stored in the `.ml-anomalies-shared` index in {es}.
To resolve this issue, click *Advanced > Use dedicated index* when you create

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-close-job]]
=== Close Jobs
=== Close Jobs API
++++
<titleabbrev>Close Jobs</titleabbrev>
++++
The close job API enables you to close one or more jobs.
A job can be opened and closed multiple times throughout its lifecycle.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-delete-datafeed]]
=== Delete {dfeeds-cap}
=== Delete {dfeeds-cap} API
++++
<titleabbrev>Delete {dfeeds-cap}</titleabbrev>
++++
The delete {dfeed} API enables you to delete an existing {dfeed}.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-delete-job]]
=== Delete Jobs
=== Delete Jobs API
++++
<titleabbrev>Delete Jobs</titleabbrev>
++++
The delete job API enables you to delete an existing anomaly detection job.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-delete-snapshot]]
=== Delete Model Snapshots
=== Delete Model Snapshots API
++++
<titleabbrev>Delete Model Snapshots</titleabbrev>
++++
The delete model snapshot API enables you to delete an existing model snapshot.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-flush-job]]
=== Flush Jobs
=== Flush Jobs API
++++
<titleabbrev>Flush Jobs</titleabbrev>
++++
The flush job API forces any buffered data to be processed by the job.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-forecast]]
=== Forecast Jobs
=== Forecast Jobs API
++++
<titleabbrev>Forecast Jobs</titleabbrev>
++++
The forecast jobs API uses historical behavior to predict the future behavior of
a time series.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-get-bucket]]
=== Get Buckets
=== Get Buckets API
++++
<titleabbrev>Get Buckets</titleabbrev>
++++
The get bucket API enables you to retrieve job results for one or more buckets.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-get-category]]
=== Get Categories
=== Get Categories API
++++
<titleabbrev>Get Categories</titleabbrev>
++++
The get categories API enables you to retrieve job results for one or more
categories.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-get-datafeed-stats]]
=== Get {dfeed-cap} Statistics
=== Get {dfeed-cap} Statistics API
++++
<titleabbrev>Get {dfeed-cap} Statistics</titleabbrev>
++++
The get {dfeed} statistics API enables you to retrieve usage information for
{dfeeds}.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-get-datafeed]]
=== Get {dfeeds-cap}
=== Get {dfeeds-cap} API
++++
<titleabbrev>Get {dfeeds-cap}</titleabbrev>
++++
The get {dfeeds} API enables you to retrieve configuration information for
{dfeeds}.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-get-influencer]]
=== Get Influencers
=== Get Influencers API
++++
<titleabbrev>Get Influencers</titleabbrev>
++++
The get influencers API enables you to retrieve job results for one or more
influencers.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-get-job-stats]]
=== Get Job Statistics
=== Get Job Statistics API
++++
<titleabbrev>Get Job Statistics</titleabbrev>
++++
The get jobs API enables you to retrieve usage information for jobs.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-get-job]]
=== Get Jobs
=== Get Jobs API
++++
<titleabbrev>Get Jobs</titleabbrev>
++++
The get jobs API enables you to retrieve configuration information for jobs.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-get-overall-buckets]]
=== Get Overall Buckets
=== Get Overall Buckets API
++++
<titleabbrev>Get Overall Buckets</titleabbrev>
++++
This API enables you to retrieve overall bucket results
that summarize the bucket results of multiple jobs.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-get-record]]
=== Get Records
=== Get Records API
++++
<titleabbrev>Get Records</titleabbrev>
++++
The get records API enables you to retrieve anomaly records for a job.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-get-snapshot]]
=== Get Model Snapshots
=== Get Model Snapshots API
++++
<titleabbrev>Get Model Snapshots</titleabbrev>
++++
The get model snapshots API enables you to retrieve information about model snapshots.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-open-job]]
=== Open Jobs
=== Open Jobs API
++++
<titleabbrev>Open Jobs</titleabbrev>
++++
A job must be opened in order for it to be ready to receive and analyze data.
A job can be opened and closed multiple times throughout its lifecycle.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-post-data]]
=== Post Data to Jobs
=== Post Data to Jobs API
++++
<titleabbrev>Post Data to Jobs</titleabbrev>
++++
The post data API enables you to send data to an anomaly detection job for analysis.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-preview-datafeed]]
=== Preview {dfeeds-cap}
=== Preview {dfeeds-cap} API
++++
<titleabbrev>Preview {dfeeds-cap}</titleabbrev>
++++
The preview {dfeed} API enables you to preview a {dfeed}.
@ -31,6 +34,17 @@ privileges to use this API. For more information, see
//<<privileges-list-cluster>>.
==== Security Integration
When {security} is enabled, the {dfeed} query will be previewed using the
credentials of the user calling the preview {dfeed} API. When the {dfeed}
is started it will run the query using the roles of the last user to
create or update it. If the two sets of roles differ then the preview may
not accurately reflect what the {dfeed} will return when started. To avoid
such problems, the same user that creates/updates the {dfeed} should preview
it to ensure it is returning the expected data.
==== Examples
The following example obtains a preview of the `datafeed-farequote` {dfeed}:

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-put-datafeed]]
=== Create {dfeeds-cap}
=== Create {dfeeds-cap} API
++++
<titleabbrev>Create {dfeeds-cap}</titleabbrev>
++++
The create {dfeed} API enables you to instantiate a {dfeed}.
@ -84,6 +87,13 @@ For more information, see
{xpack-ref}/security-privileges.html[Security Privileges].
//<<privileges-list-cluster>>.
==== Security Integration
When {security} is enabled, your {dfeed} will remember which roles the user who
created it had at the time of creation, and run the query using those same roles.
==== Examples
The following example creates the `datafeed-total-requests` {dfeed}:

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-put-job]]
=== Create Jobs
=== Create Jobs API
++++
<titleabbrev>Create Jobs</titleabbrev>
++++
The create job API enables you to instantiate a job.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-revert-snapshot]]
=== Revert Model Snapshots
=== Revert Model Snapshots API
++++
<titleabbrev>Revert Model Snapshots</titleabbrev>
++++
The revert model snapshot API enables you to revert to a specific snapshot.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-start-datafeed]]
=== Start {dfeeds-cap}
=== Start {dfeeds-cap} API
++++
<titleabbrev>Start {dfeeds-cap}</titleabbrev>
++++
A {dfeed} must be started in order to retrieve data from {es}.
A {dfeed} can be started and stopped multiple times throughout its lifecycle.
@ -78,6 +81,13 @@ For more information, see
//<<privileges-list-cluster>>.
==== Security Integration
When {security} is enabled, your {dfeed} will remember which roles the last
user to create or update it had at the time of creation/update, and run the query
using those same roles.
==== Examples
The following example starts the `datafeed-it-ops-kpi` {dfeed}:

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-stop-datafeed]]
=== Stop {dfeeds-cap}
=== Stop {dfeeds-cap} API
++++
<titleabbrev>Stop {dfeeds-cap}</titleabbrev>
++++
The stop {dfeeds} API enables you to stop one or more {dfeeds}.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-update-datafeed]]
=== Update {dfeeds-cap}
=== Update {dfeeds-cap} API
++++
<titleabbrev>Update {dfeeds-cap}</titleabbrev>
++++
The update {dfeed} API enables you to update certain properties of a {dfeed}.
@ -79,6 +82,13 @@ For more information, see
{xpack-ref}/security-privileges.html[Security Privileges].
//<<privileges-list-cluster>>.
==== Security Integration
When {security} is enabled, your {dfeed} will remember which roles the user who
updated it had at the time of update, and run the query using those same roles.
==== Examples
The following example updates the query for the `datafeed-it-ops-kpi` {dfeed}

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-update-job]]
=== Update Jobs
=== Update Jobs API
++++
<titleabbrev>Update Jobs</titleabbrev>
++++
The update job API enables you to update certain properties of a job.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-update-snapshot]]
=== Update Model Snapshots
=== Update Model Snapshots API
++++
<titleabbrev>Update Model Snapshots</titleabbrev>
++++
The update model snapshot API enables you to update certain properties of a snapshot.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-valid-detector]]
=== Validate Detectors
=== Validate Detectors API
++++
<titleabbrev>Validate Detectors </titleabbrev>
++++
The validate detectors API validates detector configuration information.

View File

@ -1,6 +1,9 @@
[role="xpack"]
[[ml-valid-job]]
=== Validate Jobs
=== Validate Jobs API
++++
<titleabbrev>Validate Jobs</titleabbrev>
++++
The validate jobs API validates job configuration information.

View File

@ -1,5 +1,8 @@
[[security-troubleshooting]]
== {security} Troubleshooting
++++
<titleabbrev>{security}</titleabbrev>
++++
Use the information in this section to troubleshoot common problems and find
answers for frequently asked questions.

View File

@ -56,3 +56,9 @@ this node. If you try to create a job with a `model_memory_limit` property value
that is greater than this setting value, an error occurs. Existing jobs are not
affected when you update this setting. For more information about the
`model_memory_limit` property, see <<ml-apilimits>>.
`xpack.ml.node_concurrent_job_allocations`::
The maximum number of jobs that can concurrently be in the `opening` state on
each node. Typically, jobs spend a small amount of time in this state before
they move to `open` state. Jobs that must restore large models when they are
opening spend more time in the `opening` state. Defaults to `2`.

View File

@ -71,5 +71,5 @@ When a `_doc` field exists, if the field holds an object, it is extracted and in
as a single document. If the field holds an array of objects, each object is treated as
a document and the index action indexes all of them in a bulk.
An `_id` value can be added per document to dynamically set the ID of the indexed
document.
An `_index`, `_type` or `_id` value can be added per document to dynamically set the ID
of the indexed document.

View File

@ -1,5 +1,8 @@
[[watcher-troubleshooting]]
== {watcher} Troubleshooting
== {xpack} {watcher} Troubleshooting
++++
<titleabbrev>{xpack} {watcher}</titleabbrev>
++++
[float]
=== Dynamic Mapping Error When Trying to Add a Watch

View File

@ -49,7 +49,7 @@ public class XPackLicenseState {
"Machine learning APIs are disabled"
});
messages.put(XPackPlugin.LOGSTASH, new String[] {
"Logstash specific APIs are disabled. You can continue to manage and poll stored configurations"
"Logstash will continue to poll centrally-managed pipelines"
});
messages.put(XPackPlugin.DEPRECATION, new String[] {
"Deprecation APIs are disabled"
@ -205,8 +205,7 @@ public class XPackLicenseState {
case STANDARD:
case GOLD:
case PLATINUM:
return new String[] { "Logstash specific APIs will be disabled, but you can continue to manage " +
"and poll stored configurations" };
return new String[] { "Logstash will no longer poll for centrally-managed pipelines" };
}
break;
}

View File

@ -38,6 +38,8 @@ import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
private static final String INDEX_FIELD = "_index";
private static final String TYPE_FIELD = "_type";
private static final String ID_FIELD = "_id";
private final Client client;
@ -71,24 +73,13 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
}
String docId = action.docId;
// prevent double-setting id
if (data.containsKey(ID_FIELD)) {
if (docId != null) {
throw illegalState("could not execute action [{}] of watch [{}]. " +
"[ctx.payload.{}] or [ctx.payload._doc.{}] were set with [doc_id]. Only set [{}] or [doc_id]",
actionId, ctx.watch().id(), ID_FIELD, ID_FIELD, ID_FIELD);
}
if (data.containsKey(INDEX_FIELD) || data.containsKey(TYPE_FIELD) || data.containsKey(ID_FIELD)) {
data = mutableMap(data);
docId = data.remove(ID_FIELD).toString();
}
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(action.index);
indexRequest.type(action.docType);
indexRequest.id(docId);
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", data, INDEX_FIELD, action.index));
indexRequest.type(getField(actionId, ctx.id().watchId(), "type",data, TYPE_FIELD, action.docType));
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",data, ID_FIELD, action.docId));
data = addTimestampToDocument(data, ctx.executionTime());
BytesReference bytesReference;
@ -97,8 +88,8 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
if (ctx.simulateAction(actionId)) {
return new IndexAction.Simulated(indexRequest.index(), action.docType, docId, new XContentSource(indexRequest.source(),
XContentType.JSON));
return new IndexAction.Simulated(indexRequest.index(), indexRequest.type(), indexRequest.id(),
new XContentSource(indexRequest.source(), XContentType.JSON));
}
IndexResponse response = WatcherClientHelper.execute(ctx.watch(), client,
@ -121,14 +112,17 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
throw illegalState("could not execute action [{}] of watch [{}]. failed to index payload data. " +
"[_data] field must either hold a Map or an List/Array of Maps", actionId, ctx.watch().id());
}
Map<String, Object> doc = (Map<String, Object>) item;
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(action.index);
indexRequest.type(action.docType);
if (doc.containsKey(ID_FIELD)) {
if (doc.containsKey(INDEX_FIELD) || doc.containsKey(TYPE_FIELD) || doc.containsKey(ID_FIELD)) {
doc = mutableMap(doc);
indexRequest.id(doc.remove(ID_FIELD).toString());
}
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", doc, INDEX_FIELD, action.index));
indexRequest.type(getField(actionId, ctx.id().watchId(), "type",doc, TYPE_FIELD, action.docType));
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",doc, ID_FIELD, action.docId));
doc = addTimestampToDocument(doc, ctx.executionTime());
try (XContentBuilder builder = jsonBuilder()) {
indexRequest.source(builder.prettyPrint().map(doc));
@ -163,6 +157,24 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
return data;
}
/**
* Extracts the specified field out of data map, or alternative falls back to the action value
*/
private String getField(String actionId, String watchId, String name, Map<String, Object> data, String fieldName, String defaultValue) {
Object obj = data.remove(fieldName);
if (obj != null) {
if (defaultValue != null) {
throw illegalState("could not execute action [{}] of watch [{}]. " +
"[ctx.payload.{}] or [ctx.payload._doc.{}] were set together with action [{}] field. Only set one of them",
actionId, watchId, fieldName, fieldName, name);
} else {
return obj.toString();
}
}
return defaultValue;
}
/**
* Guarantees that the {@code data} is mutable for any code that needs to modify the {@linkplain Map} before using it (e.g., from
* singleton, immutable {@code Map}s).

View File

@ -25,14 +25,14 @@ public class IndexAction implements Action {
public static final String TYPE = "index";
final String index;
final String docType;
@Nullable final String docType;
@Nullable final String index;
@Nullable final String docId;
@Nullable final String executionTimeField;
@Nullable final TimeValue timeout;
@Nullable final DateTimeZone dynamicNameTimeZone;
public IndexAction(String index, String docType, @Nullable String docId,
public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone) {
this.index = index;
@ -89,8 +89,12 @@ public class IndexAction implements Action {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Field.INDEX.getPreferredName(), index);
builder.field(Field.DOC_TYPE.getPreferredName(), docType);
if (index != null) {
builder.field(Field.INDEX.getPreferredName(), index);
}
if (docType != null) {
builder.field(Field.DOC_TYPE.getPreferredName(), docType);
}
if (docId != null) {
builder.field(Field.DOC_ID.getPreferredName(), docId);
}
@ -144,12 +148,7 @@ public class IndexAction implements Action {
// Parser for human specified timeouts and 2.x compatibility
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT_HUMAN.toString());
} else if (Field.DYNAMIC_NAME_TIMEZONE.match(currentFieldName)) {
if (token == XContentParser.Token.VALUE_STRING) {
dynamicNameTimeZone = DateTimeZone.forID(parser.text());
} else {
throw new ElasticsearchParseException("could not parse [{}] action for watch [{}]. failed to parse [{}]. must be " +
"a string value (e.g. 'UTC' or '+01:00').", TYPE, watchId, currentFieldName);
}
dynamicNameTimeZone = DateTimeZone.forID(parser.text());
} else {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. unexpected string field [{}]", TYPE,
watchId, actionId, currentFieldName);
@ -160,16 +159,6 @@ public class IndexAction implements Action {
}
}
if (index == null) {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. missing required [{}] field", TYPE, watchId,
actionId, Field.INDEX.getPreferredName());
}
if (docType == null) {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. missing required [{}] field", TYPE, watchId,
actionId, Field.DOC_TYPE.getPreferredName());
}
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone);
}

View File

@ -61,6 +61,13 @@
}
]
}
},
{
"range": {
"timestamp": {
"gte": "now-2m"
}
}
}
]
}

View File

@ -54,6 +54,13 @@
}
]
}
},
{
"range": {
"timestamp": {
"gte": "now-2m"
}
}
}
]
}

View File

@ -51,6 +51,13 @@
"term": {
"type": "cluster_stats"
}
},
{
"range": {
"timestamp": {
"gte": "now-2m"
}
}
}
]
}

View File

@ -4,6 +4,8 @@
"settings": {
"xpack.watcher.template.version": "${xpack.watcher.template.version}",
"index.number_of_shards": 1,
"index.number_of_replicas": 0,
"index.auto_expand_replicas": "0-1",
"index.format": 6
},
"mappings": {

View File

@ -3,6 +3,8 @@
"order": 2147483647,
"settings": {
"index.number_of_shards": 1,
"index.number_of_replicas": 0,
"index.auto_expand_replicas": "0-1",
"index.format": 6,
"index.priority": 800
},

View File

@ -5,27 +5,35 @@
*/
package org.elasticsearch.xpack.watcher.actions.index;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.Action.Result.Status;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.joda.time.DateTime;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.util.ArrayList;
import java.util.Arrays;
@ -37,161 +45,36 @@ import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.common.util.set.Sets.newHashSet;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class IndexActionTests extends ESIntegTestCase {
public class IndexActionTests extends ESTestCase {
public void testIndexActionExecuteSingleDoc() throws Exception {
boolean customId = randomBoolean();
boolean docIdAsParam = customId && randomBoolean();
String docId = randomAlphaOfLength(5);
String timestampField = randomFrom("@timestamp", null);
boolean customTimestampField = timestampField != null;
private final Client client = mock(Client.class);
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
DateTime executionTime = DateTime.now(UTC);
Payload payload;
if (customId && docIdAsParam == false) {
// intentionally immutable because the other side needs to cut out _id
payload = new Payload.Simple("_doc", MapBuilder.newMapBuilder().put("foo", "bar").put("_id", docId).immutableMap());
} else {
payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", singletonMap("foo", "bar"));
}
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload);
Action.Result result = executable.execute("_id", ctx, ctx.payload());
assertThat(result.status(), equalTo(Status.SUCCESS));
assertThat(result, instanceOf(IndexAction.Result.class));
IndexAction.Result successResult = (IndexAction.Result) result;
XContentSource response = successResult.response();
assertThat(response.getValue("created"), equalTo((Object)Boolean.TRUE));
assertThat(response.getValue("version"), equalTo((Object) 1));
assertThat(response.getValue("type").toString(), equalTo("test-type"));
assertThat(response.getValue("index").toString(), equalTo("test-index"));
refresh(); //Manually refresh to make sure data is available
SearchRequestBuilder searchRequestbuilder = client().prepareSearch("test-index")
.setTypes("test-type")
.setSource(searchSource().query(matchAllQuery()));
if (customTimestampField) {
searchRequestbuilder.addAggregation(terms("timestamps").field(timestampField));
}
SearchResponse searchResponse = searchRequestbuilder.get();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
SearchHit hit = searchResponse.getHits().getAt(0);
if (customId) {
assertThat(hit.getId(), is(docId));
}
if (customTimestampField) {
assertThat(hit.getSourceAsMap().size(), is(2));
assertThat(hit.getSourceAsMap(), hasEntry("foo", (Object) "bar"));
assertThat(hit.getSourceAsMap(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime)));
Terms terms = searchResponse.getAggregations().get("timestamps");
assertThat(terms, notNullValue());
assertThat(terms.getBuckets(), hasSize(1));
assertThat(terms.getBuckets().get(0).getKeyAsNumber().longValue(), is(executionTime.getMillis()));
assertThat(terms.getBuckets().get(0).getDocCount(), is(1L));
} else {
assertThat(hit.getSourceAsMap().size(), is(1));
assertThat(hit.getSourceAsMap(), hasEntry("foo", (Object) "bar"));
}
}
public void testIndexActionExecuteMultiDoc() throws Exception {
String timestampField = randomFrom("@timestamp", null);
boolean customTimestampField = "@timestamp".equals(timestampField);
assertAcked(prepareCreate("test-index")
.addMapping("test-type", "foo", "type=keyword"));
List<Map> idList = Arrays.asList(
MapBuilder.newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap(),
MapBuilder.newMapBuilder().put("foo", "bar1").put("_id", "1").map()
);
Object list = randomFrom(
new Map[] { singletonMap("foo", "bar"), singletonMap("foo", "bar1") },
Arrays.asList(singletonMap("foo", "bar"), singletonMap("foo", "bar1")),
unmodifiableSet(newHashSet(singletonMap("foo", "bar"), singletonMap("foo", "bar1"))),
idList
);
boolean customId = list == idList;
IndexAction action = new IndexAction("test-index", "test-type", null, timestampField, null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
DateTime executionTime = DateTime.now(UTC);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("watch_id", executionTime, new Payload.Simple("_doc", list));
Action.Result result = executable.execute("watch_id", ctx, ctx.payload());
assertThat(result.status(), equalTo(Status.SUCCESS));
assertThat(result, instanceOf(IndexAction.Result.class));
IndexAction.Result successResult = (IndexAction.Result) result;
XContentSource response = successResult.response();
assertThat(successResult.toString(), response.getValue("0.created"), equalTo((Object)Boolean.TRUE));
assertThat(successResult.toString(), response.getValue("0.version"), equalTo((Object) 1));
assertThat(successResult.toString(), response.getValue("0.type").toString(), equalTo("test-type"));
assertThat(successResult.toString(), response.getValue("0.index").toString(), equalTo("test-index"));
assertThat(successResult.toString(), response.getValue("1.created"), equalTo((Object)Boolean.TRUE));
assertThat(successResult.toString(), response.getValue("1.version"), equalTo((Object) 1));
assertThat(successResult.toString(), response.getValue("1.type").toString(), equalTo("test-type"));
assertThat(successResult.toString(), response.getValue("1.index").toString(), equalTo("test-index"));
refresh(); //Manually refresh to make sure data is available
SearchResponse searchResponse = client().prepareSearch("test-index")
.setTypes("test-type")
.setSource(searchSource().sort("foo", SortOrder.ASC)
.query(matchAllQuery()))
.get();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(2L));
final int fields = customTimestampField ? 2 : 1;
for (int i = 0; i < 2; ++i) {
final SearchHit hit = searchResponse.getHits().getAt(i);
final String value = "bar" + (i != 0 ? i : "");
assertThat(hit.getSourceAsMap().size(), is(fields));
if (customId) {
assertThat(hit.getId(), is(Integer.toString(i)));
}
if (customTimestampField) {
assertThat(hit.getSourceAsMap(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime)));
}
assertThat(hit.getSourceAsMap(), hasEntry("foo", (Object) value));
}
@Before
public void setupClient() {
ThreadPool threadPool = mock(ThreadPool.class);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
when(threadPool.getThreadContext()).thenReturn(threadContext);
when(client.threadPool()).thenReturn(threadPool);
}
public void testParser() throws Exception {
String timestampField = randomBoolean() ? "@timestamp" : null;
XContentBuilder builder = jsonBuilder();
builder.startObject();
builder.field(IndexAction.Field.INDEX.getPreferredName(), "test-index");
boolean includeIndex = randomBoolean();
if (includeIndex) {
builder.field(IndexAction.Field.INDEX.getPreferredName(), "test-index");
}
builder.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type");
if (timestampField != null) {
builder.field(IndexAction.Field.EXECUTION_TIME_FIELD.getPreferredName(), timestampField);
@ -201,14 +84,16 @@ public class IndexActionTests extends ESIntegTestCase {
builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout.millis());
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client());
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
XContentParser parser = createParser(builder);
parser.nextToken();
ExecutableIndexAction executable = actionParser.parseExecutable(randomAlphaOfLength(5), randomAlphaOfLength(3), parser);
assertThat(executable.action().docType, equalTo("test-type"));
assertThat(executable.action().index, equalTo("test-index"));
if (includeIndex) {
assertThat(executable.action().index, equalTo("test-index"));
}
if (timestampField != null) {
assertThat(executable.action().executionTimeField, equalTo(timestampField));
}
@ -216,63 +101,40 @@ public class IndexActionTests extends ESIntegTestCase {
}
public void testParserFailure() throws Exception {
XContentBuilder builder = jsonBuilder();
boolean useIndex = randomBoolean();
boolean useType = randomBoolean();
builder.startObject();
{
if (useIndex) {
builder.field(IndexAction.Field.INDEX.getPreferredName(), "test-index");
}
if (useType) {
builder.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type");
}
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client());
XContentParser parser = createParser(builder);
parser.nextToken();
try {
actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser);
if (!(useIndex && useType)) {
fail();
}
} catch (ElasticsearchParseException iae) {
assertThat(useIndex && useType, equalTo(false));
}
// wrong type for field
expectParseFailure(jsonBuilder()
.startObject()
.field(IndexAction.Field.DOC_TYPE.getPreferredName(), 1234)
.endObject());
expectParseFailure(jsonBuilder()
.startObject()
.field(IndexAction.Field.TIMEOUT.getPreferredName(), "1234")
.endObject());
// unknown field
expectParseFailure(jsonBuilder()
.startObject()
.field("unknown", "whatever")
.endObject());
expectParseFailure(jsonBuilder()
.startObject()
.field("unknown", 1234)
.endObject());
}
// https://github.com/elastic/x-pack/issues/4416
public void testIndexingWithWrongMappingReturnsFailureResult() throws Exception {
// index a document to set the mapping of the foo field to a boolean
client().prepareIndex("test-index", "test-type", "_id").setSource("foo", true)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(),
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
List<Map<String, Object>> docs = new ArrayList<>();
boolean addSuccessfulIndexedDoc = randomBoolean();
if (addSuccessfulIndexedDoc) {
docs.add(Collections.singletonMap("foo", randomBoolean()));
}
docs.add(Collections.singletonMap("foo", Collections.singletonMap("foo", "bar")));
Payload payload = new Payload.Simple(Collections.singletonMap("_doc", docs));
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", DateTime.now(UTC), payload);
Action.Result result = executable.execute("_id", ctx, payload);
if (addSuccessfulIndexedDoc) {
assertThat(result.status(), is(Status.PARTIAL_FAILURE));
} else {
assertThat(result.status(), is(Status.FAILURE));
}
private void expectParseFailure(XContentBuilder builder) throws Exception {
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
XContentParser parser = createParser(builder);
parser.nextToken();
expectThrows(ElasticsearchParseException.class, () ->
actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
}
public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() {
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(),
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
final Map<String, Object> docWithId = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap();
final DateTime executionTime = DateTime.now(UTC);
@ -301,4 +163,184 @@ public class IndexActionTests extends ESIntegTestCase {
executable.execute("_id", ctx, ctx.payload());
});
}
public void testThatIndexTypeIdDynamically() throws Exception {
boolean configureIndexDynamically = randomBoolean();
boolean configureTypeDynamically = randomBoolean();
boolean configureIdDynamically = (configureTypeDynamically == false && configureIndexDynamically == false) || randomBoolean();
MapBuilder<String, Object> builder = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar");
if (configureIdDynamically) {
builder.put("_id", "my_dynamic_id");
}
if (configureTypeDynamically) {
builder.put("_type", "my_dynamic_type");
}
if (configureIndexDynamically) {
builder.put("_index", "my_dynamic_index");
}
final IndexAction action = new IndexAction(configureIndexDynamically ? null : "my_index",
configureTypeDynamically ? null : "my_type",
configureIdDynamically ? null : "my_id",
null, null, null);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", new Payload.Simple(builder.immutableMap()));
ArgumentCaptor<IndexRequest> captor = ArgumentCaptor.forClass(IndexRequest.class);
PlainActionFuture<IndexResponse> listener = PlainActionFuture.newFuture();
listener.onResponse(new IndexResponse(new ShardId(new Index("foo", "bar"), 0), "whatever", "whatever", 1, 1, 1, true));
when(client.index(captor.capture())).thenReturn(listener);
Action.Result result = executable.execute("_id", ctx, ctx.payload());
assertThat(result.status(), is(Status.SUCCESS));
assertThat(captor.getAllValues(), hasSize(1));
assertThat(captor.getValue().index(), is(configureIndexDynamically ? "my_dynamic_index" : "my_index"));
assertThat(captor.getValue().type(), is(configureTypeDynamically ? "my_dynamic_type" : "my_type"));
assertThat(captor.getValue().id(), is(configureIdDynamically ? "my_dynamic_id" : "my_id"));
}
public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() throws Exception {
final IndexAction action = new IndexAction(null, "my-type", null, null, null, null);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
final Map<String, Object> docWithIndex = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar")
.put("_index", "my-index").immutableMap();
final Map<String, Object> docWithOtherIndex = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar")
.put("_index", "my-other-index").immutableMap();
final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id",
new Payload.Simple("_doc", Arrays.asList(docWithIndex, docWithOtherIndex)));
ArgumentCaptor<BulkRequest> captor = ArgumentCaptor.forClass(BulkRequest.class);
PlainActionFuture<BulkResponse> listener = PlainActionFuture.newFuture();
IndexResponse indexResponse = new IndexResponse(new ShardId(new Index("foo", "bar"), 0), "whatever", "whatever", 1, 1, 1, true);
BulkItemResponse response = new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, indexResponse);
BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]{response}, 1);
listener.onResponse(bulkResponse);
when(client.bulk(captor.capture())).thenReturn(listener);
Action.Result result = executable.execute("_id", ctx, ctx.payload());
assertThat(result.status(), is(Status.SUCCESS));
assertThat(captor.getAllValues(), hasSize(1));
assertThat(captor.getValue().requests(), hasSize(2));
assertThat(captor.getValue().requests().get(0).type(), is("my-type"));
assertThat(captor.getValue().requests().get(0).index(), is("my-index"));
assertThat(captor.getValue().requests().get(1).type(), is("my-type"));
assertThat(captor.getValue().requests().get(1).index(), is("my-other-index"));
}
public void testConfigureIndexInMapAndAction() {
String fieldName = randomFrom("_index", "_type");
final IndexAction action = new IndexAction(fieldName.equals("_index") ? "my_index" : null,
fieldName.equals("_type") ? "my_type" : null,
null,null, null, null);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
final Map<String, Object> docWithIndex = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar")
.put(fieldName, "my-value").immutableMap();
final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id",
new Payload.Simple("_doc", Collections.singletonList(docWithIndex)));
IllegalStateException e = expectThrows(IllegalStateException.class, () -> executable.execute("_id", ctx, ctx.payload()));
assertThat(e.getMessage(), startsWith("could not execute action [_id] of watch [_id]. [ctx.payload." +
fieldName + "] or [ctx.payload._doc." + fieldName + "]"));
}
public void testIndexActionExecuteSingleDoc() throws Exception {
boolean customId = randomBoolean();
boolean docIdAsParam = customId && randomBoolean();
String docId = randomAlphaOfLength(5);
String timestampField = randomFrom("@timestamp", null);
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
DateTime executionTime = DateTime.now(UTC);
Payload payload;
if (customId && docIdAsParam == false) {
// intentionally immutable because the other side needs to cut out _id
payload = new Payload.Simple("_doc", MapBuilder.newMapBuilder().put("foo", "bar").put("_id", docId).immutableMap());
} else {
payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", singletonMap("foo", "bar"));
}
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload);
ArgumentCaptor<IndexRequest> captor = ArgumentCaptor.forClass(IndexRequest.class);
PlainActionFuture<IndexResponse> listener = PlainActionFuture.newFuture();
listener.onResponse(new IndexResponse(new ShardId(new Index("test-index", "uuid"), 0), "test-type", docId, 1, 1, 1, true));
when(client.index(captor.capture())).thenReturn(listener);
Action.Result result = executable.execute("_id", ctx, ctx.payload());
assertThat(result.status(), equalTo(Status.SUCCESS));
assertThat(result, instanceOf(IndexAction.Result.class));
IndexAction.Result successResult = (IndexAction.Result) result;
XContentSource response = successResult.response();
assertThat(response.getValue("created"), equalTo((Object)Boolean.TRUE));
assertThat(response.getValue("version"), equalTo((Object) 1));
assertThat(response.getValue("type").toString(), equalTo("test-type"));
assertThat(response.getValue("index").toString(), equalTo("test-index"));
assertThat(captor.getAllValues(), hasSize(1));
IndexRequest indexRequest = captor.getValue();
assertThat(indexRequest.sourceAsMap(), is(hasEntry("foo", "bar")));
if (customId) {
assertThat(indexRequest.id(), is(docId));
}
if (timestampField != null) {
assertThat(indexRequest.sourceAsMap().keySet(), is(hasSize(2)));
assertThat(indexRequest.sourceAsMap(), hasEntry(timestampField, executionTime.toString()));
} else {
assertThat(indexRequest.sourceAsMap().keySet(), is(hasSize(1)));
}
}
public void testFailureResult() throws Exception {
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
// should the result resemble a failure or a partial failure
boolean isPartialFailure = randomBoolean();
List<Map<String, Object>> docs = new ArrayList<>();
docs.add(Collections.singletonMap("foo", Collections.singletonMap("foo", "bar")));
docs.add(Collections.singletonMap("foo", Collections.singletonMap("foo", "bar")));
Payload payload = new Payload.Simple(Collections.singletonMap("_doc", docs));
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", DateTime.now(UTC), payload);
ArgumentCaptor<BulkRequest> captor = ArgumentCaptor.forClass(BulkRequest.class);
PlainActionFuture<BulkResponse> listener = PlainActionFuture.newFuture();
BulkItemResponse.Failure failure = new BulkItemResponse.Failure("test-index", "test-type", "anything",
new ElasticsearchException("anything"));
BulkItemResponse firstResponse = new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, failure);
BulkItemResponse secondResponse;
if (isPartialFailure) {
ShardId shardId = new ShardId(new Index("foo", "bar"), 0);
IndexResponse indexResponse = new IndexResponse(shardId, "whatever", "whatever", 1, 1, 1, true);
secondResponse = new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, indexResponse);
} else {
secondResponse = new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, failure);
}
BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]{firstResponse, secondResponse}, 1);
listener.onResponse(bulkResponse);
when(client.bulk(captor.capture())).thenReturn(listener);
Action.Result result = executable.execute("_id", ctx, payload);
if (isPartialFailure) {
assertThat(result.status(), is(Status.PARTIAL_FAILURE));
} else {
assertThat(result.status(), is(Status.FAILURE));
}
}
}

View File

@ -15,6 +15,8 @@ import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.XPackSingleNodeTestCase;
import org.elasticsearch.xpack.watcher.WatcherService;
import org.elasticsearch.xpack.watcher.WatcherState;
import org.elasticsearch.xpack.watcher.actions.hipchat.HipChatAction;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
@ -127,6 +129,8 @@ public class HipChatServiceTests extends XPackSingleNodeTestCase {
}
public void testWatchWithHipChatAction() throws Exception {
assertBusy(() -> assertThat(getInstanceFromNode(WatcherService.class).state(), is(WatcherState.STARTED)));
HipChatAccount.Profile profile = randomFrom(HipChatAccount.Profile.values());
HipChatMessage.Color color = randomFrom(HipChatMessage.Color.values());
String account;