From 8b7b2f3cdf632ca41f9164e03b00aa6baea1090e Mon Sep 17 00:00:00 2001 From: Britta Weber Date: Mon, 4 May 2015 10:52:20 +0200 Subject: [PATCH] remove replica response from TransportShardReplicationOperation. not needed anymore --- .../action/bulk/TransportShardBulkAction.java | 11 +- .../action/delete/TransportDeleteAction.java | 11 +- .../TransportDeleteByQueryAction.java | 3 +- .../TransportIndexDeleteByQueryAction.java | 3 +- .../TransportShardDeleteByQueryAction.java | 11 +- .../action/index/TransportIndexAction.java | 15 +-- .../TransportBroadcastOperationAction.java | 7 +- ...nsportIndexReplicationOperationAction.java | 7 +- ...portIndicesReplicationOperationAction.java | 7 +- ...nsportShardReplicationOperationAction.java | 115 ++++++------------ 10 files changed, 56 insertions(+), 134 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 16455b0d1be..245d7d16033 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -60,7 +60,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.river.RiverIndexName; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.util.Map; @@ -68,7 +67,7 @@ import java.util.Map; /** * Performs the index operation. */ -public class TransportShardBulkAction extends TransportShardReplicationOperationAction { +public class TransportShardBulkAction extends TransportShardReplicationOperationAction { private final static String OP_TYPE_UPDATE = "update"; private final static String OP_TYPE_DELETE = "delete"; @@ -119,11 +118,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation return new BulkShardResponse(); } - @Override - protected TransportResponse.Empty newReplicaResponseInstance() { - return TransportResponse.Empty.INSTANCE; - } - @Override protected boolean resolveIndex() { return false; @@ -534,7 +528,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation @Override - protected TransportResponse.Empty shardOperationOnReplica(ReplicaOperationRequest shardRequest) { + protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id()); final BulkShardRequest request = shardRequest.request; for (int i = 0; i < request.items().length; i++) { @@ -586,7 +580,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation // ignore } } - return newReplicaResponseInstance(); } private void applyVersion(BulkItemRequest item, long version, VersionType versionType) { diff --git a/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index efdea2cb3b5..86ffac8bcce 100644 --- a/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -43,13 +43,12 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; /** * Performs the delete operation. */ -public class TransportDeleteAction extends TransportShardReplicationOperationAction { +public class TransportDeleteAction extends TransportShardReplicationOperationAction { private final AutoCreateIndex autoCreateIndex; @@ -141,11 +140,6 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct return new DeleteResponse(); } - @Override - protected TransportResponse.Empty newReplicaResponseInstance() { - return TransportResponse.Empty.INSTANCE; - } - @Override protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { DeleteRequest request = shardRequest.request; @@ -171,7 +165,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct } @Override - protected TransportResponse.Empty shardOperationOnReplica(ReplicaOperationRequest shardRequest) { + protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { DeleteRequest request = shardRequest.request; IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id()); Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA); @@ -185,7 +179,6 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct // ignore } } - return newReplicaResponseInstance(); } @Override diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java index 2571eecef5f..0800a639a81 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java @@ -32,7 +32,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.util.Map; @@ -41,7 +40,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray; /** */ -public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction { +public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction { private final DestructiveOperations destructiveOperations; diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java index a76a0ee9e23..607459e7798 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java @@ -30,14 +30,13 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse; import java.util.List; /** * Internal transport action that broadcasts a delete by query request to all of the shards that belong to an index. */ -public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction { +public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction { private static final String ACTION_NAME = DeleteByQueryAction.NAME + "[index]"; diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java index aeb852e967e..6e364302e83 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java @@ -42,13 +42,12 @@ import org.elasticsearch.search.internal.DefaultSearchContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchLocalRequest; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; /** * */ -public class TransportShardDeleteByQueryAction extends TransportShardReplicationOperationAction { +public class TransportShardDeleteByQueryAction extends TransportShardReplicationOperationAction { public final static String DELETE_BY_QUERY_API = "delete_by_query"; @@ -94,11 +93,6 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication return new ShardDeleteByQueryResponse(); } - @Override - protected TransportResponse.Empty newReplicaResponseInstance() { - return TransportResponse.Empty.INSTANCE; - } - @Override protected boolean resolveIndex() { return false; @@ -127,7 +121,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication @Override - protected TransportResponse.Empty shardOperationOnReplica(ReplicaOperationRequest shardRequest) { + protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { ShardDeleteByQueryRequest request = shardRequest.request; IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()); IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); @@ -144,7 +138,6 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication SearchContext.removeCurrent(); } } - return newReplicaResponseInstance(); } @Override diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index e165253f0b0..79ea496c317 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -49,7 +49,6 @@ import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.river.RiverIndexName; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; /** @@ -62,7 +61,7 @@ import org.elasticsearch.transport.TransportService; *
  • allowIdGeneration: If the id is set not, should it be generated. Defaults to true. * */ -public class TransportIndexAction extends TransportShardReplicationOperationAction { +public class TransportIndexAction extends TransportShardReplicationOperationAction { private final AutoCreateIndex autoCreateIndex; @@ -157,11 +156,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi return new IndexResponse(); } - @Override - protected TransportResponse.Empty newReplicaResponseInstance() { - return TransportResponse.Empty.INSTANCE; - } - @Override protected String executor() { return ThreadPool.Names.INDEX; @@ -245,7 +239,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } @Override - protected TransportResponse.Empty shardOperationOnReplica(ReplicaOperationRequest shardRequest) { + protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id()); IndexRequest request = shardRequest.request; SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id()) @@ -265,10 +259,5 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi // ignore } } - return newReplicaResponseInstance(); - } - - public String getReplicaActionName() { - return IndexAction.NAME + "[r]"; } } diff --git a/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java b/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java index 81676226f90..09d7bd55447 100644 --- a/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -59,16 +60,12 @@ public abstract class TransportBroadcastOperationAction listener) { new AsyncBroadcastAction(request, listener).start(); diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java index c4dddc7e361..53c4984bfc8 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java @@ -36,7 +36,6 @@ import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse; import java.util.ArrayList; import java.util.Arrays; @@ -49,15 +48,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray; * It relies on a shard sub-action that gets sent over the transport and executed on each of the shard. * The index provided with the request is expected to be a concrete index, properly resolved by the callers (parent actions). */ -public abstract class TransportIndexReplicationOperationAction +public abstract class TransportIndexReplicationOperationAction extends TransportAction { protected final ClusterService clusterService; - protected final TransportShardReplicationOperationAction shardAction; + protected final TransportShardReplicationOperationAction shardAction; protected TransportIndexReplicationOperationAction(Settings settings, String actionName, ClusterService clusterService, - ThreadPool threadPool, TransportShardReplicationOperationAction shardAction, ActionFilters actionFilters) { + ThreadPool threadPool, TransportShardReplicationOperationAction shardAction, ActionFilters actionFilters) { super(settings, actionName, threadPool, actionFilters); this.clusterService = clusterService; this.shardAction = shardAction; diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java index c53b9bbb65d..e2a811202c1 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java @@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportRequestHandler; import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.util.Map; @@ -43,15 +42,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray; /** */ public abstract class TransportIndicesReplicationOperationAction + ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse> extends TransportAction { protected final ClusterService clusterService; - protected final TransportIndexReplicationOperationAction indexAction; + protected final TransportIndexReplicationOperationAction indexAction; protected TransportIndicesReplicationOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - TransportIndexReplicationOperationAction indexAction, ActionFilters actionFilters) { + TransportIndexReplicationOperationAction indexAction, ActionFilters actionFilters) { super(settings, actionName, threadPool, actionFilters); this.clusterService = clusterService; this.indexAction = indexAction; diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 244d624cb15..df99d045177 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -59,14 +59,12 @@ import org.elasticsearch.transport.*; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceArray; /** */ -public abstract class TransportShardReplicationOperationAction extends TransportAction { +public abstract class TransportShardReplicationOperationAction extends TransportAction { protected final TransportService transportService; protected final ClusterService clusterService; @@ -88,22 +86,18 @@ public abstract class TransportShardReplicationOperationAction listener) { new AsyncShardOperationAction(request, listener).start(); @@ -115,17 +109,15 @@ public abstract class TransportShardReplicationOperationAction shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable; + protected abstract Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable; - protected abstract ReplicaResponse shardOperationOnReplica(ReplicaOperationRequest shardRequest); + protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest); protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException; @@ -224,7 +216,7 @@ public abstract class TransportShardReplicationOperationAction { + class ReplicaOperationTransportHandler extends BaseTransportRequestHandler { @Override public ReplicaOperationRequest newInstance() { @@ -244,14 +236,13 @@ public abstract class TransportShardReplicationOperationAction() { - @Override - public ReplicaResponse newInstance() { - return newReplicaResponseInstance(); - } + transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleResponse(TransportResponse.Empty vResponse) { + state.onReplicaSuccess(); + } - @Override - public void handleResponse(ReplicaResponse vResponse) { - state.onReplicaSuccess(vResponse); - } + @Override + public void handleException(TransportException exp) { + state.onReplicaFailure(nodeId, exp); + logger.trace("[{}] Transport failure during replica request [{}] ", exp, node, internalRequest.request()); + if (!ignoreReplicaException(exp)) { + logger.warn("Failed to perform " + actionName + " on remote replica " + node + shardIt.shardId(), exp); + shardStateAction.shardFailed(shard, indexMetaData.getUUID(), + "Failed to perform [" + actionName + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]"); + } + } - @Override - public void handleException(TransportException exp) { - state.onReplicaFailure(nodeId, exp); - logger.trace("[{}] Transport failure during replica request [{}] ", exp, node, internalRequest.request()); - if (!ignoreReplicaException(exp)) { - logger.warn("Failed to perform " + actionName + " on remote replica " + node + shardIt.shardId(), exp); - shardStateAction.shardFailed(shard, indexMetaData.getUUID(), - "Failed to perform [" + actionName + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]"); - } - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - }); + }); } else { if (internalRequest.request().operationThreaded()) { try { @@ -687,8 +655,8 @@ public abstract class TransportShardReplicationOperationAction 2) { // only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to) requiredNumber = (shardRoutingTable.getSize() / 2) + 1; @@ -770,7 +738,7 @@ public abstract class TransportShardReplicationOperationAction shardReplicaFailures = ConcurrentCollections.newConcurrentMap(); - // nocommit the Broadcast operations use AtomicReferencArray, Boaz wants to figure out why, this here is just a hack - private final CopyOnWriteArrayList replicaResponses = new CopyOnWriteArrayList<>(); private final AtomicInteger pending; private final int numberOfShardInstances; @@ -834,9 +800,8 @@ public abstract class TransportShardReplicationOperationAction replicaResponses) { - return finalResponse; - } - /** * Internal request class that gets built on each node. Holds the original request plus additional info. */