Remove TransportSingleItemBulkWriteAction as replication action (#40424)

The implementation of TransportIndexAction and TransportDeleteAction as
TransportReplicationAction existed for interoperability with older 5.x nodes, as these older nodes
coordinated single index / deletes as replication requests. This BWC layer is no longer needed in 7.x,
where these single actions are now mapped to bulk requests. Completely removing the deprecated
transport actions is not possible yet if we want to keep BWC with a 6.x transport client. The best
way here is to wait for the transport client to go away and then just remove the actions.
This commit is contained in:
Yannick Welsch 2019-03-27 12:05:53 +01:00
parent 856789f791
commit b4b17e16e0
5 changed files with 12 additions and 109 deletions

View File

@ -23,19 +23,12 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -45,68 +38,21 @@ import java.util.function.Supplier;
public abstract class TransportSingleItemBulkWriteAction< public abstract class TransportSingleItemBulkWriteAction<
Request extends ReplicatedWriteRequest<Request>, Request extends ReplicatedWriteRequest<Request>,
Response extends ReplicationResponse & WriteResponse Response extends ReplicationResponse & WriteResponse
> extends TransportWriteAction<Request, Request, Response> { > extends HandledTransportAction<Request, Response> {
private final TransportBulkAction bulkAction; private final TransportBulkAction bulkAction;
private final TransportShardBulkAction shardBulkAction;
protected TransportSingleItemBulkWriteAction(String actionName, TransportService transportService, ActionFilters actionFilters,
protected TransportSingleItemBulkWriteAction(Settings settings, String actionName, TransportService transportService, Supplier<Request> request, TransportBulkAction bulkAction) {
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, super(actionName, transportService, actionFilters, request);
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<Request> replicaRequest, String executor,
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor);
this.bulkAction = bulkAction; this.bulkAction = bulkAction;
this.shardBulkAction = shardBulkAction;
} }
@Override @Override
protected void doExecute(Task task, final Request request, final ActionListener<Response> listener) { protected void doExecute(Task task, final Request request, final ActionListener<Response> listener) {
bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener)); bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener));
} }
@Override
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
Request request, final IndexShard primary) throws Exception {
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy();
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest<?>) request));
BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests);
WritePrimaryResult<BulkShardRequest, BulkShardResponse> bulkResult =
shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary);
assert bulkResult.finalResponseIfSuccessful.getResponses().length == 1 : "expected only one bulk shard response";
BulkItemResponse itemResponse = bulkResult.finalResponseIfSuccessful.getResponses()[0];
final Response response;
final Exception failure;
if (itemResponse.isFailed()) {
failure = itemResponse.getFailure().getCause();
response = null;
} else {
response = (Response) itemResponse.getResponse();
failure = null;
}
return new WritePrimaryResult<>(request, response, bulkResult.location, failure, primary, logger);
}
@Override
protected WriteReplicaResult<Request> shardOperationOnReplica(
Request replicaRequest, IndexShard replica) throws Exception {
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
WriteRequest.RefreshPolicy refreshPolicy = replicaRequest.getRefreshPolicy();
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest<?>) replicaRequest));
BulkShardRequest bulkShardRequest = new BulkShardRequest(replicaRequest.shardId(), refreshPolicy, itemRequests);
WriteReplicaResult<BulkShardRequest> result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica);
// a replica operation can never throw a document-level failure,
// as the same document has been already indexed successfully in the primary
return new WriteReplicaResult<>(replicaRequest, result.location, null, replica, logger);
}
public static <Response extends ReplicationResponse & WriteResponse> public static <Response extends ReplicationResponse & WriteResponse>
ActionListener<BulkResponse> wrapBulkResponse(ActionListener<Response> listener) { ActionListener<BulkResponse> wrapBulkResponse(ActionListener<Response> listener) {
return ActionListener.wrap(bulkItemResponses -> { return ActionListener.wrap(bulkItemResponses -> {

View File

@ -20,16 +20,9 @@
package org.elasticsearch.action.delete; package org.elasticsearch.action.delete;
import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction; import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
/** /**
@ -41,17 +34,7 @@ import org.elasticsearch.transport.TransportService;
public class TransportDeleteAction extends TransportSingleItemBulkWriteAction<DeleteRequest, DeleteResponse> { public class TransportDeleteAction extends TransportSingleItemBulkWriteAction<DeleteRequest, DeleteResponse> {
@Inject @Inject
public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService, public TransportDeleteAction(TransportService transportService, ActionFilters actionFilters, TransportBulkAction bulkAction) {
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, super(DeleteAction.NAME, transportService, actionFilters, DeleteRequest::new, bulkAction);
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.WRITE,
bulkAction, shardBulkAction);
}
@Override
protected DeleteResponse newResponseInstance() {
return new DeleteResponse();
} }
} }

View File

@ -20,16 +20,9 @@
package org.elasticsearch.action.index; package org.elasticsearch.action.index;
import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction; import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
/** /**
@ -48,18 +41,7 @@ import org.elasticsearch.transport.TransportService;
public class TransportIndexAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> { public class TransportIndexAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> {
@Inject @Inject
public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService, public TransportIndexAction(ActionFilters actionFilters, TransportService transportService, TransportBulkAction bulkAction) {
IndicesService indicesService, super(IndexAction.NAME, transportService, actionFilters, IndexRequest::new, bulkAction);
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.WRITE,
bulkAction, shardBulkAction);
}
@Override
protected IndexResponse newResponseInstance() {
return new IndexResponse();
} }
} }

View File

@ -167,6 +167,7 @@ public abstract class TransportReplicationAction<
@Override @Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) { protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
assert request.shardId() != null : "request shardId must be set";
new ReroutePhase((ReplicationTask) task, request, listener).run(); new ReroutePhase((ReplicationTask) task, request, listener).run();
} }
@ -780,7 +781,6 @@ public abstract class TransportReplicationAction<
// resolve all derived request fields, so we can route and apply it // resolve all derived request fields, so we can route and apply it
resolveRequest(indexMetaData, request); resolveRequest(indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
"request waitForActiveShards must be set in resolveRequest"; "request waitForActiveShards must be set in resolveRequest";

View File

@ -50,7 +50,6 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.junit.Before; import org.junit.Before;
@ -156,15 +155,8 @@ public class TransportBulkActionIngestTests extends ESTestCase {
class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> { class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> {
TestSingleItemBulkWriteAction(TestTransportBulkAction bulkAction) { TestSingleItemBulkWriteAction(TestTransportBulkAction bulkAction) {
super(SETTINGS, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService, super(IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
TransportBulkActionIngestTests.this.clusterService, new ActionFilters(Collections.emptySet()), IndexRequest::new, bulkAction);
null, null, null, new ActionFilters(Collections.emptySet()), null,
IndexRequest::new, IndexRequest::new, ThreadPool.Names.WRITE, bulkAction, null);
}
@Override
protected IndexResponse newResponseInstance() {
return new IndexResponse();
} }
} }