From 42bc2d15bedb0f3b457bbfe06247311b9667b7c9 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 6 Oct 2016 14:25:53 -0400 Subject: [PATCH] fix bug in bulk replication for noop update operation --- .../action/DocumentWriteRequest.java | 1 + .../action/bulk/TransportShardBulkAction.java | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/DocumentWriteRequest.java b/core/src/main/java/org/elasticsearch/action/DocumentWriteRequest.java index 66ea6401bcc..490aa7fd326 100644 --- a/core/src/main/java/org/elasticsearch/action/DocumentWriteRequest.java +++ b/core/src/main/java/org/elasticsearch/action/DocumentWriteRequest.java @@ -178,6 +178,7 @@ public abstract class DocumentWriteRequest> /** write a document write (index/delete/update) request*/ public static void writeDocumentRequest(StreamOutput out, DocumentWriteRequest request) throws IOException { + assert request != null : "request must not be null"; if (request instanceof IndexRequest) { out.writeByte((byte) 0); } else if (request instanceof DeleteRequest) { diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 9a58817c188..b25483268fa 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -168,7 +168,7 @@ public class TransportShardBulkAction extends TransportWriteAction itemRequest = request.items()[requestIndex].request(); + DocumentWriteRequest itemRequest = request.items()[requestIndex].request(); preVersions[requestIndex] = itemRequest.version(); preVersionTypes[requestIndex] = itemRequest.versionType(); DocumentWriteRequest.OpType opType = itemRequest.opType(); @@ -196,9 +196,14 @@ public class TransportShardBulkAction extends TransportWriteAction) writeResult.getReplicaRequest()); + DocumentWriteRequest replicaRequest = (DocumentWriteRequest) writeResult.getReplicaRequest(); + if (replicaRequest != null) { + request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), replicaRequest); + } else { + request.items()[requestIndex].setIgnoreOnReplica(); + } // add the response setResponse(request.items()[requestIndex], new BulkItemResponse(request.items()[requestIndex].id(), opType, writeResult.getResponse())); } catch (Exception e) { @@ -206,14 +211,14 @@ public class TransportShardBulkAction extends TransportWriteAction documentWriteRequest = request.items()[j].request(); + DocumentWriteRequest documentWriteRequest = request.items()[j].request(); documentWriteRequest.version(preVersions[j]); documentWriteRequest.versionType(preVersionTypes[j]); } throw (ElasticsearchException) e; } BulkItemRequest item = request.items()[requestIndex]; - DocumentWriteRequest documentWriteRequest = item.request(); + DocumentWriteRequest documentWriteRequest = item.request(); if (isConflictException(e)) { logger.trace((Supplier) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", request.shardId(), documentWriteRequest.opType().getLowercase(), request), e);