From fe50db2e8d57ce588fc80eca7413955a8ff63a76 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Fri, 7 Oct 2016 18:28:51 -0400 Subject: [PATCH] fix bug in update operation in shard bulk execution --- .../action/bulk/TransportShardBulkAction.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 38417058e6a..8660c4eac26 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -233,14 +233,22 @@ public class TransportShardBulkAction extends TransportWriteAction writeResult = TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction); BytesReference indexSourceAsBytes = indexRequest.source(); IndexResponse indexResponse = writeResult.getResponse(); - UpdateResponse writeUpdateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult()); - if (updateRequest.fields() != null && updateRequest.fields().length > 0) { - Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); - writeUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); + UpdateResponse update = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult()); + if (translate.getResponseResult() == DocWriteResponse.Result.CREATED) { + if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) || + (updateRequest.fields() != null && updateRequest.fields().length > 0)) { + Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); + update.setGetResult(updateHelper.extractGetResult(updateRequest, updateRequest.concreteIndex(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); + } else { + update.setGetResult(null); + } + } else { + assert translate.getResponseResult() == DocWriteResponse.Result.UPDATED; + update.setGetResult(updateHelper.extractGetResult(updateRequest, updateRequest.concreteIndex(), indexResponse.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), indexSourceAsBytes)); } // Replace the update request to the translated index request to execute on the replica. request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest); - return new WriteResult<>(writeUpdateResponse, writeResult.getLocation()); + return new WriteResult<>(update, writeResult.getLocation()); case DELETED: DeleteRequest deleteRequest = translate.action(); WriteResult deleteResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);