Fix issues with serializing BulkByScrollResponse (#45357)
Currently there are two issues with serializing BulkByScrollResponse. First, when deserializing from XContent, indexing exceptions and search exceptions are switched. Additionally, search exceptions do no retain the appropriate RestStatus code, so you must evaluate the status code from the exception. However, the exception class is not always correctly retained when serialized. This commit adds tests in the failure case. Additionally, fixes the swapping of failure types and adds the rest status code to the search failure.
This commit is contained in:
parent
d96977202d
commit
02622c1ef9
|
@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
|
@ -40,7 +41,6 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
|||
import org.elasticsearch.index.reindex.DeleteByQueryAction;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
import org.elasticsearch.index.reindex.ReindexRequest;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryAction;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -179,10 +179,10 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
|
|||
final BulkByScrollResponse response = highLevelClient().reindex(reindexRequest, RequestOptions.DEFAULT);
|
||||
|
||||
assertThat(response.getVersionConflicts(), equalTo(2L));
|
||||
assertThat(response.getBulkFailures(), empty());
|
||||
assertThat(response.getSearchFailures(), hasSize(2));
|
||||
assertThat(response.getSearchFailures(), empty());
|
||||
assertThat(response.getBulkFailures(), hasSize(2));
|
||||
assertThat(
|
||||
response.getSearchFailures().stream().map(ScrollableHitSource.SearchFailure::toString).collect(Collectors.toSet()),
|
||||
response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toSet()),
|
||||
everyItem(containsString("version conflict"))
|
||||
);
|
||||
|
||||
|
@ -328,10 +328,10 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
|
|||
final BulkByScrollResponse response = highLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
|
||||
|
||||
assertThat(response.getVersionConflicts(), equalTo(1L));
|
||||
assertThat(response.getBulkFailures(), empty());
|
||||
assertThat(response.getSearchFailures(), hasSize(1));
|
||||
assertThat(response.getSearchFailures(), empty());
|
||||
assertThat(response.getBulkFailures(), hasSize(1));
|
||||
assertThat(
|
||||
response.getSearchFailures().stream().map(ScrollableHitSource.SearchFailure::toString).collect(Collectors.toSet()),
|
||||
response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toSet()),
|
||||
everyItem(containsString("version conflict"))
|
||||
);
|
||||
|
||||
|
|
|
@ -19,11 +19,10 @@
|
|||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.rest.BytesRestResponse;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestResponse;
|
||||
|
@ -67,7 +66,7 @@ public class BulkIndexByScrollResponseContentListener extends RestBuilderListene
|
|||
}
|
||||
}
|
||||
for (SearchFailure failure: response.getSearchFailures()) {
|
||||
RestStatus failureStatus = ExceptionsHelper.status(failure.getReason());
|
||||
RestStatus failureStatus = failure.getStatus();
|
||||
if (failureStatus.getStatus() > status.getStatus()) {
|
||||
status = failureStatus;
|
||||
}
|
||||
|
|
|
@ -241,10 +241,10 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr
|
|||
} else if (token == Token.START_OBJECT) {
|
||||
switch (name) {
|
||||
case SearchFailure.REASON_FIELD:
|
||||
bulkExc = ElasticsearchException.fromXContent(parser);
|
||||
searchExc = ElasticsearchException.fromXContent(parser);
|
||||
break;
|
||||
case Failure.CAUSE_FIELD:
|
||||
searchExc = ElasticsearchException.fromXContent(parser);
|
||||
bulkExc = ElasticsearchException.fromXContent(parser);
|
||||
break;
|
||||
default:
|
||||
parser.skipChildren();
|
||||
|
@ -285,7 +285,11 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr
|
|||
if (bulkExc != null) {
|
||||
return new Failure(index, type, id, bulkExc, RestStatus.fromCode(status));
|
||||
} else if (searchExc != null) {
|
||||
return new SearchFailure(searchExc, index, shardId, nodeId);
|
||||
if (status == null) {
|
||||
return new SearchFailure(searchExc, index, shardId, nodeId);
|
||||
} else {
|
||||
return new SearchFailure(searchExc, index, shardId, nodeId, RestStatus.fromCode(status));
|
||||
}
|
||||
} else {
|
||||
throw new ElasticsearchParseException("failed to parse failures array. At least one of {reason,cause} must be present");
|
||||
}
|
||||
|
|
|
@ -21,8 +21,10 @@ package org.elasticsearch.index.reindex;
|
|||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -35,6 +37,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
|
@ -356,6 +359,7 @@ public abstract class ScrollableHitSource {
|
|||
*/
|
||||
public static class SearchFailure implements Writeable, ToXContentObject {
|
||||
private final Throwable reason;
|
||||
private final RestStatus status;
|
||||
@Nullable
|
||||
private final String index;
|
||||
@Nullable
|
||||
|
@ -367,12 +371,19 @@ public abstract class ScrollableHitSource {
|
|||
public static final String SHARD_FIELD = "shard";
|
||||
public static final String NODE_FIELD = "node";
|
||||
public static final String REASON_FIELD = "reason";
|
||||
public static final String STATUS_FIELD = BulkItemResponse.Failure.STATUS_FIELD;
|
||||
|
||||
public SearchFailure(Throwable reason, @Nullable String index, @Nullable Integer shardId, @Nullable String nodeId) {
|
||||
this(reason, index, shardId, nodeId, ExceptionsHelper.status(reason));
|
||||
}
|
||||
|
||||
public SearchFailure(Throwable reason, @Nullable String index, @Nullable Integer shardId, @Nullable String nodeId,
|
||||
RestStatus status) {
|
||||
this.index = index;
|
||||
this.shardId = shardId;
|
||||
this.reason = requireNonNull(reason, "reason cannot be null");
|
||||
this.nodeId = nodeId;
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -390,6 +401,7 @@ public abstract class ScrollableHitSource {
|
|||
index = in.readOptionalString();
|
||||
shardId = in.readOptionalVInt();
|
||||
nodeId = in.readOptionalString();
|
||||
status = ExceptionsHelper.status(reason);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -408,6 +420,10 @@ public abstract class ScrollableHitSource {
|
|||
return shardId;
|
||||
}
|
||||
|
||||
public RestStatus getStatus() {
|
||||
return this.status;
|
||||
}
|
||||
|
||||
public Throwable getReason() {
|
||||
return reason;
|
||||
}
|
||||
|
@ -429,6 +445,7 @@ public abstract class ScrollableHitSource {
|
|||
if (nodeId != null) {
|
||||
builder.field(NODE_FIELD, nodeId);
|
||||
}
|
||||
builder.field(STATUS_FIELD, status.getStatus());
|
||||
builder.field(REASON_FIELD);
|
||||
{
|
||||
builder.startObject();
|
||||
|
|
|
@ -20,14 +20,16 @@
|
|||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollTask.Status;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
@ -44,6 +46,7 @@ public class BulkByScrollResponseTests extends AbstractXContentTestCase<BulkBySc
|
|||
|
||||
private boolean includeUpdated;
|
||||
private boolean includeCreated;
|
||||
private boolean testExceptions = randomBoolean();
|
||||
|
||||
public void testRountTrip() throws IOException {
|
||||
BulkByScrollResponse response = new BulkByScrollResponse(timeValueMillis(randomNonNegativeLong()),
|
||||
|
@ -76,7 +79,9 @@ public class BulkByScrollResponseTests extends AbstractXContentTestCase<BulkBySc
|
|||
shardId = randomInt();
|
||||
nodeId = usually() ? randomAlphaOfLength(5) : null;
|
||||
}
|
||||
return singletonList(new ScrollableHitSource.SearchFailure(new ElasticsearchException("foo"), index, shardId, nodeId));
|
||||
ElasticsearchException exception = randomFrom(new ResourceNotFoundException("bar"), new ElasticsearchException("foo"),
|
||||
new NoNodeAvailableException("baz"));
|
||||
return singletonList(new ScrollableHitSource.SearchFailure(exception, index, shardId, nodeId));
|
||||
}
|
||||
|
||||
private void assertResponseEquals(BulkByScrollResponse expected, BulkByScrollResponse actual) {
|
||||
|
@ -101,14 +106,14 @@ public class BulkByScrollResponseTests extends AbstractXContentTestCase<BulkBySc
|
|||
assertEquals(expectedFailure.getNodeId(), actualFailure.getNodeId());
|
||||
assertEquals(expectedFailure.getReason().getClass(), actualFailure.getReason().getClass());
|
||||
assertEquals(expectedFailure.getReason().getMessage(), actualFailure.getReason().getMessage());
|
||||
assertEquals(expectedFailure.getStatus(), actualFailure.getStatus());
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertEqualBulkResponse(BulkByScrollResponse expected, BulkByScrollResponse actual,
|
||||
boolean includeUpdated, boolean includeCreated) {
|
||||
public static void assertEqualBulkResponse(BulkByScrollResponse expected, BulkByScrollResponse actual, boolean includeUpdated,
|
||||
boolean includeCreated) {
|
||||
assertEquals(expected.getTook(), actual.getTook());
|
||||
BulkByScrollTaskStatusTests
|
||||
.assertEqualStatus(expected.getStatus(), actual.getStatus(), includeUpdated, includeCreated);
|
||||
BulkByScrollTaskStatusTests.assertEqualStatus(expected.getStatus(), actual.getStatus(), includeUpdated, includeCreated);
|
||||
assertEquals(expected.getBulkFailures().size(), actual.getBulkFailures().size());
|
||||
for (int i = 0; i < expected.getBulkFailures().size(); i++) {
|
||||
Failure expectedFailure = expected.getBulkFailures().get(i);
|
||||
|
@ -126,7 +131,8 @@ public class BulkByScrollResponseTests extends AbstractXContentTestCase<BulkBySc
|
|||
assertEquals(expectedFailure.getIndex(), actualFailure.getIndex());
|
||||
assertEquals(expectedFailure.getShardId(), actualFailure.getShardId());
|
||||
assertEquals(expectedFailure.getNodeId(), actualFailure.getNodeId());
|
||||
assertThat(expectedFailure.getReason().getMessage(), containsString(actualFailure.getReason().getMessage()));
|
||||
assertEquals(expectedFailure.getStatus(), actualFailure.getStatus());
|
||||
assertThat(actualFailure.getReason().getMessage(), containsString(expectedFailure.getReason().getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -137,12 +143,13 @@ public class BulkByScrollResponseTests extends AbstractXContentTestCase<BulkBySc
|
|||
|
||||
@Override
|
||||
protected BulkByScrollResponse createTestInstance() {
|
||||
// failures are tested separately, so we can test XContent equivalence at least when we have no failures
|
||||
return
|
||||
new BulkByScrollResponse(
|
||||
timeValueMillis(randomNonNegativeLong()), BulkByScrollTaskStatusTests.randomStatusWithoutException(),
|
||||
emptyList(), emptyList(), randomBoolean()
|
||||
);
|
||||
if (testExceptions) {
|
||||
return new BulkByScrollResponse(timeValueMillis(randomNonNegativeLong()), BulkByScrollTaskStatusTests.randomStatus(),
|
||||
randomIndexingFailures(), randomSearchFailures(), randomBoolean());
|
||||
} else {
|
||||
return new BulkByScrollResponse(timeValueMillis(randomNonNegativeLong()),
|
||||
BulkByScrollTaskStatusTests.randomStatusWithoutException(), emptyList(), emptyList(), randomBoolean());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -150,6 +157,12 @@ public class BulkByScrollResponseTests extends AbstractXContentTestCase<BulkBySc
|
|||
return BulkByScrollResponse.fromXContent(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean assertToXContentEquivalence() {
|
||||
// XContentEquivalence fails in the exception case, due to how exceptions are serialized.
|
||||
return testExceptions == false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return true;
|
||||
|
|
Loading…
Reference in New Issue