Simplify TransportWriteAction request handling

This commit reduces classes to handle write
operation results in TransportWriteAction, this
comes at the cost of handling write operations in
TransportShardBulkAction.
Now parsing, mapping failures (which happen before
executing engine write operation) are communicated
via a failure operation type while transient operation
failures are set on the index/delete operations.
This commit is contained in:
Areek Zillur 2016-10-20 20:53:39 -04:00
parent e195f7dd19
commit 63c0728292
14 changed files with 279 additions and 285 deletions

View File

@ -54,14 +54,14 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
protected PrimaryResult shardOperationOnPrimary(ShardFlushRequest shardRequest, IndexShard primary) {
primary.flush(shardRequest.getRequest());
logger.trace("{} flush request executed on primary", primary.shardId());
return new PrimaryResult(shardRequest, new ReplicationResponse());
return new PrimaryResult(shardRequest, new ReplicationResponse(), null);
}
@Override
protected ReplicaResult shardOperationOnReplica(ShardFlushRequest request, IndexShard replica) {
replica.flush(request.getRequest());
logger.trace("{} flush request executed on replica", replica.shardId());
return new ReplicaResult();
return new ReplicaResult(null);
}
@Override

View File

@ -57,14 +57,14 @@ public class TransportShardRefreshAction
protected PrimaryResult shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary) {
primary.refresh("api");
logger.trace("{} refresh request executed on primary", primary.shardId());
return new PrimaryResult(shardRequest, new ReplicationResponse());
return new PrimaryResult(shardRequest, new ReplicationResponse(), null);
}
@Override
protected ReplicaResult shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica) {
replica.refresh("api");
logger.trace("{} refresh request executed on replica", replica.shardId());
return new ReplicaResult();
return new ReplicaResult(null);
}
@Override

View File

@ -46,6 +46,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
@ -100,7 +101,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
@Override
protected PrimaryOperationResult<BulkShardResponse> onPrimaryShard(BulkShardRequest request, IndexShard primary) throws Exception {
protected WritePrimaryResult onPrimaryShard(BulkShardRequest request, IndexShard primary) throws Exception {
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();
long[] preVersions = new long[request.items().length];
@ -116,7 +117,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
responses[i] = items[i].getPrimaryResponse();
}
BulkShardResponse response = new BulkShardResponse(request.shardId(), responses);
return new PrimaryOperationResult<>(response, location);
return new WritePrimaryResult(request, response, location, null, primary);
}
/** Executes bulk item requests and handles request execution exceptions */
@ -128,48 +129,117 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
preVersionTypes[requestIndex] = request.items()[requestIndex].request().versionType();
DocWriteRequest.OpType opType = request.items()[requestIndex].request().opType();
try {
// execute item request
DocWriteRequest itemRequest = request.items()[requestIndex].request();
final PrimaryOperationResult<? extends DocWriteResponse> primaryOperationResult;
final Engine.Operation operation;
final DocWriteResponse response;
switch (itemRequest.opType()) {
case CREATE:
case INDEX:
primaryOperationResult = executeIndexRequestOnPrimary(((IndexRequest) itemRequest), primary, mappingUpdatedAction);
final IndexRequest indexRequest = (IndexRequest) itemRequest;
operation = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
response = operation.hasFailure() ? null
: new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
operation.version(), ((Engine.Index) operation).isCreated());
break;
case UPDATE:
int maxAttempts = ((UpdateRequest) itemRequest).retryOnConflict();
PrimaryOperationResult<? extends DocWriteResponse> shardUpdateOperation = null;
Engine.Operation updateOperation = null;
UpdateResponse updateResponse = null;
UpdateRequest updateRequest = (UpdateRequest) itemRequest;
int maxAttempts = updateRequest.retryOnConflict();
for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) {
shardUpdateOperation = shardUpdateOperation(metaData, primary, request, requestIndex, ((UpdateRequest) itemRequest));
if (shardUpdateOperation.success()
|| shardUpdateOperation.getFailure() instanceof VersionConflictEngineException == false) {
break;
try {
// translate and execute operation
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, primary, threadPool::estimatedTimeInMillis);
updateOperation = shardUpdateOperation(metaData, primary, request, updateRequest, translate);
if (updateOperation == null) {
// this is a noop operation
updateResponse = translate.action();
} else {
if (updateOperation.hasFailure() == false) {
// enrich update response
switch (updateOperation.operationType()) {
case INDEX:
IndexRequest updateIndexRequest = translate.action();
final IndexResponse indexResponse = new IndexResponse(primary.shardId(),
updateIndexRequest.type(), updateIndexRequest.id(),
updateOperation.version(), ((Engine.Index) updateOperation).isCreated());
BytesReference indexSourceAsBytes = updateIndexRequest.source();
updateResponse = new UpdateResponse(indexResponse.getShardInfo(),
indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(),
indexResponse.getVersion(), indexResponse.getResult());
if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) ||
(updateRequest.fields() != null && updateRequest.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent =
XContentHelper.convertToMap(indexSourceAsBytes, true);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(),
indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
}
// Replace the update request to the translated index request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), updateIndexRequest);
break;
case DELETE:
DeleteRequest updateDeleteRequest = translate.action();
DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(),
updateDeleteRequest.type(), updateDeleteRequest.id(),
updateOperation.version(), ((Engine.Delete) updateOperation).found());
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(),
deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(),
deleteResponse.getVersion(), deleteResponse.getResult());
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest,
request.index(), deleteResponse.getVersion(), translate.updatedSourceAsMap(),
translate.updateSourceContentType(), null));
// Replace the update request to the translated delete request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
break;
case FAILURE:
break;
}
} else {
// version conflict exception, retry
if (updateOperation.getFailure() instanceof VersionConflictEngineException) {
continue;
}
}
}
break; // out of retry loop
} catch (Exception failure) {
// set to a failure operation
updateOperation = new Engine.Failure(updateRequest.type(), updateRequest.id(), updateRequest.version(),
updateRequest.versionType(), Engine.Operation.Origin.PRIMARY, System.nanoTime(), failure);
break; // out of retry loop
}
}
if (shardUpdateOperation == null) {
throw new IllegalStateException("version conflict exception should bubble up on last attempt");
}
primaryOperationResult = shardUpdateOperation;
operation = updateOperation;
response = updateResponse;
break;
case DELETE:
primaryOperationResult = executeDeleteRequestOnPrimary(((DeleteRequest) itemRequest), primary);
final DeleteRequest deleteRequest = (DeleteRequest) itemRequest;
operation = executeDeleteRequestOnPrimary(deleteRequest, primary);
response = operation.hasFailure() ? null :
new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(),
operation.version(), ((Engine.Delete) operation).found());
break;
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
}
if (primaryOperationResult.success()) {
if (primaryOperationResult.getLocation() != null) {
location = locationToSync(location, primaryOperationResult.getLocation());
// set item response and handle failures
if (operation == null // in case of a noop update operation
|| operation.hasFailure() == false) {
if (operation == null) {
assert response != null;
assert response.getResult() == DocWriteResponse.Result.NOOP
: "only noop operation can have null operation";
} else {
assert primaryOperationResult.getResponse().getResult() == DocWriteResponse.Result.NOOP
: "only noop operation can have null next operation";
location = locationToSync(location, operation.getTranslogLocation());
}
// update the bulk item request because update request execution can mutate the bulk item request
BulkItemRequest item = request.items()[requestIndex];
// add the response
setResponse(item, new BulkItemResponse(item.id(), opType, primaryOperationResult.getResponse()));
item.setPrimaryResponse(new BulkItemResponse(item.id(), opType, response));
} else {
BulkItemRequest item = request.items()[requestIndex];
DocWriteRequest docWriteRequest = item.request();
Exception failure = primaryOperationResult.getFailure();
Exception failure = operation.getFailure();
if (isConflictException(failure)) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
request.shardId(), docWriteRequest.opType().getLowercase(), request), failure);
@ -180,10 +250,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
// if its a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the successful execution
if (item.getPrimaryResponse() != null && isConflictException(failure)) {
setResponse(item, item.getPrimaryResponse());
} else {
setResponse(item, new BulkItemResponse(item.id(), docWriteRequest.opType(),
if (item.getPrimaryResponse() == null || isConflictException(failure) == false) {
item.setPrimaryResponse(new BulkItemResponse(item.id(), docWriteRequest.opType(),
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure)));
}
}
@ -201,81 +269,41 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
// TODO: maybe this assert is too strict, we can still get environment failures while executing write operations
assert false : "unexpected exception: " + e.getMessage() + " class:" + e.getClass().getSimpleName();
}
assert request.items()[requestIndex].getPrimaryResponse() != null;
BulkItemRequest itemRequest = request.items()[requestIndex];
assert itemRequest.getPrimaryResponse() != null;
assert preVersionTypes[requestIndex] != null;
if (itemRequest.getPrimaryResponse().isFailed()
|| itemRequest.getPrimaryResponse().getResponse().getResult() == DocWriteResponse.Result.NOOP) {
itemRequest.setIgnoreOnReplica();
} else {
// Set the ShardInfo to 0 so we can safely send it to the replicas. We won't use it in the real response though.
itemRequest.getPrimaryResponse().getResponse().setShardInfo(new ShardInfo());
}
return location;
}
private void setResponse(BulkItemRequest request, BulkItemResponse response) {
request.setPrimaryResponse(response);
if (response.isFailed()) {
request.setIgnoreOnReplica();
} else {
// Set the ShardInfo to 0 so we can safely send it to the replicas. We won't use it in the real response though.
response.getResponse().setShardInfo(new ShardInfo());
}
}
/**
* Executes update request, doing a get and translating update to a index or delete operation
* NOTE: all operations except NOOP, reassigns the bulk item request
*/
private PrimaryOperationResult<? extends DocWriteResponse> shardUpdateOperation(IndexMetaData metaData, IndexShard primary,
BulkShardRequest request,
int requestIndex, UpdateRequest updateRequest)
/** Executes update request, delegating to a index or delete operation after translation */
private Engine.Operation shardUpdateOperation(IndexMetaData metaData, IndexShard primary, BulkShardRequest request,
UpdateRequest updateRequest, UpdateHelper.Result translate)
throws Exception {
final UpdateHelper.Result translate;
try {
translate = updateHelper.prepare(updateRequest, primary, threadPool::estimatedTimeInMillis);
} catch (Exception e) {
return new PrimaryOperationResult<>(e);
}
switch (translate.getResponseResult()) {
case CREATED:
case UPDATED:
IndexRequest indexRequest = translate.action();
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(mappingMd, allowIdGeneration, request.index());
PrimaryOperationResult<IndexResponse> writeResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
if (writeResult.success()) {
BytesReference indexSourceAsBytes = indexRequest.source();
IndexResponse indexResponse = writeResult.getResponse();
UpdateResponse update = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult());
if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) ||
(updateRequest.fields() != null && updateRequest.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
update.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
}
// Replace the update request to the translated index request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
return new PrimaryOperationResult<>(update, writeResult.getLocation());
} else {
return writeResult;
}
return executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
case DELETED:
DeleteRequest deleteRequest = translate.action();
PrimaryOperationResult<DeleteResponse> deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
if (deleteResult.success()) {
DeleteResponse response = deleteResult.getResponse();
UpdateResponse deleteUpdateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
deleteUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), null));
// Replace the update request to the translated delete request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
return new PrimaryOperationResult<>(deleteUpdateResponse, deleteResult.getLocation());
} else {
return deleteResult;
}
return executeDeleteRequestOnPrimary(translate.action(), primary);
case NOOP:
BulkItemRequest item = request.items()[requestIndex];
primary.noopUpdate(updateRequest.type());
item.setIgnoreOnReplica(); // no need to go to the replica
return new PrimaryOperationResult<>(translate.action(), null);
return null;
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
}
}
@Override
protected ReplicaOperationResult onReplicaShard(BulkShardRequest request, IndexShard replica) throws Exception {
protected WriteReplicaResult onReplicaShard(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
@ -283,27 +311,27 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
continue;
}
DocWriteRequest docWriteRequest = item.request();
final ReplicaOperationResult replicaResult;
final Engine.Operation operation;
try {
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
replicaResult = executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), replica);
operation = executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), replica);
break;
case DELETE:
replicaResult = executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), replica);
operation = executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), replica);
break;
default: throw new IllegalStateException("Unexpected request operation type on replica: "
+ docWriteRequest.opType().getLowercase());
}
if (replicaResult.success()) {
location = locationToSync(location, replicaResult.getLocation());
} else {
if (operation.hasFailure()) {
// check if any transient write operation failures should be bubbled up
Exception failure = replicaResult.getFailure();
Exception failure = operation.getFailure();
if (!ignoreReplicaException(failure)) {
throw failure;
}
} else {
location = locationToSync(location, operation.getTranslogLocation());
}
} catch (Exception e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
@ -313,7 +341,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
}
}
return new ReplicaOperationResult(location);
return new WriteReplicaResult(request, location, null, replica);
}
private Translog.Location locationToSync(Translog.Location current, Translog.Location next) {

View File

@ -69,7 +69,11 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
protected void doExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
ClusterState state = clusterService.state();
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
createIndexAction.execute(task, new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
CreateIndexRequest createIndexRequest = new CreateIndexRequest()
.index(request.index())
.cause("auto(delete api)")
.masterNodeTimeout(request.timeout());
createIndexAction.execute(task, createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(task, request, listener);
@ -118,36 +122,34 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
}
@Override
protected PrimaryOperationResult<DeleteResponse> onPrimaryShard(DeleteRequest request, IndexShard primary) {
return executeDeleteRequestOnPrimary(request, primary);
protected WritePrimaryResult onPrimaryShard(DeleteRequest request, IndexShard primary) {
final Engine.Delete operation = executeDeleteRequestOnPrimary(request, primary);
final DeleteResponse response = operation.hasFailure() ? null :
new DeleteResponse(primary.shardId(), request.type(), request.id(), operation.version(), operation.found());
return new WritePrimaryResult(request, response, operation.getTranslogLocation(), operation.getFailure(), primary);
}
@Override
protected ReplicaOperationResult onReplicaShard(DeleteRequest request, IndexShard replica) {
return executeDeleteRequestOnReplica(request, replica);
protected WriteReplicaResult onReplicaShard(DeleteRequest request, IndexShard replica) {
final Engine.Operation operation = executeDeleteRequestOnReplica(request, replica);
return new WriteReplicaResult(request, operation.getTranslogLocation(), operation.getFailure(), replica);
}
public static PrimaryOperationResult<DeleteResponse> executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
public static Engine.Delete executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
primary.execute(delete);
if (delete.hasFailure()) {
return new PrimaryOperationResult<>(delete.getFailure());
} else {
if (delete.hasFailure() == false) {
// update the request with the version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
request.version(delete.version());
assert request.versionType().validateVersionForWrites(request.version());
DeleteResponse response = new DeleteResponse(primary.shardId(), request.type(), request.id(), delete.version(), delete.found());
return new PrimaryOperationResult<>(response, delete.getTranslogLocation());
}
return delete;
}
public static ReplicaOperationResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
public static Engine.Delete executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
replica.execute(delete);
return delete.hasFailure()
? new ReplicaOperationResult(delete.getFailure())
: new ReplicaOperationResult(delete.getTranslogLocation());
return delete;
}
}

View File

@ -39,7 +39,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
@ -140,88 +139,77 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
}
@Override
protected PrimaryOperationResult<IndexResponse> onPrimaryShard(IndexRequest request, IndexShard primary) throws Exception {
return executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
protected WritePrimaryResult onPrimaryShard(IndexRequest request, IndexShard primary) throws Exception {
final Engine.Operation operation = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
final IndexResponse response = operation.hasFailure() ? null :
new IndexResponse(primary.shardId(), request.type(), request.id(), operation.version(),
((Engine.Index) operation).isCreated());
return new WritePrimaryResult(request, response, operation.getTranslogLocation(), operation.getFailure(), primary);
}
@Override
protected ReplicaOperationResult onReplicaShard(IndexRequest request, IndexShard replica) {
return executeIndexRequestOnReplica(request, replica);
protected WriteReplicaResult onReplicaShard(IndexRequest request, IndexShard replica) {
final Engine.Operation operation = executeIndexRequestOnReplica(request, replica);
return new WriteReplicaResult(request, operation.getTranslogLocation(), operation.getFailure(), replica);
}
/**
* Execute the given {@link IndexRequest} on a replica shard, throwing a
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
*/
public static ReplicaOperationResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) {
public static Engine.Operation executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) {
final ShardId shardId = replica.shardId();
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
final Engine.Index operation;
try {
operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
} catch (MapperParsingException | IllegalArgumentException e) {
return new ReplicaOperationResult(e);
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
replica.execute(operation);
if (operation.hasFailure()) {
return new ReplicaOperationResult(operation.getFailure());
} else {
return new ReplicaOperationResult(operation.getTranslogLocation());
final Engine.Operation operation;
operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
if (operation.hasFailure() == false) {
Mapping update = ((Engine.Index) operation).parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
replica.execute(operation);
}
return operation;
}
/** Utility method to prepare an index operation on primary shards */
static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
static Engine.Operation prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
public static PrimaryOperationResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
public static Engine.Operation executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatedAction mappingUpdatedAction) throws Exception {
Engine.Index operation;
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new PrimaryOperationResult<>(e);
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = primary.shardId();
if (update != null) {
try {
Engine.Operation operation = prepareIndexOperationOnPrimary(request, primary);
if (operation.hasFailure() == false) {
Mapping update = ((Engine.Index) operation).parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = primary.shardId();
if (update != null) {
// can throw timeout exception when updating mappings or ISE for attempting to update default mappings
// which are bubbled up
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new PrimaryOperationResult<>(e);
}
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
"Dynamic mappings are not available on the node that holds the primary yet");
if (operation.hasFailure() == false) {
update = ((Engine.Index) operation).parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
"Dynamic mappings are not available on the node that holds the primary yet");
}
}
}
}
primary.execute(operation);
if (operation.hasFailure()) {
return new PrimaryOperationResult<>(operation.getFailure());
} else {
if (operation.hasFailure() == false) {
primary.execute(operation);
// update the version on request so it will happen on the replicas
final long version = operation.version();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), operation.isCreated());
return new PrimaryOperationResult<>(response, operation.getTranslogLocation());
}
return operation;
}
}

View File

@ -377,15 +377,9 @@ public abstract class TransportReplicationAction<
final Response finalResponseIfSuccessful;
final Exception finalFailure;
public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful) {
public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful, Exception finalFailure) {
this.replicaRequest = replicaRequest;
this.finalResponseIfSuccessful = finalResponseIfSuccessful;
this.finalFailure = null;
}
public PrimaryResult(Exception finalFailure) {
this.replicaRequest = null;
this.finalResponseIfSuccessful = null;
this.finalFailure = finalFailure;
}
@ -413,10 +407,6 @@ public abstract class TransportReplicationAction<
protected class ReplicaResult {
final Exception finalFailure;
public ReplicaResult() {
this.finalFailure = null;
}
public ReplicaResult(Exception finalFailure) {
this.finalFailure = finalFailure;
}

View File

@ -62,7 +62,7 @@ public abstract class TransportWriteAction<
/**
* Called on the primary with a reference to the primary {@linkplain IndexShard} to modify.
*/
protected abstract PrimaryOperationResult<Response> onPrimaryShard(Request request, IndexShard primary) throws Exception;
protected abstract WritePrimaryResult onPrimaryShard(Request request, IndexShard primary) throws Exception;
/**
* Called once per replica with a reference to the replica {@linkplain IndexShard} to modify.
@ -70,107 +70,39 @@ public abstract class TransportWriteAction<
* @return the result of the replication operation containing either the translog location of the {@linkplain IndexShard}
* after the write was completed or a failure if the operation failed
*/
protected abstract ReplicaOperationResult onReplicaShard(ReplicaRequest request, IndexShard replica) throws Exception;
protected abstract WriteReplicaResult onReplicaShard(ReplicaRequest request, IndexShard replica) throws Exception;
@Override
protected final WritePrimaryResult shardOperationOnPrimary(Request request, IndexShard primary) throws Exception {
final PrimaryOperationResult<Response> result = onPrimaryShard(request, primary);
return result.success()
? new WritePrimaryResult((ReplicaRequest) request, result.getResponse(), result.getLocation(), primary)
: new WritePrimaryResult(result.getFailure());
return onPrimaryShard(request, primary);
}
@Override
protected final WriteReplicaResult shardOperationOnReplica(ReplicaRequest request, IndexShard replica) throws Exception {
final ReplicaOperationResult result = onReplicaShard(request, replica);
return result.success()
? new WriteReplicaResult(request, result.getLocation(), replica)
: new WriteReplicaResult(result.getFailure());
}
abstract static class OperationWriteResult {
private final Translog.Location location;
private final Exception failure;
protected OperationWriteResult(@Nullable Location location) {
this.location = location;
this.failure = null;
}
protected OperationWriteResult(Exception failure) {
this.location = null;
this.failure = failure;
}
public Translog.Location getLocation() {
return location;
}
public Exception getFailure() {
return failure;
}
public boolean success() {
return failure == null;
}
}
/**
* Simple result from a primary write action (includes response).
* Write actions have static method to return these so they can integrate with bulk.
*/
public static class PrimaryOperationResult<Response extends ReplicationResponse> extends OperationWriteResult {
private final Response response;
public PrimaryOperationResult(Response response, @Nullable Location location) {
super(location);
this.response = response;
}
public PrimaryOperationResult(Exception failure) {
super(failure);
this.response = null;
}
public Response getResponse() {
return response;
}
}
/**
* Simple result from a replica write action. Write actions have static method to return these so they can integrate with bulk.
*/
public static class ReplicaOperationResult extends OperationWriteResult {
public ReplicaOperationResult(@Nullable Location location) {
super(location);
}
public ReplicaOperationResult(Exception failure) {
super(failure);
}
return onReplicaShard(request, replica);
}
/**
* Result of taking the action on the primary.
*/
class WritePrimaryResult extends PrimaryResult implements RespondingWriteResult {
protected class WritePrimaryResult extends PrimaryResult implements RespondingWriteResult {
boolean finishedAsyncActions;
ActionListener<Response> listener = null;
public WritePrimaryResult(ReplicaRequest request, Response finalResponse,
@Nullable Location location, IndexShard primary) {
super(request, finalResponse);
/*
* We call this before replication because this might wait for a refresh and that can take a while. This way we wait for the
* refresh in parallel on the primary and on the replica.
*/
new AsyncAfterWriteAction(primary, request, location, this, logger).run();
}
public WritePrimaryResult(Exception failure) {
super(failure);
this.finishedAsyncActions = true;
public WritePrimaryResult(ReplicaRequest request, @Nullable Response finalResponse,
@Nullable Location location, @Nullable Exception operationFailure,
IndexShard primary) {
super(request, finalResponse, operationFailure);
assert operationFailure != null ^ finalResponse != null;
if (operationFailure != null) {
this.finishedAsyncActions = true;
} else {
/*
* We call this before replication because this might wait for a refresh and that can take a while.
* This way we wait for the refresh in parallel on the primary and on the replica.
*/
new AsyncAfterWriteAction(primary, request, location, this, logger).run();
}
}
@Override
@ -210,17 +142,18 @@ public abstract class TransportWriteAction<
/**
* Result of taking the action on the replica.
*/
class WriteReplicaResult extends ReplicaResult implements RespondingWriteResult {
protected class WriteReplicaResult extends ReplicaResult implements RespondingWriteResult {
boolean finishedAsyncActions;
private ActionListener<TransportResponse.Empty> listener;
public WriteReplicaResult(ReplicaRequest request, Location location, IndexShard replica) {
new AsyncAfterWriteAction(replica, request, location, this, logger).run();
}
public WriteReplicaResult(Exception finalFailure) {
super(finalFailure);
this.finishedAsyncActions = true;
public WriteReplicaResult(ReplicaRequest request, @Nullable Location location,
@Nullable Exception operationFailure, IndexShard replica) {
super(operationFailure);
if (operationFailure != null) {
this.finishedAsyncActions = true;
} else {
new AsyncAfterWriteAction(replica, request, location, this, logger).run();
}
}
@Override

View File

@ -771,7 +771,7 @@ public abstract class Engine implements Closeable {
/** type of operation (index, delete), subclasses use static types */
public enum TYPE {
INDEX, DELETE;
INDEX, DELETE, FAILURE;
private final String lowercase;
@ -1051,7 +1051,50 @@ public abstract class Engine implements Closeable {
protected int estimatedSizeInBytes() {
return (uid().field().length() + uid().text().length()) * 2 + 20;
}
}
public static class Failure extends Operation {
private final String type;
private final String id;
public Failure(String type, String id, long version, VersionType versionType, Origin origin,
long startTime, Exception failure) {
super(null, version, versionType, origin, startTime);
this.type = type;
this.id = id;
setFailure(failure);
}
@Override
public Term uid() {
throw new UnsupportedOperationException("failure operation doesn't have uid");
}
@Override
protected int estimatedSizeInBytes() {
return 0;
}
@Override
public String type() {
return type;
}
@Override
protected String id() {
return id;
}
@Override
public TYPE operationType() {
return TYPE.FAILURE;
}
@Override
public String toString() {
return "failure [{" + type() + "}][{" + id()+ "}]";
}
}
public static class Get {

View File

@ -93,6 +93,7 @@ import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
@ -499,24 +500,30 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return previousState;
}
public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp,
public Engine.Operation prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp,
boolean isRetry) {
try {
verifyPrimary();
return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY,
autoGeneratedIdTimestamp, isRetry);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.Failure(source.type(), source.id(), version, versionType, Engine.Operation.Origin.PRIMARY,
System.nanoTime(), e);
} catch (Exception e) {
verifyNotClosed(e);
throw e;
}
}
public Engine.Index prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp,
public Engine.Operation prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp,
boolean isRetry) {
try {
verifyReplicationTarget();
return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.REPLICA, autoGeneratedIdTimestamp,
isRetry);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.Failure(source.type(), source.id(), version, versionType, Engine.Operation.Origin.PRIMARY,
System.nanoTime(), e);
} catch (Exception e) {
verifyNotClosed(e);
throw e;

View File

@ -718,7 +718,7 @@ public class TransportReplicationActionTests extends ESTestCase {
if (throwException) {
throw new ElasticsearchException("simulated");
}
return new ReplicaResult();
return new ReplicaResult(null);
}
};
final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
@ -837,7 +837,7 @@ public class TransportReplicationActionTests extends ESTestCase {
if (throwException.get()) {
throw new RetryOnReplicaException(shardId, "simulation");
}
return new ReplicaResult();
return new ReplicaResult(null);
}
};
final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
@ -961,13 +961,13 @@ public class TransportReplicationActionTests extends ESTestCase {
protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception {
boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true);
assert executedBefore == false : "request has already been executed on the primary";
return new PrimaryResult(shardRequest, new Response());
return new PrimaryResult(shardRequest, new Response(), null);
}
@Override
protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
request.processedOnReplicas.incrementAndGet();
return new ReplicaResult();
return new ReplicaResult(null);
}
@Override
@ -1053,7 +1053,7 @@ public class TransportReplicationActionTests extends ESTestCase {
@Override
public void execute() throws Exception {
this.resultListener.onResponse(action.new PrimaryResult(null, new Response()));
this.resultListener.onResponse(action.new PrimaryResult(null, new Response(), null));
}
}

View File

@ -136,13 +136,13 @@ public class TransportWriteActionTests extends ESTestCase {
}
@Override
protected PrimaryOperationResult<TestResponse> onPrimaryShard(TestRequest request, IndexShard primary) throws Exception {
return new PrimaryOperationResult<>(new TestResponse(), location);
protected WritePrimaryResult onPrimaryShard(TestRequest request, IndexShard primary) throws Exception {
return new WritePrimaryResult(request, new TestResponse(), location, null, primary);
}
@Override
protected ReplicaOperationResult onReplicaShard(TestRequest request, IndexShard replica) {
return new ReplicaOperationResult(location);
protected WriteReplicaResult onReplicaShard(TestRequest request, IndexShard replica) {
return new WriteReplicaResult(request, location, null, replica);
}
@Override

View File

@ -39,6 +39,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
@ -365,17 +366,19 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
@Override
protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception {
TransportWriteAction.PrimaryOperationResult<IndexResponse> result = executeIndexRequestOnPrimary(request, primary,
null);
final Engine.Operation operation = executeIndexRequestOnPrimary(request, primary,
null);
request.primaryTerm(primary.getPrimaryTerm());
TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.getLocation(), logger);
return new PrimaryResult(request, result.getResponse());
TransportWriteActionTestHelper.performPostWriteActions(primary, request, operation.getTranslogLocation(), logger);
IndexResponse response = new IndexResponse(primary.shardId(), request.type(), request.id(), operation.version(),
((Engine.Index) operation).isCreated());
return new PrimaryResult(request, response);
}
@Override
protected void performOnReplica(IndexRequest request, IndexShard replica) {
TransportWriteAction.ReplicaOperationResult index = executeIndexRequestOnReplica(request, replica);
TransportWriteActionTestHelper.performPostWriteActions(replica, request, index.getLocation(), logger);
final Engine.Operation operation = executeIndexRequestOnReplica(request, replica);
TransportWriteActionTestHelper.performPostWriteActions(replica, request, operation.getTranslogLocation(), logger);
}
}
}

View File

@ -493,7 +493,7 @@ public class IndexStatsIT extends ESIntegTestCase {
assertThat(stats.getIndex("test2").getPrimaries().getIndexing().getTotal().getIndexFailedCount(), equalTo(1L));
assertThat(stats.getPrimaries().getIndexing().getTypeStats().get("type1").getIndexFailedCount(), equalTo(1L));
assertThat(stats.getPrimaries().getIndexing().getTypeStats().get("type2").getIndexFailedCount(), equalTo(1L));
assertThat(stats.getTotal().getIndexing().getTotal().getIndexFailedCount(), equalTo(3L));
assertThat(stats.getPrimaries().getIndexing().getTotal().getIndexFailedCount(), equalTo(3L));
}
public void testMergeStats() {

View File

@ -441,7 +441,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
}
protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) {
final Engine.Index index;
final Engine.Operation index;
if (shard.routingEntry().primary()) {
index = shard.prepareIndexOnPrimary(
SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)),
@ -452,7 +452,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
}
shard.execute(index);
return index;
return ((Engine.Index) index);
}
protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) {