Watcher: Parse index action results to prevent exceptions (elastic/elasticsearch#4424)

When the index action is used to do some bulk indexing, the single
items of the response were not checked to have been indexed successful.
This could lead to NPEs due to an index response being null when the index
operation had failed. The action was still logged as a success though.

This commit only returns SUCCESS for the action, if all items were indexed
successfully. If all items failed, the result will be FAILED as well. Lastly
the result status PARTIAL_FAILURE is used if there were successful and unsuccessful
index operations.

Additionally some minor cleanups happened, like changing equals/hashcode.

Closes elastic/elasticsearch#4416

Original commit: elastic/x-pack-elasticsearch@692687e1af
This commit is contained in:
Alexander Reelsen 2016-12-20 16:53:12 +01:00 committed by GitHub
parent 17c3164979
commit 6f7a065605
3 changed files with 116 additions and 82 deletions

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.watcher.actions.Action; import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.Action.Result.Status;
import org.elasticsearch.xpack.watcher.actions.ExecutableAction; import org.elasticsearch.xpack.watcher.actions.ExecutableAction;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.ArrayObjectIterator; import org.elasticsearch.xpack.watcher.support.ArrayObjectIterator;
@ -29,6 +30,7 @@ import org.joda.time.DateTime;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.stream.Stream;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState; import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
@ -70,14 +72,15 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
indexRequest.source(jsonBuilder().prettyPrint().map(data)); indexRequest.source(jsonBuilder().prettyPrint().map(data));
if (ctx.simulateAction(actionId)) { if (ctx.simulateAction(actionId)) {
return new IndexAction.Result.Simulated(indexRequest.index(), action.docType, new XContentSource(indexRequest.source(), return new IndexAction.Simulated(indexRequest.index(), action.docType, new XContentSource(indexRequest.source(),
XContentType.JSON)); XContentType.JSON));
} }
IndexResponse response = client.index(indexRequest, timeout); IndexResponse response = client.index(indexRequest, timeout);
XContentBuilder jsonBuilder = jsonBuilder(); XContentBuilder jsonBuilder = jsonBuilder();
indexResponseToXContent(jsonBuilder, response); indexResponseToXContent(jsonBuilder, response);
return new IndexAction.Result.Success(new XContentSource(jsonBuilder));
return new IndexAction.Result(Status.SUCCESS, new XContentSource(jsonBuilder));
} }
Action.Result indexBulk(Iterable list, String actionId, WatchExecutionContext ctx) throws Exception { Action.Result indexBulk(Iterable list, String actionId, WatchExecutionContext ctx) throws Exception {
@ -98,11 +101,19 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
BulkResponse bulkResponse = client.bulk(bulkRequest, action.timeout); BulkResponse bulkResponse = client.bulk(bulkRequest, action.timeout);
XContentBuilder jsonBuilder = jsonBuilder().startArray(); XContentBuilder jsonBuilder = jsonBuilder().startArray();
for (BulkItemResponse item : bulkResponse) { for (BulkItemResponse item : bulkResponse) {
IndexResponse response = item.getResponse(); itemResponseToXContent(jsonBuilder, item);
indexResponseToXContent(jsonBuilder, response);
} }
jsonBuilder.endArray(); jsonBuilder.endArray();
return new IndexAction.Result.Success(new XContentSource(jsonBuilder.bytes(), XContentType.JSON));
// different error states, depending on how successful the bulk operation was
long failures = Stream.of(bulkResponse.getItems()).filter(BulkItemResponse::isFailed).count();
if (failures == 0) {
return new IndexAction.Result(Status.SUCCESS, new XContentSource(jsonBuilder.bytes(), XContentType.JSON));
} else if (failures == bulkResponse.getItems().length) {
return new IndexAction.Result(Status.FAILURE, new XContentSource(jsonBuilder.bytes(), XContentType.JSON));
} else {
return new IndexAction.Result(Status.PARTIAL_FAILURE, new XContentSource(jsonBuilder.bytes(), XContentType.JSON));
}
} }
private Map<String, Object> addTimestampToDocument(Map<String, Object> data, DateTime executionTime) { private Map<String, Object> addTimestampToDocument(Map<String, Object> data, DateTime executionTime) {
@ -115,6 +126,20 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
return data; return data;
} }
static void itemResponseToXContent(XContentBuilder builder, BulkItemResponse item) throws IOException {
if (item.isFailed()) {
builder.startObject()
.field("failed", item.isFailed())
.field("message", item.getFailureMessage())
.field("id", item.getId())
.field("type", item.getType())
.field("index", item.getIndex())
.endObject();
} else {
indexResponseToXContent(builder, item.getResponse());
}
}
static void indexResponseToXContent(XContentBuilder builder, IndexResponse response) throws IOException { static void indexResponseToXContent(XContentBuilder builder, IndexResponse response) throws IOException {
builder.startObject() builder.startObject()
.field("created", response.getResult() == DocWriteResponse.Result.CREATED) .field("created", response.getResult() == DocWriteResponse.Result.CREATED)

View File

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import java.io.IOException; import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@ -68,22 +69,15 @@ public class IndexAction implements Action {
IndexAction that = (IndexAction) o; IndexAction that = (IndexAction) o;
if (!index.equals(that.index)) return false; return Objects.equals(index, that.index) && Objects.equals(docType, that.docType)
if (!docType.equals(that.docType)) return false; && Objects.equals(executionTimeField, that.executionTimeField)
if (executionTimeField != null ? !executionTimeField.equals(that.executionTimeField) : that.executionTimeField != null) && Objects.equals(timeout, that.timeout)
return false; && Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone);
if (timeout != null ? !timeout.equals(that.timeout) : that.timeout != null) return false;
return !(dynamicNameTimeZone != null ? !dynamicNameTimeZone.equals(that.dynamicNameTimeZone) : that.dynamicNameTimeZone != null);
} }
@Override @Override
public int hashCode() { public int hashCode() {
int result = index.hashCode(); return Objects.hash(index, docType, executionTimeField, timeout, dynamicNameTimeZone);
result = 31 * result + docType.hashCode();
result = 31 * result + (executionTimeField != null ? executionTimeField.hashCode() : 0);
result = 31 * result + (timeout != null ? timeout.hashCode() : 0);
result = 31 * result + (dynamicNameTimeZone != null ? dynamicNameTimeZone.hashCode() : 0);
return result;
} }
@Override @Override
@ -171,64 +165,61 @@ public class IndexAction implements Action {
return new Builder(index, docType); return new Builder(index, docType);
} }
public interface Result { public static class Result extends Action.Result {
class Success extends Action.Result implements Result { private final XContentSource response;
private final XContentSource response; public Result(Status status, XContentSource response) {
super(TYPE, status);
public Success(XContentSource response) { this.response = response;
super(TYPE, Status.SUCCESS);
this.response = response;
}
public XContentSource response() {
return response;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject(type)
.field(Field.RESPONSE.getPreferredName(), response, params)
.endObject();
}
} }
class Simulated extends Action.Result implements Result { public XContentSource response() {
return response;
}
private final String index; @Override
private final String docType; public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
private final XContentSource source; return builder.startObject(type)
.field(Field.RESPONSE.getPreferredName(), response, params)
.endObject();
}
}
protected Simulated(String index, String docType, XContentSource source) { static class Simulated extends Action.Result {
super(TYPE, Status.SIMULATED);
this.index = index;
this.docType = docType;
this.source = source;
}
public String index() { private final String index;
return index; private final String docType;
} private final XContentSource source;
public String docType() { protected Simulated(String index, String docType, XContentSource source) {
return docType; super(TYPE, Status.SIMULATED);
} this.index = index;
this.docType = docType;
this.source = source;
}
public XContentSource source() { public String index() {
return source; return index;
} }
@Override public String docType() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { return docType;
return builder.startObject(type) }
.startObject(Field.REQUEST.getPreferredName())
.field(Field.INDEX.getPreferredName(), index) public XContentSource source() {
.field(Field.DOC_TYPE.getPreferredName(), docType) return source;
.field(Field.SOURCE.getPreferredName(), source, params) }
.endObject()
.endObject(); @Override
} public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject(type)
.startObject(Field.REQUEST.getPreferredName())
.field(Field.INDEX.getPreferredName(), index)
.field(Field.DOC_TYPE.getPreferredName(), docType)
.field(Field.SOURCE.getPreferredName(), source, params)
.endObject()
.endObject();
} }
} }

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.actions.index;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -28,7 +29,10 @@ import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.watch.Payload; import org.elasticsearch.xpack.watcher.watch.Payload;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
@ -48,19 +52,6 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.joda.time.DateTimeZone.UTC; import static org.joda.time.DateTimeZone.UTC;
public class IndexActionTests extends ESIntegTestCase { public class IndexActionTests extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.build();
}
@Override
protected Settings transportClientSettings() {
return Settings.builder()
.put(super.transportClientSettings())
.build();
}
public void testIndexActionExecuteSingleDoc() throws Exception { public void testIndexActionExecuteSingleDoc() throws Exception {
String timestampField = randomFrom("@timestamp", null); String timestampField = randomFrom("@timestamp", null);
@ -75,8 +66,8 @@ public class IndexActionTests extends ESIntegTestCase {
Action.Result result = executable.execute("_id", ctx, ctx.payload()); Action.Result result = executable.execute("_id", ctx, ctx.payload());
assertThat(result.status(), equalTo(Status.SUCCESS)); assertThat(result.status(), equalTo(Status.SUCCESS));
assertThat(result, instanceOf(IndexAction.Result.Success.class)); assertThat(result, instanceOf(IndexAction.Result.class));
IndexAction.Result.Success successResult = (IndexAction.Result.Success) result; IndexAction.Result successResult = (IndexAction.Result) result;
XContentSource response = successResult.response(); XContentSource response = successResult.response();
assertThat(response.getValue("created"), equalTo((Object)Boolean.TRUE)); assertThat(response.getValue("created"), equalTo((Object)Boolean.TRUE));
assertThat(response.getValue("version"), equalTo((Object) 1)); assertThat(response.getValue("version"), equalTo((Object) 1));
@ -135,8 +126,8 @@ public class IndexActionTests extends ESIntegTestCase {
Action.Result result = executable.execute("_id", ctx, ctx.payload()); Action.Result result = executable.execute("_id", ctx, ctx.payload());
assertThat(result.status(), equalTo(Status.SUCCESS)); assertThat(result.status(), equalTo(Status.SUCCESS));
assertThat(result, instanceOf(IndexAction.Result.Success.class)); assertThat(result, instanceOf(IndexAction.Result.class));
IndexAction.Result.Success successResult = (IndexAction.Result.Success) result; IndexAction.Result successResult = (IndexAction.Result) result;
XContentSource response = successResult.response(); XContentSource response = successResult.response();
assertThat(successResult.toString(), response.getValue("0.created"), equalTo((Object)Boolean.TRUE)); assertThat(successResult.toString(), response.getValue("0.created"), equalTo((Object)Boolean.TRUE));
assertThat(successResult.toString(), response.getValue("0.version"), equalTo((Object) 1)); assertThat(successResult.toString(), response.getValue("0.version"), equalTo((Object) 1));
@ -236,4 +227,31 @@ public class IndexActionTests extends ESIntegTestCase {
assertThat(useIndex && useType, equalTo(false)); assertThat(useIndex && useType, equalTo(false));
} }
} }
// https://github.com/elastic/x-pack/issues/4416
public void testIndexingWithWrongMappingReturnsFailureResult() throws Exception {
// index a document to set the mapping of the foo field to a boolean
client().prepareIndex("test-index", "test-type", "_id").setSource("foo", true)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
IndexAction action = new IndexAction("test-index", "test-type", "@timestamp", null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, WatcherClientProxy.of(client()), null);
List<Map<String, Object>> docs = new ArrayList<>();
boolean addSuccessfulIndexedDoc = randomBoolean();
if (addSuccessfulIndexedDoc) {
docs.add(Collections.singletonMap("foo", randomBoolean()));
}
docs.add(Collections.singletonMap("foo", Collections.singletonMap("foo", "bar")));
Payload payload = new Payload.Simple(Collections.singletonMap("_doc", docs));
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", DateTime.now(UTC), payload);
Action.Result result = executable.execute("_id", ctx, payload);
if (addSuccessfulIndexedDoc) {
assertThat(result.status(), is(Status.PARTIAL_FAILURE));
} else {
assertThat(result.status(), is(Status.FAILURE));
}
}
} }