Add configurable op_type for index watcher action (#64590) (#64647)

This commit is contained in:
Dan Hermann 2020-11-05 08:21:19 -06:00 committed by GitHub
parent 15f2cbd84c
commit 38ee2da564
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 103 additions and 22 deletions

View File

@ -26,8 +26,8 @@ The following snippet shows a simple `index` action definition:
<1> The id of the action
<2> An optional <<condition,condition>> to restrict action execution
<3> An optional <<transform,transform>> to transform the payload and prepare the data that should be indexed
<4> The elasticsearch index to store the data to
<5> An optional `_id` for the document, if it should always be the same document.
<4> The index, alias, or data stream to which the data will be written
<5> An optional `_id` for the document
[[index-action-attributes]]
@ -37,11 +37,15 @@ The following snippet shows a simple `index` action definition:
|======
|Name |Required | Default | Description
| `index` | yes | - | The Elasticsearch index to index into.
| `index` | yes | - | The index, alias, or data stream to index into.
| `doc_id` | no | - | The optional `_id` of the document.
| `op_type` | no | `index` | The <<docs-index-api-op_type,op_type>> for the index operation.
Must be one of either `index` or `create`. Must be `create` if
`index` is a data stream.
| `execution_time_field` | no | - | The field that will store/index the watch execution
time.

View File

@ -84,6 +84,9 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", data, INDEX_FIELD, action.index));
indexRequest.type(getField(actionId, ctx.id().watchId(), "type",data, TYPE_FIELD, action.docType));
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",data, ID_FIELD, action.docId));
if (action.opType != null) {
indexRequest.opType(action.opType);
}
data = addTimestampToDocument(data, ctx.executionTime());
BytesReference bytesReference;
@ -130,6 +133,9 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", doc, INDEX_FIELD, action.index));
indexRequest.type(getField(actionId, ctx.id().watchId(), "type",doc, TYPE_FIELD, action.docType));
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",doc, ID_FIELD, action.docId));
if (action.opType != null) {
indexRequest.opType(action.opType);
}
doc = addTimestampToDocument(doc, ctx.executionTime());
try (XContentBuilder builder = jsonBuilder()) {

View File

@ -6,9 +6,11 @@
package org.elasticsearch.xpack.watcher.actions.index;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.List;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.common.unit.TimeValue;
@ -31,6 +33,7 @@ public class IndexAction implements Action {
@Nullable @Deprecated final String docType;
@Nullable final String index;
@Nullable final String docId;
@Nullable final DocWriteRequest.OpType opType;
@Nullable final String executionTimeField;
@Nullable final TimeValue timeout;
@Nullable final ZoneId dynamicNameTimeZone;
@ -42,18 +45,20 @@ public class IndexAction implements Action {
public IndexAction(@Nullable String index, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
this(index, null, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
this(index, null, docId, null, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
/**
* Document types are deprecated, use constructor without docType
*/
@Deprecated
public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId, @Nullable DocWriteRequest.OpType opType,
@Nullable String executionTimeField, @Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone,
@Nullable RefreshPolicy refreshPolicy) {
this.index = index;
this.docType = docType;
this.docId = docId;
this.opType = opType;
this.executionTimeField = executionTimeField;
this.timeout = timeout;
this.dynamicNameTimeZone = dynamicNameTimeZone;
@ -77,6 +82,10 @@ public class IndexAction implements Action {
return docId;
}
public DocWriteRequest.OpType getOpType() {
return opType;
}
public String getExecutionTimeField() {
return executionTimeField;
}
@ -96,7 +105,10 @@ public class IndexAction implements Action {
IndexAction that = (IndexAction) o;
return Objects.equals(index, that.index) && Objects.equals(docType, that.docType) && Objects.equals(docId, that.docId)
return Objects.equals(index, that.index)
&& Objects.equals(docType, that.docType)
&& Objects.equals(docId, that.docId)
&& Objects.equals(opType, that.opType)
&& Objects.equals(executionTimeField, that.executionTimeField)
&& Objects.equals(timeout, that.timeout)
&& Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone)
@ -105,7 +117,7 @@ public class IndexAction implements Action {
@Override
public int hashCode() {
return Objects.hash(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
return Objects.hash(index, docType, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
@Override
@ -120,6 +132,9 @@ public class IndexAction implements Action {
if (docId != null) {
builder.field(Field.DOC_ID.getPreferredName(), docId);
}
if (opType != null) {
builder.field(Field.OP_TYPE.getPreferredName(), opType);
}
if (executionTimeField != null) {
builder.field(Field.EXECUTION_TIME_FIELD.getPreferredName(), executionTimeField);
}
@ -139,6 +154,7 @@ public class IndexAction implements Action {
String index = null;
String docType = null;
String docId = null;
DocWriteRequest.OpType opType = null;
String executionTimeField = null;
TimeValue timeout = null;
ZoneId dynamicNameTimeZone = null;
@ -169,6 +185,17 @@ public class IndexAction implements Action {
docType = parser.text();
} else if (Field.DOC_ID.match(currentFieldName, parser.getDeprecationHandler())) {
docId = parser.text();
} else if (Field.OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
try {
opType = DocWriteRequest.OpType.fromString(parser.text());
if (List.of(DocWriteRequest.OpType.CREATE, DocWriteRequest.OpType.INDEX).contains(opType) == false) {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. op_type value for field [{}] " +
"must be [index] or [create]", TYPE, watchId, actionId, currentFieldName);
}
} catch (IllegalArgumentException e) {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. failed to parse op_type value for " +
"field [{}]", TYPE, watchId, actionId, currentFieldName);
}
} else if (Field.EXECUTION_TIME_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
executionTimeField = parser.text();
} else if (Field.TIMEOUT_HUMAN.match(currentFieldName, parser.getDeprecationHandler())) {
@ -193,7 +220,7 @@ public class IndexAction implements Action {
}
}
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
return new IndexAction(index, docType, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
/**
@ -289,6 +316,7 @@ public class IndexAction implements Action {
final String index;
final String docType;
String docId;
DocWriteRequest.OpType opType;
String executionTimeField;
TimeValue timeout;
ZoneId dynamicNameTimeZone;
@ -313,6 +341,11 @@ public class IndexAction implements Action {
return this;
}
public Builder setOpType(DocWriteRequest.OpType opType) {
this.opType = opType;
return this;
}
public Builder setExecutionTimeField(String executionTimeField) {
this.executionTimeField = executionTimeField;
return this;
@ -335,7 +368,7 @@ public class IndexAction implements Action {
@Override
public IndexAction build() {
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
return new IndexAction(index, docType, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
}
@ -343,6 +376,7 @@ public class IndexAction implements Action {
ParseField INDEX = new ParseField("index");
ParseField DOC_TYPE = new ParseField("doc_type");
ParseField DOC_ID = new ParseField("doc_id");
ParseField OP_TYPE = new ParseField("op_type");
ParseField EXECUTION_TIME_FIELD = new ParseField("execution_time_field");
ParseField SOURCE = new ParseField("source");
ParseField RESPONSE = new ParseField("response");

View File

@ -49,6 +49,7 @@ import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import static org.elasticsearch.common.util.set.Sets.newHashSet;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
@ -87,6 +88,10 @@ public class IndexActionTests extends ESTestCase {
if (writeTimeout != null) {
builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout.millis());
}
DocWriteRequest.OpType opType = randomBoolean() ? DocWriteRequest.OpType.fromId(randomFrom(new Byte[] { 0, 1 })) : null;
if (opType != null) {
builder.field(IndexAction.Field.OP_TYPE.getPreferredName(), opType.getLowercase());
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
XContentParser parser = createParser(builder);
@ -100,6 +105,9 @@ public class IndexActionTests extends ESTestCase {
if (timestampField != null) {
assertThat(executable.action().executionTimeField, equalTo(timestampField));
}
if (opType != null) {
assertThat(executable.action().opType, equalTo(opType));
}
assertThat(executable.action().timeout, equalTo(writeTimeout));
}
@ -146,20 +154,47 @@ public class IndexActionTests extends ESTestCase {
.endObject());
}
public void testOpTypeThatCannotBeParsed() throws Exception {
expectParseFailure(jsonBuilder()
.startObject()
.field(IndexAction.Field.OP_TYPE.getPreferredName(), randomAlphaOfLength(10))
.endObject(),
"failed to parse op_type value for field [op_type]");
}
public void testUnsupportedOpType() throws Exception {
expectParseFailure(jsonBuilder()
.startObject()
.field(IndexAction.Field.OP_TYPE.getPreferredName(),
randomFrom(DocWriteRequest.OpType.UPDATE.name(), DocWriteRequest.OpType.DELETE.name()))
.endObject(),
"op_type value for field [op_type] must be [index] or [create]");
}
private void expectParseFailure(XContentBuilder builder, String expectedMessage) throws Exception {
expectFailure(ElasticsearchParseException.class, builder, expectedMessage);
}
private void expectParseFailure(XContentBuilder builder) throws Exception {
expectFailure(ElasticsearchParseException.class, builder);
}
private void expectFailure(Class clazz, XContentBuilder builder) throws Exception {
expectFailure(clazz, builder, null);
}
private void expectFailure(Class clazz, XContentBuilder builder, String expectedMessage) throws Exception {
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
XContentParser parser = createParser(builder);
parser.nextToken();
expectThrows(clazz, () ->
actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
Throwable t = expectThrows(clazz, () -> actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
if (expectedMessage != null) {
assertThat(t.getMessage(), containsString(expectedMessage));
}
}
public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() {
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null, refreshPolicy);
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
final Map<String, Object> docWithId = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap();
@ -209,7 +244,7 @@ public class IndexActionTests extends ESTestCase {
final IndexAction action = new IndexAction(configureIndexDynamically ? null : "my_index",
configureTypeDynamically ? null : "my_type",
configureIdDynamically ? null : "my_id",
null, null, null, refreshPolicy);
null, null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
@ -230,7 +265,7 @@ public class IndexActionTests extends ESTestCase {
}
public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() throws Exception {
final IndexAction action = new IndexAction(null, "my-type", null, null, null, null, refreshPolicy);
final IndexAction action = new IndexAction(null, "my-type", null, null, null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
@ -263,7 +298,7 @@ public class IndexActionTests extends ESTestCase {
String fieldName = randomFrom("_index", "_type");
final IndexAction action = new IndexAction(fieldName.equals("_index") ? "my_index" : null,
fieldName.equals("_type") ? "my_type" : null,
null,null, null, null, refreshPolicy);
null, null, null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
@ -283,7 +318,7 @@ public class IndexActionTests extends ESTestCase {
String docId = randomAlphaOfLength(5);
String timestampField = randomFrom("@timestamp", null);
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null,
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, null, timestampField, null, null,
refreshPolicy);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
@ -334,7 +369,7 @@ public class IndexActionTests extends ESTestCase {
}
public void testFailureResult() throws Exception {
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null, refreshPolicy);
IndexAction action = new IndexAction("test-index", "test-type", null, null, "@timestamp", null, null, refreshPolicy);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.watch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
@ -585,15 +586,16 @@ public class WatchTests extends ESTestCase {
randomFrom(DataAttachment.JSON, DataAttachment.YAML), EmailAttachments.EMPTY_ATTACHMENTS);
list.add(new ActionWrapper("_email_" + randomAlphaOfLength(8), randomThrottler(),
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer,
new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer,
Collections.emptyMap()), null, null));
}
if (randomBoolean()) {
ZoneOffset timeZone = randomBoolean() ? ZoneOffset.UTC : null;
TimeValue timeout = randomBoolean() ? timeValueSeconds(between(1, 10000)) : null;
WriteRequest.RefreshPolicy refreshPolicy = randomBoolean() ? null : randomFrom(WriteRequest.RefreshPolicy.values());
IndexAction action = new IndexAction("_index", null, randomBoolean() ? "123" : null, null, timeout, timeZone,
refreshPolicy);
IndexAction action = new IndexAction("_index", null, randomBoolean() ? "123" : null,
randomBoolean() ? DocWriteRequest.OpType.fromId(randomFrom(new Byte[] { 0, 1 })) : null, null, timeout, timeZone,
refreshPolicy);
list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(),
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),