Refactor TransportShardBulkAction.executeUpdateRequest and add tests

This splits `executeUpdateRequest` into separate parts and adds some unit tests
for the behavior in it. The actual behavior has not been changed.
This commit is contained in:
Lee Hinman 2017-05-19 14:50:20 -06:00
parent cadd31b3a8
commit aa3134c093
5 changed files with 349 additions and 120 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
/**
* A struct-like holder for a bulk items reponse, result, and the resulting
@ -39,4 +40,9 @@ class BulkItemResultHolder {
this.operationResult = operationResult;
this.replicaRequest = replicaRequest;
}
public boolean isVersionConflict() {
return operationResult == null ? false :
operationResult.getFailure() instanceof VersionConflictEngineException;
}
}

View File

@ -43,6 +43,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
@ -265,131 +266,150 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException;
}
/**
* Creates a new bulk item result from the given requests and result of performing the update operation on the shard.
*/
static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex,
final Engine.Result result, final UpdateHelper.Result translate,
final IndexShard primary, final int bulkReqId) throws Exception {
assert result.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "failed result should not have a sequence number";
Engine.Operation.TYPE opType = result.getOperationType();
final UpdateResponse updateResponse;
final BulkItemRequest replicaRequest;
// enrich update response and set translated update (index/delete) request for replica execution in bulk items
if (opType == Engine.Operation.TYPE.INDEX) {
assert result instanceof Engine.IndexResult : result.getClass();
final IndexRequest updateIndexRequest = translate.action();
final IndexResponse indexResponse = new IndexResponse(primary.shardId(), updateIndexRequest.type(), updateIndexRequest.id(),
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(),
indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm(), indexResponse.getVersion(),
indexResponse.getResult());
if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) ||
(updateRequest.fields() != null && updateRequest.fields().length > 0)) {
final BytesReference indexSourceAsBytes = updateIndexRequest.source();
final Tuple<XContentType, Map<String, Object>> sourceAndContent =
XContentHelper.convertToMap(indexSourceAsBytes, true, updateIndexRequest.getContentType());
updateResponse.setGetResult(UpdateHelper.extractGetResult(updateRequest, concreteIndex,
indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
}
// set translated request as replica request
replicaRequest = new BulkItemRequest(bulkReqId, updateIndexRequest);
} else if (opType == Engine.Operation.TYPE.DELETE) {
assert result instanceof Engine.DeleteResult : result.getClass();
final DeleteRequest updateDeleteRequest = translate.action();
final DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(), updateDeleteRequest.type(), updateDeleteRequest.id(),
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(),
deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(),
deleteResponse.getVersion(), deleteResponse.getResult());
final GetResult getResult = UpdateHelper.extractGetResult(updateRequest, concreteIndex, deleteResponse.getVersion(),
translate.updatedSourceAsMap(), translate.updateSourceContentType(), null);
updateResponse.setGetResult(getResult);
// set translated request as replica request
replicaRequest = new BulkItemRequest(bulkReqId, updateDeleteRequest);
} else {
throw new IllegalArgumentException("unknown operation type: " + opType);
}
return new BulkItemResultHolder(updateResponse, result, replicaRequest);
}
/**
* Executes update request once, delegating to a index or delete operation after translation.
* NOOP updates are indicated by returning a <code>null</code> operation in {@link BulkItemResultHolder}
*/
static BulkItemResultHolder executeUpdateRequestOnce(UpdateRequest updateRequest, IndexShard primary,
IndexMetaData metaData, String concreteIndex,
UpdateHelper updateHelper, LongSupplier nowInMillis,
BulkItemRequest primaryItemRequest, int bulkReqId,
final MappingUpdatePerformer mappingUpdater) throws Exception {
final UpdateHelper.Result translate;
// translate update request
try {
translate = updateHelper.prepare(updateRequest, primary, nowInMillis);
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
return new BulkItemResultHolder(null, result, primaryItemRequest);
}
final Engine.Result result;
// execute translated update request
switch (translate.getResponseResult()) {
case CREATED:
case UPDATED:
IndexRequest indexRequest = translate.action();
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(mappingMd, concreteIndex);
result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
break;
case DELETED:
DeleteRequest deleteRequest = translate.action();
result = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater);
break;
case NOOP:
primary.noopUpdate(updateRequest.type());
result = null;
break;
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
}
if (result == null) {
// this is a noop operation
final UpdateResponse updateResponse = translate.action();
return new BulkItemResultHolder(updateResponse, result, primaryItemRequest);
} else if (result.hasFailure()) {
// There was a result, and the result was a failure
return new BulkItemResultHolder(null, result, primaryItemRequest);
} else {
// It was successful, we need to construct the response and return it
return processUpdateResponse(updateRequest, concreteIndex, result, translate, primary, bulkReqId);
}
}
/**
* Executes update request, delegating to a index or delete operation after translation,
* handles retries on version conflict and constructs update response
* NOTE: reassigns bulk item request at <code>requestIndex</code> for replicas to
* execute translated update request (NOOP update is an exception). NOOP updates are
* indicated by returning a <code>null</code> operation in {@link BulkItemResultHolder}
* */
* NOOP updates are indicated by returning a <code>null</code> operation
* in {@link BulkItemResultHolder}
*/
private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateRequest, IndexShard primary,
IndexMetaData metaData, BulkShardRequest request,
int requestIndex, UpdateHelper updateHelper,
LongSupplier nowInMillis,
final MappingUpdatePerformer mappingUpdater) throws Exception {
Engine.Result result = null;
UpdateResponse updateResponse = null;
BulkItemRequest replicaRequest = request.items()[requestIndex];
int maxAttempts = updateRequest.retryOnConflict();
for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) {
final UpdateHelper.Result translate;
// translate update request
try {
translate = updateHelper.prepare(updateRequest, primary, nowInMillis);
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
break; // out of retry loop
}
// execute translated update request
switch (translate.getResponseResult()) {
case CREATED:
case UPDATED:
IndexRequest indexRequest = translate.action();
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(mappingMd, request.index());
result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
break;
case DELETED:
DeleteRequest deleteRequest = translate.action();
result = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater);
break;
case NOOP:
primary.noopUpdate(updateRequest.type());
break;
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
}
if (result == null) {
// this is a noop operation
updateResponse = translate.action();
break; // out of retry loop
} else if (result.hasFailure() == false) {
// enrich update response and
// set translated update (index/delete) request for replica execution in bulk items
switch (result.getOperationType()) {
case INDEX:
assert result instanceof Engine.IndexResult : result.getClass();
IndexRequest updateIndexRequest = translate.action();
final IndexResponse indexResponse = new IndexResponse(
primary.shardId(),
updateIndexRequest.type(),
updateIndexRequest.id(),
result.getSeqNo(),
primary.getPrimaryTerm(),
result.getVersion(),
((Engine.IndexResult) result).isCreated());
BytesReference indexSourceAsBytes = updateIndexRequest.source();
updateResponse = new UpdateResponse(
indexResponse.getShardInfo(),
indexResponse.getShardId(),
indexResponse.getType(),
indexResponse.getId(),
indexResponse.getSeqNo(),
indexResponse.getPrimaryTerm(),
indexResponse.getVersion(),
indexResponse.getResult());
if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) ||
(updateRequest.fields() != null && updateRequest.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent =
XContentHelper.convertToMap(indexSourceAsBytes, true, updateIndexRequest.getContentType());
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(),
indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
}
// set translated request as replica request
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateIndexRequest);
break;
case DELETE:
assert result instanceof Engine.DeleteResult : result.getClass();
DeleteRequest updateDeleteRequest = translate.action();
DeleteResponse deleteResponse = new DeleteResponse(
primary.shardId(),
updateDeleteRequest.type(),
updateDeleteRequest.id(),
result.getSeqNo(),
primary.getPrimaryTerm(),
result.getVersion(),
((Engine.DeleteResult) result).isFound());
updateResponse = new UpdateResponse(
deleteResponse.getShardInfo(),
deleteResponse.getShardId(),
deleteResponse.getType(),
deleteResponse.getId(),
deleteResponse.getSeqNo(),
deleteResponse.getPrimaryTerm(),
deleteResponse.getVersion(),
deleteResponse.getResult());
final GetResult getResult = updateHelper.extractGetResult(
updateRequest,
request.index(),
deleteResponse.getVersion(),
translate.updatedSourceAsMap(),
translate.updateSourceContentType(),
null);
updateResponse.setGetResult(getResult);
// set translated request as replica request
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
break;
}
assert result.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO;
// successful operation
break; // out of retry loop
} else if (result.getFailure() instanceof VersionConflictEngineException == false) {
// not a version conflict exception
break; // out of retry loop
BulkItemRequest primaryItemRequest = request.items()[requestIndex];
assert primaryItemRequest.request() == updateRequest
: "expected bulk item request to contain the original update request, got: " +
primaryItemRequest.request() + " and " + updateRequest;
BulkItemResultHolder holder = null;
// There must be at least one attempt
int maxAttempts = Math.max(1, updateRequest.retryOnConflict());
for (int attemptCount = 0; attemptCount < maxAttempts; attemptCount++) {
holder = executeUpdateRequestOnce(updateRequest, primary, metaData, request.index(), updateHelper,
nowInMillis, primaryItemRequest, request.items()[requestIndex].id(), mappingUpdater);
// It was either a successful request, or it was a non-conflict failure
if (holder.isVersionConflict() == false) {
return holder;
}
}
return new BulkItemResultHolder(updateResponse, result, replicaRequest);
// We ran out of tries and haven't returned a valid bulk item response, so return the last one generated
return holder;
}
/** Modes for executing item request on replica depending on corresponding primary execution result */

View File

@ -184,7 +184,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
(request.fields() != null && request.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent =
XContentHelper.convertToMap(upsertSourceBytes, true, upsertRequest.getContentType());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
} else {
update.setGetResult(null);
}
@ -201,7 +201,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
bulkAction.execute(toSingleItemBulkRequest(indexRequest), wrapBulkResponse(
ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
@ -212,7 +212,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
bulkAction.execute(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse(
ActionListener.<DeleteResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))

View File

@ -314,8 +314,9 @@ public class UpdateHelper extends AbstractComponent {
* Applies {@link UpdateRequest#fetchSource()} to the _source of the updated document to be returned in a update response.
* For BWC this function also extracts the {@link UpdateRequest#fields()} from the updated document to be returned in a update response
*/
public GetResult extractGetResult(final UpdateRequest request, String concreteIndex, long version, final Map<String, Object> source,
XContentType sourceContentType, @Nullable final BytesReference sourceAsBytes) {
public static GetResult extractGetResult(final UpdateRequest request, String concreteIndex, long version,
final Map<String, Object> source, XContentType sourceContentType,
@Nullable final BytesReference sourceAsBytes) {
if ((request.fields() == null || request.fields().length == 0) &&
(request.fetchSource() == null || request.fetchSource().fetchSource() == false)) {
return null;

View File

@ -35,8 +35,12 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
@ -50,12 +54,17 @@ import org.elasticsearch.rest.RestStatus;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;
import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode;
import static org.junit.Assert.assertNotNull;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyLong;
@ -648,6 +657,199 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
closeShards(shard);
}
public void testProcessUpdateResponse() throws Exception {
IndexMetaData metaData = indexMetaData();
IndexShard shard = newStartedShard(false);
UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
BulkItemRequest request = new BulkItemRequest(0, updateRequest);
Exception err = new VersionConflictEngineException(shardId, "type", "id",
"I'm conflicted <(;_;)>");
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0);
Engine.DeleteResult deleteResult = new Engine.DeleteResult(1, 1, true);
DocWriteResponse.Result docWriteResult = DocWriteResponse.Result.CREATED;
DocWriteResponse.Result deleteWriteResult = DocWriteResponse.Result.DELETED;
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
DeleteRequest deleteRequest = new DeleteRequest("index", "type", "id");
UpdateHelper.Result translate = new UpdateHelper.Result(indexRequest, docWriteResult,
new HashMap<String, Object>(), XContentType.JSON);
UpdateHelper.Result translateDelete = new UpdateHelper.Result(deleteRequest, deleteWriteResult,
new HashMap<String, Object>(), XContentType.JSON);
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
itemRequests[0] = request;
BulkShardRequest bulkShardRequest = new BulkShardRequest(shard.shardId(), RefreshPolicy.NONE, itemRequests);
BulkItemResultHolder holder = TransportShardBulkAction.processUpdateResponse(updateRequest,
"index", indexResult, translate, shard, 7);
assertTrue(holder.isVersionConflict());
assertThat(holder.response, instanceOf(UpdateResponse.class));
UpdateResponse updateResp = (UpdateResponse) holder.response;
assertThat(updateResp.getGetResult(), equalTo(null));
assertThat(holder.operationResult, equalTo(indexResult));
BulkItemRequest replicaBulkRequest = holder.replicaRequest;
assertThat(replicaBulkRequest.id(), equalTo(7));
DocWriteRequest replicaRequest = replicaBulkRequest.request();
assertThat(replicaRequest, instanceOf(IndexRequest.class));
assertThat(replicaRequest, equalTo(indexRequest));
BulkItemResultHolder deleteHolder = TransportShardBulkAction.processUpdateResponse(updateRequest,
"index", deleteResult, translateDelete, shard, 8);
assertFalse(deleteHolder.isVersionConflict());
assertThat(deleteHolder.response, instanceOf(UpdateResponse.class));
UpdateResponse delUpdateResp = (UpdateResponse) deleteHolder.response;
assertThat(delUpdateResp.getGetResult(), equalTo(null));
assertThat(deleteHolder.operationResult, equalTo(deleteResult));
BulkItemRequest delReplicaBulkRequest = deleteHolder.replicaRequest;
assertThat(delReplicaBulkRequest.id(), equalTo(8));
DocWriteRequest delReplicaRequest = delReplicaBulkRequest.request();
assertThat(delReplicaRequest, instanceOf(DeleteRequest.class));
assertThat(delReplicaRequest, equalTo(deleteRequest));
closeShards(shard);
}
public void testExecuteUpdateRequestOnce() throws Exception {
IndexMetaData metaData = indexMetaData();
IndexShard shard = newStartedShard(true);
Map<String, Object> source = new HashMap<>();
source.put("foo", "bar");
BulkItemRequest[] items = new BulkItemRequest[1];
boolean create = randomBoolean();
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar")
.create(create);
BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest);
items[0] = primaryRequest;
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
Translog.Location location = new Translog.Location(0, 0, 0);
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(source);
DocWriteResponse.Result docWriteResult = DocWriteResponse.Result.CREATED;
UpdateHelper.Result translate = new UpdateHelper.Result(indexRequest, docWriteResult,
new HashMap<String, Object>(), XContentType.JSON);
UpdateHelper updateHelper = new MockUpdateHelper(translate);
UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
updateRequest.upsert(source);
BulkItemResultHolder holder = TransportShardBulkAction.executeUpdateRequestOnce(updateRequest, shard, metaData,
"index", updateHelper, threadPool::absoluteTimeInMillis, primaryRequest, 0, new NoopMappingUpdatePerformer());
assertFalse(holder.isVersionConflict());
assertNotNull(holder.response);
assertNotNull(holder.operationResult);
assertNotNull(holder.replicaRequest);
assertThat(holder.response, instanceOf(UpdateResponse.class));
UpdateResponse updateResp = (UpdateResponse) holder.response;
assertThat(updateResp.getGetResult(), equalTo(null));
BulkItemRequest replicaBulkRequest = holder.replicaRequest;
assertThat(replicaBulkRequest.id(), equalTo(0));
DocWriteRequest replicaRequest = replicaBulkRequest.request();
assertThat(replicaRequest, instanceOf(IndexRequest.class));
assertThat(replicaRequest, equalTo(indexRequest));
// Assert that the document actually made it there
assertDocCount(shard, 1);
closeShards(shard);
}
public void testExecuteUpdateRequestOnceWithFailure() throws Exception {
IndexMetaData metaData = indexMetaData();
IndexShard shard = newStartedShard(true);
Map<String, Object> source = new HashMap<>();
source.put("foo", "bar");
BulkItemRequest[] items = new BulkItemRequest[1];
boolean create = randomBoolean();
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar")
.create(create);
BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest);
items[0] = primaryRequest;
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
Translog.Location location = new Translog.Location(0, 0, 0);
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(source);
DocWriteResponse.Result docWriteResult = DocWriteResponse.Result.CREATED;
Exception prepareFailure = new IllegalArgumentException("I failed to do something!");
UpdateHelper updateHelper = new FailingUpdateHelper(prepareFailure);
UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
updateRequest.upsert(source);
BulkItemResultHolder holder = TransportShardBulkAction.executeUpdateRequestOnce(updateRequest, shard, metaData,
"index", updateHelper, threadPool::absoluteTimeInMillis, primaryRequest, 0, new NoopMappingUpdatePerformer());
assertFalse(holder.isVersionConflict());
assertNull(holder.response);
assertNotNull(holder.operationResult);
assertNotNull(holder.replicaRequest);
Engine.IndexResult opResult = (Engine.IndexResult) holder.operationResult;
assertTrue(opResult.hasFailure());
assertFalse(opResult.isCreated());
Exception e = opResult.getFailure();
assertThat(e.getMessage(), containsString("I failed to do something!"));
BulkItemRequest replicaBulkRequest = holder.replicaRequest;
assertThat(replicaBulkRequest.id(), equalTo(0));
assertThat(replicaBulkRequest.request(), instanceOf(IndexRequest.class));
IndexRequest replicaRequest = (IndexRequest) replicaBulkRequest.request();
assertThat(replicaRequest.index(), equalTo("index"));
assertThat(replicaRequest.type(), equalTo("type"));
assertThat(replicaRequest.id(), equalTo("id"));
assertThat(replicaRequest.sourceAsMap(), equalTo(source));
// Assert that the document did not make it there, since it should have failed
assertDocCount(shard, 0);
closeShards(shard);
}
/**
* Fake UpdateHelper that always returns whatever result you give it
*/
private static class MockUpdateHelper extends UpdateHelper {
private final UpdateHelper.Result result;
MockUpdateHelper(UpdateHelper.Result result) {
super(Settings.EMPTY, null);
this.result = result;
}
@Override
public UpdateHelper.Result prepare(UpdateRequest u, IndexShard s, LongSupplier n) {
logger.info("--> preparing update for {} - {}", s, u);
return result;
}
}
/**
* An update helper that always fails to prepare the update
*/
private static class FailingUpdateHelper extends UpdateHelper {
private final Exception e;
FailingUpdateHelper(Exception failure) {
super(Settings.EMPTY, null);
this.e = failure;
}
@Override
public UpdateHelper.Result prepare(UpdateRequest u, IndexShard s, LongSupplier n) {
logger.info("--> preparing failing update for {} - {}", s, u);
throw new ElasticsearchException(e);
}
}
/**
* Fake IndexResult that has a settable translog location
*/