Refactor TransportShardBulkAction to better support retries (#31821)

Processing bulk request goes item by item. Sometimes during processing, we need to stop execution and wait for a new mapping update to be processed by the node. This is currently achieved by throwing a `RetryOnPrimaryException`, which is caught higher up. When the exception is caught, we wait for the next cluster state to arrive and process the request again. Sadly this is a problem because all operations that were already done until the mapping change was required are applied again and get new sequence numbers. This in turn means that the previously issued sequence numbers are never replicated to the replicas. That causes the local checkpoint of those shards to be stuck and with it all the seq# based infrastructure.

This commit refactors how we deal with retries with the goal of removing  `RetryOnPrimaryException` and `RetryOnReplicaException` (not done yet). It achieves so by introducing a class `BulkPrimaryExecutionContext` that is used the capture the execution state and allows continuing from where the execution stopped. The class also formalizes the steps each item has to go through:
1) A translation phase for updates
2) Execution phase (always index/delete)
3) Waiting for a mapping update to come in, if needed
4) Requires a retry (for updates and cases where the mapping are still not available after the put mapping call returns)
5) A finalization phase which allows updates to the index/delete result to an update result.
This commit is contained in:
Boaz Leskes 2018-08-10 10:15:01 +02:00 committed by GitHub
parent af8c23eb40
commit f58ed21720
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1125 additions and 760 deletions

View File

@ -1,48 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
/**
* A struct-like holder for a bulk items reponse, result, and the resulting
* replica operation to be executed.
*/
class BulkItemResultHolder {
public final @Nullable DocWriteResponse response;
public final @Nullable Engine.Result operationResult;
public final BulkItemRequest replicaRequest;
BulkItemResultHolder(@Nullable DocWriteResponse response,
@Nullable Engine.Result operationResult,
BulkItemRequest replicaRequest) {
this.response = response;
this.operationResult = operationResult;
this.replicaRequest = replicaRequest;
}
public boolean isVersionConflict() {
return operationResult == null ? false :
operationResult.getFailure() instanceof VersionConflictEngineException;
}
}

View File

@ -0,0 +1,345 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import java.util.Arrays;
/**
* This is a utility class that holds the per request state needed to perform bulk operations on the primary.
* More specifically, it maintains an index to the current executing bulk item, which allows execution
* to stop and wait for external events such as mapping updates.
*/
class BulkPrimaryExecutionContext {
enum ItemProcessingState {
/** Item execution is ready to start, no operations have been performed yet */
INITIAL,
/**
* The incoming request has been translated to a request that can be executed on the shard.
* This is used to convert update requests to a fully specified index or delete requests.
*/
TRANSLATED,
/**
* the request can not execute with the current mapping and should wait for a new mapping
* to arrive from the master. A mapping request for the needed changes has already been
* submitted
*/
WAIT_FOR_MAPPING_UPDATE,
/**
* The request should be executed again, but there is no need to wait for an external event.
* This is needed to support retry on conflicts during updates.
*/
IMMEDIATE_RETRY,
/** The request has been executed on the primary shard (successfully or not) */
EXECUTED,
/**
* No further handling of current request is needed. The result has been converted to a user response
* and execution can continue to the next item (if available).
*/
COMPLETED
}
private final BulkShardRequest request;
private final IndexShard primary;
private Translog.Location locationToSync = null;
private int currentIndex = -1;
private ItemProcessingState currentItemState;
private DocWriteRequest requestToExecute;
private BulkItemResponse executionResult;
private int retryCounter;
BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) {
this.request = request;
this.primary = primary;
advance();
}
private int findNextNonAborted(int startIndex) {
final int length = request.items().length;
while (startIndex < length && isAborted(request.items()[startIndex].getPrimaryResponse())) {
startIndex++;
}
return startIndex;
}
private static boolean isAborted(BulkItemResponse response) {
return response != null && response.isFailed() && response.getFailure().isAborted();
}
/** move to the next item to execute */
private void advance() {
assert currentItemState == ItemProcessingState.COMPLETED || currentIndex == -1 :
"moving to next but current item wasn't completed (state: " + currentItemState + ")";
currentItemState = ItemProcessingState.INITIAL;
currentIndex = findNextNonAborted(currentIndex + 1);
retryCounter = 0;
requestToExecute = null;
executionResult = null;
assert assertInvariants(ItemProcessingState.INITIAL);
}
/** gets the current, untranslated item request */
public DocWriteRequest<?> getCurrent() {
return getCurrentItem().request();
}
public BulkShardRequest getBulkShardRequest() {
return request;
}
/** returns the result of the request that has been executed on the shard */
public BulkItemResponse getExecutionResult() {
assert assertInvariants(ItemProcessingState.EXECUTED);
return executionResult;
}
/** returns the number of times the current operation has been retried */
public int getRetryCounter() {
return retryCounter;
}
/** returns true if the current request has been executed on the primary */
public boolean isOperationExecuted() {
return currentItemState == ItemProcessingState.EXECUTED;
}
/** returns true if the request needs to wait for a mapping update to arrive from the master */
public boolean requiresWaitingForMappingUpdate() {
return currentItemState == ItemProcessingState.WAIT_FOR_MAPPING_UPDATE;
}
/** returns true if the current request should be retried without waiting for an external event */
public boolean requiresImmediateRetry() {
return currentItemState == ItemProcessingState.IMMEDIATE_RETRY;
}
/**
* returns true if the current request has been completed and it's result translated to a user
* facing response
*/
public boolean isCompleted() {
return currentItemState == ItemProcessingState.COMPLETED;
}
/**
* returns true if the current request is in INITIAL state
*/
public boolean isInitial() {
return currentItemState == ItemProcessingState.INITIAL;
}
/**
* returns true if {@link #advance()} has moved the current item beyond the
* end of the {@link BulkShardRequest#items()} array.
*/
public boolean hasMoreOperationsToExecute() {
return currentIndex < request.items().length;
}
/** returns the name of the index the current request used */
public String getConcreteIndex() {
return getCurrentItem().index();
}
/** returns any primary response that was set by a previous primary */
public BulkItemResponse getPreviousPrimaryResponse() {
return getCurrentItem().getPrimaryResponse();
}
/** returns a translog location that is needed to be synced in order to persist all operations executed so far */
public Translog.Location getLocationToSync() {
assert hasMoreOperationsToExecute() == false;
// we always get to the end of the list by using advance, which in turn sets the state to INITIAL
assert assertInvariants(ItemProcessingState.INITIAL);
return locationToSync;
}
private BulkItemRequest getCurrentItem() {
return request.items()[currentIndex];
}
/** returns the primary shard */
public IndexShard getPrimary() {
return primary;
}
/**
* sets the request that should actually be executed on the primary. This can be different then the request
* received from the user (specifically, an update request is translated to an indexing or delete request).
*/
public void setRequestToExecute(DocWriteRequest writeRequest) {
assert assertInvariants(ItemProcessingState.INITIAL);
requestToExecute = writeRequest;
currentItemState = ItemProcessingState.TRANSLATED;
assert assertInvariants(ItemProcessingState.TRANSLATED);
}
/** returns the request that should be executed on the shard. */
public <T extends DocWriteRequest<T>> T getRequestToExecute() {
assert assertInvariants(ItemProcessingState.TRANSLATED);
return (T) requestToExecute;
}
/** indicates that the current operation can not be completed and needs to wait for a new mapping from the master */
public void markAsRequiringMappingUpdate() {
assert assertInvariants(ItemProcessingState.TRANSLATED);
currentItemState = ItemProcessingState.WAIT_FOR_MAPPING_UPDATE;
requestToExecute = null;
assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE);
}
/** resets the current item state, prepare for a new execution */
public void resetForExecutionForRetry() {
assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE, ItemProcessingState.EXECUTED);
currentItemState = ItemProcessingState.INITIAL;
requestToExecute = null;
executionResult = null;
assertInvariants(ItemProcessingState.INITIAL);
}
/** completes the operation without doing anything on the primary */
public void markOperationAsNoOp(DocWriteResponse response) {
assertInvariants(ItemProcessingState.INITIAL);
executionResult = new BulkItemResponse(getCurrentItem().id(), getCurrentItem().request().opType(), response);
currentItemState = ItemProcessingState.EXECUTED;
assertInvariants(ItemProcessingState.EXECUTED);
}
/** indicates that the operation needs to be failed as the required mapping didn't arrive in time */
public void failOnMappingUpdate(Exception cause) {
assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE);
currentItemState = ItemProcessingState.EXECUTED;
final DocWriteRequest docWriteRequest = getCurrentItem().request();
executionResult = new BulkItemResponse(getCurrentItem().id(), docWriteRequest.opType(),
// Make sure to use getCurrentItem().index() here, if you use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
new BulkItemResponse.Failure(getCurrentItem().index(), docWriteRequest.type(), docWriteRequest.id(), cause));
markAsCompleted(executionResult);
}
/** the current operation has been executed on the primary with the specified result */
public void markOperationAsExecuted(Engine.Result result) {
assertInvariants(ItemProcessingState.TRANSLATED);
final BulkItemRequest current = getCurrentItem();
DocWriteRequest docWriteRequest = getRequestToExecute();
switch (result.getResultType()) {
case SUCCESS:
final DocWriteResponse response;
if (result.getOperationType() == Engine.Operation.TYPE.INDEX) {
Engine.IndexResult indexResult = (Engine.IndexResult) result;
response = new IndexResponse(primary.shardId(), requestToExecute.type(), requestToExecute.id(),
result.getSeqNo(), result.getTerm(), indexResult.getVersion(), indexResult.isCreated());
} else if (result.getOperationType() == Engine.Operation.TYPE.DELETE) {
Engine.DeleteResult deleteResult = (Engine.DeleteResult) result;
response = new DeleteResponse(primary.shardId(), requestToExecute.type(), requestToExecute.id(),
deleteResult.getSeqNo(), result.getTerm(), deleteResult.getVersion(), deleteResult.isFound());
} else {
throw new AssertionError("unknown result type :" + result.getResultType());
}
executionResult = new BulkItemResponse(current.id(), current.request().opType(), response);
// set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though.
executionResult.getResponse().setShardInfo(new ReplicationResponse.ShardInfo());
locationToSync = TransportWriteAction.locationToSync(locationToSync, result.getTranslogLocation());
break;
case FAILURE:
executionResult = new BulkItemResponse(current.id(), docWriteRequest.opType(),
// Make sure to use request.index() here, if you
// use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(),
result.getFailure(), result.getSeqNo()));
break;
default:
throw new AssertionError("unknown result type for " + getCurrentItem() + ": " + result.getResultType());
}
currentItemState = ItemProcessingState.EXECUTED;
}
/** finishes the execution of the current request, with the response that should be returned to the user */
public void markAsCompleted(BulkItemResponse translatedResponse) {
assertInvariants(ItemProcessingState.EXECUTED);
assert executionResult == null || translatedResponse.getItemId() == executionResult.getItemId();
assert translatedResponse.getItemId() == getCurrentItem().id();
if (translatedResponse.isFailed() == false && requestToExecute != getCurrent()) {
request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute);
}
getCurrentItem().setPrimaryResponse(translatedResponse);
currentItemState = ItemProcessingState.COMPLETED;
advance();
}
/** builds the bulk shard response to return to the user */
public BulkShardResponse buildShardResponse() {
assert hasMoreOperationsToExecute() == false;
return new BulkShardResponse(request.shardId(),
Arrays.stream(request.items()).map(BulkItemRequest::getPrimaryResponse).toArray(BulkItemResponse[]::new));
}
private boolean assertInvariants(ItemProcessingState... expectedCurrentState) {
assert Arrays.asList(expectedCurrentState).contains(currentItemState):
"expected current state [" + currentItemState + "] to be one of " + Arrays.toString(expectedCurrentState);
assert currentIndex >= 0 : currentIndex;
assert retryCounter >= 0 : retryCounter;
switch (currentItemState) {
case INITIAL:
assert requestToExecute == null : requestToExecute;
assert executionResult == null : executionResult;
break;
case TRANSLATED:
assert requestToExecute != null;
assert executionResult == null : executionResult;
break;
case WAIT_FOR_MAPPING_UPDATE:
assert requestToExecute == null;
assert executionResult == null : executionResult;
break;
case IMMEDIATE_RETRY:
assert requestToExecute != null;
assert executionResult == null : executionResult;
break;
case EXECUTED:
// requestToExecute can be null if the update ended up as NOOP
assert executionResult != null;
break;
case COMPLETED:
assert requestToExecute != null;
assert executionResult != null;
assert getCurrentItem().getPrimaryResponse() != null;
break;
}
return true;
}
}

View File

@ -29,30 +29,34 @@ import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
@ -60,12 +64,14 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
@ -108,174 +114,167 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
} }
@Override @Override
public WritePrimaryResult<BulkShardRequest, BulkShardResponse> shardOperationOnPrimary( protected WritePrimaryResult<BulkShardRequest, BulkShardResponse> shardOperationOnPrimary(BulkShardRequest request, IndexShard primary)
BulkShardRequest request, IndexShard primary) throws Exception { throws Exception {
return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, new ConcreteMappingUpdatePerformer()); ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
CheckedRunnable<Exception> waitForMappingUpdate = () -> {
PlainActionFuture<Void> waitingFuture = new PlainActionFuture<>();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
waitingFuture.onResponse(null);
}
@Override
public void onClusterServiceClose() {
waitingFuture.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
waitingFuture.onFailure(
new MapperException("timed out while waiting for a dynamic mapping update"));
}
});
waitingFuture.get();
};
return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis,
new ConcreteMappingUpdatePerformer(), waitForMappingUpdate);
} }
public static WritePrimaryResult<BulkShardRequest, BulkShardResponse> performOnPrimary( public static WritePrimaryResult<BulkShardRequest, BulkShardResponse> performOnPrimary(
BulkShardRequest request, BulkShardRequest request,
IndexShard primary, IndexShard primary,
UpdateHelper updateHelper, UpdateHelper updateHelper,
LongSupplier nowInMillisSupplier, LongSupplier nowInMillisSupplier,
MappingUpdatePerformer mappingUpdater) throws Exception { MappingUpdatePerformer mappingUpdater,
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData(); CheckedRunnable<Exception> waitForMappingUpdate) throws Exception {
Translog.Location location = null; BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary);
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) { return performOnPrimary(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate);
if (isAborted(request.items()[requestIndex].getPrimaryResponse()) == false) {
location = executeBulkItemRequest(metaData, primary, request, location, requestIndex,
updateHelper, nowInMillisSupplier, mappingUpdater);
}
}
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
BulkItemRequest[] items = request.items();
for (int i = 0; i < items.length; i++) {
responses[i] = items[i].getPrimaryResponse();
}
BulkShardResponse response = new BulkShardResponse(request.shardId(), responses);
return new WritePrimaryResult<>(request, response, location, null, primary, logger);
} }
private static BulkItemResultHolder executeIndexRequest(final IndexRequest indexRequest, private static WritePrimaryResult<BulkShardRequest, BulkShardResponse> performOnPrimary(
final BulkItemRequest bulkItemRequest, BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
final IndexShard primary, MappingUpdatePerformer mappingUpdater, CheckedRunnable<Exception> waitForMappingUpdate) throws Exception {
final MappingUpdatePerformer mappingUpdater) throws Exception {
Engine.IndexResult indexResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater); while (context.hasMoreOperationsToExecute()) {
switch (indexResult.getResultType()) { executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate);
case SUCCESS: assert context.isInitial(); // either completed and moved to next or reset
IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
indexResult.getSeqNo(), indexResult.getTerm(), indexResult.getVersion(), indexResult.isCreated());
return new BulkItemResultHolder(response, indexResult, bulkItemRequest);
case FAILURE:
return new BulkItemResultHolder(null, indexResult, bulkItemRequest);
default:
throw new AssertionError("unknown result type for " + indexRequest + ": " + indexResult.getResultType());
}
}
private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest deleteRequest,
final BulkItemRequest bulkItemRequest,
final IndexShard primary,
final MappingUpdatePerformer mappingUpdater) throws Exception {
Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater);
switch (deleteResult.getResultType()) {
case SUCCESS:
DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(),
deleteResult.getSeqNo(), deleteResult.getTerm(), deleteResult.getVersion(), deleteResult.isFound());
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest);
case FAILURE:
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
case MAPPING_UPDATE_REQUIRED:
throw new AssertionError("delete operation leaked a mapping update " + deleteRequest);
default:
throw new AssertionError("unknown result type for " + deleteRequest + ": " + deleteResult.getResultType());
}
}
static Translog.Location calculateTranslogLocation(final Translog.Location originalLocation,
final BulkItemResultHolder bulkItemResult) {
final Engine.Result operationResult = bulkItemResult.operationResult;
if (operationResult != null && operationResult.getResultType() == Engine.Result.Type.SUCCESS) {
return locationToSync(originalLocation, operationResult.getTranslogLocation());
} else {
return originalLocation;
}
}
// Visible for unit testing
/**
* Creates a BulkItemResponse for the primary operation and returns it. If no bulk response is
* needed (because one already exists and the operation failed), then return null.
*/
static BulkItemResponse createPrimaryResponse(BulkItemResultHolder bulkItemResult,
final DocWriteRequest.OpType opType,
BulkShardRequest request) {
final Engine.Result operationResult = bulkItemResult.operationResult;
final DocWriteResponse response = bulkItemResult.response;
final BulkItemRequest replicaRequest = bulkItemResult.replicaRequest;
if (operationResult == null) { // in case of noop update operation
assert response.getResult() == DocWriteResponse.Result.NOOP : "only noop updates can have a null operation";
return new BulkItemResponse(replicaRequest.id(), opType, response);
} else if (operationResult.getResultType() == Engine.Result.Type.SUCCESS) {
BulkItemResponse primaryResponse = new BulkItemResponse(replicaRequest.id(), opType, response);
// set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though.
primaryResponse.getResponse().setShardInfo(new ShardInfo());
return primaryResponse;
} else if (operationResult.getResultType() == Engine.Result.Type.FAILURE) {
DocWriteRequest<?> docWriteRequest = replicaRequest.request();
Exception failure = operationResult.getFailure();
if (isConflictException(failure)) {
logger.trace(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
request.shardId(), docWriteRequest.opType().getLowercase(), request), failure);
} else {
logger.debug(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
request.shardId(), docWriteRequest.opType().getLowercase(), request), failure);
}
// if it's a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the failed execution
if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) {
return new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(),
// Make sure to use request.index() here, if you
// use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(),
failure, operationResult.getSeqNo()));
} else {
assert replicaRequest.getPrimaryResponse() != null : "replica request must have a primary response";
return null;
}
} else {
throw new AssertionError("unknown result type for " + request + ": " + operationResult.getResultType());
} }
return new WritePrimaryResult<>(context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(),
null, context.getPrimary(), logger);
} }
/** Executes bulk item requests and handles request execution exceptions */ /** Executes bulk item requests and handles request execution exceptions */
static Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexShard primary, static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
BulkShardRequest request, Translog.Location location, MappingUpdatePerformer mappingUpdater, CheckedRunnable<Exception> waitForMappingUpdate)
int requestIndex, UpdateHelper updateHelper, throws Exception {
LongSupplier nowInMillisSupplier, final DocWriteRequest.OpType opType = context.getCurrent().opType();
final MappingUpdatePerformer mappingUpdater) throws Exception {
final DocWriteRequest<?> itemRequest = request.items()[requestIndex].request(); final UpdateHelper.Result updateResult;
final DocWriteRequest.OpType opType = itemRequest.opType(); if (opType == DocWriteRequest.OpType.UPDATE) {
final BulkItemResultHolder responseHolder; final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
switch (itemRequest.opType()) { try {
case CREATE: updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);
case INDEX: } catch (Exception failure) {
responseHolder = executeIndexRequest((IndexRequest) itemRequest, // we may fail translating a update to index or delete operation
request.items()[requestIndex], primary, mappingUpdater); // we use index result to communicate failure while translating update request
break; final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO);
case UPDATE: context.setRequestToExecute(updateRequest);
responseHolder = executeUpdateRequest((UpdateRequest) itemRequest, primary, metaData, request, context.markOperationAsExecuted(result);
requestIndex, updateHelper, nowInMillisSupplier, mappingUpdater); context.markAsCompleted(context.getExecutionResult());
break; return;
case DELETE: }
responseHolder = executeDeleteRequest((DeleteRequest) itemRequest, request.items()[requestIndex], primary, mappingUpdater); // execute translated update request
break; switch (updateResult.getResponseResult()) {
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found"); case CREATED:
case UPDATED:
IndexRequest indexRequest = updateResult.action();
IndexMetaData metaData = context.getPrimary().indexSettings().getIndexMetaData();
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(metaData.getCreationVersion(), mappingMd, updateRequest.concreteIndex());
context.setRequestToExecute(indexRequest);
break;
case DELETED:
context.setRequestToExecute(updateResult.action());
break;
case NOOP:
context.markOperationAsNoOp(updateResult.action());
context.markAsCompleted(context.getExecutionResult());
return;
default:
throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult());
}
} else {
context.setRequestToExecute(context.getCurrent());
updateResult = null;
} }
final BulkItemRequest replicaRequest = responseHolder.replicaRequest; assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state
// update the bulk item request because update request execution can mutate the bulk item request if (context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE) {
request.items()[requestIndex] = replicaRequest; executeDeleteRequestOnPrimary(context, mappingUpdater);
} else {
// Retrieve the primary response, and update the replica request with the primary's response executeIndexRequestOnPrimary(context, mappingUpdater);
BulkItemResponse primaryResponse = createPrimaryResponse(responseHolder, opType, request);
if (primaryResponse != null) {
replicaRequest.setPrimaryResponse(primaryResponse);
} }
// Update the translog with the new location, if needed if (context.requiresWaitingForMappingUpdate()) {
return calculateTranslogLocation(location, responseHolder); try {
waitForMappingUpdate.run();
context.resetForExecutionForRetry();
} catch (Exception e) {
context.failOnMappingUpdate(e);
}
return;
}
assert context.isOperationExecuted();
if (opType == DocWriteRequest.OpType.UPDATE &&
context.getExecutionResult().isFailed() &&
isConflictException(context.getExecutionResult().getFailure().getCause())) {
final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
if (context.getRetryCounter() < updateRequest.retryOnConflict()) {
context.resetForExecutionForRetry();
return;
}
}
finalizePrimaryOperationOnCompletion(context, opType, updateResult);
} }
private static boolean isAborted(BulkItemResponse response) { private static void finalizePrimaryOperationOnCompletion(BulkPrimaryExecutionContext context, DocWriteRequest.OpType opType,
return response != null && response.isFailed() && response.getFailure().isAborted(); UpdateHelper.Result updateResult) {
final BulkItemResponse executionResult = context.getExecutionResult();
if (opType == DocWriteRequest.OpType.UPDATE) {
final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
context.markAsCompleted(
processUpdateResponse(updateRequest, context.getConcreteIndex(), executionResult, updateResult));
} else if (executionResult.isFailed()) {
final Exception failure = executionResult.getFailure().getCause();
final DocWriteRequest docWriteRequest = context.getCurrent();
if (TransportShardBulkAction.isConflictException(failure)) {
logger.trace(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure);
} else {
logger.debug(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure);
}
final BulkItemResponse primaryResponse;
// if it's a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the failed execution
if (TransportShardBulkAction.isConflictException(failure) && context.getPreviousPrimaryResponse() != null) {
primaryResponse = context.getPreviousPrimaryResponse();
} else {
primaryResponse = executionResult;
}
context.markAsCompleted(primaryResponse);
} else {
context.markAsCompleted(executionResult);
}
assert context.isInitial();
} }
private static boolean isConflictException(final Exception e) { private static boolean isConflictException(final Exception e) {
@ -285,150 +284,50 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
/** /**
* Creates a new bulk item result from the given requests and result of performing the update operation on the shard. * Creates a new bulk item result from the given requests and result of performing the update operation on the shard.
*/ */
static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex, static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex,
final Engine.Result result, final UpdateHelper.Result translate, BulkItemResponse operationResponse,
final IndexShard primary, final int bulkReqId) { final UpdateHelper.Result translate) {
assert result.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "failed result should not have a sequence number";
Engine.Operation.TYPE opType = result.getOperationType(); final BulkItemResponse response;
DocWriteResponse.Result translatedResult = translate.getResponseResult();
if (operationResponse.isFailed()) {
response = new BulkItemResponse(operationResponse.getItemId(), DocWriteRequest.OpType.UPDATE, operationResponse.getFailure());
} else {
final UpdateResponse updateResponse; final UpdateResponse updateResponse;
final BulkItemRequest replicaRequest; if (translatedResult == DocWriteResponse.Result.CREATED || translatedResult == DocWriteResponse.Result.UPDATED) {
final IndexRequest updateIndexRequest = translate.action();
final IndexResponse indexResponse = operationResponse.getResponse();
updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(),
indexResponse.getType(), indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm(),
indexResponse.getVersion(), indexResponse.getResult());
// enrich update response and set translated update (index/delete) request for replica execution in bulk items if (updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) {
if (opType == Engine.Operation.TYPE.INDEX) { final BytesReference indexSourceAsBytes = updateIndexRequest.source();
assert result instanceof Engine.IndexResult : result.getClass(); final Tuple<XContentType, Map<String, Object>> sourceAndContent =
final IndexRequest updateIndexRequest = translate.action();
final IndexResponse indexResponse = new IndexResponse(primary.shardId(), updateIndexRequest.type(), updateIndexRequest.id(),
result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(),
indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm(), indexResponse.getVersion(),
indexResponse.getResult());
if (updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) {
final BytesReference indexSourceAsBytes = updateIndexRequest.source();
final Tuple<XContentType, Map<String, Object>> sourceAndContent =
XContentHelper.convertToMap(indexSourceAsBytes, true, updateIndexRequest.getContentType()); XContentHelper.convertToMap(indexSourceAsBytes, true, updateIndexRequest.getContentType());
updateResponse.setGetResult(UpdateHelper.extractGetResult(updateRequest, concreteIndex, updateResponse.setGetResult(UpdateHelper.extractGetResult(updateRequest, concreteIndex,
indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
} }
// set translated request as replica request } else if (translatedResult == DocWriteResponse.Result.DELETED) {
replicaRequest = new BulkItemRequest(bulkReqId, updateIndexRequest); final DeleteResponse deleteResponse = operationResponse.getResponse();
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(),
} else if (opType == Engine.Operation.TYPE.DELETE) {
assert result instanceof Engine.DeleteResult : result.getClass();
final DeleteRequest updateDeleteRequest = translate.action();
final DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(), updateDeleteRequest.type(), updateDeleteRequest.id(),
result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(),
deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(), deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(),
deleteResponse.getVersion(), deleteResponse.getResult()); deleteResponse.getVersion(), deleteResponse.getResult());
final GetResult getResult = UpdateHelper.extractGetResult(updateRequest, concreteIndex, deleteResponse.getVersion(), final GetResult getResult = UpdateHelper.extractGetResult(updateRequest, concreteIndex, deleteResponse.getVersion(),
translate.updatedSourceAsMap(), translate.updateSourceContentType(), null); translate.updatedSourceAsMap(), translate.updateSourceContentType(), null);
updateResponse.setGetResult(getResult); updateResponse.setGetResult(getResult);
// set translated request as replica request } else {
replicaRequest = new BulkItemRequest(bulkReqId, updateDeleteRequest); throw new IllegalArgumentException("unknown operation type: " + translatedResult);
} else {
throw new IllegalArgumentException("unknown operation type: " + opType);
}
return new BulkItemResultHolder(updateResponse, result, replicaRequest);
}
/**
* Executes update request once, delegating to a index or delete operation after translation.
* NOOP updates are indicated by returning a <code>null</code> operation in {@link BulkItemResultHolder}
*/
static BulkItemResultHolder executeUpdateRequestOnce(UpdateRequest updateRequest, IndexShard primary,
IndexMetaData metaData, String concreteIndex,
UpdateHelper updateHelper, LongSupplier nowInMillis,
BulkItemRequest primaryItemRequest, int bulkReqId,
final MappingUpdatePerformer mappingUpdater) throws Exception {
final UpdateHelper.Result translate;
// translate update request
try {
translate = updateHelper.prepare(updateRequest, primary, nowInMillis);
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
final Engine.Result result = primary.getFailedIndexResult(failure, updateRequest.version());
return new BulkItemResultHolder(null, result, primaryItemRequest);
}
final Engine.Result result;
// execute translated update request
switch (translate.getResponseResult()) {
case CREATED:
case UPDATED:
IndexRequest indexRequest = translate.action();
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(metaData.getCreationVersion(), mappingMd, concreteIndex);
result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
break;
case DELETED:
DeleteRequest deleteRequest = translate.action();
result = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater);
break;
case NOOP:
primary.noopUpdate(updateRequest.type());
result = null;
break;
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
}
if (result == null) {
// this is a noop operation
final UpdateResponse updateResponse = translate.action();
return new BulkItemResultHolder(updateResponse, result, primaryItemRequest);
} else if (result.getResultType() == Engine.Result.Type.FAILURE) {
// There was a result, and the result was a failure
return new BulkItemResultHolder(null, result, primaryItemRequest);
} else if (result.getResultType() == Engine.Result.Type.SUCCESS) {
// It was successful, we need to construct the response and return it
return processUpdateResponse(updateRequest, concreteIndex, result, translate, primary, bulkReqId);
} else {
throw new AssertionError("unknown result type for " + updateRequest + ": " + result.getResultType());
}
}
/**
* Executes update request, delegating to a index or delete operation after translation,
* handles retries on version conflict and constructs update response
* NOOP updates are indicated by returning a <code>null</code> operation
* in {@link BulkItemResultHolder}
*/
private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateRequest, IndexShard primary,
IndexMetaData metaData, BulkShardRequest request,
int requestIndex, UpdateHelper updateHelper,
LongSupplier nowInMillis,
final MappingUpdatePerformer mappingUpdater) throws Exception {
BulkItemRequest primaryItemRequest = request.items()[requestIndex];
assert primaryItemRequest.request() == updateRequest
: "expected bulk item request to contain the original update request, got: " +
primaryItemRequest.request() + " and " + updateRequest;
BulkItemResultHolder holder = null;
// There must be at least one attempt
int maxAttempts = Math.max(1, updateRequest.retryOnConflict());
for (int attemptCount = 0; attemptCount < maxAttempts; attemptCount++) {
holder = executeUpdateRequestOnce(updateRequest, primary, metaData, request.index(), updateHelper,
nowInMillis, primaryItemRequest, request.items()[requestIndex].id(), mappingUpdater);
// It was either a successful request, or it was a non-conflict failure
if (holder.isVersionConflict() == false) {
return holder;
} }
response = new BulkItemResponse(operationResponse.getItemId(), DocWriteRequest.OpType.UPDATE, updateResponse);
} }
// We ran out of tries and haven't returned a valid bulk item response, so return the last one generated return response;
return holder;
} }
/** Modes for executing item request on replica depending on corresponding primary execution result */ /** Modes for executing item request on replica depending on corresponding primary execution result */
public enum ReplicaItemExecutionMode { public enum ReplicaItemExecutionMode {
@ -451,6 +350,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
/** /**
* Determines whether a bulk item request should be executed on the replica. * Determines whether a bulk item request should be executed on the replica.
*
* @return {@link ReplicaItemExecutionMode#NORMAL} upon normal primary execution with no failures * @return {@link ReplicaItemExecutionMode#NORMAL} upon normal primary execution with no failures
* {@link ReplicaItemExecutionMode#FAILURE} upon primary execution failure after sequence no generation * {@link ReplicaItemExecutionMode#FAILURE} upon primary execution failure after sequence no generation
* {@link ReplicaItemExecutionMode#NOOP} upon primary execution failure before sequence no generation or * {@link ReplicaItemExecutionMode#NOOP} upon primary execution failure before sequence no generation or
@ -461,8 +361,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]"; assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]";
if (primaryResponse.isFailed()) { if (primaryResponse.isFailed()) {
return primaryResponse.getFailure().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO return primaryResponse.getFailure().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
? ReplicaItemExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op ? ReplicaItemExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op
: ReplicaItemExecutionMode.NOOP; // no seq no generated, ignore replication : ReplicaItemExecutionMode.NOOP; // no seq no generated, ignore replication
} else { } else {
// TODO: once we know for sure that every operation that has been processed on the primary is assigned a seq# // TODO: once we know for sure that every operation that has been processed on the primary is assigned a seq#
// (i.e., all nodes on the cluster are on v6.0.0 or higher) we can use the existence of a seq# to indicate whether // (i.e., all nodes on the cluster are on v6.0.0 or higher) we can use the existence of a seq# to indicate whether
@ -470,8 +370,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
// ReplicaItemExecutionMode enum and have a simple boolean check for seq != UNASSIGNED_SEQ_NO which will work for // ReplicaItemExecutionMode enum and have a simple boolean check for seq != UNASSIGNED_SEQ_NO which will work for
// both failures and indexing operations. // both failures and indexing operations.
return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP
? ReplicaItemExecutionMode.NORMAL // execution successful on primary ? ReplicaItemExecutionMode.NORMAL // execution successful on primary
: ReplicaItemExecutionMode.NOOP; // ignore replication : ReplicaItemExecutionMode.NOOP; // ignore replication
} }
} }
@ -527,7 +427,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
break; break;
case DELETE: case DELETE:
DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest; DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(), result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
deleteRequest.type(), deleteRequest.id()); deleteRequest.type(), deleteRequest.id());
break; break;
default: default:
@ -550,56 +450,62 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
} }
/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */ /** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary, private static void executeIndexRequestOnPrimary(BulkPrimaryExecutionContext context,
MappingUpdatePerformer mappingUpdater) throws Exception { MappingUpdatePerformer mappingUpdater) throws Exception {
final IndexRequest request = context.getRequestToExecute();
final IndexShard primary = context.getPrimary();
final SourceToParse sourceToParse = final SourceToParse sourceToParse =
SourceToParse.source(request.index(), request.type(), request.id(), request.source(), request.getContentType()) SourceToParse.source(request.index(), request.type(), request.id(), request.source(), request.getContentType())
.routing(request.routing()); .routing(request.routing());
return executeOnPrimaryWhileHandlingMappingUpdates(primary.shardId(), request.type(), executeOnPrimaryWhileHandlingMappingUpdates(context,
() -> () ->
primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse, primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
request.getAutoGeneratedTimestamp(), request.isRetry()), request.getAutoGeneratedTimestamp(), request.isRetry()),
e -> primary.getFailedIndexResult(e, request.version()), e -> primary.getFailedIndexResult(e, request.version()),
mappingUpdater); context::markOperationAsExecuted,
mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type()));
} }
private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary, private static void executeDeleteRequestOnPrimary(BulkPrimaryExecutionContext context,
MappingUpdatePerformer mappingUpdater) throws Exception { MappingUpdatePerformer mappingUpdater) throws Exception {
return executeOnPrimaryWhileHandlingMappingUpdates(primary.shardId(), request.type(), final DeleteRequest request = context.getRequestToExecute();
final IndexShard primary = context.getPrimary();
executeOnPrimaryWhileHandlingMappingUpdates(context,
() -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType()), () -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType()),
e -> primary.getFailedDeleteResult(e, request.version()), e -> primary.getFailedDeleteResult(e, request.version()),
mappingUpdater); context::markOperationAsExecuted,
mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type()));
} }
private static <T extends Engine.Result> T executeOnPrimaryWhileHandlingMappingUpdates(ShardId shardId, String type, private static <T extends Engine.Result> void executeOnPrimaryWhileHandlingMappingUpdates(
CheckedSupplier<T, IOException> toExecute, BulkPrimaryExecutionContext context, CheckedSupplier<T, IOException> toExecute,
Function<Exception, T> onError, Function<Exception, T> exceptionToResult, Consumer<T> onComplete, Consumer<Mapping> mappingUpdater)
MappingUpdatePerformer mappingUpdater)
throws IOException { throws IOException {
T result = toExecute.get(); T result = toExecute.get();
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
// try to update the mappings and try again. // try to update the mappings and try again.
try { try {
mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), shardId, type); mappingUpdater.accept(result.getRequiredMappingUpdate());
} catch (Exception e) { } catch (Exception e) {
// failure to update the mapping should translate to a failure of specific requests. Other requests // failure to update the mapping should translate to a failure of specific requests. Other requests
// still need to be executed and replicated. // still need to be executed and replicated.
return onError.apply(e); onComplete.accept(exceptionToResult.apply(e));
return;
} }
// TODO - we can fall back to a wait for cluster state update but I'm keeping the logic the same for now
result = toExecute.get(); result = toExecute.get();
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
// double mapping update. We assume that the successful mapping update wasn't yet processed on the node // double mapping update. We assume that the successful mapping update wasn't yet processed on the node
// and retry the entire request again. // and retry the entire request again.
throw new ReplicationOperation.RetryOnPrimaryException(shardId, context.markAsRequiringMappingUpdate();
"Dynamic mappings are not available on the node that holds the primary yet"); } else {
onComplete.accept(result);
} }
} else {
onComplete.accept(result);
} }
assert result.getFailure() instanceof ReplicationOperation.RetryOnPrimaryException == false :
"IndexShard shouldn't use RetryOnPrimaryException. got " + result.getFailure();
return result;
} }
class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer { class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer {

View File

@ -83,7 +83,7 @@ public abstract class TransportWriteAction<
return location; return location;
} }
protected static Location locationToSync(Location current, Location next) { public static Location locationToSync(Location current, Location next) {
/* here we are moving forward in the translog with each operation. Under the hood this might /* here we are moving forward in the translog with each operation. Under the hood this might
* cross translog files which is ok since from the user perspective the translog is like a * cross translog files which is ok since from the user perspective the translog is like a
* tape where only the highest location needs to be fsynced in order to sync all previous * tape where only the highest location needs to be fsynced in order to sync all previous

View File

@ -0,0 +1,158 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.TransportShardBulkActionTests.FakeDeleteResult;
import org.elasticsearch.action.bulk.TransportShardBulkActionTests.FakeIndexResult;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class BulkPrimaryExecutionContextTests extends ESTestCase {
public void testAbortedSkipped() {
BulkShardRequest shardRequest = generateRandomRequest();
ArrayList<DocWriteRequest<?>> nonAbortedRequests = new ArrayList<>();
for (BulkItemRequest request : shardRequest.items()) {
if (randomBoolean()) {
request.abort("index", new ElasticsearchException("bla"));
} else {
nonAbortedRequests.add(request.request());
}
}
ArrayList<DocWriteRequest<?>> visitedRequests = new ArrayList<>();
for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, null);
context.hasMoreOperationsToExecute();
) {
visitedRequests.add(context.getCurrent());
context.setRequestToExecute(context.getCurrent());
// using failures prevents caring about types
context.markOperationAsExecuted(new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1));
context.markAsCompleted(context.getExecutionResult());
}
assertThat(visitedRequests, equalTo(nonAbortedRequests));
}
private BulkShardRequest generateRandomRequest() {
BulkItemRequest[] items = new BulkItemRequest[randomInt(20)];
for (int i = 0; i < items.length; i++) {
final DocWriteRequest request;
switch (randomFrom(DocWriteRequest.OpType.values())) {
case INDEX:
request = new IndexRequest("index", "_doc", "id_" + i);
break;
case CREATE:
request = new IndexRequest("index", "_doc", "id_" + i).create(true);
break;
case UPDATE:
request = new UpdateRequest("index", "_doc", "id_" + i);
break;
case DELETE:
request = new DeleteRequest("index", "_doc", "id_" + i);
break;
default:
throw new AssertionError("unknown type");
}
items[i] = new BulkItemRequest(i, request);
}
return new BulkShardRequest(new ShardId("index", "_na_", 0),
randomFrom(WriteRequest.RefreshPolicy.values()), items);
}
public void testTranslogLocation() {
BulkShardRequest shardRequest = generateRandomRequest();
Translog.Location expectedLocation = null;
final IndexShard primary = mock(IndexShard.class);
when(primary.shardId()).thenReturn(shardRequest.shardId());
long translogGen = 0;
long translogOffset = 0;
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, primary);
while (context.hasMoreOperationsToExecute()) {
final Engine.Result result;
final DocWriteRequest<?> current = context.getCurrent();
final boolean failure = rarely();
if (frequently()) {
translogGen += randomIntBetween(1, 4);
translogOffset = 0;
} else {
translogOffset += randomIntBetween(200, 400);
}
Translog.Location location = new Translog.Location(translogGen, translogOffset, randomInt(200));
switch (current.opType()) {
case INDEX:
case CREATE:
context.setRequestToExecute(current);
if (failure) {
result = new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1);
} else {
result = new FakeIndexResult(1, 1, randomLongBetween(0, 200), randomBoolean(), location);
}
break;
case UPDATE:
context.setRequestToExecute(new IndexRequest(current.index(), current.type(), current.id()));
if (failure) {
result = new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1, 1);
} else {
result = new FakeIndexResult(1, 1, randomLongBetween(0, 200), randomBoolean(), location);
}
break;
case DELETE:
context.setRequestToExecute(current);
if (failure) {
result = new Engine.DeleteResult(new ElasticsearchException("bla"), 1, 1);
} else {
result = new FakeDeleteResult(1, 1, randomLongBetween(0, 200), randomBoolean(), location);
}
break;
default:
throw new AssertionError("unknown type:" + current.opType());
}
if (failure == false) {
expectedLocation = location;
}
context.markOperationAsExecuted(result);
context.markAsCompleted(context.getExecutionResult());
}
assertThat(context.getLocationToSync(), equalTo(expectedLocation));
}
}

View File

@ -405,16 +405,20 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
assertThat("expected no failures but got: " + response.buildFailureMessage(), response.hasFailures(), equalTo(false)); assertThat("expected no failures but got: " + response.buildFailureMessage(), response.hasFailures(), equalTo(false));
assertThat(response.getItems().length, equalTo(numDocs)); assertThat(response.getItems().length, equalTo(numDocs));
for (int i = 0; i < numDocs; i++) { for (int i = 0; i < numDocs; i++) {
assertThat(response.getItems()[i].getItemId(), equalTo(i)); final BulkItemResponse itemResponse = response.getItems()[i];
assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i))); assertThat(itemResponse.getFailure(), nullValue());
assertThat(response.getItems()[i].getIndex(), equalTo("test")); assertThat(itemResponse.isFailed(), equalTo(false));
assertThat(response.getItems()[i].getType(), equalTo("type1")); assertThat(itemResponse.getItemId(), equalTo(i));
assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE)); assertThat(itemResponse.getId(), equalTo(Integer.toString(i)));
assertThat(itemResponse.getIndex(), equalTo("test"));
assertThat(itemResponse.getType(), equalTo("type1"));
assertThat(itemResponse.getOpType(), equalTo(OpType.UPDATE));
for (int j = 0; j < 5; j++) { for (int j = 0; j < 5; j++) {
GetResponse getResponse = client().prepareGet("test", "type1", Integer.toString(i)).get(); GetResponse getResponse = client().prepareGet("test", "type1", Integer.toString(i)).get();
assertThat(getResponse.isExists(), equalTo(false)); assertThat(getResponse.isExists(), equalTo(false));
} }
} }
assertThat(response.hasFailures(), equalTo(false));
} }
public void testBulkIndexingWhileInitializing() throws Exception { public void testBulkIndexingWhileInitializing() throws Exception {

View File

@ -91,6 +91,7 @@ public class ShardInfoIT extends ESIntegTestCase {
BulkResponse bulkResponse = bulkRequestBuilder.get(); BulkResponse bulkResponse = bulkRequestBuilder.get();
for (BulkItemResponse item : bulkResponse) { for (BulkItemResponse item : bulkResponse) {
assertThat(item.getFailure(), nullValue());
assertThat(item.isFailed(), equalTo(false)); assertThat(item.isFailed(), equalTo(false));
assertShardInfo(item.getResponse()); assertShardInfo(item.getResponse());
} }

View File

@ -629,7 +629,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
final TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse> result; final TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse> result;
try (Releasable ignored = permitAcquiredFuture.actionGet()) { try (Releasable ignored = permitAcquiredFuture.actionGet()) {
MappingUpdatePerformer noopMappingUpdater = (update, shardId, type) -> { }; MappingUpdatePerformer noopMappingUpdater = (update, shardId, type) -> { };
result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater); result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater,
null);
} }
TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger); TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger);
return result; return result;