diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResultHolder.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResultHolder.java index e844f8d6506..3e7ee41b914 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResultHolder.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResultHolder.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.common.Nullable; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.VersionConflictEngineException; /** * A struct-like holder for a bulk items reponse, result, and the resulting @@ -39,4 +40,9 @@ class BulkItemResultHolder { this.operationResult = operationResult; this.replicaRequest = replicaRequest; } + + public boolean isVersionConflict() { + return operationResult == null ? false : + operationResult.getFailure() instanceof VersionConflictEngineException; + } } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 7a2c5eb0222..25c8635a35e 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -43,6 +43,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; @@ -265,131 +266,150 @@ public class TransportShardBulkAction extends TransportWriteAction 0)) { + final BytesReference indexSourceAsBytes = updateIndexRequest.source(); + final Tuple> sourceAndContent = + XContentHelper.convertToMap(indexSourceAsBytes, true, updateIndexRequest.getContentType()); + updateResponse.setGetResult(UpdateHelper.extractGetResult(updateRequest, concreteIndex, + indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); + } + // set translated request as replica request + replicaRequest = new BulkItemRequest(bulkReqId, updateIndexRequest); + + } else if (opType == Engine.Operation.TYPE.DELETE) { + assert result instanceof Engine.DeleteResult : result.getClass(); + final DeleteRequest updateDeleteRequest = translate.action(); + + final DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(), updateDeleteRequest.type(), updateDeleteRequest.id(), + result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound()); + + updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(), + deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(), + deleteResponse.getVersion(), deleteResponse.getResult()); + + final GetResult getResult = UpdateHelper.extractGetResult(updateRequest, concreteIndex, deleteResponse.getVersion(), + translate.updatedSourceAsMap(), translate.updateSourceContentType(), null); + + updateResponse.setGetResult(getResult); + // set translated request as replica request + replicaRequest = new BulkItemRequest(bulkReqId, updateDeleteRequest); + + } else { + throw new IllegalArgumentException("unknown operation type: " + opType); + } + + return new BulkItemResultHolder(updateResponse, result, replicaRequest); + } + + /** + * Executes update request once, delegating to a index or delete operation after translation. + * NOOP updates are indicated by returning a null operation in {@link BulkItemResultHolder} + */ + static BulkItemResultHolder executeUpdateRequestOnce(UpdateRequest updateRequest, IndexShard primary, + IndexMetaData metaData, String concreteIndex, + UpdateHelper updateHelper, LongSupplier nowInMillis, + BulkItemRequest primaryItemRequest, int bulkReqId, + final MappingUpdatePerformer mappingUpdater) throws Exception { + final UpdateHelper.Result translate; + // translate update request + try { + translate = updateHelper.prepare(updateRequest, primary, nowInMillis); + } catch (Exception failure) { + // we may fail translating a update to index or delete operation + // we use index result to communicate failure while translating update request + final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO); + return new BulkItemResultHolder(null, result, primaryItemRequest); + } + + final Engine.Result result; + // execute translated update request + switch (translate.getResponseResult()) { + case CREATED: + case UPDATED: + IndexRequest indexRequest = translate.action(); + MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type()); + indexRequest.process(mappingMd, concreteIndex); + result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater); + break; + case DELETED: + DeleteRequest deleteRequest = translate.action(); + result = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater); + break; + case NOOP: + primary.noopUpdate(updateRequest.type()); + result = null; + break; + default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult()); + } + + if (result == null) { + // this is a noop operation + final UpdateResponse updateResponse = translate.action(); + return new BulkItemResultHolder(updateResponse, result, primaryItemRequest); + } else if (result.hasFailure()) { + // There was a result, and the result was a failure + return new BulkItemResultHolder(null, result, primaryItemRequest); + } else { + // It was successful, we need to construct the response and return it + return processUpdateResponse(updateRequest, concreteIndex, result, translate, primary, bulkReqId); + } + } + /** * Executes update request, delegating to a index or delete operation after translation, * handles retries on version conflict and constructs update response - * NOTE: reassigns bulk item request at requestIndex for replicas to - * execute translated update request (NOOP update is an exception). NOOP updates are - * indicated by returning a null operation in {@link BulkItemResultHolder} - * */ + * NOOP updates are indicated by returning a null operation + * in {@link BulkItemResultHolder} + */ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateRequest, IndexShard primary, IndexMetaData metaData, BulkShardRequest request, int requestIndex, UpdateHelper updateHelper, LongSupplier nowInMillis, final MappingUpdatePerformer mappingUpdater) throws Exception { - Engine.Result result = null; - UpdateResponse updateResponse = null; - BulkItemRequest replicaRequest = request.items()[requestIndex]; - int maxAttempts = updateRequest.retryOnConflict(); - for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) { - final UpdateHelper.Result translate; - // translate update request - try { - translate = updateHelper.prepare(updateRequest, primary, nowInMillis); - } catch (Exception failure) { - // we may fail translating a update to index or delete operation - // we use index result to communicate failure while translating update request - result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO); - break; // out of retry loop - } - // execute translated update request - switch (translate.getResponseResult()) { - case CREATED: - case UPDATED: - IndexRequest indexRequest = translate.action(); - MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type()); - indexRequest.process(mappingMd, request.index()); - result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater); - break; - case DELETED: - DeleteRequest deleteRequest = translate.action(); - result = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater); - break; - case NOOP: - primary.noopUpdate(updateRequest.type()); - break; - default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult()); - } - if (result == null) { - // this is a noop operation - updateResponse = translate.action(); - break; // out of retry loop - } else if (result.hasFailure() == false) { - // enrich update response and - // set translated update (index/delete) request for replica execution in bulk items - switch (result.getOperationType()) { - case INDEX: - assert result instanceof Engine.IndexResult : result.getClass(); - IndexRequest updateIndexRequest = translate.action(); - final IndexResponse indexResponse = new IndexResponse( - primary.shardId(), - updateIndexRequest.type(), - updateIndexRequest.id(), - result.getSeqNo(), - primary.getPrimaryTerm(), - result.getVersion(), - ((Engine.IndexResult) result).isCreated()); - BytesReference indexSourceAsBytes = updateIndexRequest.source(); - updateResponse = new UpdateResponse( - indexResponse.getShardInfo(), - indexResponse.getShardId(), - indexResponse.getType(), - indexResponse.getId(), - indexResponse.getSeqNo(), - indexResponse.getPrimaryTerm(), - indexResponse.getVersion(), - indexResponse.getResult()); - if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) || - (updateRequest.fields() != null && updateRequest.fields().length > 0)) { - Tuple> sourceAndContent = - XContentHelper.convertToMap(indexSourceAsBytes, true, updateIndexRequest.getContentType()); - updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), - indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); - } - // set translated request as replica request - replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateIndexRequest); - break; - case DELETE: - assert result instanceof Engine.DeleteResult : result.getClass(); - DeleteRequest updateDeleteRequest = translate.action(); - DeleteResponse deleteResponse = new DeleteResponse( - primary.shardId(), - updateDeleteRequest.type(), - updateDeleteRequest.id(), - result.getSeqNo(), - primary.getPrimaryTerm(), - result.getVersion(), - ((Engine.DeleteResult) result).isFound()); - updateResponse = new UpdateResponse( - deleteResponse.getShardInfo(), - deleteResponse.getShardId(), - deleteResponse.getType(), - deleteResponse.getId(), - deleteResponse.getSeqNo(), - deleteResponse.getPrimaryTerm(), - deleteResponse.getVersion(), - deleteResponse.getResult()); - final GetResult getResult = updateHelper.extractGetResult( - updateRequest, - request.index(), - deleteResponse.getVersion(), - translate.updatedSourceAsMap(), - translate.updateSourceContentType(), - null); - updateResponse.setGetResult(getResult); - // set translated request as replica request - replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest); - break; - } - assert result.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO; - // successful operation - break; // out of retry loop - } else if (result.getFailure() instanceof VersionConflictEngineException == false) { - // not a version conflict exception - break; // out of retry loop + BulkItemRequest primaryItemRequest = request.items()[requestIndex]; + assert primaryItemRequest.request() == updateRequest + : "expected bulk item request to contain the original update request, got: " + + primaryItemRequest.request() + " and " + updateRequest; + + BulkItemResultHolder holder = null; + // There must be at least one attempt + int maxAttempts = Math.max(1, updateRequest.retryOnConflict()); + for (int attemptCount = 0; attemptCount < maxAttempts; attemptCount++) { + + holder = executeUpdateRequestOnce(updateRequest, primary, metaData, request.index(), updateHelper, + nowInMillis, primaryItemRequest, request.items()[requestIndex].id(), mappingUpdater); + + // It was either a successful request, or it was a non-conflict failure + if (holder.isVersionConflict() == false) { + return holder; } } - return new BulkItemResultHolder(updateResponse, result, replicaRequest); + // We ran out of tries and haven't returned a valid bulk item response, so return the last one generated + return holder; } /** Modes for executing item request on replica depending on corresponding primary execution result */ diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 189803f818f..a9d0e305f14 100644 --- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -184,7 +184,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio (request.fields() != null && request.fields().length > 0)) { Tuple> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true, upsertRequest.getContentType()); - update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes)); + update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes)); } else { update.setGetResult(null); } @@ -201,7 +201,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio bulkAction.execute(toSingleItemBulkRequest(indexRequest), wrapBulkResponse( ActionListener.wrap(response -> { UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult()); - update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); + update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); update.setForcedRefresh(response.forcedRefresh()); listener.onResponse(update); }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))) @@ -212,7 +212,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio bulkAction.execute(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse( ActionListener.wrap(response -> { UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult()); - update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); + update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); update.setForcedRefresh(response.forcedRefresh()); listener.onResponse(update); }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))) diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 6d3098c5caa..fb508612f51 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -314,8 +314,9 @@ public class UpdateHelper extends AbstractComponent { * Applies {@link UpdateRequest#fetchSource()} to the _source of the updated document to be returned in a update response. * For BWC this function also extracts the {@link UpdateRequest#fields()} from the updated document to be returned in a update response */ - public GetResult extractGetResult(final UpdateRequest request, String concreteIndex, long version, final Map source, - XContentType sourceContentType, @Nullable final BytesReference sourceAsBytes) { + public static GetResult extractGetResult(final UpdateRequest request, String concreteIndex, long version, + final Map source, XContentType sourceContentType, + @Nullable final BytesReference sourceAsBytes) { if ((request.fields() == null || request.fields().length == 0) && (request.fetchSource() == null || request.fetchSource().fetchSource() == false)) { return null; diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index aa7f613a176..89496054a13 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -35,8 +35,12 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; @@ -50,12 +54,17 @@ import org.elasticsearch.rest.RestStatus; import org.mockito.ArgumentCaptor; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.LongSupplier; import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode; +import static org.junit.Assert.assertNotNull; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyLong; @@ -648,6 +657,199 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { closeShards(shard); } + public void testProcessUpdateResponse() throws Exception { + IndexMetaData metaData = indexMetaData(); + IndexShard shard = newStartedShard(false); + + UpdateRequest updateRequest = new UpdateRequest("index", "type", "id"); + BulkItemRequest request = new BulkItemRequest(0, updateRequest); + Exception err = new VersionConflictEngineException(shardId, "type", "id", + "I'm conflicted <(;_;)>"); + Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0); + Engine.DeleteResult deleteResult = new Engine.DeleteResult(1, 1, true); + DocWriteResponse.Result docWriteResult = DocWriteResponse.Result.CREATED; + DocWriteResponse.Result deleteWriteResult = DocWriteResponse.Result.DELETED; + IndexRequest indexRequest = new IndexRequest("index", "type", "id"); + DeleteRequest deleteRequest = new DeleteRequest("index", "type", "id"); + UpdateHelper.Result translate = new UpdateHelper.Result(indexRequest, docWriteResult, + new HashMap(), XContentType.JSON); + UpdateHelper.Result translateDelete = new UpdateHelper.Result(deleteRequest, deleteWriteResult, + new HashMap(), XContentType.JSON); + + BulkItemRequest[] itemRequests = new BulkItemRequest[1]; + itemRequests[0] = request; + BulkShardRequest bulkShardRequest = new BulkShardRequest(shard.shardId(), RefreshPolicy.NONE, itemRequests); + + BulkItemResultHolder holder = TransportShardBulkAction.processUpdateResponse(updateRequest, + "index", indexResult, translate, shard, 7); + + assertTrue(holder.isVersionConflict()); + assertThat(holder.response, instanceOf(UpdateResponse.class)); + UpdateResponse updateResp = (UpdateResponse) holder.response; + assertThat(updateResp.getGetResult(), equalTo(null)); + assertThat(holder.operationResult, equalTo(indexResult)); + BulkItemRequest replicaBulkRequest = holder.replicaRequest; + assertThat(replicaBulkRequest.id(), equalTo(7)); + DocWriteRequest replicaRequest = replicaBulkRequest.request(); + assertThat(replicaRequest, instanceOf(IndexRequest.class)); + assertThat(replicaRequest, equalTo(indexRequest)); + + BulkItemResultHolder deleteHolder = TransportShardBulkAction.processUpdateResponse(updateRequest, + "index", deleteResult, translateDelete, shard, 8); + + assertFalse(deleteHolder.isVersionConflict()); + assertThat(deleteHolder.response, instanceOf(UpdateResponse.class)); + UpdateResponse delUpdateResp = (UpdateResponse) deleteHolder.response; + assertThat(delUpdateResp.getGetResult(), equalTo(null)); + assertThat(deleteHolder.operationResult, equalTo(deleteResult)); + BulkItemRequest delReplicaBulkRequest = deleteHolder.replicaRequest; + assertThat(delReplicaBulkRequest.id(), equalTo(8)); + DocWriteRequest delReplicaRequest = delReplicaBulkRequest.request(); + assertThat(delReplicaRequest, instanceOf(DeleteRequest.class)); + assertThat(delReplicaRequest, equalTo(deleteRequest)); + + closeShards(shard); + } + + public void testExecuteUpdateRequestOnce() throws Exception { + IndexMetaData metaData = indexMetaData(); + IndexShard shard = newStartedShard(true); + + Map source = new HashMap<>(); + source.put("foo", "bar"); + BulkItemRequest[] items = new BulkItemRequest[1]; + boolean create = randomBoolean(); + DocWriteRequest writeRequest = new IndexRequest("index", "type", "id") + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar") + .create(create); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); + items[0] = primaryRequest; + BulkShardRequest bulkShardRequest = + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + + Translog.Location location = new Translog.Location(0, 0, 0); + IndexRequest indexRequest = new IndexRequest("index", "type", "id"); + indexRequest.source(source); + + DocWriteResponse.Result docWriteResult = DocWriteResponse.Result.CREATED; + UpdateHelper.Result translate = new UpdateHelper.Result(indexRequest, docWriteResult, + new HashMap(), XContentType.JSON); + UpdateHelper updateHelper = new MockUpdateHelper(translate); + UpdateRequest updateRequest = new UpdateRequest("index", "type", "id"); + updateRequest.upsert(source); + + BulkItemResultHolder holder = TransportShardBulkAction.executeUpdateRequestOnce(updateRequest, shard, metaData, + "index", updateHelper, threadPool::absoluteTimeInMillis, primaryRequest, 0, new NoopMappingUpdatePerformer()); + + assertFalse(holder.isVersionConflict()); + assertNotNull(holder.response); + assertNotNull(holder.operationResult); + assertNotNull(holder.replicaRequest); + + assertThat(holder.response, instanceOf(UpdateResponse.class)); + UpdateResponse updateResp = (UpdateResponse) holder.response; + assertThat(updateResp.getGetResult(), equalTo(null)); + BulkItemRequest replicaBulkRequest = holder.replicaRequest; + assertThat(replicaBulkRequest.id(), equalTo(0)); + DocWriteRequest replicaRequest = replicaBulkRequest.request(); + assertThat(replicaRequest, instanceOf(IndexRequest.class)); + assertThat(replicaRequest, equalTo(indexRequest)); + + // Assert that the document actually made it there + assertDocCount(shard, 1); + closeShards(shard); + } + + public void testExecuteUpdateRequestOnceWithFailure() throws Exception { + IndexMetaData metaData = indexMetaData(); + IndexShard shard = newStartedShard(true); + + Map source = new HashMap<>(); + source.put("foo", "bar"); + BulkItemRequest[] items = new BulkItemRequest[1]; + boolean create = randomBoolean(); + DocWriteRequest writeRequest = new IndexRequest("index", "type", "id") + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar") + .create(create); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); + items[0] = primaryRequest; + BulkShardRequest bulkShardRequest = + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + + Translog.Location location = new Translog.Location(0, 0, 0); + IndexRequest indexRequest = new IndexRequest("index", "type", "id"); + indexRequest.source(source); + + DocWriteResponse.Result docWriteResult = DocWriteResponse.Result.CREATED; + Exception prepareFailure = new IllegalArgumentException("I failed to do something!"); + UpdateHelper updateHelper = new FailingUpdateHelper(prepareFailure); + UpdateRequest updateRequest = new UpdateRequest("index", "type", "id"); + updateRequest.upsert(source); + + BulkItemResultHolder holder = TransportShardBulkAction.executeUpdateRequestOnce(updateRequest, shard, metaData, + "index", updateHelper, threadPool::absoluteTimeInMillis, primaryRequest, 0, new NoopMappingUpdatePerformer()); + + assertFalse(holder.isVersionConflict()); + assertNull(holder.response); + assertNotNull(holder.operationResult); + assertNotNull(holder.replicaRequest); + + Engine.IndexResult opResult = (Engine.IndexResult) holder.operationResult; + assertTrue(opResult.hasFailure()); + assertFalse(opResult.isCreated()); + Exception e = opResult.getFailure(); + assertThat(e.getMessage(), containsString("I failed to do something!")); + + BulkItemRequest replicaBulkRequest = holder.replicaRequest; + assertThat(replicaBulkRequest.id(), equalTo(0)); + assertThat(replicaBulkRequest.request(), instanceOf(IndexRequest.class)); + IndexRequest replicaRequest = (IndexRequest) replicaBulkRequest.request(); + assertThat(replicaRequest.index(), equalTo("index")); + assertThat(replicaRequest.type(), equalTo("type")); + assertThat(replicaRequest.id(), equalTo("id")); + assertThat(replicaRequest.sourceAsMap(), equalTo(source)); + + // Assert that the document did not make it there, since it should have failed + assertDocCount(shard, 0); + closeShards(shard); + } + + /** + * Fake UpdateHelper that always returns whatever result you give it + */ + private static class MockUpdateHelper extends UpdateHelper { + private final UpdateHelper.Result result; + + MockUpdateHelper(UpdateHelper.Result result) { + super(Settings.EMPTY, null); + this.result = result; + } + + @Override + public UpdateHelper.Result prepare(UpdateRequest u, IndexShard s, LongSupplier n) { + logger.info("--> preparing update for {} - {}", s, u); + return result; + } + } + + /** + * An update helper that always fails to prepare the update + */ + private static class FailingUpdateHelper extends UpdateHelper { + private final Exception e; + + FailingUpdateHelper(Exception failure) { + super(Settings.EMPTY, null); + this.e = failure; + } + + @Override + public UpdateHelper.Result prepare(UpdateRequest u, IndexShard s, LongSupplier n) { + logger.info("--> preparing failing update for {} - {}", s, u); + throw new ElasticsearchException(e); + } + } + /** * Fake IndexResult that has a settable translog location */