fix bug in update operation in shard bulk execution

This commit is contained in:
Areek Zillur 2016-10-07 18:28:51 -04:00
parent 9d48248a66
commit fe50db2e8d
1 changed files with 13 additions and 5 deletions

View File

@ -233,14 +233,22 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
WriteResult<IndexResponse> writeResult = TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction);
BytesReference indexSourceAsBytes = indexRequest.source();
IndexResponse indexResponse = writeResult.getResponse();
UpdateResponse writeUpdateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult());
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
writeUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
UpdateResponse update = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult());
if (translate.getResponseResult() == DocWriteResponse.Result.CREATED) {
if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) ||
(updateRequest.fields() != null && updateRequest.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
update.setGetResult(updateHelper.extractGetResult(updateRequest, updateRequest.concreteIndex(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
} else {
update.setGetResult(null);
}
} else {
assert translate.getResponseResult() == DocWriteResponse.Result.UPDATED;
update.setGetResult(updateHelper.extractGetResult(updateRequest, updateRequest.concreteIndex(), indexResponse.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), indexSourceAsBytes));
}
// Replace the update request to the translated index request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
return new WriteResult<>(writeUpdateResponse, writeResult.getLocation());
return new WriteResult<>(update, writeResult.getLocation());
case DELETED:
DeleteRequest deleteRequest = translate.action();
WriteResult<DeleteResponse> deleteResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);