Ensure write operation execution does not have side-effects (#21430)

Currently, `executeIndexRequestOnPrimary` and `executeDeleteRequestOnPrimary`
methods used to prepare and execute write operations, modifies the provided
request, updating the version and versionType. This commit makes it the
callers responsibility to update request version and versionType and avoids
mutating the provided request in the execute methods.
This commit is contained in:
Areek Zillur 2016-11-09 15:59:45 -05:00 committed by GitHub
parent 2674e415c8
commit 5bac39dbec
4 changed files with 69 additions and 31 deletions

View File

@ -142,10 +142,18 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
case INDEX:
final IndexRequest indexRequest = (IndexRequest) itemRequest;
Engine.IndexResult indexResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
operationResult = indexResult;
response = indexResult.hasFailure() ? null
: new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
if (indexResult.hasFailure()) {
response = null;
} else {
// update the version on request so it will happen on the replicas
final long version = indexResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
indexResult.getVersion(), indexResult.isCreated());
}
operationResult = indexResult;
replicaRequest = request.items()[requestIndex];
break;
case UPDATE:
@ -158,10 +166,17 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
case DELETE:
final DeleteRequest deleteRequest = (DeleteRequest) itemRequest;
Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
operationResult = deleteResult;
response = deleteResult.hasFailure() ? null :
new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(),
if (deleteResult.hasFailure()) {
response = null;
} else {
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(deleteResult.getVersion());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(),
deleteResult.getVersion(), deleteResult.isFound());
}
operationResult = deleteResult;
replicaRequest = request.items()[requestIndex];
break;
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
@ -261,9 +276,23 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(mappingMd, allowIdGeneration, request.index());
updateOperationResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
if (updateOperationResult.hasFailure() == false) {
// update the version on request so it will happen on the replicas
final long version = updateOperationResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
}
break;
case DELETED:
updateOperationResult = executeDeleteRequestOnPrimary(translate.action(), primary);
DeleteRequest deleteRequest = translate.action();
updateOperationResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
if (updateOperationResult.hasFailure() == false) {
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(updateOperationResult.getVersion());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
}
break;
case NOOP:
primary.noopUpdate(updateRequest.type());

View File

@ -124,8 +124,16 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
@Override
protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception {
final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary);
final DeleteResponse response = result.hasFailure() ? null :
new DeleteResponse(primary.shardId(), request.type(), request.id(), result.getVersion(), result.isFound());
final DeleteResponse response;
if (result.hasFailure() == false) {
// update the request with the version so it will go to the replicas
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
request.version(result.getVersion());
assert request.versionType().validateVersionForWrites(request.version());
response = new DeleteResponse(primary.shardId(), request.type(), request.id(), result.getVersion(), result.isFound());
} else {
response = null;
}
return new WritePrimaryResult(request, response, result.getTranslogLocation(), result.getFailure(), primary);
}
@ -136,19 +144,12 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
}
public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
Engine.DeleteResult result = primary.delete(delete);
if (result.hasFailure() == false) {
// update the request with the version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
request.version(result.getVersion());
assert request.versionType().validateVersionForWrites(request.version());
}
return result;
final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
return primary.delete(delete);
}
public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
return replica.delete(delete);
}
}

View File

@ -142,9 +142,18 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
@Override
protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
final IndexResponse response = indexResult.hasFailure() ? null :
new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getVersion(),
indexResult.isCreated());
final IndexResponse response;
if (indexResult.hasFailure() == false) {
// update the version on request so it will happen on the replicas
final long version = indexResult.getVersion();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getVersion(),
indexResult.isCreated());
} else {
response = null;
}
return new WritePrimaryResult(request, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary);
}
@ -213,15 +222,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
"Dynamic mappings are not available on the node that holds the primary yet");
}
}
Engine.IndexResult result = primary.index(operation);
if (result.hasFailure() == false) {
// update the version on request so it will happen on the replicas
final long version = result.getVersion();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
}
return result;
return primary.index(operation);
}
}

View File

@ -367,6 +367,13 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception {
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary,
null);
if (indexResult.hasFailure() == false) {
// update the version on request so it will happen on the replicas
final long version = indexResult.getVersion();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
}
request.primaryTerm(primary.getPrimaryTerm());
TransportWriteActionTestHelper.performPostWriteActions(primary, request, indexResult.getTranslogLocation(), logger);
IndexResponse response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getVersion(),