diff --git a/docs/en/ml/limitations.asciidoc b/docs/en/ml/limitations.asciidoc index 60fc7cafa5f..9a68ce026a2 100644 --- a/docs/en/ml/limitations.asciidoc +++ b/docs/en/ml/limitations.asciidoc @@ -157,3 +157,17 @@ poorer precision worthwhile. If you want to view or change the aggregations that are used in your job, refer to the `aggregations` property in your {dfeed}. For more information, see {ref}/ml-datafeed-resource.html[Datafeed Resources]. + +[float] +=== Security Integration + +When {security} is enabled, a {dfeed} stores the roles of the user who created +or updated the {dfeed} **at that time**. This means that if those roles are +updated then the {dfeed} will subsequently run with the new permissions associated +with the roles. However, if the user's roles are adjusted after creating or +updating the {dfeed} then the {dfeed} will continue to run with the permissions +associated with the original roles. + +A way to update the roles stored within the {dfeed} without changing any other +settings is to submit an empty JSON document (`{}`) to the +{ref}/ml-update-datafeed.html[update {dfeed} API]. diff --git a/docs/en/rest-api/ml/preview-datafeed.asciidoc b/docs/en/rest-api/ml/preview-datafeed.asciidoc index f3012170631..dfb402efa92 100644 --- a/docs/en/rest-api/ml/preview-datafeed.asciidoc +++ b/docs/en/rest-api/ml/preview-datafeed.asciidoc @@ -34,6 +34,17 @@ privileges to use this API. For more information, see //<>. +==== Security Integration + +When {security} is enabled, the {dfeed} query will be previewed using the +credentials of the user calling the preview {dfeed} API. When the {dfeed} +is started it will run the query using the roles of the last user to +create or update it. If the two sets of roles differ then the preview may +not accurately reflect what the {dfeed} will return when started. To avoid +such problems, the same user that creates/updates the {dfeed} should preview +it to ensure it is returning the expected data. + + ==== Examples The following example obtains a preview of the `datafeed-farequote` {dfeed}: diff --git a/docs/en/rest-api/ml/put-datafeed.asciidoc b/docs/en/rest-api/ml/put-datafeed.asciidoc index 9ef0948a88b..2e367273690 100644 --- a/docs/en/rest-api/ml/put-datafeed.asciidoc +++ b/docs/en/rest-api/ml/put-datafeed.asciidoc @@ -87,6 +87,13 @@ For more information, see {xpack-ref}/security-privileges.html[Security Privileges]. //<>. + +==== Security Integration + +When {security} is enabled, your {dfeed} will remember which roles the user who +created it had at the time of creation, and run the query using those same roles. + + ==== Examples The following example creates the `datafeed-total-requests` {dfeed}: diff --git a/docs/en/rest-api/ml/start-datafeed.asciidoc b/docs/en/rest-api/ml/start-datafeed.asciidoc index 147186fbfc1..801984deac0 100644 --- a/docs/en/rest-api/ml/start-datafeed.asciidoc +++ b/docs/en/rest-api/ml/start-datafeed.asciidoc @@ -81,6 +81,13 @@ For more information, see //<>. +==== Security Integration + +When {security} is enabled, your {dfeed} will remember which roles the last +user to create or update it had at the time of creation/update, and run the query +using those same roles. + + ==== Examples The following example starts the `datafeed-it-ops-kpi` {dfeed}: diff --git a/docs/en/rest-api/ml/update-datafeed.asciidoc b/docs/en/rest-api/ml/update-datafeed.asciidoc index a17b7892534..560f3c82ca2 100644 --- a/docs/en/rest-api/ml/update-datafeed.asciidoc +++ b/docs/en/rest-api/ml/update-datafeed.asciidoc @@ -82,6 +82,13 @@ For more information, see {xpack-ref}/security-privileges.html[Security Privileges]. //<>. + +==== Security Integration + +When {security} is enabled, your {dfeed} will remember which roles the user who +updated it had at the time of update, and run the query using those same roles. + + ==== Examples The following example updates the query for the `datafeed-it-ops-kpi` {dfeed} diff --git a/docs/en/watcher/actions/index.asciidoc b/docs/en/watcher/actions/index.asciidoc index 2585d50e090..f003da74b99 100644 --- a/docs/en/watcher/actions/index.asciidoc +++ b/docs/en/watcher/actions/index.asciidoc @@ -71,5 +71,5 @@ When a `_doc` field exists, if the field holds an object, it is extracted and in as a single document. If the field holds an array of objects, each object is treated as a document and the index action indexes all of them in a bulk. -An `_id` value can be added per document to dynamically set the ID of the indexed -document. +An `_index`, `_type` or `_id` value can be added per document to dynamically set the ID +of the indexed document. diff --git a/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java b/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java index e5c33c782eb..4a51aace3ac 100644 --- a/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java +++ b/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java @@ -49,7 +49,7 @@ public class XPackLicenseState { "Machine learning APIs are disabled" }); messages.put(XPackPlugin.LOGSTASH, new String[] { - "Logstash specific APIs are disabled. You can continue to manage and poll stored configurations" + "Logstash will continue to poll centrally-managed pipelines" }); messages.put(XPackPlugin.DEPRECATION, new String[] { "Deprecation APIs are disabled" @@ -201,8 +201,7 @@ public class XPackLicenseState { case STANDARD: case GOLD: case PLATINUM: - return new String[] { "Logstash specific APIs will be disabled, but you can continue to manage " + - "and poll stored configurations" }; + return new String[] { "Logstash will no longer poll for centrally-managed pipelines" }; } break; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java index 4e05e9cab6c..ebeec8b9641 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java @@ -38,6 +38,8 @@ import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState; public class ExecutableIndexAction extends ExecutableAction { + private static final String INDEX_FIELD = "_index"; + private static final String TYPE_FIELD = "_type"; private static final String ID_FIELD = "_id"; private final Client client; @@ -71,24 +73,13 @@ public class ExecutableIndexAction extends ExecutableAction { } } - String docId = action.docId; - - // prevent double-setting id - if (data.containsKey(ID_FIELD)) { - if (docId != null) { - throw illegalState("could not execute action [{}] of watch [{}]. " + - "[ctx.payload.{}] or [ctx.payload._doc.{}] were set with [doc_id]. Only set [{}] or [doc_id]", - actionId, ctx.watch().id(), ID_FIELD, ID_FIELD, ID_FIELD); - } - + if (data.containsKey(INDEX_FIELD) || data.containsKey(TYPE_FIELD) || data.containsKey(ID_FIELD)) { data = mutableMap(data); - docId = data.remove(ID_FIELD).toString(); } - IndexRequest indexRequest = new IndexRequest(); - indexRequest.index(action.index); - indexRequest.type(action.docType); - indexRequest.id(docId); + indexRequest.index(getField(actionId, ctx.id().watchId(), "index", data, INDEX_FIELD, action.index)); + indexRequest.type(getField(actionId, ctx.id().watchId(), "type",data, TYPE_FIELD, action.docType)); + indexRequest.id(getField(actionId, ctx.id().watchId(), "id",data, ID_FIELD, action.docId)); data = addTimestampToDocument(data, ctx.executionTime()); BytesReference bytesReference; @@ -97,8 +88,8 @@ public class ExecutableIndexAction extends ExecutableAction { } if (ctx.simulateAction(actionId)) { - return new IndexAction.Simulated(indexRequest.index(), action.docType, docId, new XContentSource(indexRequest.source(), - XContentType.JSON)); + return new IndexAction.Simulated(indexRequest.index(), indexRequest.type(), indexRequest.id(), + new XContentSource(indexRequest.source(), XContentType.JSON)); } IndexResponse response = WatcherClientHelper.execute(ctx.watch(), client, @@ -121,14 +112,17 @@ public class ExecutableIndexAction extends ExecutableAction { throw illegalState("could not execute action [{}] of watch [{}]. failed to index payload data. " + "[_data] field must either hold a Map or an List/Array of Maps", actionId, ctx.watch().id()); } + Map doc = (Map) item; - IndexRequest indexRequest = new IndexRequest(); - indexRequest.index(action.index); - indexRequest.type(action.docType); - if (doc.containsKey(ID_FIELD)) { + if (doc.containsKey(INDEX_FIELD) || doc.containsKey(TYPE_FIELD) || doc.containsKey(ID_FIELD)) { doc = mutableMap(doc); - indexRequest.id(doc.remove(ID_FIELD).toString()); } + + IndexRequest indexRequest = new IndexRequest(); + indexRequest.index(getField(actionId, ctx.id().watchId(), "index", doc, INDEX_FIELD, action.index)); + indexRequest.type(getField(actionId, ctx.id().watchId(), "type",doc, TYPE_FIELD, action.docType)); + indexRequest.id(getField(actionId, ctx.id().watchId(), "id",doc, ID_FIELD, action.docId)); + doc = addTimestampToDocument(doc, ctx.executionTime()); try (XContentBuilder builder = jsonBuilder()) { indexRequest.source(builder.prettyPrint().map(doc)); @@ -163,6 +157,24 @@ public class ExecutableIndexAction extends ExecutableAction { return data; } + /** + * Extracts the specified field out of data map, or alternative falls back to the action value + */ + private String getField(String actionId, String watchId, String name, Map data, String fieldName, String defaultValue) { + Object obj = data.remove(fieldName); + if (obj != null) { + if (defaultValue != null) { + throw illegalState("could not execute action [{}] of watch [{}]. " + + "[ctx.payload.{}] or [ctx.payload._doc.{}] were set together with action [{}] field. Only set one of them", + actionId, watchId, fieldName, fieldName, name); + } else { + return obj.toString(); + } + } + + return defaultValue; + } + /** * Guarantees that the {@code data} is mutable for any code that needs to modify the {@linkplain Map} before using it (e.g., from * singleton, immutable {@code Map}s). diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java index 1ffa9ccc5df..f838c88dad3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java @@ -25,14 +25,14 @@ public class IndexAction implements Action { public static final String TYPE = "index"; - final String index; - final String docType; + @Nullable final String docType; + @Nullable final String index; @Nullable final String docId; @Nullable final String executionTimeField; @Nullable final TimeValue timeout; @Nullable final DateTimeZone dynamicNameTimeZone; - public IndexAction(String index, String docType, @Nullable String docId, + public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId, @Nullable String executionTimeField, @Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone) { this.index = index; @@ -89,8 +89,12 @@ public class IndexAction implements Action { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(Field.INDEX.getPreferredName(), index); - builder.field(Field.DOC_TYPE.getPreferredName(), docType); + if (index != null) { + builder.field(Field.INDEX.getPreferredName(), index); + } + if (docType != null) { + builder.field(Field.DOC_TYPE.getPreferredName(), docType); + } if (docId != null) { builder.field(Field.DOC_ID.getPreferredName(), docId); } @@ -144,12 +148,7 @@ public class IndexAction implements Action { // Parser for human specified timeouts and 2.x compatibility timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT_HUMAN.toString()); } else if (Field.DYNAMIC_NAME_TIMEZONE.match(currentFieldName)) { - if (token == XContentParser.Token.VALUE_STRING) { - dynamicNameTimeZone = DateTimeZone.forID(parser.text()); - } else { - throw new ElasticsearchParseException("could not parse [{}] action for watch [{}]. failed to parse [{}]. must be " + - "a string value (e.g. 'UTC' or '+01:00').", TYPE, watchId, currentFieldName); - } + dynamicNameTimeZone = DateTimeZone.forID(parser.text()); } else { throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. unexpected string field [{}]", TYPE, watchId, actionId, currentFieldName); @@ -160,16 +159,6 @@ public class IndexAction implements Action { } } - if (index == null) { - throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. missing required [{}] field", TYPE, watchId, - actionId, Field.INDEX.getPreferredName()); - } - - if (docType == null) { - throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. missing required [{}] field", TYPE, watchId, - actionId, Field.DOC_TYPE.getPreferredName()); - } - return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone); } diff --git a/plugin/src/main/resources/monitoring/watches/elasticsearch_cluster_status.json b/plugin/src/main/resources/monitoring/watches/elasticsearch_cluster_status.json index e5639c0bb1d..c0a13ea63a6 100644 --- a/plugin/src/main/resources/monitoring/watches/elasticsearch_cluster_status.json +++ b/plugin/src/main/resources/monitoring/watches/elasticsearch_cluster_status.json @@ -61,6 +61,13 @@ } ] } + }, + { + "range": { + "timestamp": { + "gte": "now-2m" + } + } } ] } diff --git a/plugin/src/main/resources/monitoring/watches/elasticsearch_version_mismatch.json b/plugin/src/main/resources/monitoring/watches/elasticsearch_version_mismatch.json index 04328cdddb3..7e18c981f0f 100644 --- a/plugin/src/main/resources/monitoring/watches/elasticsearch_version_mismatch.json +++ b/plugin/src/main/resources/monitoring/watches/elasticsearch_version_mismatch.json @@ -54,6 +54,13 @@ } ] } + }, + { + "range": { + "timestamp": { + "gte": "now-2m" + } + } } ] } diff --git a/plugin/src/main/resources/monitoring/watches/xpack_license_expiration.json b/plugin/src/main/resources/monitoring/watches/xpack_license_expiration.json index 2fa2a06249c..a05198a15eb 100644 --- a/plugin/src/main/resources/monitoring/watches/xpack_license_expiration.json +++ b/plugin/src/main/resources/monitoring/watches/xpack_license_expiration.json @@ -51,6 +51,13 @@ "term": { "type": "cluster_stats" } + }, + { + "range": { + "timestamp": { + "gte": "now-2m" + } + } } ] } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java index bbcf979a19b..f158d44997d 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java @@ -5,27 +5,35 @@ */ package org.elasticsearch.xpack.watcher.actions.index; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.watcher.actions.Action; import org.elasticsearch.xpack.watcher.actions.Action.Result.Status; import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; -import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.watcher.test.WatcherTestUtils; import org.elasticsearch.xpack.watcher.watch.Payload; import org.joda.time.DateTime; +import org.junit.Before; +import org.mockito.ArgumentCaptor; import java.util.ArrayList; import java.util.Arrays; @@ -37,161 +45,36 @@ import static java.util.Collections.singletonMap; import static java.util.Collections.unmodifiableSet; import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; -import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; -import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; import static org.joda.time.DateTimeZone.UTC; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -public class IndexActionTests extends ESIntegTestCase { +public class IndexActionTests extends ESTestCase { - public void testIndexActionExecuteSingleDoc() throws Exception { - boolean customId = randomBoolean(); - boolean docIdAsParam = customId && randomBoolean(); - String docId = randomAlphaOfLength(5); - String timestampField = randomFrom("@timestamp", null); - boolean customTimestampField = timestampField != null; + private final Client client = mock(Client.class); - IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null); - ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30), - TimeValue.timeValueSeconds(30)); - DateTime executionTime = DateTime.now(UTC); - Payload payload; - - if (customId && docIdAsParam == false) { - // intentionally immutable because the other side needs to cut out _id - payload = new Payload.Simple("_doc", MapBuilder.newMapBuilder().put("foo", "bar").put("_id", docId).immutableMap()); - } else { - payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", singletonMap("foo", "bar")); - } - - WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload); - - Action.Result result = executable.execute("_id", ctx, ctx.payload()); - - assertThat(result.status(), equalTo(Status.SUCCESS)); - assertThat(result, instanceOf(IndexAction.Result.class)); - IndexAction.Result successResult = (IndexAction.Result) result; - XContentSource response = successResult.response(); - assertThat(response.getValue("created"), equalTo((Object)Boolean.TRUE)); - assertThat(response.getValue("version"), equalTo((Object) 1)); - assertThat(response.getValue("type").toString(), equalTo("test-type")); - assertThat(response.getValue("index").toString(), equalTo("test-index")); - - refresh(); //Manually refresh to make sure data is available - - SearchRequestBuilder searchRequestbuilder = client().prepareSearch("test-index") - .setTypes("test-type") - .setSource(searchSource().query(matchAllQuery())); - - if (customTimestampField) { - searchRequestbuilder.addAggregation(terms("timestamps").field(timestampField)); - } - - SearchResponse searchResponse = searchRequestbuilder.get(); - - assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L)); - SearchHit hit = searchResponse.getHits().getAt(0); - - if (customId) { - assertThat(hit.getId(), is(docId)); - } - - if (customTimestampField) { - assertThat(hit.getSourceAsMap().size(), is(2)); - assertThat(hit.getSourceAsMap(), hasEntry("foo", (Object) "bar")); - assertThat(hit.getSourceAsMap(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime))); - - Terms terms = searchResponse.getAggregations().get("timestamps"); - assertThat(terms, notNullValue()); - assertThat(terms.getBuckets(), hasSize(1)); - assertThat(terms.getBuckets().get(0).getKeyAsNumber().longValue(), is(executionTime.getMillis())); - assertThat(terms.getBuckets().get(0).getDocCount(), is(1L)); - } else { - assertThat(hit.getSourceAsMap().size(), is(1)); - assertThat(hit.getSourceAsMap(), hasEntry("foo", (Object) "bar")); - } - } - - public void testIndexActionExecuteMultiDoc() throws Exception { - String timestampField = randomFrom("@timestamp", null); - boolean customTimestampField = "@timestamp".equals(timestampField); - - assertAcked(prepareCreate("test-index") - .addMapping("test-type", "foo", "type=keyword")); - - List idList = Arrays.asList( - MapBuilder.newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap(), - MapBuilder.newMapBuilder().put("foo", "bar1").put("_id", "1").map() - ); - - Object list = randomFrom( - new Map[] { singletonMap("foo", "bar"), singletonMap("foo", "bar1") }, - Arrays.asList(singletonMap("foo", "bar"), singletonMap("foo", "bar1")), - unmodifiableSet(newHashSet(singletonMap("foo", "bar"), singletonMap("foo", "bar1"))), - idList - ); - - boolean customId = list == idList; - - IndexAction action = new IndexAction("test-index", "test-type", null, timestampField, null, null); - ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30), - TimeValue.timeValueSeconds(30)); - DateTime executionTime = DateTime.now(UTC); - WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("watch_id", executionTime, new Payload.Simple("_doc", list)); - - Action.Result result = executable.execute("watch_id", ctx, ctx.payload()); - - assertThat(result.status(), equalTo(Status.SUCCESS)); - assertThat(result, instanceOf(IndexAction.Result.class)); - IndexAction.Result successResult = (IndexAction.Result) result; - XContentSource response = successResult.response(); - assertThat(successResult.toString(), response.getValue("0.created"), equalTo((Object)Boolean.TRUE)); - assertThat(successResult.toString(), response.getValue("0.version"), equalTo((Object) 1)); - assertThat(successResult.toString(), response.getValue("0.type").toString(), equalTo("test-type")); - assertThat(successResult.toString(), response.getValue("0.index").toString(), equalTo("test-index")); - assertThat(successResult.toString(), response.getValue("1.created"), equalTo((Object)Boolean.TRUE)); - assertThat(successResult.toString(), response.getValue("1.version"), equalTo((Object) 1)); - assertThat(successResult.toString(), response.getValue("1.type").toString(), equalTo("test-type")); - assertThat(successResult.toString(), response.getValue("1.index").toString(), equalTo("test-index")); - - refresh(); //Manually refresh to make sure data is available - - SearchResponse searchResponse = client().prepareSearch("test-index") - .setTypes("test-type") - .setSource(searchSource().sort("foo", SortOrder.ASC) - .query(matchAllQuery())) - .get(); - - assertThat(searchResponse.getHits().getTotalHits(), equalTo(2L)); - final int fields = customTimestampField ? 2 : 1; - for (int i = 0; i < 2; ++i) { - final SearchHit hit = searchResponse.getHits().getAt(i); - final String value = "bar" + (i != 0 ? i : ""); - - assertThat(hit.getSourceAsMap().size(), is(fields)); - - if (customId) { - assertThat(hit.getId(), is(Integer.toString(i))); - } - if (customTimestampField) { - assertThat(hit.getSourceAsMap(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime))); - } - assertThat(hit.getSourceAsMap(), hasEntry("foo", (Object) value)); - } + @Before + public void setupClient() { + ThreadPool threadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + when(client.threadPool()).thenReturn(threadPool); } public void testParser() throws Exception { String timestampField = randomBoolean() ? "@timestamp" : null; XContentBuilder builder = jsonBuilder(); builder.startObject(); - builder.field(IndexAction.Field.INDEX.getPreferredName(), "test-index"); + boolean includeIndex = randomBoolean(); + if (includeIndex) { + builder.field(IndexAction.Field.INDEX.getPreferredName(), "test-index"); + } builder.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type"); if (timestampField != null) { builder.field(IndexAction.Field.EXECUTION_TIME_FIELD.getPreferredName(), timestampField); @@ -201,14 +84,16 @@ public class IndexActionTests extends ESIntegTestCase { builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout.millis()); } builder.endObject(); - IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client()); + IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client); XContentParser parser = createParser(builder); parser.nextToken(); ExecutableIndexAction executable = actionParser.parseExecutable(randomAlphaOfLength(5), randomAlphaOfLength(3), parser); assertThat(executable.action().docType, equalTo("test-type")); - assertThat(executable.action().index, equalTo("test-index")); + if (includeIndex) { + assertThat(executable.action().index, equalTo("test-index")); + } if (timestampField != null) { assertThat(executable.action().executionTimeField, equalTo(timestampField)); } @@ -216,63 +101,40 @@ public class IndexActionTests extends ESIntegTestCase { } public void testParserFailure() throws Exception { - XContentBuilder builder = jsonBuilder(); - boolean useIndex = randomBoolean(); - boolean useType = randomBoolean(); - builder.startObject(); - { - if (useIndex) { - builder.field(IndexAction.Field.INDEX.getPreferredName(), "test-index"); - } - if (useType) { - builder.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type"); - } - } - builder.endObject(); - IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client()); - XContentParser parser = createParser(builder); - parser.nextToken(); - try { - actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser); - if (!(useIndex && useType)) { - fail(); - } - } catch (ElasticsearchParseException iae) { - assertThat(useIndex && useType, equalTo(false)); - } + // wrong type for field + expectParseFailure(jsonBuilder() + .startObject() + .field(IndexAction.Field.DOC_TYPE.getPreferredName(), 1234) + .endObject()); + + expectParseFailure(jsonBuilder() + .startObject() + .field(IndexAction.Field.TIMEOUT.getPreferredName(), "1234") + .endObject()); + + // unknown field + expectParseFailure(jsonBuilder() + .startObject() + .field("unknown", "whatever") + .endObject()); + + expectParseFailure(jsonBuilder() + .startObject() + .field("unknown", 1234) + .endObject()); } - // https://github.com/elastic/x-pack/issues/4416 - public void testIndexingWithWrongMappingReturnsFailureResult() throws Exception { - // index a document to set the mapping of the foo field to a boolean - client().prepareIndex("test-index", "test-type", "_id").setSource("foo", true) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - - IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null); - ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), - TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); - - List> docs = new ArrayList<>(); - boolean addSuccessfulIndexedDoc = randomBoolean(); - if (addSuccessfulIndexedDoc) { - docs.add(Collections.singletonMap("foo", randomBoolean())); - } - docs.add(Collections.singletonMap("foo", Collections.singletonMap("foo", "bar"))); - Payload payload = new Payload.Simple(Collections.singletonMap("_doc", docs)); - - WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", DateTime.now(UTC), payload); - - Action.Result result = executable.execute("_id", ctx, payload); - if (addSuccessfulIndexedDoc) { - assertThat(result.status(), is(Status.PARTIAL_FAILURE)); - } else { - assertThat(result.status(), is(Status.FAILURE)); - } + private void expectParseFailure(XContentBuilder builder) throws Exception { + IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client); + XContentParser parser = createParser(builder); + parser.nextToken(); + expectThrows(ElasticsearchParseException.class, () -> + actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser)); } public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() { final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null); - final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), + final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); final Map docWithId = MapBuilder.newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap(); final DateTime executionTime = DateTime.now(UTC); @@ -301,4 +163,184 @@ public class IndexActionTests extends ESIntegTestCase { executable.execute("_id", ctx, ctx.payload()); }); } + + public void testThatIndexTypeIdDynamically() throws Exception { + boolean configureIndexDynamically = randomBoolean(); + boolean configureTypeDynamically = randomBoolean(); + boolean configureIdDynamically = (configureTypeDynamically == false && configureIndexDynamically == false) || randomBoolean(); + + MapBuilder builder = MapBuilder.newMapBuilder().put("foo", "bar"); + if (configureIdDynamically) { + builder.put("_id", "my_dynamic_id"); + } + if (configureTypeDynamically) { + builder.put("_type", "my_dynamic_type"); + } + if (configureIndexDynamically) { + builder.put("_index", "my_dynamic_index"); + } + + final IndexAction action = new IndexAction(configureIndexDynamically ? null : "my_index", + configureTypeDynamically ? null : "my_type", + configureIdDynamically ? null : "my_id", + null, null, null); + final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, + TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); + + final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", new Payload.Simple(builder.immutableMap())); + + ArgumentCaptor captor = ArgumentCaptor.forClass(IndexRequest.class); + PlainActionFuture listener = PlainActionFuture.newFuture(); + listener.onResponse(new IndexResponse(new ShardId(new Index("foo", "bar"), 0), "whatever", "whatever", 1, 1, 1, true)); + when(client.index(captor.capture())).thenReturn(listener); + Action.Result result = executable.execute("_id", ctx, ctx.payload()); + + assertThat(result.status(), is(Status.SUCCESS)); + assertThat(captor.getAllValues(), hasSize(1)); + + assertThat(captor.getValue().index(), is(configureIndexDynamically ? "my_dynamic_index" : "my_index")); + assertThat(captor.getValue().type(), is(configureTypeDynamically ? "my_dynamic_type" : "my_type")); + assertThat(captor.getValue().id(), is(configureIdDynamically ? "my_dynamic_id" : "my_id")); + } + + public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() throws Exception { + final IndexAction action = new IndexAction(null, "my-type", null, null, null, null); + final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, + TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); + + final Map docWithIndex = MapBuilder.newMapBuilder().put("foo", "bar") + .put("_index", "my-index").immutableMap(); + final Map docWithOtherIndex = MapBuilder.newMapBuilder().put("foo", "bar") + .put("_index", "my-other-index").immutableMap(); + final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", + new Payload.Simple("_doc", Arrays.asList(docWithIndex, docWithOtherIndex))); + + ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); + PlainActionFuture listener = PlainActionFuture.newFuture(); + IndexResponse indexResponse = new IndexResponse(new ShardId(new Index("foo", "bar"), 0), "whatever", "whatever", 1, 1, 1, true); + BulkItemResponse response = new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, indexResponse); + BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]{response}, 1); + listener.onResponse(bulkResponse); + when(client.bulk(captor.capture())).thenReturn(listener); + Action.Result result = executable.execute("_id", ctx, ctx.payload()); + + assertThat(result.status(), is(Status.SUCCESS)); + assertThat(captor.getAllValues(), hasSize(1)); + assertThat(captor.getValue().requests(), hasSize(2)); + assertThat(captor.getValue().requests().get(0).type(), is("my-type")); + assertThat(captor.getValue().requests().get(0).index(), is("my-index")); + assertThat(captor.getValue().requests().get(1).type(), is("my-type")); + assertThat(captor.getValue().requests().get(1).index(), is("my-other-index")); + } + + public void testConfigureIndexInMapAndAction() { + String fieldName = randomFrom("_index", "_type"); + final IndexAction action = new IndexAction(fieldName.equals("_index") ? "my_index" : null, + fieldName.equals("_type") ? "my_type" : null, + null,null, null, null); + final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, + TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); + + final Map docWithIndex = MapBuilder.newMapBuilder().put("foo", "bar") + .put(fieldName, "my-value").immutableMap(); + final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", + new Payload.Simple("_doc", Collections.singletonList(docWithIndex))); + + IllegalStateException e = expectThrows(IllegalStateException.class, () -> executable.execute("_id", ctx, ctx.payload())); + assertThat(e.getMessage(), startsWith("could not execute action [_id] of watch [_id]. [ctx.payload." + + fieldName + "] or [ctx.payload._doc." + fieldName + "]")); + } + + public void testIndexActionExecuteSingleDoc() throws Exception { + boolean customId = randomBoolean(); + boolean docIdAsParam = customId && randomBoolean(); + String docId = randomAlphaOfLength(5); + String timestampField = randomFrom("@timestamp", null); + + IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null); + ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(30)); + DateTime executionTime = DateTime.now(UTC); + Payload payload; + + if (customId && docIdAsParam == false) { + // intentionally immutable because the other side needs to cut out _id + payload = new Payload.Simple("_doc", MapBuilder.newMapBuilder().put("foo", "bar").put("_id", docId).immutableMap()); + } else { + payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", singletonMap("foo", "bar")); + } + + WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload); + + ArgumentCaptor captor = ArgumentCaptor.forClass(IndexRequest.class); + PlainActionFuture listener = PlainActionFuture.newFuture(); + listener.onResponse(new IndexResponse(new ShardId(new Index("test-index", "uuid"), 0), "test-type", docId, 1, 1, 1, true)); + when(client.index(captor.capture())).thenReturn(listener); + + Action.Result result = executable.execute("_id", ctx, ctx.payload()); + + assertThat(result.status(), equalTo(Status.SUCCESS)); + assertThat(result, instanceOf(IndexAction.Result.class)); + IndexAction.Result successResult = (IndexAction.Result) result; + XContentSource response = successResult.response(); + assertThat(response.getValue("created"), equalTo((Object)Boolean.TRUE)); + assertThat(response.getValue("version"), equalTo((Object) 1)); + assertThat(response.getValue("type").toString(), equalTo("test-type")); + assertThat(response.getValue("index").toString(), equalTo("test-index")); + + assertThat(captor.getAllValues(), hasSize(1)); + IndexRequest indexRequest = captor.getValue(); + assertThat(indexRequest.sourceAsMap(), is(hasEntry("foo", "bar"))); + if (customId) { + assertThat(indexRequest.id(), is(docId)); + } + + if (timestampField != null) { + assertThat(indexRequest.sourceAsMap().keySet(), is(hasSize(2))); + assertThat(indexRequest.sourceAsMap(), hasEntry(timestampField, executionTime.toString())); + } else { + assertThat(indexRequest.sourceAsMap().keySet(), is(hasSize(1))); + } + } + + public void testFailureResult() throws Exception { + IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null); + ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, + TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); + + // should the result resemble a failure or a partial failure + boolean isPartialFailure = randomBoolean(); + + List> docs = new ArrayList<>(); + docs.add(Collections.singletonMap("foo", Collections.singletonMap("foo", "bar"))); + docs.add(Collections.singletonMap("foo", Collections.singletonMap("foo", "bar"))); + Payload payload = new Payload.Simple(Collections.singletonMap("_doc", docs)); + + WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", DateTime.now(UTC), payload); + + ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); + PlainActionFuture listener = PlainActionFuture.newFuture(); + BulkItemResponse.Failure failure = new BulkItemResponse.Failure("test-index", "test-type", "anything", + new ElasticsearchException("anything")); + BulkItemResponse firstResponse = new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, failure); + BulkItemResponse secondResponse; + if (isPartialFailure) { + ShardId shardId = new ShardId(new Index("foo", "bar"), 0); + IndexResponse indexResponse = new IndexResponse(shardId, "whatever", "whatever", 1, 1, 1, true); + secondResponse = new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, indexResponse); + } else { + secondResponse = new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, failure); + } + BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]{firstResponse, secondResponse}, 1); + listener.onResponse(bulkResponse); + when(client.bulk(captor.capture())).thenReturn(listener); + Action.Result result = executable.execute("_id", ctx, payload); + + if (isPartialFailure) { + assertThat(result.status(), is(Status.PARTIAL_FAILURE)); + } else { + assertThat(result.status(), is(Status.FAILURE)); + } + } + } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HipChatServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HipChatServiceTests.java index 6222c59abe6..51bdc8f6692 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HipChatServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HipChatServiceTests.java @@ -15,6 +15,8 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.XPackSingleNodeTestCase; +import org.elasticsearch.xpack.watcher.WatcherService; +import org.elasticsearch.xpack.watcher.WatcherState; import org.elasticsearch.xpack.watcher.actions.hipchat.HipChatAction; import org.elasticsearch.xpack.watcher.client.WatcherClient; import org.elasticsearch.xpack.watcher.condition.AlwaysCondition; @@ -127,6 +129,8 @@ public class HipChatServiceTests extends XPackSingleNodeTestCase { } public void testWatchWithHipChatAction() throws Exception { + assertBusy(() -> assertThat(getInstanceFromNode(WatcherService.class).state(), is(WatcherState.STARTED))); + HipChatAccount.Profile profile = randomFrom(HipChatAccount.Profile.values()); HipChatMessage.Color color = randomFrom(HipChatMessage.Color.values()); String account;