cleanup and improve documentation for TWA

This commit is contained in:
Areek Zillur 2016-10-21 14:48:50 -04:00
parent 4396348e9e
commit 7c11a2b732
5 changed files with 23 additions and 27 deletions

View File

@ -101,7 +101,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
@Override
protected WritePrimaryResult onPrimaryShard(BulkShardRequest request, IndexShard primary) throws Exception {
protected WritePrimaryResult shardOperationOnPrimary(BulkShardRequest request, IndexShard primary) throws Exception {
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();
long[] preVersions = new long[request.items().length];
@ -321,7 +321,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
@Override
protected WriteReplicaResult onReplicaShard(BulkShardRequest request, IndexShard replica) throws Exception {
protected WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];

View File

@ -122,7 +122,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
}
@Override
protected WritePrimaryResult onPrimaryShard(DeleteRequest request, IndexShard primary) {
protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception {
final Engine.Delete operation = executeDeleteRequestOnPrimary(request, primary);
final DeleteResponse response = operation.hasFailure() ? null :
new DeleteResponse(primary.shardId(), request.type(), request.id(), operation.version(), operation.found());
@ -130,7 +130,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
}
@Override
protected WriteReplicaResult onReplicaShard(DeleteRequest request, IndexShard replica) {
protected WriteReplicaResult shardOperationOnReplica(DeleteRequest request, IndexShard replica) throws Exception {
final Engine.Operation operation = executeDeleteRequestOnReplica(request, replica);
return new WriteReplicaResult(request, operation.getTranslogLocation(), operation.getFailure(), replica);
}

View File

@ -139,16 +139,16 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
}
@Override
protected WritePrimaryResult onPrimaryShard(IndexRequest request, IndexShard primary) throws Exception {
protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
final Engine.Operation operation = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
final IndexResponse response = operation.hasFailure() ? null :
new IndexResponse(primary.shardId(), request.type(), request.id(), operation.version(),
((Engine.Index) operation).isCreated());
((Engine.Index) operation).isCreated());
return new WritePrimaryResult(request, response, operation.getTranslogLocation(), operation.getFailure(), primary);
}
@Override
protected WriteReplicaResult onReplicaShard(IndexRequest request, IndexShard replica) {
protected WriteReplicaResult shardOperationOnReplica(IndexRequest request, IndexShard replica) throws Exception {
final Engine.Operation operation = executeIndexRequestOnReplica(request, replica);
return new WriteReplicaResult(request, operation.getTranslogLocation(), operation.getFailure(), replica);
}

View File

@ -44,6 +44,7 @@ import java.util.function.Supplier;
/**
* Base class for transport actions that modify data in some shard like index, delete, and shardBulk.
* Allows performing async actions (e.g. refresh) after performing write operations on primary and replica shards
*/
public abstract class TransportWriteAction<
Request extends ReplicatedWriteRequest<Request>,
@ -61,26 +62,21 @@ public abstract class TransportWriteAction<
/**
* Called on the primary with a reference to the primary {@linkplain IndexShard} to modify.
*
* @return the result of the operation on primary, including current translog location and operation response and failure
* async refresh is performed on the <code>primary</code> shard according to the <code>Request</code> refresh policy
*/
protected abstract WritePrimaryResult onPrimaryShard(Request request, IndexShard primary) throws Exception;
@Override
protected abstract WritePrimaryResult shardOperationOnPrimary(Request request, IndexShard primary) throws Exception;
/**
* Called once per replica with a reference to the replica {@linkplain IndexShard} to modify.
*
* @return the result of the replication operation containing either the translog location of the {@linkplain IndexShard}
* after the write was completed or a failure if the operation failed
* @return the result of the operation on replica, including current translog location and operation response and failure
* async refresh is performed on the <code>replica</code> shard according to the <code>ReplicaRequest</code> refresh policy
*/
protected abstract WriteReplicaResult onReplicaShard(ReplicaRequest request, IndexShard replica) throws Exception;
@Override
protected final WritePrimaryResult shardOperationOnPrimary(Request request, IndexShard primary) throws Exception {
return onPrimaryShard(request, primary);
}
@Override
protected final WriteReplicaResult shardOperationOnReplica(ReplicaRequest request, IndexShard replica) throws Exception {
return onReplicaShard(request, replica);
}
protected abstract WriteReplicaResult shardOperationOnReplica(ReplicaRequest request, IndexShard replica) throws Exception;
/**
* Result of taking the action on the primary.

View File

@ -136,19 +136,19 @@ public class TransportWriteActionTests extends ESTestCase {
}
@Override
protected WritePrimaryResult onPrimaryShard(TestRequest request, IndexShard primary) throws Exception {
protected TestResponse newResponseInstance() {
return new TestResponse();
}
@Override
protected WritePrimaryResult shardOperationOnPrimary(TestRequest request, IndexShard primary) throws Exception {
return new WritePrimaryResult(request, new TestResponse(), location, null, primary);
}
@Override
protected WriteReplicaResult onReplicaShard(TestRequest request, IndexShard replica) {
protected WriteReplicaResult shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception {
return new WriteReplicaResult(request, location, null, replica);
}
@Override
protected TestResponse newResponseInstance() {
return new TestResponse();
}
}
private static class TestRequest extends ReplicatedWriteRequest<TestRequest> {