make sure bulk item failure on primary shards will not execute on a replica shard

This commit is contained in:
Shay Banon 2011-11-20 15:09:13 +02:00
parent bb8ff3814e
commit 90af54dad5
2 changed files with 16 additions and 2 deletions

View File

@ -72,7 +72,12 @@ public class BulkShardRequest extends ShardReplicationOperationRequest {
out.writeVInt(shardId);
out.writeVInt(items.length);
for (BulkItemRequest item : items) {
item.writeTo(out);
if (item != null) {
out.writeBoolean(true);
item.writeTo(out);
} else {
out.writeBoolean(false);
}
}
out.writeBoolean(refresh);
}
@ -82,7 +87,9 @@ public class BulkShardRequest extends ShardReplicationOperationRequest {
shardId = in.readVInt();
items = new BulkItemRequest[in.readVInt()];
for (int i = 0; i < items.length; i++) {
items[i] = BulkItemRequest.readBulkItem(in);
if (in.readBoolean()) {
items[i] = BulkItemRequest.readBulkItem(in);
}
}
refresh = in.readBoolean();
}

View File

@ -170,6 +170,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
logger.debug("[{}][{}] failed to bulk item (index) {}", e, shardRequest.request.index(), shardRequest.shardId, indexRequest);
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(e)));
// nullify the request so it won't execute on the replicas
request.items()[i] = null;
}
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
@ -190,6 +192,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
logger.debug("[{}][{}] failed to bulk item (delete) {}", e, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
responses[i] = new BulkItemResponse(item.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(e)));
// nullify the request so it won't execute on the replicas
request.items()[i] = null;
}
}
}
@ -242,6 +246,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
final BulkShardRequest request = shardRequest.request;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
if (item == null) {
continue;
}
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
try {