From 02622c1ef95e4e40f44fb8a4c622e15ef9c15534 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 13 Aug 2019 15:55:01 -0400 Subject: [PATCH] Fix issues with serializing BulkByScrollResponse (#45357) Currently there are two issues with serializing BulkByScrollResponse. First, when deserializing from XContent, indexing exceptions and search exceptions are switched. Additionally, search exceptions do no retain the appropriate RestStatus code, so you must evaluate the status code from the exception. However, the exception class is not always correctly retained when serialized. This commit adds tests in the failure case. Additionally, fixes the swapping of failure types and adds the rest status code to the search failure. --- .../org/elasticsearch/client/ReindexIT.java | 14 +++---- ...kIndexByScrollResponseContentListener.java | 5 +-- .../index/reindex/BulkByScrollResponse.java | 10 +++-- .../index/reindex/ScrollableHitSource.java | 17 ++++++++ .../reindex/BulkByScrollResponseTests.java | 39 ++++++++++++------- 5 files changed, 59 insertions(+), 26 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java index 256e38da858..1c33a7e183e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; @@ -40,7 +41,6 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; -import org.elasticsearch.index.reindex.ScrollableHitSource; import org.elasticsearch.index.reindex.UpdateByQueryAction; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; @@ -179,10 +179,10 @@ public class ReindexIT extends ESRestHighLevelClientTestCase { final BulkByScrollResponse response = highLevelClient().reindex(reindexRequest, RequestOptions.DEFAULT); assertThat(response.getVersionConflicts(), equalTo(2L)); - assertThat(response.getBulkFailures(), empty()); - assertThat(response.getSearchFailures(), hasSize(2)); + assertThat(response.getSearchFailures(), empty()); + assertThat(response.getBulkFailures(), hasSize(2)); assertThat( - response.getSearchFailures().stream().map(ScrollableHitSource.SearchFailure::toString).collect(Collectors.toSet()), + response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toSet()), everyItem(containsString("version conflict")) ); @@ -328,10 +328,10 @@ public class ReindexIT extends ESRestHighLevelClientTestCase { final BulkByScrollResponse response = highLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); assertThat(response.getVersionConflicts(), equalTo(1L)); - assertThat(response.getBulkFailures(), empty()); - assertThat(response.getSearchFailures(), hasSize(1)); + assertThat(response.getSearchFailures(), empty()); + assertThat(response.getBulkFailures(), hasSize(1)); assertThat( - response.getSearchFailures().stream().map(ScrollableHitSource.SearchFailure::toString).collect(Collectors.toSet()), + response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toSet()), everyItem(containsString("version conflict")) ); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java index 8e5dff170d4..d64bcf8662e 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java @@ -19,11 +19,10 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; -import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestResponse; @@ -67,7 +66,7 @@ public class BulkIndexByScrollResponseContentListener extends RestBuilderListene } } for (SearchFailure failure: response.getSearchFailures()) { - RestStatus failureStatus = ExceptionsHelper.status(failure.getReason()); + RestStatus failureStatus = failure.getStatus(); if (failureStatus.getStatus() > status.getStatus()) { status = failureStatus; } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java index 53e34251f4a..59ac32c6661 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java @@ -241,10 +241,10 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr } else if (token == Token.START_OBJECT) { switch (name) { case SearchFailure.REASON_FIELD: - bulkExc = ElasticsearchException.fromXContent(parser); + searchExc = ElasticsearchException.fromXContent(parser); break; case Failure.CAUSE_FIELD: - searchExc = ElasticsearchException.fromXContent(parser); + bulkExc = ElasticsearchException.fromXContent(parser); break; default: parser.skipChildren(); @@ -285,7 +285,11 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr if (bulkExc != null) { return new Failure(index, type, id, bulkExc, RestStatus.fromCode(status)); } else if (searchExc != null) { - return new SearchFailure(searchExc, index, shardId, nodeId); + if (status == null) { + return new SearchFailure(searchExc, index, shardId, nodeId); + } else { + return new SearchFailure(searchExc, index, shardId, nodeId, RestStatus.fromCode(status)); + } } else { throw new ElasticsearchParseException("failed to parse failures array. At least one of {reason,cause} must be present"); } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java b/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java index 269bed2ddc8..07d22ddb663 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java @@ -21,8 +21,10 @@ package org.elasticsearch.index.reindex; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -35,6 +37,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; @@ -356,6 +359,7 @@ public abstract class ScrollableHitSource { */ public static class SearchFailure implements Writeable, ToXContentObject { private final Throwable reason; + private final RestStatus status; @Nullable private final String index; @Nullable @@ -367,12 +371,19 @@ public abstract class ScrollableHitSource { public static final String SHARD_FIELD = "shard"; public static final String NODE_FIELD = "node"; public static final String REASON_FIELD = "reason"; + public static final String STATUS_FIELD = BulkItemResponse.Failure.STATUS_FIELD; public SearchFailure(Throwable reason, @Nullable String index, @Nullable Integer shardId, @Nullable String nodeId) { + this(reason, index, shardId, nodeId, ExceptionsHelper.status(reason)); + } + + public SearchFailure(Throwable reason, @Nullable String index, @Nullable Integer shardId, @Nullable String nodeId, + RestStatus status) { this.index = index; this.shardId = shardId; this.reason = requireNonNull(reason, "reason cannot be null"); this.nodeId = nodeId; + this.status = status; } /** @@ -390,6 +401,7 @@ public abstract class ScrollableHitSource { index = in.readOptionalString(); shardId = in.readOptionalVInt(); nodeId = in.readOptionalString(); + status = ExceptionsHelper.status(reason); } @Override @@ -408,6 +420,10 @@ public abstract class ScrollableHitSource { return shardId; } + public RestStatus getStatus() { + return this.status; + } + public Throwable getReason() { return reason; } @@ -429,6 +445,7 @@ public abstract class ScrollableHitSource { if (nodeId != null) { builder.field(NODE_FIELD, nodeId); } + builder.field(STATUS_FIELD, status.getStatus()); builder.field(REASON_FIELD); { builder.startObject(); diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java index 7822244b9ce..a1301fe03ea 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java @@ -20,14 +20,16 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.test.AbstractXContentTestCase; import org.elasticsearch.index.reindex.BulkByScrollTask.Status; +import org.elasticsearch.test.AbstractXContentTestCase; import java.io.IOException; import java.util.HashMap; @@ -44,6 +46,7 @@ public class BulkByScrollResponseTests extends AbstractXContentTestCase