[Watcher] Allow Index Action to set _id (elastic/elasticsearch#4694)

This adds a "doc_id" parameter to the index action itself, which can accept a single ID value. This also allows the payload to set _id (or _doc._id) for each document being indexed in order to support this with bulk index actions.

If doc_id and _id are used together, then this blocks the action.

Original commit: elastic/x-pack-elasticsearch@f186ccceb8
This commit is contained in:
Chris Earle 2017-01-25 13:03:26 -05:00 committed by GitHub
parent 9d002430b5
commit 9fad3cf85c
6 changed files with 391 additions and 45 deletions

View File

@ -37,6 +37,8 @@ import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
public static final String ID_FIELD = "_id";
private final WatcherClientProxy client;
private final TimeValue timeout;
@ -65,14 +67,30 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
}
String docId = action.docId;
// prevent double-setting id
if (data.containsKey(ID_FIELD)) {
if (docId != null) {
throw illegalState("could not execute action [{}] of watch [{}]. " +
"[ctx.payload.{}] or [ctx.payload._doc.{}] were set with [doc_id]. Only set [{}] or [doc_id]",
actionId, ctx.watch().id(), ID_FIELD, ID_FIELD, ID_FIELD);
}
data = mutableMap(data);
docId = data.remove(ID_FIELD).toString();
}
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(action.index);
indexRequest.type(action.docType);
indexRequest.id(docId);
data = addTimestampToDocument(data, ctx.executionTime());
indexRequest.source(jsonBuilder().prettyPrint().map(data));
if (ctx.simulateAction(actionId)) {
return new IndexAction.Simulated(indexRequest.index(), action.docType, new XContentSource(indexRequest.source(),
return new IndexAction.Simulated(indexRequest.index(), action.docType, docId, new XContentSource(indexRequest.source(),
XContentType.JSON));
}
@ -84,6 +102,10 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
Action.Result indexBulk(Iterable list, String actionId, WatchExecutionContext ctx) throws Exception {
if (action.docId != null) {
throw illegalState("could not execute action [{}] of watch [{}]. [doc_id] cannot be used with bulk [_doc] indexing");
}
BulkRequest bulkRequest = new BulkRequest();
for (Object item : list) {
if (!(item instanceof Map)) {
@ -94,6 +116,10 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(action.index);
indexRequest.type(action.docType);
if (doc.containsKey(ID_FIELD)) {
doc = mutableMap(doc);
indexRequest.id(doc.remove(ID_FIELD).toString());
}
doc = addTimestampToDocument(doc, ctx.executionTime());
indexRequest.source(jsonBuilder().prettyPrint().map(doc));
bulkRequest.add(indexRequest);
@ -118,14 +144,23 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
private Map<String, Object> addTimestampToDocument(Map<String, Object> data, DateTime executionTime) {
if (action.executionTimeField != null) {
if (!(data instanceof HashMap)) {
data = new HashMap<>(data); // ensuring mutability
}
data = mutableMap(data);
data.put(action.executionTimeField, WatcherDateTimeUtils.formatDate(executionTime));
}
return data;
}
/**
* Guarantees that the {@code data} is mutable for any code that needs to modify the {@linkplain Map} before using it (e.g., from
* singleton, immutable {@code Map}s).
*
* @param data The map to make mutable
* @return Always a {@linkplain HashMap}
*/
private Map<String, Object> mutableMap(Map<String, Object> data) {
return data instanceof HashMap ? data : new HashMap<>(data);
}
static void itemResponseToXContent(XContentBuilder builder, BulkItemResponse item) throws IOException {
if (item.isFailed()) {
builder.startObject()

View File

@ -27,14 +27,17 @@ public class IndexAction implements Action {
final String index;
final String docType;
@Nullable final String docId;
@Nullable final String executionTimeField;
@Nullable final TimeValue timeout;
@Nullable final DateTimeZone dynamicNameTimeZone;
public IndexAction(String index, String docType, @Nullable String executionTimeField,
public IndexAction(String index, String docType, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone) {
this.index = index;
this.docType = docType;
this.docId = docId;
this.executionTimeField = executionTimeField;
this.timeout = timeout;
this.dynamicNameTimeZone = dynamicNameTimeZone;
@ -53,6 +56,10 @@ public class IndexAction implements Action {
return docType;
}
public String getDocId() {
return docId;
}
public String getExecutionTimeField() {
return executionTimeField;
}
@ -68,7 +75,7 @@ public class IndexAction implements Action {
IndexAction that = (IndexAction) o;
return Objects.equals(index, that.index) && Objects.equals(docType, that.docType)
return Objects.equals(index, that.index) && Objects.equals(docType, that.docType) && Objects.equals(docId, that.docId)
&& Objects.equals(executionTimeField, that.executionTimeField)
&& Objects.equals(timeout, that.timeout)
&& Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone);
@ -76,7 +83,7 @@ public class IndexAction implements Action {
@Override
public int hashCode() {
return Objects.hash(index, docType, executionTimeField, timeout, dynamicNameTimeZone);
return Objects.hash(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone);
}
@Override
@ -84,6 +91,9 @@ public class IndexAction implements Action {
builder.startObject();
builder.field(Field.INDEX.getPreferredName(), index);
builder.field(Field.DOC_TYPE.getPreferredName(), docType);
if (docId != null) {
builder.field(Field.DOC_ID.getPreferredName(), docId);
}
if (executionTimeField != null) {
builder.field(Field.EXECUTION_TIME_FIELD.getPreferredName(), executionTimeField);
}
@ -99,6 +109,7 @@ public class IndexAction implements Action {
public static IndexAction parse(String watchId, String actionId, XContentParser parser) throws IOException {
String index = null;
String docType = null;
String docId = null;
String executionTimeField = null;
TimeValue timeout = null;
DateTimeZone dynamicNameTimeZone = null;
@ -125,6 +136,8 @@ public class IndexAction implements Action {
} else if (token == XContentParser.Token.VALUE_STRING) {
if (Field.DOC_TYPE.match(currentFieldName)) {
docType = parser.text();
} else if (Field.DOC_ID.match(currentFieldName)) {
docId = parser.text();
} else if (Field.EXECUTION_TIME_FIELD.match(currentFieldName)) {
executionTimeField = parser.text();
} else if (Field.TIMEOUT_HUMAN.match(currentFieldName)) {
@ -157,7 +170,7 @@ public class IndexAction implements Action {
actionId, Field.DOC_TYPE.getPreferredName());
}
return new IndexAction(index, docType, executionTimeField, timeout, dynamicNameTimeZone);
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone);
}
public static Builder builder(String index, String docType) {
@ -189,12 +202,15 @@ public class IndexAction implements Action {
private final String index;
private final String docType;
@Nullable
private final String docId;
private final XContentSource source;
protected Simulated(String index, String docType, XContentSource source) {
protected Simulated(String index, String docType, @Nullable String docId, XContentSource source) {
super(TYPE, Status.SIMULATED);
this.index = index;
this.docType = docType;
this.docId = docId;
this.source = source;
}
@ -206,19 +222,28 @@ public class IndexAction implements Action {
return docType;
}
public String docId() {
return docId;
}
public XContentSource source() {
return source;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject(type)
.startObject(Field.REQUEST.getPreferredName())
.field(Field.INDEX.getPreferredName(), index)
.field(Field.DOC_TYPE.getPreferredName(), docType)
.field(Field.SOURCE.getPreferredName(), source, params)
.endObject()
.endObject();
builder.startObject(type)
.startObject(Field.REQUEST.getPreferredName())
.field(Field.INDEX.getPreferredName(), index)
.field(Field.DOC_TYPE.getPreferredName(), docType);
if (docId != null) {
builder.field(Field.DOC_ID.getPreferredName(), docId);
}
return builder.field(Field.SOURCE.getPreferredName(), source, params)
.endObject()
.endObject();
}
}
@ -226,6 +251,7 @@ public class IndexAction implements Action {
final String index;
final String docType;
String docId;
String executionTimeField;
TimeValue timeout;
DateTimeZone dynamicNameTimeZone;
@ -235,30 +261,36 @@ public class IndexAction implements Action {
this.docType = docType;
}
public Builder setDocId(String docId) {
this.docId = docId;
return this;
}
public Builder setExecutionTimeField(String executionTimeField) {
this.executionTimeField = executionTimeField;
return this;
}
public Builder timeout(TimeValue writeTimeout) {
public Builder setTimeout(TimeValue writeTimeout) {
this.timeout = writeTimeout;
return this;
}
public Builder dynamicNameTimeZone(DateTimeZone dynamicNameTimeZone) {
public Builder setDynamicNameTimeZone(DateTimeZone dynamicNameTimeZone) {
this.dynamicNameTimeZone = dynamicNameTimeZone;
return this;
}
@Override
public IndexAction build() {
return new IndexAction(index, docType, executionTimeField, timeout, dynamicNameTimeZone);
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone);
}
}
interface Field extends Action.Field {
ParseField INDEX = new ParseField("index");
ParseField DOC_TYPE = new ParseField("doc_type");
ParseField DOC_ID = new ParseField("doc_id");
ParseField EXECUTION_TIME_FIELD = new ParseField("execution_time_field");
ParseField SOURCE = new ParseField("source");
ParseField RESPONSE = new ParseField("response");

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -54,13 +55,24 @@ import static org.joda.time.DateTimeZone.UTC;
public class IndexActionTests extends ESIntegTestCase {
public void testIndexActionExecuteSingleDoc() throws Exception {
boolean customId = randomBoolean();
boolean docIdAsParam = customId && randomBoolean();
String docId = randomAsciiOfLength(5);
String timestampField = randomFrom("@timestamp", null);
boolean customTimestampField = timestampField != null;
IndexAction action = new IndexAction("test-index", "test-type", timestampField, null, null);
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, WatcherClientProxy.of(client()), null);
DateTime executionTime = DateTime.now(UTC);
Payload payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", singletonMap("foo", "bar"));
Payload payload;
if (customId && docIdAsParam == false) {
// intentionally immutable because the other side needs to cut out _id
payload = new Payload.Simple("_doc", MapBuilder.newMapBuilder().put("foo", "bar").put("_id", docId).immutableMap());
} else {
payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", singletonMap("foo", "bar"));
}
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload);
Action.Result result = executable.execute("_id", ctx, ctx.payload());
@ -89,6 +101,10 @@ public class IndexActionTests extends ESIntegTestCase {
assertThat(searchResponse.getHits().totalHits(), equalTo(1L));
SearchHit hit = searchResponse.getHits().getAt(0);
if (customId) {
assertThat(hit.getId(), is(docId));
}
if (customTimestampField) {
assertThat(hit.getSource().size(), is(2));
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
@ -112,18 +128,26 @@ public class IndexActionTests extends ESIntegTestCase {
assertAcked(prepareCreate("test-index")
.addMapping("test-type", "foo", "type=keyword"));
List<Map> idList = Arrays.asList(
MapBuilder.newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap(),
MapBuilder.newMapBuilder().put("foo", "bar1").put("_id", "1").map()
);
Object list = randomFrom(
new Map[] { singletonMap("foo", "bar"), singletonMap("foo", "bar1") },
Arrays.asList(singletonMap("foo", "bar"), singletonMap("foo", "bar1")),
unmodifiableSet(newHashSet(singletonMap("foo", "bar"), singletonMap("foo", "bar1")))
unmodifiableSet(newHashSet(singletonMap("foo", "bar"), singletonMap("foo", "bar1"))),
idList
);
IndexAction action = new IndexAction("test-index", "test-type", timestampField, null, null);
boolean customId = list == idList;
IndexAction action = new IndexAction("test-index", "test-type", null, timestampField, null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, WatcherClientProxy.of(client()), null);
DateTime executionTime = DateTime.now(UTC);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, new Payload.Simple("_doc", list));
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("watch_id", executionTime, new Payload.Simple("_doc", list));
Action.Result result = executable.execute("_id", ctx, ctx.payload());
Action.Result result = executable.execute("watch_id", ctx, ctx.payload());
assertThat(result.status(), equalTo(Status.SUCCESS));
assertThat(result, instanceOf(IndexAction.Result.class));
@ -147,23 +171,20 @@ public class IndexActionTests extends ESIntegTestCase {
.get();
assertThat(searchResponse.getHits().totalHits(), equalTo(2L));
SearchHit hit = searchResponse.getHits().getAt(0);
if (customTimestampField) {
assertThat(hit.getSource().size(), is(2));
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
assertThat(hit.getSource(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime)));
} else {
assertThat(hit.getSource().size(), is(1));
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar"));
}
hit = searchResponse.getHits().getAt(1);
if (customTimestampField) {
assertThat(hit.getSource().size(), is(2));
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar1"));
assertThat(hit.getSource(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime)));
} else {
assertThat(hit.getSource().size(), is(1));
assertThat(hit.getSource(), hasEntry("foo", (Object) "bar1"));
final int fields = customTimestampField ? 2 : 1;
for (int i = 0; i < 2; ++i) {
final SearchHit hit = searchResponse.getHits().getAt(i);
final String value = "bar" + (i != 0 ? i : "");
assertThat(hit.getSource().size(), is(fields));
if (customId) {
assertThat(hit.getId(), is(Integer.toString(i)));
}
if (customTimestampField) {
assertThat(hit.getSource(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime)));
}
assertThat(hit.getSource(), hasEntry("foo", (Object) value));
}
}
@ -234,7 +255,7 @@ public class IndexActionTests extends ESIntegTestCase {
client().prepareIndex("test-index", "test-type", "_id").setSource("foo", true)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
IndexAction action = new IndexAction("test-index", "test-type", "@timestamp", null, null);
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, WatcherClientProxy.of(client()), null);
List<Map<String, Object>> docs = new ArrayList<>();
@ -254,4 +275,35 @@ public class IndexActionTests extends ESIntegTestCase {
assertThat(result.status(), is(Status.FAILURE));
}
}
public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() {
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, WatcherClientProxy.of(client()), null);
final Map<String, Object> docWithId = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap();
final DateTime executionTime = DateTime.now(UTC);
// using doc_id with bulk fails regardless of using ID
expectThrows(IllegalStateException.class, () -> {
final List<Map> idList = Arrays.asList(docWithId, MapBuilder.newMapBuilder().put("foo", "bar1").put("_id", "1").map());
final Object list = randomFrom(
new Map[] { singletonMap("foo", "bar"), singletonMap("foo", "bar1") },
Arrays.asList(singletonMap("foo", "bar"), singletonMap("foo", "bar1")),
unmodifiableSet(newHashSet(singletonMap("foo", "bar"), singletonMap("foo", "bar1"))),
idList
);
final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, new Payload.Simple("_doc", list));
executable.execute("_id", ctx, ctx.payload());
});
// using doc_id with _id
expectThrows(IllegalStateException.class, () -> {
final Payload payload = randomBoolean() ? new Payload.Simple("_doc", docWithId) : new Payload.Simple(docWithId);
final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload);
executable.execute("_id", ctx, ctx.payload());
});
}
}

View File

@ -471,7 +471,7 @@ public class WatchTests extends ESTestCase {
if (randomBoolean()) {
DateTimeZone timeZone = randomBoolean() ? DateTimeZone.UTC : null;
TimeValue timeout = randomBoolean() ? timeValueSeconds(between(1, 10000)) : null;
IndexAction action = new IndexAction("_index", "_type", null, timeout, timeZone);
IndexAction action = new IndexAction("_index", "_type", randomBoolean() ? "123" : null, null, timeout, timeZone);
list.add(new ActionWrapper("_index_" + randomAsciiOfLength(8), randomThrottler(),
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
new ExecutableIndexAction(action, logger, client, null)));

View File

@ -55,6 +55,6 @@ teardown:
- do:
xpack.watcher.get_watch:
id: "my_watch1"
- match: { found : true}
- match: { found : true }
- match: { _id: "my_watch1" }
- match: { watch.actions.test_index.condition.compare: { "ctx.payload.value": { lt: 10 } } }

View File

@ -0,0 +1,227 @@
---
setup:
- do:
cluster.health:
wait_for_status: yellow
---
teardown:
- do:
xpack.watcher.delete_watch:
id: "my_watch"
ignore: 404
---
"Test put watch api with index action using doc_id":
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger": {
"schedule": {
"hourly": {
"minute": [ 0, 5 ]
}
}
},
"input": {
"simple": {
"value": 15
}
},
"condition": {
"always": {}
},
"actions": {
"test_index": {
"index": {
"index": "test",
"doc_type": "test2",
"doc_id": "test_id1"
}
}
}
}
- match: { _id: "my_watch" }
- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { found : true }
- match: { _id: "my_watch" }
- match: { watch.input.simple.value: 15 }
- match: { watch.actions.test_index.index.doc_id: "test_id1" }
- do:
xpack.watcher.execute_watch:
id: "my_watch"
- match: { watch_record.state: "executed" }
- match: { watch_record.result.actions.0.index.response.id: "test_id1" }
---
"Test put watch api with index action using _id field":
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger": {
"schedule": {
"hourly": {
"minute": [ 0, 5 ]
}
}
},
"input": {
"simple": {
"_id": "test_id2",
"value": 20
}
},
"condition": {
"always": {}
},
"actions": {
"test_index": {
"index": {
"index": "test",
"doc_type": "test2"
}
}
}
}
- match: { _id: "my_watch" }
- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { found : true }
- match: { _id: "my_watch" }
- match: { watch.input.simple._id: "test_id2" }
- match: { watch.input.simple.value: 20 }
- do:
xpack.watcher.execute_watch:
id: "my_watch"
- match: { watch_record.state: "executed" }
- match: { watch_record.result.actions.0.index.response.id: "test_id2" }
---
"Test put watch api with bulk index action using _id field":
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger": {
"schedule": {
"hourly": {
"minute": [ 0, 5 ]
}
}
},
"input": {
"simple": {
"_doc": [
{
"_id": "test_id3",
"value": 30
},
{
"_id": "test_id4",
"value": 40
}
]
}
},
"condition": {
"always": {}
},
"actions": {
"test_index": {
"index": {
"index": "test",
"doc_type": "test2"
}
}
}
}
- match: { _id: "my_watch" }
- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { found : true }
- match: { _id: "my_watch" }
- match: { watch.input.simple._doc.0._id: "test_id3" }
- match: { watch.input.simple._doc.0.value: 30 }
- match: { watch.input.simple._doc.1._id: "test_id4" }
- match: { watch.input.simple._doc.1.value: 40 }
- do:
xpack.watcher.execute_watch:
id: "my_watch"
- match: { watch_record.state: "executed" }
- match: { watch_record.result.actions.0.index.response.0.id: "test_id3" }
- match: { watch_record.result.actions.0.index.response.1.id: "test_id4" }
---
"Test put watch api with bulk index action using _id field in one document":
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger": {
"schedule": {
"hourly": {
"minute": [ 0, 5 ]
}
}
},
"input": {
"simple": {
"_doc": [
{
"value": 50
},
{
"_id": "test_id6",
"value": 60
}
]
}
},
"condition": {
"always": {}
},
"actions": {
"test_index": {
"index": {
"index": "test",
"doc_type": "test2"
}
}
}
}
- match: { _id: "my_watch" }
- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { found : true }
- match: { _id: "my_watch" }
- match: { watch.input.simple._doc.0.value: 50 }
- match: { watch.input.simple._doc.1._id: "test_id6" }
- match: { watch.input.simple._doc.1.value: 60 }
- do:
xpack.watcher.execute_watch:
id: "my_watch"
- match: { watch_record.state: "executed" }
- match: { watch_record.result.actions.0.index.response.1.id: "test_id6" }