Fix double delete on replica copy when executing bulk request
This commit is contained in:
parent
cc993de996
commit
14908f8726
|
@ -267,33 +267,27 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
if (item == null || item.isIgnoreOnReplica()) {
|
||||
continue;
|
||||
}
|
||||
if (item.request() instanceof IndexRequest) {
|
||||
IndexRequest indexRequest = (IndexRequest) item.request();
|
||||
try {
|
||||
Engine.Index operation = TransportIndexAction.executeIndexRequestOnReplica(indexRequest, indexShard);
|
||||
location = locationToSync(location, operation.getTranslogLocation());
|
||||
} catch (Exception e) {
|
||||
// if its not an ignore replica failure, we need to make sure to bubble up the failure
|
||||
// so we will fail the shard
|
||||
if (!ignoreReplicaException(e)) {
|
||||
throw e;
|
||||
}
|
||||
DocumentRequest<?> documentRequest = item.request();
|
||||
final Engine.Operation operation;
|
||||
try {
|
||||
switch (documentRequest.opType()) {
|
||||
case CREATE:
|
||||
case INDEX:
|
||||
operation = TransportIndexAction.executeIndexRequestOnReplica(((IndexRequest) documentRequest), indexShard);
|
||||
break;
|
||||
case DELETE:
|
||||
operation = TransportDeleteAction.executeDeleteRequestOnReplica(((DeleteRequest) documentRequest), indexShard);
|
||||
break;
|
||||
default: throw new IllegalStateException("Unexpected request operation type on replica: "
|
||||
+ documentRequest.opType().getLowercase());
|
||||
}
|
||||
} else if (item.request() instanceof DeleteRequest) {
|
||||
DeleteRequest deleteRequest = (DeleteRequest) item.request();
|
||||
try {
|
||||
Engine.Delete delete = TransportDeleteAction.executeDeleteRequestOnReplica(deleteRequest, indexShard);
|
||||
indexShard.delete(delete);
|
||||
location = locationToSync(location, delete.getTranslogLocation());
|
||||
} catch (Exception e) {
|
||||
// if its not an ignore replica failure, we need to make sure to bubble up the failure
|
||||
// so we will fail the shard
|
||||
if (!ignoreReplicaException(e)) {
|
||||
throw e;
|
||||
}
|
||||
location = locationToSync(location, operation.getTranslogLocation());
|
||||
} catch (Exception e) {
|
||||
// if its not an ignore replica failure, we need to make sure to bubble up the failure
|
||||
// so we will fail the shard
|
||||
if (!ignoreReplicaException(e)) {
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
throw new IllegalStateException("Unexpected index operation: " + item.request());
|
||||
}
|
||||
}
|
||||
return location;
|
||||
|
|
Loading…
Reference in New Issue