diff --git a/docs/en/watcher/actions/index.asciidoc b/docs/en/watcher/actions/index.asciidoc index f003da74b99..bfbb63cd54b 100644 --- a/docs/en/watcher/actions/index.asciidoc +++ b/docs/en/watcher/actions/index.asciidoc @@ -51,6 +51,9 @@ The following snippet shows a simple `index` action definition: the index action times out and fails. This setting overrides the default timeouts. +| `refresh` | no | - | Optional setting of the <> + for the write request + |====== [[anatomy-actions-index-multi-doc-support]] diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java index ebeec8b9641..252b3485e03 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java @@ -77,6 +77,10 @@ public class ExecutableIndexAction extends ExecutableAction { data = mutableMap(data); } IndexRequest indexRequest = new IndexRequest(); + if (action.refreshPolicy != null) { + indexRequest.setRefreshPolicy(action.refreshPolicy); + } + indexRequest.index(getField(actionId, ctx.id().watchId(), "index", data, INDEX_FIELD, action.index)); indexRequest.type(getField(actionId, ctx.id().watchId(), "type",data, TYPE_FIELD, action.docType)); indexRequest.id(getField(actionId, ctx.id().watchId(), "id",data, ID_FIELD, action.docId)); @@ -88,7 +92,7 @@ public class ExecutableIndexAction extends ExecutableAction { } if (ctx.simulateAction(actionId)) { - return new IndexAction.Simulated(indexRequest.index(), indexRequest.type(), indexRequest.id(), + return new IndexAction.Simulated(indexRequest.index(), indexRequest.type(), indexRequest.id(), action.refreshPolicy, new XContentSource(indexRequest.source(), XContentType.JSON)); } @@ -107,6 +111,10 @@ public class ExecutableIndexAction extends ExecutableAction { } BulkRequest bulkRequest = new BulkRequest(); + if (action.refreshPolicy != null) { + bulkRequest.setRefreshPolicy(action.refreshPolicy); + } + for (Object item : list) { if (!(item instanceof Map)) { throw illegalState("could not execute action [{}] of watch [{}]. failed to index payload data. " + diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java index f838c88dad3..464bf2afe08 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.watcher.actions.index; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.unit.TimeValue; @@ -31,16 +33,18 @@ public class IndexAction implements Action { @Nullable final String executionTimeField; @Nullable final TimeValue timeout; @Nullable final DateTimeZone dynamicNameTimeZone; + @Nullable final RefreshPolicy refreshPolicy; public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId, @Nullable String executionTimeField, - @Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone) { + @Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) { this.index = index; this.docType = docType; this.docId = docId; this.executionTimeField = executionTimeField; this.timeout = timeout; this.dynamicNameTimeZone = dynamicNameTimeZone; + this.refreshPolicy = refreshPolicy; } @Override @@ -68,6 +72,10 @@ public class IndexAction implements Action { return dynamicNameTimeZone; } + public RefreshPolicy getRefreshPolicy() { + return refreshPolicy; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -78,12 +86,13 @@ public class IndexAction implements Action { return Objects.equals(index, that.index) && Objects.equals(docType, that.docType) && Objects.equals(docId, that.docId) && Objects.equals(executionTimeField, that.executionTimeField) && Objects.equals(timeout, that.timeout) - && Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone); + && Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone) + && Objects.equals(refreshPolicy, that.refreshPolicy); } @Override public int hashCode() { - return Objects.hash(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone); + return Objects.hash(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy); } @Override @@ -107,6 +116,9 @@ public class IndexAction implements Action { if (dynamicNameTimeZone != null) { builder.field(Field.DYNAMIC_NAME_TIMEZONE.getPreferredName(), dynamicNameTimeZone); } + if (refreshPolicy!= null) { + builder.field(Field.REFRESH.getPreferredName(), refreshPolicy.getValue()); + } return builder.endObject(); } @@ -117,6 +129,7 @@ public class IndexAction implements Action { String executionTimeField = null; TimeValue timeout = null; DateTimeZone dynamicNameTimeZone = null; + RefreshPolicy refreshPolicy = null; String currentFieldName = null; XContentParser.Token token; @@ -148,7 +161,14 @@ public class IndexAction implements Action { // Parser for human specified timeouts and 2.x compatibility timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT_HUMAN.toString()); } else if (Field.DYNAMIC_NAME_TIMEZONE.match(currentFieldName)) { - dynamicNameTimeZone = DateTimeZone.forID(parser.text()); + if (token == XContentParser.Token.VALUE_STRING) { + dynamicNameTimeZone = DateTimeZone.forID(parser.text()); + } else { + throw new ElasticsearchParseException("could not parse [{}] action for watch [{}]. failed to parse [{}]. must be " + + "a string value (e.g. 'UTC' or '+01:00').", TYPE, watchId, currentFieldName); + } + } else if (Field.REFRESH.match(currentFieldName)) { + refreshPolicy = RefreshPolicy.parse(parser.text()); } else { throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. unexpected string field [{}]", TYPE, watchId, actionId, currentFieldName); @@ -159,7 +179,7 @@ public class IndexAction implements Action { } } - return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone); + return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy); } public static Builder builder(String index, String docType) { @@ -191,16 +211,18 @@ public class IndexAction implements Action { private final String index; private final String docType; - @Nullable - private final String docId; + @Nullable private final String docId; + @Nullable private final RefreshPolicy refreshPolicy; private final XContentSource source; - protected Simulated(String index, String docType, @Nullable String docId, XContentSource source) { + protected Simulated(String index, String docType, @Nullable String docId, @Nullable RefreshPolicy refreshPolicy, + XContentSource source) { super(TYPE, Status.SIMULATED); this.index = index; this.docType = docType; this.docId = docId; this.source = source; + this.refreshPolicy = refreshPolicy; } public String index() { @@ -230,6 +252,10 @@ public class IndexAction implements Action { builder.field(Field.DOC_ID.getPreferredName(), docId); } + if (refreshPolicy != null) { + builder.field(Field.REFRESH.getPreferredName(), refreshPolicy.getValue()); + } + return builder.field(Field.SOURCE.getPreferredName(), source, params) .endObject() .endObject(); @@ -244,6 +270,7 @@ public class IndexAction implements Action { String executionTimeField; TimeValue timeout; DateTimeZone dynamicNameTimeZone; + RefreshPolicy refreshPolicy; private Builder(String index, String docType) { this.index = index; @@ -270,9 +297,14 @@ public class IndexAction implements Action { return this; } + public Builder setRefreshPolicy(RefreshPolicy refreshPolicy) { + this.refreshPolicy = refreshPolicy; + return this; + } + @Override public IndexAction build() { - return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone); + return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy); } } @@ -287,5 +319,6 @@ public class IndexAction implements Action { ParseField TIMEOUT = new ParseField("timeout_in_millis"); ParseField TIMEOUT_HUMAN = new ParseField("timeout"); ParseField DYNAMIC_NAME_TIMEZONE = new ParseField("dynamic_name_timezone"); + ParseField REFRESH = new ParseField("refresh"); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java index f158d44997d..05d4354ba4a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.watcher.actions.index; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -43,6 +45,7 @@ import java.util.Map; import static java.util.Collections.singletonMap; import static java.util.Collections.unmodifiableSet; +import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; @@ -57,6 +60,8 @@ import static org.mockito.Mockito.when; public class IndexActionTests extends ESTestCase { + private RefreshPolicy refreshPolicy = randomBoolean() ? null : randomFrom(RefreshPolicy.values()); + private final Client client = mock(Client.class); @Before @@ -122,18 +127,28 @@ public class IndexActionTests extends ESTestCase { .startObject() .field("unknown", 1234) .endObject()); + + // unknown refresh policy + expectFailure(IllegalArgumentException.class, jsonBuilder() + .startObject() + .field(IndexAction.Field.REFRESH.getPreferredName(), "unknown") + .endObject()); } private void expectParseFailure(XContentBuilder builder) throws Exception { + expectFailure(ElasticsearchParseException.class, builder); + } + + private void expectFailure(Class clazz, XContentBuilder builder) throws Exception { IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client); XContentParser parser = createParser(builder); parser.nextToken(); - expectThrows(ElasticsearchParseException.class, () -> + expectThrows(clazz, () -> actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser)); } public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() { - final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null); + final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null, refreshPolicy); final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); final Map docWithId = MapBuilder.newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap(); @@ -183,7 +198,7 @@ public class IndexActionTests extends ESTestCase { final IndexAction action = new IndexAction(configureIndexDynamically ? null : "my_index", configureTypeDynamically ? null : "my_type", configureIdDynamically ? null : "my_id", - null, null, null); + null, null, null, refreshPolicy); final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); @@ -204,7 +219,7 @@ public class IndexActionTests extends ESTestCase { } public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() throws Exception { - final IndexAction action = new IndexAction(null, "my-type", null, null, null, null); + final IndexAction action = new IndexAction(null, "my-type", null, null, null, null, refreshPolicy); final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); @@ -237,7 +252,7 @@ public class IndexActionTests extends ESTestCase { String fieldName = randomFrom("_index", "_type"); final IndexAction action = new IndexAction(fieldName.equals("_index") ? "my_index" : null, fieldName.equals("_type") ? "my_type" : null, - null,null, null, null); + null,null, null, null, refreshPolicy); final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); @@ -257,7 +272,8 @@ public class IndexActionTests extends ESTestCase { String docId = randomAlphaOfLength(5); String timestampField = randomFrom("@timestamp", null); - IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null); + IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null, + refreshPolicy); ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); DateTime executionTime = DateTime.now(UTC); @@ -295,6 +311,9 @@ public class IndexActionTests extends ESTestCase { assertThat(indexRequest.id(), is(docId)); } + RefreshPolicy expectedRefreshPolicy = refreshPolicy == null ? RefreshPolicy.NONE: refreshPolicy; + assertThat(indexRequest.getRefreshPolicy(), is(expectedRefreshPolicy)); + if (timestampField != null) { assertThat(indexRequest.sourceAsMap().keySet(), is(hasSize(2))); assertThat(indexRequest.sourceAsMap(), hasEntry(timestampField, executionTime.toString())); @@ -304,7 +323,7 @@ public class IndexActionTests extends ESTestCase { } public void testFailureResult() throws Exception { - IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null); + IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null, refreshPolicy); ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); @@ -335,6 +354,8 @@ public class IndexActionTests extends ESTestCase { listener.onResponse(bulkResponse); when(client.bulk(captor.capture())).thenReturn(listener); Action.Result result = executable.execute("_id", ctx, payload); + RefreshPolicy expectedRefreshPolicy = refreshPolicy == null ? RefreshPolicy.NONE: refreshPolicy; + assertThat(captor.getValue().getRefreshPolicy(), is(expectedRefreshPolicy)); if (isPartialFailure) { assertThat(result.status(), is(Status.PARTIAL_FAILURE)); @@ -342,5 +363,4 @@ public class IndexActionTests extends ESTestCase { assertThat(result.status(), is(Status.FAILURE)); } } - } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java index 60ca7e38f2e..55c751d7f54 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.watch; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesReference; @@ -585,7 +586,9 @@ public class WatchTests extends ESTestCase { if (randomBoolean()) { DateTimeZone timeZone = randomBoolean() ? DateTimeZone.UTC : null; TimeValue timeout = randomBoolean() ? timeValueSeconds(between(1, 10000)) : null; - IndexAction action = new IndexAction("_index", "_type", randomBoolean() ? "123" : null, null, timeout, timeZone); + WriteRequest.RefreshPolicy refreshPolicy = randomBoolean() ? null : randomFrom(WriteRequest.RefreshPolicy.values()); + IndexAction action = new IndexAction("_index", "_type", randomBoolean() ? "123" : null, null, timeout, timeZone, + refreshPolicy); list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(), AlwaysConditionTests.randomCondition(scriptService), randomTransform(), new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), diff --git a/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/10_basic.yml b/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/10_basic.yml index 3ff6f0757a0..9ae698ced70 100644 --- a/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/10_basic.yml +++ b/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/10_basic.yml @@ -131,6 +131,7 @@ teardown: "index" : { "index" : "my_test_index", "doc_type" : "my-type", + "refresh" : "wait_for", "doc_id": "my-id" } } @@ -156,3 +157,8 @@ teardown: - is_true: watch_record.node - is_false: watch_record.result.input.payload.foo - is_true: watch_record.result.input.payload.spam + + - do: + search: + index: my_test_index + - match: { hits.total : 1 }