From b4b17e16e04117064b557a4930e819e6d19551e0 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 27 Mar 2019 12:05:53 +0100 Subject: [PATCH] Remove TransportSingleItemBulkWriteAction as replication action (#40424) The implementation of TransportIndexAction and TransportDeleteAction as TransportReplicationAction existed for interoperability with older 5.x nodes, as these older nodes coordinated single index / deletes as replication requests. This BWC layer is no longer needed in 7.x, where these single actions are now mapped to bulk requests. Completely removing the deprecated transport actions is not possible yet if we want to keep BWC with a 6.x transport client. The best way here is to wait for the transport client to go away and then just remove the actions. --- .../TransportSingleItemBulkWriteAction.java | 64 ++----------------- .../action/delete/TransportDeleteAction.java | 21 +----- .../action/index/TransportIndexAction.java | 22 +------ .../TransportReplicationAction.java | 2 +- .../bulk/TransportBulkActionIngestTests.java | 12 +--- 5 files changed, 12 insertions(+), 109 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java index 892daae4bb2..cc97b6237e3 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java @@ -23,19 +23,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; -import org.elasticsearch.action.support.replication.TransportWriteAction; -import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.function.Supplier; @@ -45,68 +38,21 @@ import java.util.function.Supplier; public abstract class TransportSingleItemBulkWriteAction< Request extends ReplicatedWriteRequest, Response extends ReplicationResponse & WriteResponse - > extends TransportWriteAction { + > extends HandledTransportAction { private final TransportBulkAction bulkAction; - private final TransportShardBulkAction shardBulkAction; - - protected TransportSingleItemBulkWriteAction(Settings settings, String actionName, TransportService transportService, - ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, - ShardStateAction shardStateAction, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, - Supplier replicaRequest, String executor, - TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) { - super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, request, replicaRequest, executor); + protected TransportSingleItemBulkWriteAction(String actionName, TransportService transportService, ActionFilters actionFilters, + Supplier request, TransportBulkAction bulkAction) { + super(actionName, transportService, actionFilters, request); this.bulkAction = bulkAction; - this.shardBulkAction = shardBulkAction; } - @Override protected void doExecute(Task task, final Request request, final ActionListener listener) { bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener)); } - @Override - protected WritePrimaryResult shardOperationOnPrimary( - Request request, final IndexShard primary) throws Exception { - BulkItemRequest[] itemRequests = new BulkItemRequest[1]; - WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy(); - request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); - itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request)); - BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests); - WritePrimaryResult bulkResult = - shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary); - assert bulkResult.finalResponseIfSuccessful.getResponses().length == 1 : "expected only one bulk shard response"; - BulkItemResponse itemResponse = bulkResult.finalResponseIfSuccessful.getResponses()[0]; - final Response response; - final Exception failure; - if (itemResponse.isFailed()) { - failure = itemResponse.getFailure().getCause(); - response = null; - } else { - response = (Response) itemResponse.getResponse(); - failure = null; - } - return new WritePrimaryResult<>(request, response, bulkResult.location, failure, primary, logger); - } - - @Override - protected WriteReplicaResult shardOperationOnReplica( - Request replicaRequest, IndexShard replica) throws Exception { - BulkItemRequest[] itemRequests = new BulkItemRequest[1]; - WriteRequest.RefreshPolicy refreshPolicy = replicaRequest.getRefreshPolicy(); - itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) replicaRequest)); - BulkShardRequest bulkShardRequest = new BulkShardRequest(replicaRequest.shardId(), refreshPolicy, itemRequests); - WriteReplicaResult result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica); - // a replica operation can never throw a document-level failure, - // as the same document has been already indexed successfully in the primary - return new WriteReplicaResult<>(replicaRequest, result.location, null, replica, logger); - } - - public static ActionListener wrapBulkResponse(ActionListener listener) { return ActionListener.wrap(bulkItemResponses -> { diff --git a/server/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/server/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 32c599a9f58..5b85f2f9085 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/server/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -20,16 +20,9 @@ package org.elasticsearch.action.delete; import org.elasticsearch.action.bulk.TransportBulkAction; -import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; /** @@ -41,17 +34,7 @@ import org.elasticsearch.transport.TransportService; public class TransportDeleteAction extends TransportSingleItemBulkWriteAction { @Inject - public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService, - IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) { - super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, - actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.WRITE, - bulkAction, shardBulkAction); - } - - @Override - protected DeleteResponse newResponseInstance() { - return new DeleteResponse(); + public TransportDeleteAction(TransportService transportService, ActionFilters actionFilters, TransportBulkAction bulkAction) { + super(DeleteAction.NAME, transportService, actionFilters, DeleteRequest::new, bulkAction); } } diff --git a/server/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/server/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 8480c7be3bb..b8e3b9b89b3 100644 --- a/server/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -20,16 +20,9 @@ package org.elasticsearch.action.index; import org.elasticsearch.action.bulk.TransportBulkAction; -import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; /** @@ -48,18 +41,7 @@ import org.elasticsearch.transport.TransportService; public class TransportIndexAction extends TransportSingleItemBulkWriteAction { @Inject - public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService, - IndicesService indicesService, - ThreadPool threadPool, ShardStateAction shardStateAction, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) { - super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, - actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.WRITE, - bulkAction, shardBulkAction); - } - - @Override - protected IndexResponse newResponseInstance() { - return new IndexResponse(); + public TransportIndexAction(ActionFilters actionFilters, TransportService transportService, TransportBulkAction bulkAction) { + super(IndexAction.NAME, transportService, actionFilters, IndexRequest::new, bulkAction); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index cc0c69418d7..f0ba0a520bd 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -167,6 +167,7 @@ public abstract class TransportReplicationAction< @Override protected void doExecute(Task task, Request request, ActionListener listener) { + assert request.shardId() != null : "request shardId must be set"; new ReroutePhase((ReplicationTask) task, request, listener).run(); } @@ -780,7 +781,6 @@ public abstract class TransportReplicationAction< // resolve all derived request fields, so we can route and apply it resolveRequest(indexMetaData, request); - assert request.shardId() != null : "request shardId must be set in resolveRequest"; assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest"; diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index a13e8af919b..b3ecc590767 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -50,7 +50,6 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.junit.Before; @@ -156,15 +155,8 @@ public class TransportBulkActionIngestTests extends ESTestCase { class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction { TestSingleItemBulkWriteAction(TestTransportBulkAction bulkAction) { - super(SETTINGS, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService, - TransportBulkActionIngestTests.this.clusterService, - null, null, null, new ActionFilters(Collections.emptySet()), null, - IndexRequest::new, IndexRequest::new, ThreadPool.Names.WRITE, bulkAction, null); - } - - @Override - protected IndexResponse newResponseInstance() { - return new IndexResponse(); + super(IndexAction.NAME, TransportBulkActionIngestTests.this.transportService, + new ActionFilters(Collections.emptySet()), IndexRequest::new, bulkAction); } }