From 78f7c0e27af217f248efc3c1b0b5d84e5796e6ba Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 15 Dec 2017 05:56:01 -0800 Subject: [PATCH 1/5] Fix license messaging for Logstash functionality (elastic/x-pack-elasticsearch#3268) * Fix license messaging for Logstash functionality With a Basic license, users are still able to perform CRUD operations on the `.logstash` index, therefore manage their Logstash pipelines. However, Logstash itself will not pick up any changes from this index and act on them. With an expired license Logstash functionality continues to operate as normal. * Fixing messages after feedback * Removing extraneous tabs at end of line * Fixing typo Original commit: elastic/x-pack-elasticsearch@bc069cf00ff865056ee53fd3e7c79b325c68dcee --- .../java/org/elasticsearch/license/XPackLicenseState.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java b/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java index e5c33c782eb..4a51aace3ac 100644 --- a/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java +++ b/plugin/src/main/java/org/elasticsearch/license/XPackLicenseState.java @@ -49,7 +49,7 @@ public class XPackLicenseState { "Machine learning APIs are disabled" }); messages.put(XPackPlugin.LOGSTASH, new String[] { - "Logstash specific APIs are disabled. You can continue to manage and poll stored configurations" + "Logstash will continue to poll centrally-managed pipelines" }); messages.put(XPackPlugin.DEPRECATION, new String[] { "Deprecation APIs are disabled" @@ -201,8 +201,7 @@ public class XPackLicenseState { case STANDARD: case GOLD: case PLATINUM: - return new String[] { "Logstash specific APIs will be disabled, but you can continue to manage " + - "and poll stored configurations" }; + return new String[] { "Logstash will no longer poll for centrally-managed pipelines" }; } break; } From 758433a0fa36e8d9eb7387e49b13b046d70bc4cd Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Fri, 15 Dec 2017 15:23:57 +0100 Subject: [PATCH 2/5] Monitoring: Ensure all monitoring watches filter by timestamp (elastic/x-pack-elasticsearch#3238) Only the Logstash and Kibana version mismatch watches contain a time filter, the others are only sorting by timestamp. In combination with searching in all `.monitoring-es-*` indices, this is IMO pretty resource intensive, as we cannot exit early on any search request. This commit adds time based filters to remaining three watches, using the same range than the other two. Original commit: elastic/x-pack-elasticsearch@3eb6bf0de2b9fb36564717e22a9f406457c0bddc --- .../monitoring/watches/elasticsearch_cluster_status.json | 7 +++++++ .../monitoring/watches/elasticsearch_version_mismatch.json | 7 +++++++ .../monitoring/watches/xpack_license_expiration.json | 7 +++++++ 3 files changed, 21 insertions(+) diff --git a/plugin/src/main/resources/monitoring/watches/elasticsearch_cluster_status.json b/plugin/src/main/resources/monitoring/watches/elasticsearch_cluster_status.json index e5639c0bb1d..c0a13ea63a6 100644 --- a/plugin/src/main/resources/monitoring/watches/elasticsearch_cluster_status.json +++ b/plugin/src/main/resources/monitoring/watches/elasticsearch_cluster_status.json @@ -61,6 +61,13 @@ } ] } + }, + { + "range": { + "timestamp": { + "gte": "now-2m" + } + } } ] } diff --git a/plugin/src/main/resources/monitoring/watches/elasticsearch_version_mismatch.json b/plugin/src/main/resources/monitoring/watches/elasticsearch_version_mismatch.json index 04328cdddb3..7e18c981f0f 100644 --- a/plugin/src/main/resources/monitoring/watches/elasticsearch_version_mismatch.json +++ b/plugin/src/main/resources/monitoring/watches/elasticsearch_version_mismatch.json @@ -54,6 +54,13 @@ } ] } + }, + { + "range": { + "timestamp": { + "gte": "now-2m" + } + } } ] } diff --git a/plugin/src/main/resources/monitoring/watches/xpack_license_expiration.json b/plugin/src/main/resources/monitoring/watches/xpack_license_expiration.json index 2fa2a06249c..a05198a15eb 100644 --- a/plugin/src/main/resources/monitoring/watches/xpack_license_expiration.json +++ b/plugin/src/main/resources/monitoring/watches/xpack_license_expiration.json @@ -51,6 +51,13 @@ "term": { "type": "cluster_stats" } + }, + { + "range": { + "timestamp": { + "gte": "now-2m" + } + } } ] } From b81c90d6fc5b686196a95e0ce0c4f213d7e62a29 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 15 Dec 2017 14:41:10 +0000 Subject: [PATCH 3/5] [DOCS] Explain ML datafeed run-as integration/limitations (elastic/x-pack-elasticsearch#3311) Docs for elastic/x-pack-elasticsearch#3254 Original commit: elastic/x-pack-elasticsearch@eec3c7cccee810373e7a6a1b34fabd8bdebe90c5 --- docs/en/ml/limitations.asciidoc | 14 ++++++++++++++ docs/en/rest-api/ml/preview-datafeed.asciidoc | 11 +++++++++++ docs/en/rest-api/ml/put-datafeed.asciidoc | 7 +++++++ docs/en/rest-api/ml/start-datafeed.asciidoc | 7 +++++++ docs/en/rest-api/ml/update-datafeed.asciidoc | 7 +++++++ 5 files changed, 46 insertions(+) diff --git a/docs/en/ml/limitations.asciidoc b/docs/en/ml/limitations.asciidoc index 60fc7cafa5f..9a68ce026a2 100644 --- a/docs/en/ml/limitations.asciidoc +++ b/docs/en/ml/limitations.asciidoc @@ -157,3 +157,17 @@ poorer precision worthwhile. If you want to view or change the aggregations that are used in your job, refer to the `aggregations` property in your {dfeed}. For more information, see {ref}/ml-datafeed-resource.html[Datafeed Resources]. + +[float] +=== Security Integration + +When {security} is enabled, a {dfeed} stores the roles of the user who created +or updated the {dfeed} **at that time**. This means that if those roles are +updated then the {dfeed} will subsequently run with the new permissions associated +with the roles. However, if the user's roles are adjusted after creating or +updating the {dfeed} then the {dfeed} will continue to run with the permissions +associated with the original roles. + +A way to update the roles stored within the {dfeed} without changing any other +settings is to submit an empty JSON document (`{}`) to the +{ref}/ml-update-datafeed.html[update {dfeed} API]. diff --git a/docs/en/rest-api/ml/preview-datafeed.asciidoc b/docs/en/rest-api/ml/preview-datafeed.asciidoc index f3012170631..dfb402efa92 100644 --- a/docs/en/rest-api/ml/preview-datafeed.asciidoc +++ b/docs/en/rest-api/ml/preview-datafeed.asciidoc @@ -34,6 +34,17 @@ privileges to use this API. For more information, see //<>. +==== Security Integration + +When {security} is enabled, the {dfeed} query will be previewed using the +credentials of the user calling the preview {dfeed} API. When the {dfeed} +is started it will run the query using the roles of the last user to +create or update it. If the two sets of roles differ then the preview may +not accurately reflect what the {dfeed} will return when started. To avoid +such problems, the same user that creates/updates the {dfeed} should preview +it to ensure it is returning the expected data. + + ==== Examples The following example obtains a preview of the `datafeed-farequote` {dfeed}: diff --git a/docs/en/rest-api/ml/put-datafeed.asciidoc b/docs/en/rest-api/ml/put-datafeed.asciidoc index 9ef0948a88b..2e367273690 100644 --- a/docs/en/rest-api/ml/put-datafeed.asciidoc +++ b/docs/en/rest-api/ml/put-datafeed.asciidoc @@ -87,6 +87,13 @@ For more information, see {xpack-ref}/security-privileges.html[Security Privileges]. //<>. + +==== Security Integration + +When {security} is enabled, your {dfeed} will remember which roles the user who +created it had at the time of creation, and run the query using those same roles. + + ==== Examples The following example creates the `datafeed-total-requests` {dfeed}: diff --git a/docs/en/rest-api/ml/start-datafeed.asciidoc b/docs/en/rest-api/ml/start-datafeed.asciidoc index 147186fbfc1..801984deac0 100644 --- a/docs/en/rest-api/ml/start-datafeed.asciidoc +++ b/docs/en/rest-api/ml/start-datafeed.asciidoc @@ -81,6 +81,13 @@ For more information, see //<>. +==== Security Integration + +When {security} is enabled, your {dfeed} will remember which roles the last +user to create or update it had at the time of creation/update, and run the query +using those same roles. + + ==== Examples The following example starts the `datafeed-it-ops-kpi` {dfeed}: diff --git a/docs/en/rest-api/ml/update-datafeed.asciidoc b/docs/en/rest-api/ml/update-datafeed.asciidoc index a17b7892534..560f3c82ca2 100644 --- a/docs/en/rest-api/ml/update-datafeed.asciidoc +++ b/docs/en/rest-api/ml/update-datafeed.asciidoc @@ -82,6 +82,13 @@ For more information, see {xpack-ref}/security-privileges.html[Security Privileges]. //<>. + +==== Security Integration + +When {security} is enabled, your {dfeed} will remember which roles the user who +updated it had at the time of update, and run the query using those same roles. + + ==== Examples The following example updates the query for the `datafeed-it-ops-kpi` {dfeed} From f518501df4138d03a9ef0c98c63d05baa5045378 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Fri, 15 Dec 2017 16:00:21 +0100 Subject: [PATCH 4/5] Tests: Ensure that watcher is started in HipchatServiceTests One of those tests requires watcher to be started, so a proper assertBusy() block has been added to this tests. relates elastic/x-pack-elasticsearch#3324 Original commit: elastic/x-pack-elasticsearch@324830316f0943c91017a45a41c980871700a435 --- .../xpack/watcher/test/integration/HipChatServiceTests.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HipChatServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HipChatServiceTests.java index 6222c59abe6..51bdc8f6692 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HipChatServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HipChatServiceTests.java @@ -15,6 +15,8 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.XPackSingleNodeTestCase; +import org.elasticsearch.xpack.watcher.WatcherService; +import org.elasticsearch.xpack.watcher.WatcherState; import org.elasticsearch.xpack.watcher.actions.hipchat.HipChatAction; import org.elasticsearch.xpack.watcher.client.WatcherClient; import org.elasticsearch.xpack.watcher.condition.AlwaysCondition; @@ -127,6 +129,8 @@ public class HipChatServiceTests extends XPackSingleNodeTestCase { } public void testWatchWithHipChatAction() throws Exception { + assertBusy(() -> assertThat(getInstanceFromNode(WatcherService.class).state(), is(WatcherState.STARTED))); + HipChatAccount.Profile profile = randomFrom(HipChatAccount.Profile.values()); HipChatMessage.Color color = randomFrom(HipChatMessage.Color.values()); String account; From 5f8a0711f5734d3d841a5ccf3b90a403254d8429 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Fri, 15 Dec 2017 16:59:29 +0100 Subject: [PATCH 5/5] Watcher: Set index and type dynamically in index action (elastic/x-pack-elasticsearch#3264) The index action allowed to set the id of a document dynamically, however this was not allowed for the index or the type. If a user wants to execute a search, modify the found documents and index them back, then this would only work across a single index and a single type. This change allows the watch writer to just take a search result, read index and type out of that and configure this as part of the index action. On top of that the integration tests have been changed to become fast running unit tests. Original commit: elastic/x-pack-elasticsearch@640b085dd4e0f4d8e4903d391c583878a7932825 --- docs/en/watcher/actions/index.asciidoc | 4 +- .../actions/index/ExecutableIndexAction.java | 56 ++- .../watcher/actions/index/IndexAction.java | 31 +- .../actions/index/IndexActionTests.java | 444 ++++++++++-------- 4 files changed, 289 insertions(+), 246 deletions(-) diff --git a/docs/en/watcher/actions/index.asciidoc b/docs/en/watcher/actions/index.asciidoc index 2585d50e090..f003da74b99 100644 --- a/docs/en/watcher/actions/index.asciidoc +++ b/docs/en/watcher/actions/index.asciidoc @@ -71,5 +71,5 @@ When a `_doc` field exists, if the field holds an object, it is extracted and in as a single document. If the field holds an array of objects, each object is treated as a document and the index action indexes all of them in a bulk. -An `_id` value can be added per document to dynamically set the ID of the indexed -document. +An `_index`, `_type` or `_id` value can be added per document to dynamically set the ID +of the indexed document. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java index 4e05e9cab6c..ebeec8b9641 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java @@ -38,6 +38,8 @@ import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState; public class ExecutableIndexAction extends ExecutableAction { + private static final String INDEX_FIELD = "_index"; + private static final String TYPE_FIELD = "_type"; private static final String ID_FIELD = "_id"; private final Client client; @@ -71,24 +73,13 @@ public class ExecutableIndexAction extends ExecutableAction { } } - String docId = action.docId; - - // prevent double-setting id - if (data.containsKey(ID_FIELD)) { - if (docId != null) { - throw illegalState("could not execute action [{}] of watch [{}]. " + - "[ctx.payload.{}] or [ctx.payload._doc.{}] were set with [doc_id]. Only set [{}] or [doc_id]", - actionId, ctx.watch().id(), ID_FIELD, ID_FIELD, ID_FIELD); - } - + if (data.containsKey(INDEX_FIELD) || data.containsKey(TYPE_FIELD) || data.containsKey(ID_FIELD)) { data = mutableMap(data); - docId = data.remove(ID_FIELD).toString(); } - IndexRequest indexRequest = new IndexRequest(); - indexRequest.index(action.index); - indexRequest.type(action.docType); - indexRequest.id(docId); + indexRequest.index(getField(actionId, ctx.id().watchId(), "index", data, INDEX_FIELD, action.index)); + indexRequest.type(getField(actionId, ctx.id().watchId(), "type",data, TYPE_FIELD, action.docType)); + indexRequest.id(getField(actionId, ctx.id().watchId(), "id",data, ID_FIELD, action.docId)); data = addTimestampToDocument(data, ctx.executionTime()); BytesReference bytesReference; @@ -97,8 +88,8 @@ public class ExecutableIndexAction extends ExecutableAction { } if (ctx.simulateAction(actionId)) { - return new IndexAction.Simulated(indexRequest.index(), action.docType, docId, new XContentSource(indexRequest.source(), - XContentType.JSON)); + return new IndexAction.Simulated(indexRequest.index(), indexRequest.type(), indexRequest.id(), + new XContentSource(indexRequest.source(), XContentType.JSON)); } IndexResponse response = WatcherClientHelper.execute(ctx.watch(), client, @@ -121,14 +112,17 @@ public class ExecutableIndexAction extends ExecutableAction { throw illegalState("could not execute action [{}] of watch [{}]. failed to index payload data. " + "[_data] field must either hold a Map or an List/Array of Maps", actionId, ctx.watch().id()); } + Map doc = (Map) item; - IndexRequest indexRequest = new IndexRequest(); - indexRequest.index(action.index); - indexRequest.type(action.docType); - if (doc.containsKey(ID_FIELD)) { + if (doc.containsKey(INDEX_FIELD) || doc.containsKey(TYPE_FIELD) || doc.containsKey(ID_FIELD)) { doc = mutableMap(doc); - indexRequest.id(doc.remove(ID_FIELD).toString()); } + + IndexRequest indexRequest = new IndexRequest(); + indexRequest.index(getField(actionId, ctx.id().watchId(), "index", doc, INDEX_FIELD, action.index)); + indexRequest.type(getField(actionId, ctx.id().watchId(), "type",doc, TYPE_FIELD, action.docType)); + indexRequest.id(getField(actionId, ctx.id().watchId(), "id",doc, ID_FIELD, action.docId)); + doc = addTimestampToDocument(doc, ctx.executionTime()); try (XContentBuilder builder = jsonBuilder()) { indexRequest.source(builder.prettyPrint().map(doc)); @@ -163,6 +157,24 @@ public class ExecutableIndexAction extends ExecutableAction { return data; } + /** + * Extracts the specified field out of data map, or alternative falls back to the action value + */ + private String getField(String actionId, String watchId, String name, Map data, String fieldName, String defaultValue) { + Object obj = data.remove(fieldName); + if (obj != null) { + if (defaultValue != null) { + throw illegalState("could not execute action [{}] of watch [{}]. " + + "[ctx.payload.{}] or [ctx.payload._doc.{}] were set together with action [{}] field. Only set one of them", + actionId, watchId, fieldName, fieldName, name); + } else { + return obj.toString(); + } + } + + return defaultValue; + } + /** * Guarantees that the {@code data} is mutable for any code that needs to modify the {@linkplain Map} before using it (e.g., from * singleton, immutable {@code Map}s). diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java index 1ffa9ccc5df..f838c88dad3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java @@ -25,14 +25,14 @@ public class IndexAction implements Action { public static final String TYPE = "index"; - final String index; - final String docType; + @Nullable final String docType; + @Nullable final String index; @Nullable final String docId; @Nullable final String executionTimeField; @Nullable final TimeValue timeout; @Nullable final DateTimeZone dynamicNameTimeZone; - public IndexAction(String index, String docType, @Nullable String docId, + public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId, @Nullable String executionTimeField, @Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone) { this.index = index; @@ -89,8 +89,12 @@ public class IndexAction implements Action { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(Field.INDEX.getPreferredName(), index); - builder.field(Field.DOC_TYPE.getPreferredName(), docType); + if (index != null) { + builder.field(Field.INDEX.getPreferredName(), index); + } + if (docType != null) { + builder.field(Field.DOC_TYPE.getPreferredName(), docType); + } if (docId != null) { builder.field(Field.DOC_ID.getPreferredName(), docId); } @@ -144,12 +148,7 @@ public class IndexAction implements Action { // Parser for human specified timeouts and 2.x compatibility timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT_HUMAN.toString()); } else if (Field.DYNAMIC_NAME_TIMEZONE.match(currentFieldName)) { - if (token == XContentParser.Token.VALUE_STRING) { - dynamicNameTimeZone = DateTimeZone.forID(parser.text()); - } else { - throw new ElasticsearchParseException("could not parse [{}] action for watch [{}]. failed to parse [{}]. must be " + - "a string value (e.g. 'UTC' or '+01:00').", TYPE, watchId, currentFieldName); - } + dynamicNameTimeZone = DateTimeZone.forID(parser.text()); } else { throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. unexpected string field [{}]", TYPE, watchId, actionId, currentFieldName); @@ -160,16 +159,6 @@ public class IndexAction implements Action { } } - if (index == null) { - throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. missing required [{}] field", TYPE, watchId, - actionId, Field.INDEX.getPreferredName()); - } - - if (docType == null) { - throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. missing required [{}] field", TYPE, watchId, - actionId, Field.DOC_TYPE.getPreferredName()); - } - return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java index bbcf979a19b..f158d44997d 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java @@ -5,27 +5,35 @@ */ package org.elasticsearch.xpack.watcher.actions.index; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.watcher.actions.Action; import org.elasticsearch.xpack.watcher.actions.Action.Result.Status; import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; -import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.watcher.test.WatcherTestUtils; import org.elasticsearch.xpack.watcher.watch.Payload; import org.joda.time.DateTime; +import org.junit.Before; +import org.mockito.ArgumentCaptor; import java.util.ArrayList; import java.util.Arrays; @@ -37,161 +45,36 @@ import static java.util.Collections.singletonMap; import static java.util.Collections.unmodifiableSet; import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; -import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; -import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; import static org.joda.time.DateTimeZone.UTC; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -public class IndexActionTests extends ESIntegTestCase { +public class IndexActionTests extends ESTestCase { - public void testIndexActionExecuteSingleDoc() throws Exception { - boolean customId = randomBoolean(); - boolean docIdAsParam = customId && randomBoolean(); - String docId = randomAlphaOfLength(5); - String timestampField = randomFrom("@timestamp", null); - boolean customTimestampField = timestampField != null; + private final Client client = mock(Client.class); - IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null); - ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30), - TimeValue.timeValueSeconds(30)); - DateTime executionTime = DateTime.now(UTC); - Payload payload; - - if (customId && docIdAsParam == false) { - // intentionally immutable because the other side needs to cut out _id - payload = new Payload.Simple("_doc", MapBuilder.newMapBuilder().put("foo", "bar").put("_id", docId).immutableMap()); - } else { - payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", singletonMap("foo", "bar")); - } - - WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload); - - Action.Result result = executable.execute("_id", ctx, ctx.payload()); - - assertThat(result.status(), equalTo(Status.SUCCESS)); - assertThat(result, instanceOf(IndexAction.Result.class)); - IndexAction.Result successResult = (IndexAction.Result) result; - XContentSource response = successResult.response(); - assertThat(response.getValue("created"), equalTo((Object)Boolean.TRUE)); - assertThat(response.getValue("version"), equalTo((Object) 1)); - assertThat(response.getValue("type").toString(), equalTo("test-type")); - assertThat(response.getValue("index").toString(), equalTo("test-index")); - - refresh(); //Manually refresh to make sure data is available - - SearchRequestBuilder searchRequestbuilder = client().prepareSearch("test-index") - .setTypes("test-type") - .setSource(searchSource().query(matchAllQuery())); - - if (customTimestampField) { - searchRequestbuilder.addAggregation(terms("timestamps").field(timestampField)); - } - - SearchResponse searchResponse = searchRequestbuilder.get(); - - assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L)); - SearchHit hit = searchResponse.getHits().getAt(0); - - if (customId) { - assertThat(hit.getId(), is(docId)); - } - - if (customTimestampField) { - assertThat(hit.getSourceAsMap().size(), is(2)); - assertThat(hit.getSourceAsMap(), hasEntry("foo", (Object) "bar")); - assertThat(hit.getSourceAsMap(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime))); - - Terms terms = searchResponse.getAggregations().get("timestamps"); - assertThat(terms, notNullValue()); - assertThat(terms.getBuckets(), hasSize(1)); - assertThat(terms.getBuckets().get(0).getKeyAsNumber().longValue(), is(executionTime.getMillis())); - assertThat(terms.getBuckets().get(0).getDocCount(), is(1L)); - } else { - assertThat(hit.getSourceAsMap().size(), is(1)); - assertThat(hit.getSourceAsMap(), hasEntry("foo", (Object) "bar")); - } - } - - public void testIndexActionExecuteMultiDoc() throws Exception { - String timestampField = randomFrom("@timestamp", null); - boolean customTimestampField = "@timestamp".equals(timestampField); - - assertAcked(prepareCreate("test-index") - .addMapping("test-type", "foo", "type=keyword")); - - List idList = Arrays.asList( - MapBuilder.newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap(), - MapBuilder.newMapBuilder().put("foo", "bar1").put("_id", "1").map() - ); - - Object list = randomFrom( - new Map[] { singletonMap("foo", "bar"), singletonMap("foo", "bar1") }, - Arrays.asList(singletonMap("foo", "bar"), singletonMap("foo", "bar1")), - unmodifiableSet(newHashSet(singletonMap("foo", "bar"), singletonMap("foo", "bar1"))), - idList - ); - - boolean customId = list == idList; - - IndexAction action = new IndexAction("test-index", "test-type", null, timestampField, null, null); - ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30), - TimeValue.timeValueSeconds(30)); - DateTime executionTime = DateTime.now(UTC); - WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("watch_id", executionTime, new Payload.Simple("_doc", list)); - - Action.Result result = executable.execute("watch_id", ctx, ctx.payload()); - - assertThat(result.status(), equalTo(Status.SUCCESS)); - assertThat(result, instanceOf(IndexAction.Result.class)); - IndexAction.Result successResult = (IndexAction.Result) result; - XContentSource response = successResult.response(); - assertThat(successResult.toString(), response.getValue("0.created"), equalTo((Object)Boolean.TRUE)); - assertThat(successResult.toString(), response.getValue("0.version"), equalTo((Object) 1)); - assertThat(successResult.toString(), response.getValue("0.type").toString(), equalTo("test-type")); - assertThat(successResult.toString(), response.getValue("0.index").toString(), equalTo("test-index")); - assertThat(successResult.toString(), response.getValue("1.created"), equalTo((Object)Boolean.TRUE)); - assertThat(successResult.toString(), response.getValue("1.version"), equalTo((Object) 1)); - assertThat(successResult.toString(), response.getValue("1.type").toString(), equalTo("test-type")); - assertThat(successResult.toString(), response.getValue("1.index").toString(), equalTo("test-index")); - - refresh(); //Manually refresh to make sure data is available - - SearchResponse searchResponse = client().prepareSearch("test-index") - .setTypes("test-type") - .setSource(searchSource().sort("foo", SortOrder.ASC) - .query(matchAllQuery())) - .get(); - - assertThat(searchResponse.getHits().getTotalHits(), equalTo(2L)); - final int fields = customTimestampField ? 2 : 1; - for (int i = 0; i < 2; ++i) { - final SearchHit hit = searchResponse.getHits().getAt(i); - final String value = "bar" + (i != 0 ? i : ""); - - assertThat(hit.getSourceAsMap().size(), is(fields)); - - if (customId) { - assertThat(hit.getId(), is(Integer.toString(i))); - } - if (customTimestampField) { - assertThat(hit.getSourceAsMap(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime))); - } - assertThat(hit.getSourceAsMap(), hasEntry("foo", (Object) value)); - } + @Before + public void setupClient() { + ThreadPool threadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + when(client.threadPool()).thenReturn(threadPool); } public void testParser() throws Exception { String timestampField = randomBoolean() ? "@timestamp" : null; XContentBuilder builder = jsonBuilder(); builder.startObject(); - builder.field(IndexAction.Field.INDEX.getPreferredName(), "test-index"); + boolean includeIndex = randomBoolean(); + if (includeIndex) { + builder.field(IndexAction.Field.INDEX.getPreferredName(), "test-index"); + } builder.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type"); if (timestampField != null) { builder.field(IndexAction.Field.EXECUTION_TIME_FIELD.getPreferredName(), timestampField); @@ -201,14 +84,16 @@ public class IndexActionTests extends ESIntegTestCase { builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout.millis()); } builder.endObject(); - IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client()); + IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client); XContentParser parser = createParser(builder); parser.nextToken(); ExecutableIndexAction executable = actionParser.parseExecutable(randomAlphaOfLength(5), randomAlphaOfLength(3), parser); assertThat(executable.action().docType, equalTo("test-type")); - assertThat(executable.action().index, equalTo("test-index")); + if (includeIndex) { + assertThat(executable.action().index, equalTo("test-index")); + } if (timestampField != null) { assertThat(executable.action().executionTimeField, equalTo(timestampField)); } @@ -216,63 +101,40 @@ public class IndexActionTests extends ESIntegTestCase { } public void testParserFailure() throws Exception { - XContentBuilder builder = jsonBuilder(); - boolean useIndex = randomBoolean(); - boolean useType = randomBoolean(); - builder.startObject(); - { - if (useIndex) { - builder.field(IndexAction.Field.INDEX.getPreferredName(), "test-index"); - } - if (useType) { - builder.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type"); - } - } - builder.endObject(); - IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client()); - XContentParser parser = createParser(builder); - parser.nextToken(); - try { - actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser); - if (!(useIndex && useType)) { - fail(); - } - } catch (ElasticsearchParseException iae) { - assertThat(useIndex && useType, equalTo(false)); - } + // wrong type for field + expectParseFailure(jsonBuilder() + .startObject() + .field(IndexAction.Field.DOC_TYPE.getPreferredName(), 1234) + .endObject()); + + expectParseFailure(jsonBuilder() + .startObject() + .field(IndexAction.Field.TIMEOUT.getPreferredName(), "1234") + .endObject()); + + // unknown field + expectParseFailure(jsonBuilder() + .startObject() + .field("unknown", "whatever") + .endObject()); + + expectParseFailure(jsonBuilder() + .startObject() + .field("unknown", 1234) + .endObject()); } - // https://github.com/elastic/x-pack/issues/4416 - public void testIndexingWithWrongMappingReturnsFailureResult() throws Exception { - // index a document to set the mapping of the foo field to a boolean - client().prepareIndex("test-index", "test-type", "_id").setSource("foo", true) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - - IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null); - ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), - TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); - - List> docs = new ArrayList<>(); - boolean addSuccessfulIndexedDoc = randomBoolean(); - if (addSuccessfulIndexedDoc) { - docs.add(Collections.singletonMap("foo", randomBoolean())); - } - docs.add(Collections.singletonMap("foo", Collections.singletonMap("foo", "bar"))); - Payload payload = new Payload.Simple(Collections.singletonMap("_doc", docs)); - - WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", DateTime.now(UTC), payload); - - Action.Result result = executable.execute("_id", ctx, payload); - if (addSuccessfulIndexedDoc) { - assertThat(result.status(), is(Status.PARTIAL_FAILURE)); - } else { - assertThat(result.status(), is(Status.FAILURE)); - } + private void expectParseFailure(XContentBuilder builder) throws Exception { + IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client); + XContentParser parser = createParser(builder); + parser.nextToken(); + expectThrows(ElasticsearchParseException.class, () -> + actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser)); } public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() { final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null); - final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), + final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); final Map docWithId = MapBuilder.newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap(); final DateTime executionTime = DateTime.now(UTC); @@ -301,4 +163,184 @@ public class IndexActionTests extends ESIntegTestCase { executable.execute("_id", ctx, ctx.payload()); }); } + + public void testThatIndexTypeIdDynamically() throws Exception { + boolean configureIndexDynamically = randomBoolean(); + boolean configureTypeDynamically = randomBoolean(); + boolean configureIdDynamically = (configureTypeDynamically == false && configureIndexDynamically == false) || randomBoolean(); + + MapBuilder builder = MapBuilder.newMapBuilder().put("foo", "bar"); + if (configureIdDynamically) { + builder.put("_id", "my_dynamic_id"); + } + if (configureTypeDynamically) { + builder.put("_type", "my_dynamic_type"); + } + if (configureIndexDynamically) { + builder.put("_index", "my_dynamic_index"); + } + + final IndexAction action = new IndexAction(configureIndexDynamically ? null : "my_index", + configureTypeDynamically ? null : "my_type", + configureIdDynamically ? null : "my_id", + null, null, null); + final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, + TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); + + final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", new Payload.Simple(builder.immutableMap())); + + ArgumentCaptor captor = ArgumentCaptor.forClass(IndexRequest.class); + PlainActionFuture listener = PlainActionFuture.newFuture(); + listener.onResponse(new IndexResponse(new ShardId(new Index("foo", "bar"), 0), "whatever", "whatever", 1, 1, 1, true)); + when(client.index(captor.capture())).thenReturn(listener); + Action.Result result = executable.execute("_id", ctx, ctx.payload()); + + assertThat(result.status(), is(Status.SUCCESS)); + assertThat(captor.getAllValues(), hasSize(1)); + + assertThat(captor.getValue().index(), is(configureIndexDynamically ? "my_dynamic_index" : "my_index")); + assertThat(captor.getValue().type(), is(configureTypeDynamically ? "my_dynamic_type" : "my_type")); + assertThat(captor.getValue().id(), is(configureIdDynamically ? "my_dynamic_id" : "my_id")); + } + + public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() throws Exception { + final IndexAction action = new IndexAction(null, "my-type", null, null, null, null); + final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, + TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); + + final Map docWithIndex = MapBuilder.newMapBuilder().put("foo", "bar") + .put("_index", "my-index").immutableMap(); + final Map docWithOtherIndex = MapBuilder.newMapBuilder().put("foo", "bar") + .put("_index", "my-other-index").immutableMap(); + final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", + new Payload.Simple("_doc", Arrays.asList(docWithIndex, docWithOtherIndex))); + + ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); + PlainActionFuture listener = PlainActionFuture.newFuture(); + IndexResponse indexResponse = new IndexResponse(new ShardId(new Index("foo", "bar"), 0), "whatever", "whatever", 1, 1, 1, true); + BulkItemResponse response = new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, indexResponse); + BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]{response}, 1); + listener.onResponse(bulkResponse); + when(client.bulk(captor.capture())).thenReturn(listener); + Action.Result result = executable.execute("_id", ctx, ctx.payload()); + + assertThat(result.status(), is(Status.SUCCESS)); + assertThat(captor.getAllValues(), hasSize(1)); + assertThat(captor.getValue().requests(), hasSize(2)); + assertThat(captor.getValue().requests().get(0).type(), is("my-type")); + assertThat(captor.getValue().requests().get(0).index(), is("my-index")); + assertThat(captor.getValue().requests().get(1).type(), is("my-type")); + assertThat(captor.getValue().requests().get(1).index(), is("my-other-index")); + } + + public void testConfigureIndexInMapAndAction() { + String fieldName = randomFrom("_index", "_type"); + final IndexAction action = new IndexAction(fieldName.equals("_index") ? "my_index" : null, + fieldName.equals("_type") ? "my_type" : null, + null,null, null, null); + final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, + TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); + + final Map docWithIndex = MapBuilder.newMapBuilder().put("foo", "bar") + .put(fieldName, "my-value").immutableMap(); + final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", + new Payload.Simple("_doc", Collections.singletonList(docWithIndex))); + + IllegalStateException e = expectThrows(IllegalStateException.class, () -> executable.execute("_id", ctx, ctx.payload())); + assertThat(e.getMessage(), startsWith("could not execute action [_id] of watch [_id]. [ctx.payload." + + fieldName + "] or [ctx.payload._doc." + fieldName + "]")); + } + + public void testIndexActionExecuteSingleDoc() throws Exception { + boolean customId = randomBoolean(); + boolean docIdAsParam = customId && randomBoolean(); + String docId = randomAlphaOfLength(5); + String timestampField = randomFrom("@timestamp", null); + + IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null); + ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(30)); + DateTime executionTime = DateTime.now(UTC); + Payload payload; + + if (customId && docIdAsParam == false) { + // intentionally immutable because the other side needs to cut out _id + payload = new Payload.Simple("_doc", MapBuilder.newMapBuilder().put("foo", "bar").put("_id", docId).immutableMap()); + } else { + payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", singletonMap("foo", "bar")); + } + + WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload); + + ArgumentCaptor captor = ArgumentCaptor.forClass(IndexRequest.class); + PlainActionFuture listener = PlainActionFuture.newFuture(); + listener.onResponse(new IndexResponse(new ShardId(new Index("test-index", "uuid"), 0), "test-type", docId, 1, 1, 1, true)); + when(client.index(captor.capture())).thenReturn(listener); + + Action.Result result = executable.execute("_id", ctx, ctx.payload()); + + assertThat(result.status(), equalTo(Status.SUCCESS)); + assertThat(result, instanceOf(IndexAction.Result.class)); + IndexAction.Result successResult = (IndexAction.Result) result; + XContentSource response = successResult.response(); + assertThat(response.getValue("created"), equalTo((Object)Boolean.TRUE)); + assertThat(response.getValue("version"), equalTo((Object) 1)); + assertThat(response.getValue("type").toString(), equalTo("test-type")); + assertThat(response.getValue("index").toString(), equalTo("test-index")); + + assertThat(captor.getAllValues(), hasSize(1)); + IndexRequest indexRequest = captor.getValue(); + assertThat(indexRequest.sourceAsMap(), is(hasEntry("foo", "bar"))); + if (customId) { + assertThat(indexRequest.id(), is(docId)); + } + + if (timestampField != null) { + assertThat(indexRequest.sourceAsMap().keySet(), is(hasSize(2))); + assertThat(indexRequest.sourceAsMap(), hasEntry(timestampField, executionTime.toString())); + } else { + assertThat(indexRequest.sourceAsMap().keySet(), is(hasSize(1))); + } + } + + public void testFailureResult() throws Exception { + IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null); + ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, + TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); + + // should the result resemble a failure or a partial failure + boolean isPartialFailure = randomBoolean(); + + List> docs = new ArrayList<>(); + docs.add(Collections.singletonMap("foo", Collections.singletonMap("foo", "bar"))); + docs.add(Collections.singletonMap("foo", Collections.singletonMap("foo", "bar"))); + Payload payload = new Payload.Simple(Collections.singletonMap("_doc", docs)); + + WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", DateTime.now(UTC), payload); + + ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); + PlainActionFuture listener = PlainActionFuture.newFuture(); + BulkItemResponse.Failure failure = new BulkItemResponse.Failure("test-index", "test-type", "anything", + new ElasticsearchException("anything")); + BulkItemResponse firstResponse = new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, failure); + BulkItemResponse secondResponse; + if (isPartialFailure) { + ShardId shardId = new ShardId(new Index("foo", "bar"), 0); + IndexResponse indexResponse = new IndexResponse(shardId, "whatever", "whatever", 1, 1, 1, true); + secondResponse = new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, indexResponse); + } else { + secondResponse = new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, failure); + } + BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]{firstResponse, secondResponse}, 1); + listener.onResponse(bulkResponse); + when(client.bulk(captor.capture())).thenReturn(listener); + Action.Result result = executable.execute("_id", ctx, payload); + + if (isPartialFailure) { + assertThat(result.status(), is(Status.PARTIAL_FAILURE)); + } else { + assertThat(result.status(), is(Status.FAILURE)); + } + } + }