diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 1347aba6a79..b4a803e281a 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -144,16 +144,16 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; long[] preVersions = new long[request.items().length]; - for (int i = 0; i < request.items().length; i++) { - BulkItemRequest item = request.items()[i]; + for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) { + BulkItemRequest item = request.items()[requestIndex]; if (item.request() instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) item.request(); try { WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true); // add the response IndexResponse indexResponse = result.response(); - responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse); - preVersions[i] = result.preVersion; + responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse); + preVersions[requestIndex] = result.preVersion; if (result.mappingToUpdate != null) { if (mappingsToUpdate == null) { mappingsToUpdate = Sets.newHashSet(); @@ -164,13 +164,13 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation if (ops == null) { ops = new Engine.IndexingOperation[request.items().length]; } - ops[i] = result.op; + ops[requestIndex] = result.op; } } catch (Exception e) { // rethrow the failure if we are going to retry on primary and let parent failure to handle it if (retryPrimaryException(e)) { // restore updated versions... - for (int j = 0; j < i; j++) { + for (int j = 0; j < requestIndex; j++) { applyVersion(request.items()[j], preVersions[j]); } throw (ElasticSearchException) e; @@ -180,22 +180,22 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } else { logger.debug("[{}][{}] failed to execute bulk item (index) {}", e, shardRequest.request.index(), shardRequest.shardId, indexRequest); } - responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), + responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(e))); // nullify the request so it won't execute on the replicas - request.items()[i] = null; + request.items()[requestIndex] = null; } } else if (item.request() instanceof DeleteRequest) { DeleteRequest deleteRequest = (DeleteRequest) item.request(); try { // add the response DeleteResponse deleteResponse = shardDeleteOperation(deleteRequest, indexShard).response(); - responses[i] = new BulkItemResponse(item.id(), "delete", deleteResponse); + responses[requestIndex] = new BulkItemResponse(item.id(), "delete", deleteResponse); } catch (Exception e) { // rethrow the failure if we are going to retry on primary and let parent failure to handle it if (retryPrimaryException(e)) { // restore updated versions... - for (int j = 0; j < i; j++) { + for (int j = 0; j < requestIndex; j++) { applyVersion(request.items()[j], preVersions[j]); } throw (ElasticSearchException) e; @@ -205,15 +205,15 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } else { logger.debug("[{}][{}] failed to execute bulk item (delete) {}", e, shardRequest.request.index(), shardRequest.shardId, deleteRequest); } - responses[i] = new BulkItemResponse(item.id(), "delete", + responses[requestIndex] = new BulkItemResponse(item.id(), "delete", new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(e))); // nullify the request so it won't execute on the replicas - request.items()[i] = null; + request.items()[requestIndex] = null; } } else if (item.request() instanceof UpdateRequest) { UpdateRequest updateRequest = (UpdateRequest) item.request(); - int retryCount = 0; - do { + // We need to do the requested retries plus the initial attempt. We don't do < 1+retry_on_conflict because retry_on_conflict may be Integer.MAX_VALUE + for (int updateAttemptsCount = 0; updateAttemptsCount <= updateRequest.retryOnConflict(); updateAttemptsCount++) { UpdateResult updateResult; try { updateResult = shardUpdateOperation(clusterState, request, updateRequest, indexShard); @@ -221,6 +221,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation updateResult = new UpdateResult(null, null, false, t, null); } if (updateResult.success()) { + switch (updateResult.result.operation()) { case UPSERT: case INDEX: @@ -234,8 +235,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); } - responses[i] = new BulkItemResponse(item.id(), "update", updateResponse); - preVersions[i] = result.preVersion; + responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResponse); + preVersions[requestIndex] = result.preVersion; if (result.mappingToUpdate != null) { if (mappingsToUpdate == null) { mappingsToUpdate = Sets.newHashSet(); @@ -246,40 +247,50 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation if (ops == null) { ops = new Engine.IndexingOperation[request.items().length]; } - ops[i] = result.op; + ops[requestIndex] = result.op; } // Replace the update request to the translated index request to execute on the replica. - request.items()[i] = new BulkItemRequest(request.items()[i].id(), indexRequest); + request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest); break; case DELETE: DeleteResponse response = updateResult.writeResult.response(); DeleteRequest deleteRequest = updateResult.request(); updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), false); updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null)); - responses[i] = new BulkItemResponse(item.id(), "update", updateResponse); + responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResponse); // Replace the update request to the translated delete request to execute on the replica. - request.items()[i] = new BulkItemRequest(request.items()[i].id(), deleteRequest); + request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest); break; case NONE: - responses[i] = new BulkItemResponse(item.id(), "update", updateResult.noopResult); - request.items()[i] = null; // No need to go to the replica + responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResult.noopResult); + request.items()[requestIndex] = null; // No need to go to the replica break; } // NOTE: Breaking out of the retry_on_conflict loop! break; } else if (updateResult.failure()) { Throwable t = updateResult.error; - if (!updateResult.retry) { + if (updateResult.retry) { + // updateAttemptCount is 0 based and marks current attempt, if it's equal to retryOnConflict we are going out of the iteration + if (updateAttemptsCount >= updateRequest.retryOnConflict()) { + // we can't try any more + responses[requestIndex] = new BulkItemResponse(item.id(), "update", + new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t)));; + + request.items()[requestIndex] = null; // do not send to replicas + } + } + else { // rethrow the failure if we are going to retry on primary and let parent failure to handle it if (retryPrimaryException(t)) { // restore updated versions... - for (int j = 0; j < i; j++) { + for (int j = 0; j < requestIndex; j++) { applyVersion(request.items()[j], preVersions[j]); } throw (ElasticSearchException) t; } if (updateResult.result == null) { - responses[i] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t))); + responses[requestIndex] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t))); } else { switch (updateResult.result.operation()) { case UPSERT: @@ -290,7 +301,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } else { logger.debug("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest); } - responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), + responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(t))); break; case DELETE: @@ -300,19 +311,23 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } else { logger.debug("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest); } - responses[i] = new BulkItemResponse(item.id(), "delete", + responses[requestIndex] = new BulkItemResponse(item.id(), "delete", new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(t))); break; } } // nullify the request so it won't execute on the replicas - request.items()[i] = null; + request.items()[requestIndex] = null; // NOTE: Breaking out of the retry_on_conflict loop! break; } + } - } while (++retryCount < updateRequest.retryOnConflict()); + } } + + assert responses[requestIndex] != null; // we must have set a response somewhere. + } if (mappingsToUpdate != null) { diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 82168f71a11..22d87fff249 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -218,7 +218,7 @@ public abstract class TransportShardReplicationOperationAction response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, request)); performReplicas(response); - } catch (Exception e) { + } catch (Throwable e) { // shard has not been allocated yet, retry it here if (retryPrimaryException(e)) { primaryOperationStarted.set(false); @@ -691,7 +691,7 @@ public abstract class TransportShardReplicationOperationAction /** * Sets the number of retries of a version conflict occurs because the document was updated between - * getting it and updating it. Defaults to 1. + * getting it and updating it. Defaults to 0. */ public UpdateRequest retryOnConflict(int retryOnConflict) { this.retryOnConflict = retryOnConflict; diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java index aab5f6f18b0..b14abaf5f11 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java @@ -118,7 +118,7 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder