diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java index 256e38da858..1c33a7e183e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; @@ -40,7 +41,6 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; -import org.elasticsearch.index.reindex.ScrollableHitSource; import org.elasticsearch.index.reindex.UpdateByQueryAction; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; @@ -179,10 +179,10 @@ public class ReindexIT extends ESRestHighLevelClientTestCase { final BulkByScrollResponse response = highLevelClient().reindex(reindexRequest, RequestOptions.DEFAULT); assertThat(response.getVersionConflicts(), equalTo(2L)); - assertThat(response.getBulkFailures(), empty()); - assertThat(response.getSearchFailures(), hasSize(2)); + assertThat(response.getSearchFailures(), empty()); + assertThat(response.getBulkFailures(), hasSize(2)); assertThat( - response.getSearchFailures().stream().map(ScrollableHitSource.SearchFailure::toString).collect(Collectors.toSet()), + response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toSet()), everyItem(containsString("version conflict")) ); @@ -328,10 +328,10 @@ public class ReindexIT extends ESRestHighLevelClientTestCase { final BulkByScrollResponse response = highLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); assertThat(response.getVersionConflicts(), equalTo(1L)); - assertThat(response.getBulkFailures(), empty()); - assertThat(response.getSearchFailures(), hasSize(1)); + assertThat(response.getSearchFailures(), empty()); + assertThat(response.getBulkFailures(), hasSize(1)); assertThat( - response.getSearchFailures().stream().map(ScrollableHitSource.SearchFailure::toString).collect(Collectors.toSet()), + response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toSet()), everyItem(containsString("version conflict")) ); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java index 8e5dff170d4..d64bcf8662e 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java @@ -19,11 +19,10 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; -import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestResponse; @@ -67,7 +66,7 @@ public class BulkIndexByScrollResponseContentListener extends RestBuilderListene } } for (SearchFailure failure: response.getSearchFailures()) { - RestStatus failureStatus = ExceptionsHelper.status(failure.getReason()); + RestStatus failureStatus = failure.getStatus(); if (failureStatus.getStatus() > status.getStatus()) { status = failureStatus; } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java index 53e34251f4a..59ac32c6661 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java @@ -241,10 +241,10 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr } else if (token == Token.START_OBJECT) { switch (name) { case SearchFailure.REASON_FIELD: - bulkExc = ElasticsearchException.fromXContent(parser); + searchExc = ElasticsearchException.fromXContent(parser); break; case Failure.CAUSE_FIELD: - searchExc = ElasticsearchException.fromXContent(parser); + bulkExc = ElasticsearchException.fromXContent(parser); break; default: parser.skipChildren(); @@ -285,7 +285,11 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr if (bulkExc != null) { return new Failure(index, type, id, bulkExc, RestStatus.fromCode(status)); } else if (searchExc != null) { - return new SearchFailure(searchExc, index, shardId, nodeId); + if (status == null) { + return new SearchFailure(searchExc, index, shardId, nodeId); + } else { + return new SearchFailure(searchExc, index, shardId, nodeId, RestStatus.fromCode(status)); + } } else { throw new ElasticsearchParseException("failed to parse failures array. At least one of {reason,cause} must be present"); } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java b/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java index 269bed2ddc8..07d22ddb663 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java @@ -21,8 +21,10 @@ package org.elasticsearch.index.reindex; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -35,6 +37,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; @@ -356,6 +359,7 @@ public abstract class ScrollableHitSource { */ public static class SearchFailure implements Writeable, ToXContentObject { private final Throwable reason; + private final RestStatus status; @Nullable private final String index; @Nullable @@ -367,12 +371,19 @@ public abstract class ScrollableHitSource { public static final String SHARD_FIELD = "shard"; public static final String NODE_FIELD = "node"; public static final String REASON_FIELD = "reason"; + public static final String STATUS_FIELD = BulkItemResponse.Failure.STATUS_FIELD; public SearchFailure(Throwable reason, @Nullable String index, @Nullable Integer shardId, @Nullable String nodeId) { + this(reason, index, shardId, nodeId, ExceptionsHelper.status(reason)); + } + + public SearchFailure(Throwable reason, @Nullable String index, @Nullable Integer shardId, @Nullable String nodeId, + RestStatus status) { this.index = index; this.shardId = shardId; this.reason = requireNonNull(reason, "reason cannot be null"); this.nodeId = nodeId; + this.status = status; } /** @@ -390,6 +401,7 @@ public abstract class ScrollableHitSource { index = in.readOptionalString(); shardId = in.readOptionalVInt(); nodeId = in.readOptionalString(); + status = ExceptionsHelper.status(reason); } @Override @@ -408,6 +420,10 @@ public abstract class ScrollableHitSource { return shardId; } + public RestStatus getStatus() { + return this.status; + } + public Throwable getReason() { return reason; } @@ -429,6 +445,7 @@ public abstract class ScrollableHitSource { if (nodeId != null) { builder.field(NODE_FIELD, nodeId); } + builder.field(STATUS_FIELD, status.getStatus()); builder.field(REASON_FIELD); { builder.startObject(); diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java index 7822244b9ce..a1301fe03ea 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java @@ -20,14 +20,16 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.test.AbstractXContentTestCase; import org.elasticsearch.index.reindex.BulkByScrollTask.Status; +import org.elasticsearch.test.AbstractXContentTestCase; import java.io.IOException; import java.util.HashMap; @@ -44,6 +46,7 @@ public class BulkByScrollResponseTests extends AbstractXContentTestCase