From 1b1f484c2840d42b2da3a6fdd8c773d1c98f7508 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 13 Oct 2016 14:37:08 -0400 Subject: [PATCH] Distinguish primary and replica request types in TransportWriteAction --- .../action/bulk/TransportShardBulkAction.java | 4 ++-- .../action/delete/TransportDeleteAction.java | 4 ++-- .../action/index/TransportIndexAction.java | 4 ++-- .../support/replication/TransportWriteAction.java | 15 ++++++++------- .../replication/TransportWriteActionTests.java | 4 ++-- 5 files changed, 16 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index c7cfd6330cb..15e6a5efa31 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -66,7 +66,7 @@ import static org.elasticsearch.action.support.replication.ReplicationOperation. import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException; /** Performs shard-level bulk (index, delete or update) operations */ -public class TransportShardBulkAction extends TransportWriteAction { +public class TransportShardBulkAction extends TransportWriteAction { public static final String ACTION_NAME = BulkAction.NAME + "[s]"; @@ -80,7 +80,7 @@ public class TransportShardBulkAction extends TransportWriteAction { +public class TransportDeleteAction extends TransportWriteAction { private final AutoCreateIndex autoCreateIndex; private final TransportCreateIndexAction createIndexAction; @@ -61,7 +61,7 @@ public class TransportDeleteAction extends TransportWriteActionallowIdGeneration: If the id is set not, should it be generated. Defaults to true. * */ -public class TransportIndexAction extends TransportWriteAction { +public class TransportIndexAction extends TransportWriteAction { private final AutoCreateIndex autoCreateIndex; private final boolean allowIdGeneration; @@ -76,7 +76,7 @@ public class TransportIndexAction extends TransportWriteAction, + ReplicaRequest extends ReplicatedWriteRequest, Response extends ReplicationResponse & WriteResponse - > extends TransportReplicationAction { + > extends TransportReplicationAction { protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, - String executor) { + Supplier replicaRequest, String executor) { super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, request, request, executor); + indexNameExpressionResolver, request, replicaRequest, executor); } /** @@ -68,16 +69,16 @@ public abstract class TransportWriteAction< * * @return the translog location of the {@linkplain IndexShard} after the write was completed or null if no write occurred */ - protected abstract Translog.Location onReplicaShard(Request request, IndexShard indexShard); + protected abstract Translog.Location onReplicaShard(ReplicaRequest request, IndexShard indexShard); @Override protected final WritePrimaryResult shardOperationOnPrimary(Request request, IndexShard primary) throws Exception { WriteResult result = onPrimaryShard(request, primary); - return new WritePrimaryResult(request, result.getResponse(), result.getLocation(), primary); + return new WritePrimaryResult(((ReplicaRequest) request), result.getResponse(), result.getLocation(), primary); } @Override - protected final WriteReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { + protected final WriteReplicaResult shardOperationOnReplica(ReplicaRequest request, IndexShard replica) { Translog.Location location = onReplicaShard(request, replica); return new WriteReplicaResult(replica, request, location); } @@ -110,7 +111,7 @@ public abstract class TransportWriteAction< boolean finishedAsyncActions; ActionListener listener = null; - public WritePrimaryResult(Request request, Response finalResponse, + public WritePrimaryResult(ReplicaRequest request, Response finalResponse, @Nullable Translog.Location location, IndexShard indexShard) { super(request, finalResponse); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 14afe4dee9b..2a1471fa746 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -128,12 +128,12 @@ public class TransportWriteActionTests extends ESTestCase { resultChecker.accept(listener.response, forcedRefresh); } - private class TestAction extends TransportWriteAction { + private class TestAction extends TransportWriteAction { protected TestAction() { super(Settings.EMPTY, "test", new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null), null, null, null, null, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, - ThreadPool.Names.SAME); + TestRequest::new, ThreadPool.Names.SAME); } @Override