Handle transient write failure in transport replication action

This commit is contained in:
Areek Zillur 2016-10-13 22:42:27 -04:00
parent 415fdee828
commit 71dc4178b9
9 changed files with 345 additions and 208 deletions

View File

@ -21,13 +21,10 @@ package org.elasticsearch.action.bulk;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
@ -50,11 +47,9 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.Translog.Location;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
@ -62,6 +57,8 @@ import org.elasticsearch.transport.TransportService;
import java.util.Map;
import static org.elasticsearch.action.delete.TransportDeleteAction.*;
import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary;
import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException;
import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException;
@ -102,7 +99,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
@Override
protected WriteResult<BulkShardResponse> onPrimaryShard(BulkShardRequest request, IndexShard primary) throws Exception {
protected PrimaryOperationResult<BulkShardResponse> onPrimaryShard(BulkShardRequest request, IndexShard primary) throws Exception {
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();
long[] preVersions = new long[request.items().length];
@ -118,30 +115,77 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
responses[i] = items[i].getPrimaryResponse();
}
BulkShardResponse response = new BulkShardResponse(request.shardId(), responses);
return new WriteResult<>(response, location);
return new PrimaryOperationResult<>(response, location);
}
/** Executes bulk item requests and handles request execution exceptions */
private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexShard indexShard,
private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexShard primary,
BulkShardRequest request,
long[] preVersions, VersionType[] preVersionTypes,
Translog.Location location, int requestIndex) {
Translog.Location location, int requestIndex) throws Exception {
preVersions[requestIndex] = request.items()[requestIndex].request().version();
preVersionTypes[requestIndex] = request.items()[requestIndex].request().versionType();
DocWriteRequest.OpType opType = request.items()[requestIndex].request().opType();
try {
WriteResult<? extends DocWriteResponse> writeResult = innerExecuteBulkItemRequest(metaData, indexShard,
request, requestIndex);
if (writeResult.getLocation() != null) {
location = locationToSync(location, writeResult.getLocation());
} else {
assert writeResult.getResponse().getResult() == DocWriteResponse.Result.NOOP
: "only noop operation can have null next operation";
DocWriteRequest itemRequest = request.items()[requestIndex].request();
final PrimaryOperationResult<? extends DocWriteResponse> primaryOperationResult;
switch (itemRequest.opType()) {
case CREATE:
case INDEX:
primaryOperationResult = executeIndexRequestOnPrimary(((IndexRequest) itemRequest), primary, mappingUpdatedAction);
break;
case UPDATE:
int maxAttempts = ((UpdateRequest) itemRequest).retryOnConflict();
PrimaryOperationResult<? extends DocWriteResponse> shardUpdateOperation = null;
for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) {
shardUpdateOperation = shardUpdateOperation(metaData, primary, request, requestIndex, ((UpdateRequest) itemRequest));
if (shardUpdateOperation.success()
|| shardUpdateOperation.getFailure() instanceof VersionConflictEngineException == false) {
break;
}
}
if (shardUpdateOperation == null) {
throw new IllegalStateException("version conflict exception should bubble up on last attempt");
}
primaryOperationResult = shardUpdateOperation;
break;
case DELETE:
primaryOperationResult = executeDeleteRequestOnPrimary(((DeleteRequest) itemRequest), primary);
break;
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
}
if (primaryOperationResult.success()) {
if (primaryOperationResult.getLocation() != null) {
location = locationToSync(location, primaryOperationResult.getLocation());
} else {
assert primaryOperationResult.getResponse().getResult() == DocWriteResponse.Result.NOOP
: "only noop operation can have null next operation";
}
// update the bulk item request because update request execution can mutate the bulk item request
BulkItemRequest item = request.items()[requestIndex];
// add the response
setResponse(item, new BulkItemResponse(item.id(), opType, primaryOperationResult.getResponse()));
} else {
BulkItemRequest item = request.items()[requestIndex];
DocWriteRequest docWriteRequest = item.request();
Exception failure = primaryOperationResult.getFailure();
if (isConflictException(failure)) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
request.shardId(), docWriteRequest.opType().getLowercase(), request), failure);
} else {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
request.shardId(), docWriteRequest.opType().getLowercase(), request), failure);
}
// if its a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the successful execution
if (item.getPrimaryResponse() != null && isConflictException(failure)) {
setResponse(item, item.getPrimaryResponse());
} else {
setResponse(item, new BulkItemResponse(item.id(), docWriteRequest.opType(),
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure)));
}
}
// update the bulk item request because update request execution can mutate the bulk item request
BulkItemRequest item = request.items()[requestIndex];
// add the response
setResponse(item, new BulkItemResponse(item.id(), opType, writeResult.getResponse()));
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
@ -151,59 +195,16 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
docWriteRequest.version(preVersions[j]);
docWriteRequest.versionType(preVersionTypes[j]);
}
throw (ElasticsearchException) e;
}
BulkItemRequest item = request.items()[requestIndex];
DocWriteRequest docWriteRequest = item.request();
if (isConflictException(e)) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
request.shardId(), docWriteRequest.opType().getLowercase(), request), e);
} else {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
request.shardId(), docWriteRequest.opType().getLowercase(), request), e);
}
// if its a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the successful execution
if (item.getPrimaryResponse() != null && isConflictException(e)) {
setResponse(item, item.getPrimaryResponse());
} else {
setResponse(item, new BulkItemResponse(item.id(), docWriteRequest.opType(),
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), e)));
throw e;
}
// TODO: maybe this assert is too strict, we can still get environment failures while executing write operations
assert false : "unexpected exception: " + e.getMessage() + " class:" + e.getClass().getSimpleName();
}
assert request.items()[requestIndex].getPrimaryResponse() != null;
assert preVersionTypes[requestIndex] != null;
return location;
}
private WriteResult<? extends DocWriteResponse> innerExecuteBulkItemRequest(IndexMetaData metaData, IndexShard indexShard,
BulkShardRequest request, int requestIndex) throws Exception {
DocWriteRequest itemRequest = request.items()[requestIndex].request();
switch (itemRequest.opType()) {
case CREATE:
case INDEX:
return TransportIndexAction.executeIndexRequestOnPrimary(((IndexRequest) itemRequest), indexShard, mappingUpdatedAction);
case UPDATE:
int maxAttempts = ((UpdateRequest) itemRequest).retryOnConflict();
for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) {
try {
return shardUpdateOperation(metaData, indexShard, request, requestIndex, ((UpdateRequest) itemRequest));
} catch (Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (attemptCount == maxAttempts // bubble up exception when we run out of attempts
|| (cause instanceof VersionConflictEngineException) == false) { // or when exception is not a version conflict
throw e;
}
}
}
throw new IllegalStateException("version conflict exception should bubble up on last attempt");
case DELETE:
return TransportDeleteAction.executeDeleteRequestOnPrimary(((DeleteRequest) itemRequest), indexShard);
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
}
}
private void setResponse(BulkItemRequest request, BulkItemResponse response) {
request.setPrimaryResponse(response);
if (response.isFailed()) {
@ -218,50 +219,62 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
* Executes update request, doing a get and translating update to a index or delete operation
* NOTE: all operations except NOOP, reassigns the bulk item request
*/
private WriteResult<? extends DocWriteResponse> shardUpdateOperation(IndexMetaData metaData, IndexShard indexShard,
private PrimaryOperationResult<? extends DocWriteResponse> shardUpdateOperation(IndexMetaData metaData, IndexShard primary,
BulkShardRequest request,
int requestIndex, UpdateRequest updateRequest)
throws Exception {
// Todo: capture read version conflicts, missing documents and malformed script errors in the write result due to get request
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard, threadPool::estimatedTimeInMillis);
final UpdateHelper.Result translate;
try {
translate = updateHelper.prepare(updateRequest, primary, threadPool::estimatedTimeInMillis);
} catch (Exception e) {
return new PrimaryOperationResult<>(e);
}
switch (translate.getResponseResult()) {
case CREATED:
case UPDATED:
IndexRequest indexRequest = translate.action();
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(mappingMd, allowIdGeneration, request.index());
WriteResult<IndexResponse> writeResult = TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction);
BytesReference indexSourceAsBytes = indexRequest.source();
IndexResponse indexResponse = writeResult.getResponse();
UpdateResponse update = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult());
if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) ||
(updateRequest.fields() != null && updateRequest.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
update.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
PrimaryOperationResult<IndexResponse> writeResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
if (writeResult.success()) {
BytesReference indexSourceAsBytes = indexRequest.source();
IndexResponse indexResponse = writeResult.getResponse();
UpdateResponse update = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult());
if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) ||
(updateRequest.fields() != null && updateRequest.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
update.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
}
// Replace the update request to the translated index request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
return new PrimaryOperationResult<>(update, writeResult.getLocation());
} else {
return writeResult;
}
// Replace the update request to the translated index request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
return new WriteResult<>(update, writeResult.getLocation());
case DELETED:
DeleteRequest deleteRequest = translate.action();
WriteResult<DeleteResponse> deleteResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
DeleteResponse response = deleteResult.getResponse();
UpdateResponse deleteUpdateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
deleteUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), null));
// Replace the update request to the translated delete request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
return new WriteResult<>(deleteUpdateResponse, deleteResult.getLocation());
PrimaryOperationResult<DeleteResponse> deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
if (deleteResult.success()) {
DeleteResponse response = deleteResult.getResponse();
UpdateResponse deleteUpdateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
deleteUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), null));
// Replace the update request to the translated delete request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
return new PrimaryOperationResult<>(deleteUpdateResponse, deleteResult.getLocation());
} else {
return deleteResult;
}
case NOOP:
BulkItemRequest item = request.items()[requestIndex];
indexShard.noopUpdate(updateRequest.type());
primary.noopUpdate(updateRequest.type());
item.setIgnoreOnReplica(); // no need to go to the replica
return new WriteResult<>(translate.action(), null);
return new PrimaryOperationResult<>(translate.action(), null);
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
}
}
@Override
protected Location onReplicaShard(BulkShardRequest request, IndexShard indexShard) {
protected ReplicaOperationResult onReplicaShard(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
@ -269,20 +282,28 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
continue;
}
DocWriteRequest docWriteRequest = item.request();
final Engine.Operation operation;
final ReplicaOperationResult replicaResult;
try {
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
operation = TransportIndexAction.executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), indexShard);
replicaResult = TransportIndexAction.executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), replica);
break;
case DELETE:
operation = TransportDeleteAction.executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), indexShard);
replicaResult = executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), replica);
break;
default: throw new IllegalStateException("Unexpected request operation type on replica: "
+ docWriteRequest.opType().getLowercase());
}
location = locationToSync(location, operation.getTranslogLocation());
if (replicaResult.success()) {
location = locationToSync(location, replicaResult.getLocation());
} else {
// check if any transient write operation failures should be bubbled up
Exception failure = replicaResult.getFailure();
if (!ignoreReplicaException(failure)) {
throw failure;
}
}
} catch (Exception e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard
@ -291,7 +312,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
}
}
return location;
return new ReplicaOperationResult(location);
}
private Translog.Location locationToSync(Translog.Location current, Translog.Location next) {

View File

@ -39,7 +39,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog.Location;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
@ -119,30 +118,36 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
}
@Override
protected WriteResult<DeleteResponse> onPrimaryShard(DeleteRequest request, IndexShard indexShard) {
return executeDeleteRequestOnPrimary(request, indexShard);
protected PrimaryOperationResult<DeleteResponse> onPrimaryShard(DeleteRequest request, IndexShard primary) {
return executeDeleteRequestOnPrimary(request, primary);
}
@Override
protected Location onReplicaShard(DeleteRequest request, IndexShard indexShard) {
return executeDeleteRequestOnReplica(request, indexShard).getTranslogLocation();
protected ReplicaOperationResult onReplicaShard(DeleteRequest request, IndexShard replica) {
return executeDeleteRequestOnReplica(request, replica);
}
public static WriteResult<DeleteResponse> executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
indexShard.delete(delete);
// update the request with the version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
request.version(delete.version());
public static PrimaryOperationResult<DeleteResponse> executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
primary.delete(delete);
if (delete.hasFailure()) {
return new PrimaryOperationResult<>(delete.getFailure());
} else {
// update the request with the version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
request.version(delete.version());
assert request.versionType().validateVersionForWrites(request.version());
DeleteResponse response = new DeleteResponse(indexShard.shardId(), request.type(), request.id(), delete.version(), delete.found());
return new WriteResult<>(response, delete.getTranslogLocation());
assert request.versionType().validateVersionForWrites(request.version());
DeleteResponse response = new DeleteResponse(primary.shardId(), request.type(), request.id(), delete.version(), delete.found());
return new PrimaryOperationResult<>(response, delete.getTranslogLocation());
}
}
public static Engine.Delete executeDeleteRequestOnReplica(DeleteRequest request, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
indexShard.delete(delete);
return delete;
public static ReplicaOperationResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
replica.delete(delete);
return delete.hasFailure()
? new ReplicaOperationResult(delete.getFailure())
: new ReplicaOperationResult(delete.getTranslogLocation());
}
}

View File

@ -39,11 +39,11 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog.Location;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
@ -140,48 +140,68 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
}
@Override
protected WriteResult<IndexResponse> onPrimaryShard(IndexRequest request, IndexShard indexShard) throws Exception {
return executeIndexRequestOnPrimary(request, indexShard, mappingUpdatedAction);
protected PrimaryOperationResult<IndexResponse> onPrimaryShard(IndexRequest request, IndexShard primary) throws Exception {
return executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
}
@Override
protected Location onReplicaShard(IndexRequest request, IndexShard indexShard) {
return executeIndexRequestOnReplica(request, indexShard).getTranslogLocation();
protected ReplicaOperationResult onReplicaShard(IndexRequest request, IndexShard replica) {
return executeIndexRequestOnReplica(request, replica);
}
/**
* Execute the given {@link IndexRequest} on a replica shard, throwing a
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
*/
public static Engine.Index executeIndexRequestOnReplica(IndexRequest request, IndexShard indexShard) {
public static ReplicaOperationResult executeIndexRequestOnReplica(IndexRequest request, IndexShard indexShard) {
final ShardId shardId = indexShard.shardId();
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
final Engine.Index operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
final Engine.Index operation;
try {
operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
} catch (MapperParsingException | IllegalArgumentException e) {
return new ReplicaOperationResult(e);
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.index(operation);
return operation;
if (operation.hasFailure()) {
return new ReplicaOperationResult(operation.getFailure());
} else {
return new ReplicaOperationResult(operation.getTranslogLocation());
}
}
/** Utility method to prepare an index operation on primary shards */
public static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard indexShard) {
static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard indexShard) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard,
public static PrimaryOperationResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard,
MappingUpdatedAction mappingUpdatedAction) throws Exception {
Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard);
Engine.Index operation;
try {
operation = prepareIndexOperationOnPrimary(request, indexShard);
} catch (MapperParsingException | IllegalArgumentException e) {
return new PrimaryOperationResult<>(e);
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = indexShard.shardId();
if (update != null) {
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
operation = prepareIndexOperationOnPrimary(request, indexShard);
try {
// can throw timeout exception when updating mappings or ISE for attempting to update default mappings
// which are bubbled up
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
operation = prepareIndexOperationOnPrimary(request, indexShard);
} catch (MapperParsingException | IllegalArgumentException e) {
return new PrimaryOperationResult<>(e);
}
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
@ -189,16 +209,19 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
}
}
indexShard.index(operation);
if (operation.hasFailure()) {
return new PrimaryOperationResult<>(operation.getFailure());
} else {
// update the version on request so it will happen on the replicas
final long version = operation.version();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
// update the version on request so it will happen on the replicas
final long version = operation.version();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
assert request.versionType().validateVersionForWrites(request.version());
IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), operation.isCreated());
return new WriteResult<>(response, operation.getTranslogLocation());
IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), operation.isCreated());
return new PrimaryOperationResult<>(response, operation.getTranslogLocation());
}
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.engine.VersionConflictEngineException;
@ -112,22 +113,24 @@ public class ReplicationOperation<
pendingActions.incrementAndGet();
primaryResult = primary.perform(request);
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
if (replicaRequest != null) {
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
}
// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
ClusterState clusterState = clusterStateSupplier.get();
final List<ShardRouting> shards = getShards(primaryId, clusterState);
Set<String> inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState);
markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards);
performOnReplicas(replicaRequest, shards);
}
// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
ClusterState clusterState = clusterStateSupplier.get();
final List<ShardRouting> shards = getShards(primaryId, clusterState);
Set<String> inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState);
markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards);
performOnReplicas(replicaRequest, shards);
successfulShards.incrementAndGet();
decPendingAndFinishIfNeeded();
}
@ -419,7 +422,11 @@ public class ReplicationOperation<
public interface PrimaryResult<R extends ReplicationRequest<R>> {
R replicaRequest();
/**
* @return null if no operation needs to be sent to a replica
* (for example when the operation failed on the primary due to a parsing exception)
*/
@Nullable R replicaRequest();
void setShardInfo(ReplicationResponse.ShardInfo shardInfo);
}

View File

@ -178,7 +178,7 @@ public abstract class TransportReplicationAction<
* @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on
*/
protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest, IndexShard replica);
protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest, IndexShard replica) throws Exception;
/**
* Cluster level block to check before request execution
@ -206,8 +206,13 @@ public abstract class TransportReplicationAction<
}
protected boolean retryPrimaryException(final Throwable e) {
return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class
|| TransportActions.isShardNotAvailableException(e);
boolean retry = e.getClass() == ReplicationOperation.RetryOnPrimaryException.class
|| TransportActions.isShardNotAvailableException(e);
if (retry) {
assert e instanceof ElasticsearchException
: "expected all retry on primary exception to be ElasticsearchException instances, found: " + e.getClass();
}
return retry;
}
class OperationTransportHandler implements TransportRequestHandler<Request> {
@ -310,17 +315,10 @@ public abstract class TransportReplicationAction<
final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
final boolean executeOnReplicas = (indexMetaData == null) || shouldExecuteReplication(indexMetaData.getSettings());
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
createReplicatedOperation(request, new ActionListener<PrimaryResult>() {
@Override
public void onResponse(PrimaryResult result) {
result.respond(listener);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}, primaryShardReference, executeOnReplicas).execute();
createReplicatedOperation(request,
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
primaryShardReference, executeOnReplicas)
.execute();
}
} catch (Exception e) {
Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
@ -376,11 +374,19 @@ public abstract class TransportReplicationAction<
protected class PrimaryResult implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
final ReplicaRequest replicaRequest;
final Response finalResponse;
final Response finalResponseIfSuccessful;
final Exception finalFailure;
public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponse) {
public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful) {
this.replicaRequest = replicaRequest;
this.finalResponse = finalResponse;
this.finalResponseIfSuccessful = finalResponseIfSuccessful;
this.finalFailure = null;
}
public PrimaryResult(Exception finalFailure) {
this.replicaRequest = null;
this.finalResponseIfSuccessful = null;
this.finalFailure = finalFailure;
}
@Override
@ -390,22 +396,37 @@ public abstract class TransportReplicationAction<
@Override
public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) {
finalResponse.setShardInfo(shardInfo);
if (finalResponseIfSuccessful != null) {
finalResponseIfSuccessful.setShardInfo(shardInfo);
}
}
public void respond(ActionListener<Response> listener) {
listener.onResponse(finalResponse);
if (finalResponseIfSuccessful != null) {
listener.onResponse(finalResponseIfSuccessful);
} else {
listener.onFailure(finalFailure);
}
}
}
protected class ReplicaResult {
/**
* Public constructor so subclasses can call it.
*/
public ReplicaResult() {}
final Exception finalFailure;
public ReplicaResult() {
this.finalFailure = null;
}
public ReplicaResult(Exception finalFailure) {
this.finalFailure = finalFailure;
}
public void respond(ActionListener<TransportResponse.Empty> listener) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
if (finalFailure == null) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
} else {
listener.onFailure(finalFailure);
}
}
}
@ -902,7 +923,9 @@ public abstract class TransportReplicationAction<
@Override
public PrimaryResult perform(Request request) throws Exception {
PrimaryResult result = shardOperationOnPrimary(request, indexShard);
result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm());
if (result.replicaRequest() != null) {
result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm());
}
return result;
}

View File

@ -60,47 +60,94 @@ public abstract class TransportWriteAction<
}
/**
* Called on the primary with a reference to the {@linkplain IndexShard} to modify.
* Called on the primary with a reference to the primary {@linkplain IndexShard} to modify.
*/
protected abstract WriteResult<Response> onPrimaryShard(Request request, IndexShard indexShard) throws Exception;
protected abstract PrimaryOperationResult<Response> onPrimaryShard(Request request, IndexShard primary) throws Exception;
/**
* Called once per replica with a reference to the {@linkplain IndexShard} to modify.
* Called once per replica with a reference to the replica {@linkplain IndexShard} to modify.
*
* @return the translog location of the {@linkplain IndexShard} after the write was completed or null if no write occurred
* @return the result of the replication operation containing either the translog location of the {@linkplain IndexShard}
* after the write was completed or a failure if the operation failed
*/
protected abstract Translog.Location onReplicaShard(ReplicaRequest request, IndexShard indexShard);
protected abstract ReplicaOperationResult onReplicaShard(ReplicaRequest request, IndexShard replica) throws Exception;
@Override
protected final WritePrimaryResult shardOperationOnPrimary(Request request, IndexShard primary) throws Exception {
WriteResult<Response> result = onPrimaryShard(request, primary);
return new WritePrimaryResult(((ReplicaRequest) request), result.getResponse(), result.getLocation(), primary);
final PrimaryOperationResult<Response> result = onPrimaryShard(request, primary);
return result.success()
? new WritePrimaryResult((ReplicaRequest) request, result.getResponse(), result.getLocation(), primary)
: new WritePrimaryResult(result.getFailure());
}
@Override
protected final WriteReplicaResult shardOperationOnReplica(ReplicaRequest request, IndexShard replica) {
Translog.Location location = onReplicaShard(request, replica);
return new WriteReplicaResult(replica, request, location);
protected final WriteReplicaResult shardOperationOnReplica(ReplicaRequest request, IndexShard replica) throws Exception {
final ReplicaOperationResult result = onReplicaShard(request, replica);
return result.success()
? new WriteReplicaResult(request, result.getLocation(), replica)
: new WriteReplicaResult(result.getFailure());
}
abstract static class OperationWriteResult {
private final Translog.Location location;
private final Exception failure;
protected OperationWriteResult(@Nullable Location location) {
this.location = location;
this.failure = null;
}
protected OperationWriteResult(Exception failure) {
this.location = null;
this.failure = failure;
}
public Translog.Location getLocation() {
return location;
}
public Exception getFailure() {
return failure;
}
public boolean success() {
return failure == null;
}
}
/**
* Simple result from a write action. Write actions have static method to return these so they can integrate with bulk.
* Simple result from a primary write action (includes response).
* Write actions have static method to return these so they can integrate with bulk.
*/
public static class WriteResult<Response extends ReplicationResponse> {
public static class PrimaryOperationResult<Response extends ReplicationResponse> extends OperationWriteResult {
private final Response response;
private final Translog.Location location;
public WriteResult(Response response, @Nullable Location location) {
public PrimaryOperationResult(Response response, @Nullable Location location) {
super(location);
this.response = response;
this.location = location;
}
public PrimaryOperationResult(Exception failure) {
super(failure);
this.response = null;
}
public Response getResponse() {
return response;
}
}
public Translog.Location getLocation() {
return location;
/**
* Simple result from a replica write action. Write actions have static method to return these so they can integrate with bulk.
*/
public static class ReplicaOperationResult extends OperationWriteResult {
public ReplicaOperationResult(@Nullable Location location) {
super(location);
}
public ReplicaOperationResult(Exception failure) {
super(failure);
}
}
@ -112,14 +159,18 @@ public abstract class TransportWriteAction<
ActionListener<Response> listener = null;
public WritePrimaryResult(ReplicaRequest request, Response finalResponse,
@Nullable Translog.Location location,
IndexShard indexShard) {
@Nullable Location location, IndexShard primary) {
super(request, finalResponse);
/*
* We call this before replication because this might wait for a refresh and that can take a while. This way we wait for the
* refresh in parallel on the primary and on the replica.
*/
new AsyncAfterWriteAction(indexShard, request, location, this, logger).run();
new AsyncAfterWriteAction(primary, request, location, this, logger).run();
}
public WritePrimaryResult(Exception failure) {
super(failure);
this.finishedAsyncActions = true;
}
@Override
@ -148,7 +199,9 @@ public abstract class TransportWriteAction<
@Override
public synchronized void onSuccess(boolean forcedRefresh) {
finalResponse.setForcedRefresh(forcedRefresh);
if (finalResponseIfSuccessful != null) {
finalResponseIfSuccessful.setForcedRefresh(forcedRefresh);
}
finishedAsyncActions = true;
respondIfPossible(null);
}
@ -161,8 +214,13 @@ public abstract class TransportWriteAction<
boolean finishedAsyncActions;
private ActionListener<TransportResponse.Empty> listener;
public WriteReplicaResult(IndexShard indexShard, ReplicatedWriteRequest<?> request, Translog.Location location) {
new AsyncAfterWriteAction(indexShard, request, location, this, logger).run();
public WriteReplicaResult(ReplicaRequest request, Location location, IndexShard replica) {
new AsyncAfterWriteAction(replica, request, location, this, logger).run();
}
public WriteReplicaResult(Exception finalFailure) {
super(finalFailure);
this.finishedAsyncActions = true;
}
@Override

View File

@ -1124,7 +1124,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private void verifyPrimary() {
if (shardRouting.primary() == false) {
throw new IllegalStateException("shard is not a primary " + shardRouting);
// TODO throw a more appropriate exception
throw new ShardNotFoundException(shardRouting.shardId(), "shard is not a primary anymore");
}
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.Translog.Location;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -137,13 +136,13 @@ public class TransportWriteActionTests extends ESTestCase {
}
@Override
protected WriteResult<TestResponse> onPrimaryShard(TestRequest request, IndexShard indexShard) throws Exception {
return new WriteResult<>(new TestResponse(), location);
protected PrimaryOperationResult<TestResponse> onPrimaryShard(TestRequest request, IndexShard primary) throws Exception {
return new PrimaryOperationResult<>(new TestResponse(), location);
}
@Override
protected Location onReplicaShard(TestRequest request, IndexShard indexShard) {
return location;
protected ReplicaOperationResult onReplicaShard(TestRequest request, IndexShard replica) {
return new ReplicaOperationResult(location);
}
@Override

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
@ -40,7 +39,6 @@ import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
@ -65,6 +63,8 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary;
import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnReplica;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
@ -365,7 +365,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
@Override
protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception {
TransportWriteAction.WriteResult<IndexResponse> result = TransportIndexAction.executeIndexRequestOnPrimary(request, primary,
TransportWriteAction.PrimaryOperationResult<IndexResponse> result = executeIndexRequestOnPrimary(request, primary,
null);
request.primaryTerm(primary.getPrimaryTerm());
TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.getLocation(), logger);
@ -374,8 +374,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
@Override
protected void performOnReplica(IndexRequest request, IndexShard replica) {
Engine.Index index = TransportIndexAction.executeIndexRequestOnReplica(request, replica);
TransportWriteActionTestHelper.performPostWriteActions(replica, request, index.getTranslogLocation(), logger);
TransportWriteAction.ReplicaOperationResult index = executeIndexRequestOnReplica(request, replica);
TransportWriteActionTestHelper.performPostWriteActions(replica, request, index.getLocation(), logger);
}
}
}