Revert "fix bug in bulk replication for noop update operation"

This reverts commit 42bc2d15be.
This commit is contained in:
Areek Zillur 2016-10-07 17:49:57 -04:00
parent 5bbdcd6416
commit 68c82cd113
2 changed files with 5 additions and 11 deletions

View File

@ -178,7 +178,6 @@ public abstract class DocumentWriteRequest<T extends ReplicatedWriteRequest<T>>
/** write a document write (index/delete/update) request*/ /** write a document write (index/delete/update) request*/
public static void writeDocumentRequest(StreamOutput out, DocumentWriteRequest request) throws IOException { public static void writeDocumentRequest(StreamOutput out, DocumentWriteRequest request) throws IOException {
assert request != null : "request must not be null";
if (request instanceof IndexRequest) { if (request instanceof IndexRequest) {
out.writeByte((byte) 0); out.writeByte((byte) 0);
} else if (request instanceof DeleteRequest) { } else if (request instanceof DeleteRequest) {

View File

@ -168,7 +168,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
BulkShardRequest request, BulkShardRequest request,
long[] preVersions, VersionType[] preVersionTypes, long[] preVersions, VersionType[] preVersionTypes,
Translog.Location location, int requestIndex) { Translog.Location location, int requestIndex) {
DocumentWriteRequest itemRequest = request.items()[requestIndex].request(); DocumentWriteRequest<?> itemRequest = request.items()[requestIndex].request();
preVersions[requestIndex] = itemRequest.version(); preVersions[requestIndex] = itemRequest.version();
preVersionTypes[requestIndex] = itemRequest.versionType(); preVersionTypes[requestIndex] = itemRequest.versionType();
DocumentWriteRequest.OpType opType = itemRequest.opType(); DocumentWriteRequest.OpType opType = itemRequest.opType();
@ -196,14 +196,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
assert writeResult.getResponse().getResult() == DocWriteResponse.Result.NOOP assert writeResult.getResponse().getResult() == DocWriteResponse.Result.NOOP
: "only noop operation can have null next operation"; : "only noop operation can have null next operation";
} }
assert !(writeResult.getReplicaRequest() instanceof BulkShardRequest) : "replication request must be a single document operation";
// update the bulk item request with replica request (update request are changed to index or delete requests for replication) // update the bulk item request with replica request (update request are changed to index or delete requests for replication)
DocumentWriteRequest replicaRequest = (DocumentWriteRequest) writeResult.getReplicaRequest(); request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(),
if (replicaRequest != null) { (DocumentWriteRequest<?>) writeResult.getReplicaRequest());
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), replicaRequest);
} else {
request.items()[requestIndex].setIgnoreOnReplica();
}
// add the response // add the response
setResponse(request.items()[requestIndex], new BulkItemResponse(request.items()[requestIndex].id(), opType, writeResult.getResponse())); setResponse(request.items()[requestIndex], new BulkItemResponse(request.items()[requestIndex].id(), opType, writeResult.getResponse()));
} catch (Exception e) { } catch (Exception e) {
@ -211,14 +206,14 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
if (retryPrimaryException(e)) { if (retryPrimaryException(e)) {
// restore updated versions... // restore updated versions...
for (int j = 0; j < requestIndex; j++) { for (int j = 0; j < requestIndex; j++) {
DocumentWriteRequest documentWriteRequest = request.items()[j].request(); DocumentWriteRequest<?> documentWriteRequest = request.items()[j].request();
documentWriteRequest.version(preVersions[j]); documentWriteRequest.version(preVersions[j]);
documentWriteRequest.versionType(preVersionTypes[j]); documentWriteRequest.versionType(preVersionTypes[j]);
} }
throw (ElasticsearchException) e; throw (ElasticsearchException) e;
} }
BulkItemRequest item = request.items()[requestIndex]; BulkItemRequest item = request.items()[requestIndex];
DocumentWriteRequest documentWriteRequest = item.request(); DocumentWriteRequest<?> documentWriteRequest = item.request();
if (isConflictException(e)) { if (isConflictException(e)) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
request.shardId(), documentWriteRequest.opType().getLowercase(), request), e); request.shardId(), documentWriteRequest.opType().getLowercase(), request), e);