fix bug in bulk replication for noop update operation

This commit is contained in:
Areek Zillur 2016-10-06 14:25:53 -04:00
parent b5079ce009
commit 42bc2d15be
2 changed files with 11 additions and 5 deletions

View File

@ -178,6 +178,7 @@ public abstract class DocumentWriteRequest<T extends ReplicatedWriteRequest<T>>
/** write a document write (index/delete/update) request*/
public static void writeDocumentRequest(StreamOutput out, DocumentWriteRequest request) throws IOException {
assert request != null : "request must not be null";
if (request instanceof IndexRequest) {
out.writeByte((byte) 0);
} else if (request instanceof DeleteRequest) {

View File

@ -168,7 +168,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
BulkShardRequest request,
long[] preVersions, VersionType[] preVersionTypes,
Translog.Location location, int requestIndex) {
DocumentWriteRequest<?> itemRequest = request.items()[requestIndex].request();
DocumentWriteRequest itemRequest = request.items()[requestIndex].request();
preVersions[requestIndex] = itemRequest.version();
preVersionTypes[requestIndex] = itemRequest.versionType();
DocumentWriteRequest.OpType opType = itemRequest.opType();
@ -196,9 +196,14 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
assert writeResult.getResponse().getResult() == DocWriteResponse.Result.NOOP
: "only noop operation can have null next operation";
}
assert !(writeResult.getReplicaRequest() instanceof BulkShardRequest) : "replication request must be a single document operation";
// update the bulk item request with replica request (update request are changed to index or delete requests for replication)
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(),
(DocumentWriteRequest<?>) writeResult.getReplicaRequest());
DocumentWriteRequest replicaRequest = (DocumentWriteRequest) writeResult.getReplicaRequest();
if (replicaRequest != null) {
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), replicaRequest);
} else {
request.items()[requestIndex].setIgnoreOnReplica();
}
// add the response
setResponse(request.items()[requestIndex], new BulkItemResponse(request.items()[requestIndex].id(), opType, writeResult.getResponse()));
} catch (Exception e) {
@ -206,14 +211,14 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
if (retryPrimaryException(e)) {
// restore updated versions...
for (int j = 0; j < requestIndex; j++) {
DocumentWriteRequest<?> documentWriteRequest = request.items()[j].request();
DocumentWriteRequest documentWriteRequest = request.items()[j].request();
documentWriteRequest.version(preVersions[j]);
documentWriteRequest.versionType(preVersionTypes[j]);
}
throw (ElasticsearchException) e;
}
BulkItemRequest item = request.items()[requestIndex];
DocumentWriteRequest<?> documentWriteRequest = item.request();
DocumentWriteRequest documentWriteRequest = item.request();
if (isConflictException(e)) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
request.shardId(), documentWriteRequest.opType().getLowercase(), request), e);