Watcher: Add refresh parameter to index action (elastic/x-pack-elasticsearch#3350)

This allows to configure the refresh parameter to the index action,
which is either applied to the resulting index or bulk request.

Relates elastic/apm-dev#76

Original commit: elastic/x-pack-elasticsearch@aedb6adc66
This commit is contained in:
Alexander Reelsen 2017-12-21 10:18:16 +01:00 committed by GitHub
parent bf74c77fef
commit 8c99a4dc78
6 changed files with 92 additions and 19 deletions

View File

@ -51,6 +51,9 @@ The following snippet shows a simple `index` action definition:
the index action times out and fails. This setting
overrides the default timeouts.
| `refresh` | no | - | Optional setting of the <<docs-refresh, refresh policy>>
for the write request
|======
[[anatomy-actions-index-multi-doc-support]]

View File

@ -77,6 +77,10 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
data = mutableMap(data);
}
IndexRequest indexRequest = new IndexRequest();
if (action.refreshPolicy != null) {
indexRequest.setRefreshPolicy(action.refreshPolicy);
}
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", data, INDEX_FIELD, action.index));
indexRequest.type(getField(actionId, ctx.id().watchId(), "type",data, TYPE_FIELD, action.docType));
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",data, ID_FIELD, action.docId));
@ -88,7 +92,7 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
if (ctx.simulateAction(actionId)) {
return new IndexAction.Simulated(indexRequest.index(), indexRequest.type(), indexRequest.id(),
return new IndexAction.Simulated(indexRequest.index(), indexRequest.type(), indexRequest.id(), action.refreshPolicy,
new XContentSource(indexRequest.source(), XContentType.JSON));
}
@ -107,6 +111,10 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
BulkRequest bulkRequest = new BulkRequest();
if (action.refreshPolicy != null) {
bulkRequest.setRefreshPolicy(action.refreshPolicy);
}
for (Object item : list) {
if (!(item instanceof Map)) {
throw illegalState("could not execute action [{}] of watch [{}]. failed to index payload data. " +

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.watcher.actions.index;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
@ -31,16 +33,18 @@ public class IndexAction implements Action {
@Nullable final String executionTimeField;
@Nullable final TimeValue timeout;
@Nullable final DateTimeZone dynamicNameTimeZone;
@Nullable final RefreshPolicy refreshPolicy;
public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone) {
@Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
this.index = index;
this.docType = docType;
this.docId = docId;
this.executionTimeField = executionTimeField;
this.timeout = timeout;
this.dynamicNameTimeZone = dynamicNameTimeZone;
this.refreshPolicy = refreshPolicy;
}
@Override
@ -68,6 +72,10 @@ public class IndexAction implements Action {
return dynamicNameTimeZone;
}
public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -78,12 +86,13 @@ public class IndexAction implements Action {
return Objects.equals(index, that.index) && Objects.equals(docType, that.docType) && Objects.equals(docId, that.docId)
&& Objects.equals(executionTimeField, that.executionTimeField)
&& Objects.equals(timeout, that.timeout)
&& Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone);
&& Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone)
&& Objects.equals(refreshPolicy, that.refreshPolicy);
}
@Override
public int hashCode() {
return Objects.hash(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone);
return Objects.hash(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
@Override
@ -107,6 +116,9 @@ public class IndexAction implements Action {
if (dynamicNameTimeZone != null) {
builder.field(Field.DYNAMIC_NAME_TIMEZONE.getPreferredName(), dynamicNameTimeZone);
}
if (refreshPolicy!= null) {
builder.field(Field.REFRESH.getPreferredName(), refreshPolicy.getValue());
}
return builder.endObject();
}
@ -117,6 +129,7 @@ public class IndexAction implements Action {
String executionTimeField = null;
TimeValue timeout = null;
DateTimeZone dynamicNameTimeZone = null;
RefreshPolicy refreshPolicy = null;
String currentFieldName = null;
XContentParser.Token token;
@ -148,7 +161,14 @@ public class IndexAction implements Action {
// Parser for human specified timeouts and 2.x compatibility
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT_HUMAN.toString());
} else if (Field.DYNAMIC_NAME_TIMEZONE.match(currentFieldName)) {
dynamicNameTimeZone = DateTimeZone.forID(parser.text());
if (token == XContentParser.Token.VALUE_STRING) {
dynamicNameTimeZone = DateTimeZone.forID(parser.text());
} else {
throw new ElasticsearchParseException("could not parse [{}] action for watch [{}]. failed to parse [{}]. must be " +
"a string value (e.g. 'UTC' or '+01:00').", TYPE, watchId, currentFieldName);
}
} else if (Field.REFRESH.match(currentFieldName)) {
refreshPolicy = RefreshPolicy.parse(parser.text());
} else {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. unexpected string field [{}]", TYPE,
watchId, actionId, currentFieldName);
@ -159,7 +179,7 @@ public class IndexAction implements Action {
}
}
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone);
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
public static Builder builder(String index, String docType) {
@ -191,16 +211,18 @@ public class IndexAction implements Action {
private final String index;
private final String docType;
@Nullable
private final String docId;
@Nullable private final String docId;
@Nullable private final RefreshPolicy refreshPolicy;
private final XContentSource source;
protected Simulated(String index, String docType, @Nullable String docId, XContentSource source) {
protected Simulated(String index, String docType, @Nullable String docId, @Nullable RefreshPolicy refreshPolicy,
XContentSource source) {
super(TYPE, Status.SIMULATED);
this.index = index;
this.docType = docType;
this.docId = docId;
this.source = source;
this.refreshPolicy = refreshPolicy;
}
public String index() {
@ -230,6 +252,10 @@ public class IndexAction implements Action {
builder.field(Field.DOC_ID.getPreferredName(), docId);
}
if (refreshPolicy != null) {
builder.field(Field.REFRESH.getPreferredName(), refreshPolicy.getValue());
}
return builder.field(Field.SOURCE.getPreferredName(), source, params)
.endObject()
.endObject();
@ -244,6 +270,7 @@ public class IndexAction implements Action {
String executionTimeField;
TimeValue timeout;
DateTimeZone dynamicNameTimeZone;
RefreshPolicy refreshPolicy;
private Builder(String index, String docType) {
this.index = index;
@ -270,9 +297,14 @@ public class IndexAction implements Action {
return this;
}
public Builder setRefreshPolicy(RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
return this;
}
@Override
public IndexAction build() {
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone);
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
}
@ -287,5 +319,6 @@ public class IndexAction implements Action {
ParseField TIMEOUT = new ParseField("timeout_in_millis");
ParseField TIMEOUT_HUMAN = new ParseField("timeout");
ParseField DYNAMIC_NAME_TIMEZONE = new ParseField("dynamic_name_timezone");
ParseField REFRESH = new ParseField("refresh");
}
}

View File

@ -7,6 +7,8 @@ package org.elasticsearch.xpack.watcher.actions.index;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
@ -43,6 +45,7 @@ import java.util.Map;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import static org.elasticsearch.common.util.set.Sets.newHashSet;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
@ -57,6 +60,8 @@ import static org.mockito.Mockito.when;
public class IndexActionTests extends ESTestCase {
private RefreshPolicy refreshPolicy = randomBoolean() ? null : randomFrom(RefreshPolicy.values());
private final Client client = mock(Client.class);
@Before
@ -122,18 +127,28 @@ public class IndexActionTests extends ESTestCase {
.startObject()
.field("unknown", 1234)
.endObject());
// unknown refresh policy
expectFailure(IllegalArgumentException.class, jsonBuilder()
.startObject()
.field(IndexAction.Field.REFRESH.getPreferredName(), "unknown")
.endObject());
}
private void expectParseFailure(XContentBuilder builder) throws Exception {
expectFailure(ElasticsearchParseException.class, builder);
}
private void expectFailure(Class clazz, XContentBuilder builder) throws Exception {
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
XContentParser parser = createParser(builder);
parser.nextToken();
expectThrows(ElasticsearchParseException.class, () ->
expectThrows(clazz, () ->
actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
}
public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() {
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null);
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
final Map<String, Object> docWithId = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap();
@ -183,7 +198,7 @@ public class IndexActionTests extends ESTestCase {
final IndexAction action = new IndexAction(configureIndexDynamically ? null : "my_index",
configureTypeDynamically ? null : "my_type",
configureIdDynamically ? null : "my_id",
null, null, null);
null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
@ -204,7 +219,7 @@ public class IndexActionTests extends ESTestCase {
}
public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() throws Exception {
final IndexAction action = new IndexAction(null, "my-type", null, null, null, null);
final IndexAction action = new IndexAction(null, "my-type", null, null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
@ -237,7 +252,7 @@ public class IndexActionTests extends ESTestCase {
String fieldName = randomFrom("_index", "_type");
final IndexAction action = new IndexAction(fieldName.equals("_index") ? "my_index" : null,
fieldName.equals("_type") ? "my_type" : null,
null,null, null, null);
null,null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
@ -257,7 +272,8 @@ public class IndexActionTests extends ESTestCase {
String docId = randomAlphaOfLength(5);
String timestampField = randomFrom("@timestamp", null);
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null);
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null,
refreshPolicy);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
DateTime executionTime = DateTime.now(UTC);
@ -295,6 +311,9 @@ public class IndexActionTests extends ESTestCase {
assertThat(indexRequest.id(), is(docId));
}
RefreshPolicy expectedRefreshPolicy = refreshPolicy == null ? RefreshPolicy.NONE: refreshPolicy;
assertThat(indexRequest.getRefreshPolicy(), is(expectedRefreshPolicy));
if (timestampField != null) {
assertThat(indexRequest.sourceAsMap().keySet(), is(hasSize(2)));
assertThat(indexRequest.sourceAsMap(), hasEntry(timestampField, executionTime.toString()));
@ -304,7 +323,7 @@ public class IndexActionTests extends ESTestCase {
}
public void testFailureResult() throws Exception {
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null);
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null, refreshPolicy);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
@ -335,6 +354,8 @@ public class IndexActionTests extends ESTestCase {
listener.onResponse(bulkResponse);
when(client.bulk(captor.capture())).thenReturn(listener);
Action.Result result = executable.execute("_id", ctx, payload);
RefreshPolicy expectedRefreshPolicy = refreshPolicy == null ? RefreshPolicy.NONE: refreshPolicy;
assertThat(captor.getValue().getRefreshPolicy(), is(expectedRefreshPolicy));
if (isPartialFailure) {
assertThat(result.status(), is(Status.PARTIAL_FAILURE));
@ -342,5 +363,4 @@ public class IndexActionTests extends ESTestCase {
assertThat(result.status(), is(Status.FAILURE));
}
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.watch;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
@ -585,7 +586,9 @@ public class WatchTests extends ESTestCase {
if (randomBoolean()) {
DateTimeZone timeZone = randomBoolean() ? DateTimeZone.UTC : null;
TimeValue timeout = randomBoolean() ? timeValueSeconds(between(1, 10000)) : null;
IndexAction action = new IndexAction("_index", "_type", randomBoolean() ? "123" : null, null, timeout, timeZone);
WriteRequest.RefreshPolicy refreshPolicy = randomBoolean() ? null : randomFrom(WriteRequest.RefreshPolicy.values());
IndexAction action = new IndexAction("_index", "_type", randomBoolean() ? "123" : null, null, timeout, timeZone,
refreshPolicy);
list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(),
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),

View File

@ -131,6 +131,7 @@ teardown:
"index" : {
"index" : "my_test_index",
"doc_type" : "my-type",
"refresh" : "wait_for",
"doc_id": "my-id"
}
}
@ -156,3 +157,8 @@ teardown:
- is_true: watch_record.node
- is_false: watch_record.result.input.payload.foo
- is_true: watch_record.result.input.payload.spam
- do:
search:
index: my_test_index
- match: { hits.total : 1 }