diff --git a/x-pack/docs/en/watcher/actions/index.asciidoc b/x-pack/docs/en/watcher/actions/index.asciidoc index d6114259c56..e967cfa0af7 100644 --- a/x-pack/docs/en/watcher/actions/index.asciidoc +++ b/x-pack/docs/en/watcher/actions/index.asciidoc @@ -26,8 +26,8 @@ The following snippet shows a simple `index` action definition: <1> The id of the action <2> An optional <> to restrict action execution <3> An optional <> to transform the payload and prepare the data that should be indexed -<4> The elasticsearch index to store the data to -<5> An optional `_id` for the document, if it should always be the same document. +<4> The index, alias, or data stream to which the data will be written +<5> An optional `_id` for the document [[index-action-attributes]] @@ -37,11 +37,15 @@ The following snippet shows a simple `index` action definition: |====== |Name |Required | Default | Description -| `index` | yes | - | The Elasticsearch index to index into. +| `index` | yes | - | The index, alias, or data stream to index into. | `doc_id` | no | - | The optional `_id` of the document. +| `op_type` | no | `index` | The <> for the index operation. + Must be one of either `index` or `create`. Must be `create` if + `index` is a data stream. + | `execution_time_field` | no | - | The field that will store/index the watch execution time. diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java index 82ac6f8d6d7..44e722bbeb4 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java @@ -84,6 +84,9 @@ public class ExecutableIndexAction extends ExecutableAction { indexRequest.index(getField(actionId, ctx.id().watchId(), "index", data, INDEX_FIELD, action.index)); indexRequest.type(getField(actionId, ctx.id().watchId(), "type",data, TYPE_FIELD, action.docType)); indexRequest.id(getField(actionId, ctx.id().watchId(), "id",data, ID_FIELD, action.docId)); + if (action.opType != null) { + indexRequest.opType(action.opType); + } data = addTimestampToDocument(data, ctx.executionTime()); BytesReference bytesReference; @@ -130,6 +133,9 @@ public class ExecutableIndexAction extends ExecutableAction { indexRequest.index(getField(actionId, ctx.id().watchId(), "index", doc, INDEX_FIELD, action.index)); indexRequest.type(getField(actionId, ctx.id().watchId(), "type",doc, TYPE_FIELD, action.docType)); indexRequest.id(getField(actionId, ctx.id().watchId(), "id",doc, ID_FIELD, action.docId)); + if (action.opType != null) { + indexRequest.opType(action.opType); + } doc = addTimestampToDocument(doc, ctx.executionTime()); try (XContentBuilder builder = jsonBuilder()) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java index 5f6b6792e3f..296cf9c7e27 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java @@ -6,9 +6,11 @@ package org.elasticsearch.xpack.watcher.actions.index; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.collect.List; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.time.DateUtils; import org.elasticsearch.common.unit.TimeValue; @@ -31,6 +33,7 @@ public class IndexAction implements Action { @Nullable @Deprecated final String docType; @Nullable final String index; @Nullable final String docId; + @Nullable final DocWriteRequest.OpType opType; @Nullable final String executionTimeField; @Nullable final TimeValue timeout; @Nullable final ZoneId dynamicNameTimeZone; @@ -42,18 +45,20 @@ public class IndexAction implements Action { public IndexAction(@Nullable String index, @Nullable String docId, @Nullable String executionTimeField, @Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) { - this(index, null, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy); + this(index, null, docId, null, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy); } + /** * Document types are deprecated, use constructor without docType */ @Deprecated - public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId, - @Nullable String executionTimeField, - @Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) { + public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId, @Nullable DocWriteRequest.OpType opType, + @Nullable String executionTimeField, @Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, + @Nullable RefreshPolicy refreshPolicy) { this.index = index; this.docType = docType; this.docId = docId; + this.opType = opType; this.executionTimeField = executionTimeField; this.timeout = timeout; this.dynamicNameTimeZone = dynamicNameTimeZone; @@ -77,6 +82,10 @@ public class IndexAction implements Action { return docId; } + public DocWriteRequest.OpType getOpType() { + return opType; + } + public String getExecutionTimeField() { return executionTimeField; } @@ -96,7 +105,10 @@ public class IndexAction implements Action { IndexAction that = (IndexAction) o; - return Objects.equals(index, that.index) && Objects.equals(docType, that.docType) && Objects.equals(docId, that.docId) + return Objects.equals(index, that.index) + && Objects.equals(docType, that.docType) + && Objects.equals(docId, that.docId) + && Objects.equals(opType, that.opType) && Objects.equals(executionTimeField, that.executionTimeField) && Objects.equals(timeout, that.timeout) && Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone) @@ -105,7 +117,7 @@ public class IndexAction implements Action { @Override public int hashCode() { - return Objects.hash(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy); + return Objects.hash(index, docType, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy); } @Override @@ -120,6 +132,9 @@ public class IndexAction implements Action { if (docId != null) { builder.field(Field.DOC_ID.getPreferredName(), docId); } + if (opType != null) { + builder.field(Field.OP_TYPE.getPreferredName(), opType); + } if (executionTimeField != null) { builder.field(Field.EXECUTION_TIME_FIELD.getPreferredName(), executionTimeField); } @@ -139,6 +154,7 @@ public class IndexAction implements Action { String index = null; String docType = null; String docId = null; + DocWriteRequest.OpType opType = null; String executionTimeField = null; TimeValue timeout = null; ZoneId dynamicNameTimeZone = null; @@ -169,6 +185,17 @@ public class IndexAction implements Action { docType = parser.text(); } else if (Field.DOC_ID.match(currentFieldName, parser.getDeprecationHandler())) { docId = parser.text(); + } else if (Field.OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) { + try { + opType = DocWriteRequest.OpType.fromString(parser.text()); + if (List.of(DocWriteRequest.OpType.CREATE, DocWriteRequest.OpType.INDEX).contains(opType) == false) { + throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. op_type value for field [{}] " + + "must be [index] or [create]", TYPE, watchId, actionId, currentFieldName); + } + } catch (IllegalArgumentException e) { + throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. failed to parse op_type value for " + + "field [{}]", TYPE, watchId, actionId, currentFieldName); + } } else if (Field.EXECUTION_TIME_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { executionTimeField = parser.text(); } else if (Field.TIMEOUT_HUMAN.match(currentFieldName, parser.getDeprecationHandler())) { @@ -193,7 +220,7 @@ public class IndexAction implements Action { } } - return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy); + return new IndexAction(index, docType, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy); } /** @@ -289,6 +316,7 @@ public class IndexAction implements Action { final String index; final String docType; String docId; + DocWriteRequest.OpType opType; String executionTimeField; TimeValue timeout; ZoneId dynamicNameTimeZone; @@ -313,6 +341,11 @@ public class IndexAction implements Action { return this; } + public Builder setOpType(DocWriteRequest.OpType opType) { + this.opType = opType; + return this; + } + public Builder setExecutionTimeField(String executionTimeField) { this.executionTimeField = executionTimeField; return this; @@ -335,7 +368,7 @@ public class IndexAction implements Action { @Override public IndexAction build() { - return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy); + return new IndexAction(index, docType, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy); } } @@ -343,6 +376,7 @@ public class IndexAction implements Action { ParseField INDEX = new ParseField("index"); ParseField DOC_TYPE = new ParseField("doc_type"); ParseField DOC_ID = new ParseField("doc_id"); + ParseField OP_TYPE = new ParseField("op_type"); ParseField EXECUTION_TIME_FIELD = new ParseField("execution_time_field"); ParseField SOURCE = new ParseField("source"); ParseField RESPONSE = new ParseField("response"); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java index 0ecc3cd412f..c7c275187f5 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java @@ -49,6 +49,7 @@ import static java.util.Collections.unmodifiableSet; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasSize; @@ -87,6 +88,10 @@ public class IndexActionTests extends ESTestCase { if (writeTimeout != null) { builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout.millis()); } + DocWriteRequest.OpType opType = randomBoolean() ? DocWriteRequest.OpType.fromId(randomFrom(new Byte[] { 0, 1 })) : null; + if (opType != null) { + builder.field(IndexAction.Field.OP_TYPE.getPreferredName(), opType.getLowercase()); + } builder.endObject(); IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client); XContentParser parser = createParser(builder); @@ -100,6 +105,9 @@ public class IndexActionTests extends ESTestCase { if (timestampField != null) { assertThat(executable.action().executionTimeField, equalTo(timestampField)); } + if (opType != null) { + assertThat(executable.action().opType, equalTo(opType)); + } assertThat(executable.action().timeout, equalTo(writeTimeout)); } @@ -146,20 +154,47 @@ public class IndexActionTests extends ESTestCase { .endObject()); } + public void testOpTypeThatCannotBeParsed() throws Exception { + expectParseFailure(jsonBuilder() + .startObject() + .field(IndexAction.Field.OP_TYPE.getPreferredName(), randomAlphaOfLength(10)) + .endObject(), + "failed to parse op_type value for field [op_type]"); + } + + public void testUnsupportedOpType() throws Exception { + expectParseFailure(jsonBuilder() + .startObject() + .field(IndexAction.Field.OP_TYPE.getPreferredName(), + randomFrom(DocWriteRequest.OpType.UPDATE.name(), DocWriteRequest.OpType.DELETE.name())) + .endObject(), + "op_type value for field [op_type] must be [index] or [create]"); + } + + private void expectParseFailure(XContentBuilder builder, String expectedMessage) throws Exception { + expectFailure(ElasticsearchParseException.class, builder, expectedMessage); + } + private void expectParseFailure(XContentBuilder builder) throws Exception { expectFailure(ElasticsearchParseException.class, builder); } private void expectFailure(Class clazz, XContentBuilder builder) throws Exception { + expectFailure(clazz, builder, null); + } + + private void expectFailure(Class clazz, XContentBuilder builder, String expectedMessage) throws Exception { IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client); XContentParser parser = createParser(builder); parser.nextToken(); - expectThrows(clazz, () -> - actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser)); + Throwable t = expectThrows(clazz, () -> actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser)); + if (expectedMessage != null) { + assertThat(t.getMessage(), containsString(expectedMessage)); + } } public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() { - final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null, refreshPolicy); + final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null, null, refreshPolicy); final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); final Map docWithId = MapBuilder.newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap(); @@ -209,7 +244,7 @@ public class IndexActionTests extends ESTestCase { final IndexAction action = new IndexAction(configureIndexDynamically ? null : "my_index", configureTypeDynamically ? null : "my_type", configureIdDynamically ? null : "my_id", - null, null, null, refreshPolicy); + null, null, null, null, refreshPolicy); final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); @@ -230,7 +265,7 @@ public class IndexActionTests extends ESTestCase { } public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() throws Exception { - final IndexAction action = new IndexAction(null, "my-type", null, null, null, null, refreshPolicy); + final IndexAction action = new IndexAction(null, "my-type", null, null, null, null, null, refreshPolicy); final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); @@ -263,7 +298,7 @@ public class IndexActionTests extends ESTestCase { String fieldName = randomFrom("_index", "_type"); final IndexAction action = new IndexAction(fieldName.equals("_index") ? "my_index" : null, fieldName.equals("_type") ? "my_type" : null, - null,null, null, null, refreshPolicy); + null, null, null, null, null, refreshPolicy); final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); @@ -283,7 +318,7 @@ public class IndexActionTests extends ESTestCase { String docId = randomAlphaOfLength(5); String timestampField = randomFrom("@timestamp", null); - IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null, + IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, null, timestampField, null, null, refreshPolicy); ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); @@ -334,7 +369,7 @@ public class IndexActionTests extends ESTestCase { } public void testFailureResult() throws Exception { - IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null, refreshPolicy); + IndexAction action = new IndexAction("test-index", "test-type", null, null, "@timestamp", null, null, refreshPolicy); ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30)); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java index d1b6c6a322a..a9ebfd890d9 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.watch; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; @@ -585,15 +586,16 @@ public class WatchTests extends ESTestCase { randomFrom(DataAttachment.JSON, DataAttachment.YAML), EmailAttachments.EMPTY_ATTACHMENTS); list.add(new ActionWrapper("_email_" + randomAlphaOfLength(8), randomThrottler(), AlwaysConditionTests.randomCondition(scriptService), randomTransform(), - new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer, + new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer, Collections.emptyMap()), null, null)); } if (randomBoolean()) { ZoneOffset timeZone = randomBoolean() ? ZoneOffset.UTC : null; TimeValue timeout = randomBoolean() ? timeValueSeconds(between(1, 10000)) : null; WriteRequest.RefreshPolicy refreshPolicy = randomBoolean() ? null : randomFrom(WriteRequest.RefreshPolicy.values()); - IndexAction action = new IndexAction("_index", null, randomBoolean() ? "123" : null, null, timeout, timeZone, - refreshPolicy); + IndexAction action = new IndexAction("_index", null, randomBoolean() ? "123" : null, + randomBoolean() ? DocWriteRequest.OpType.fromId(randomFrom(new Byte[] { 0, 1 })) : null, null, timeout, timeZone, + refreshPolicy); list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(), AlwaysConditionTests.randomCondition(scriptService), randomTransform(), new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),