diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 0a85cebcefc..adb355d22dc 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -22,8 +22,8 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; -import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.DocWriteRequest.OpType; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateResponse; diff --git a/core/src/main/java/org/elasticsearch/action/bulk/SingleWriteOperationUtility.java b/core/src/main/java/org/elasticsearch/action/bulk/SingleWriteOperationUtility.java deleted file mode 100644 index 8473cb38f9e..00000000000 --- a/core/src/main/java/org/elasticsearch/action/bulk/SingleWriteOperationUtility.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.bulk; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.support.WriteResponse; -import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; -import org.elasticsearch.action.support.replication.ReplicationResponse; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.index.translog.Translog; - -public class SingleWriteOperationUtility { - - public static ActionListener wrapBulkResponse(ActionListener listener) { - return ActionListener.wrap(bulkItemResponses -> { - assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request"; - BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0]; - if (bulkItemResponse.isFailed() == false) { - listener.onResponse(bulkItemResponse.getResponse()); - } else { - listener.onFailure(bulkItemResponse.getFailure().getCause()); - } - }, listener::onFailure); - } - - public static BulkRequest toSingleItemBulkRequest(ReplicatedWriteRequest request) { - BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(((DocWriteRequest) request)); - bulkRequest.setRefreshPolicy(request.getRefreshPolicy()); - bulkRequest.timeout(request.timeout()); - bulkRequest.waitForActiveShards(request.waitForActiveShards()); - request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); - return bulkRequest; - } - - public static final class ResultHolder { - public final Response response; - public final Translog.Location location; - public final Exception failure; - - public ResultHolder(Response response, Translog.Location location, Exception failure) { - this.response = response; - this.location = location; - this.failure = failure; - } - } - - public static ResultHolder - executeSingleItemBulkRequestOnPrimary(ReplicatedWriteRequest request, - ThrowableFunction> executeShardBulkAction) throws Exception { - BulkItemRequest[] itemRequests = new BulkItemRequest[1]; - WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy(); - request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); - itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request)); - BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests); - Tuple responseLocationTuple = executeShardBulkAction.apply(bulkShardRequest); - BulkShardResponse bulkShardResponse = responseLocationTuple.v1(); - assert bulkShardResponse.getResponses().length == 1: "expected only one bulk shard response"; - BulkItemResponse itemResponse = bulkShardResponse.getResponses()[0]; - final Response response; - final Exception failure; - if (itemResponse.isFailed()) { - failure = itemResponse.getFailure().getCause(); - response = null; - } else { - response = (Response) itemResponse.getResponse(); - failure = null; - } - return new ResultHolder<>(response, responseLocationTuple.v2(), failure); - } - - public static ResultHolder executeSingleItemBulkRequestOnReplica(ReplicatedWriteRequest replicaRequest, - ThrowableFunction executeShardBulkAction) throws Exception { - BulkItemRequest[] itemRequests = new BulkItemRequest[1]; - WriteRequest.RefreshPolicy refreshPolicy = replicaRequest.getRefreshPolicy(); - itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) replicaRequest)); - BulkShardRequest bulkShardRequest = new BulkShardRequest(replicaRequest.shardId(), refreshPolicy, itemRequests); - Translog.Location location = executeShardBulkAction.apply(bulkShardRequest); - return new ResultHolder<>(null, location, null); - } - - public interface ThrowableFunction { - R apply(T t) throws Exception; - } -} diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java new file mode 100644 index 00000000000..39a39045f0d --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java @@ -0,0 +1,131 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.WriteResponse; +import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.function.Supplier; + +/** use transport bulk action directly */ +@Deprecated +public abstract class TransportSingleItemBulkWriteAction< + Request extends ReplicatedWriteRequest, + Response extends ReplicationResponse & WriteResponse + > extends TransportWriteAction { + + private final TransportBulkAction bulkAction; + private final TransportShardBulkAction shardBulkAction; + + + protected TransportSingleItemBulkWriteAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, + ShardStateAction shardStateAction, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, + Supplier replicaRequest, String executor, + TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) { + super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, + indexNameExpressionResolver, request, replicaRequest, executor); + this.bulkAction = bulkAction; + this.shardBulkAction = shardBulkAction; + } + + + @Override + protected void doExecute(Task task, final Request request, final ActionListener listener) { + bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener)); + } + + @Override + protected WritePrimaryResult shardOperationOnPrimary( + Request request, final IndexShard primary) throws Exception { + BulkItemRequest[] itemRequests = new BulkItemRequest[1]; + WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy(); + request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); + itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request)); + BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests); + WritePrimaryResult bulkResult = + shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary); + assert bulkResult.finalResponseIfSuccessful.getResponses().length == 1 : "expected only one bulk shard response"; + BulkItemResponse itemResponse = bulkResult.finalResponseIfSuccessful.getResponses()[0]; + final Response response; + final Exception failure; + if (itemResponse.isFailed()) { + failure = itemResponse.getFailure().getCause(); + response = null; + } else { + response = (Response) itemResponse.getResponse(); + failure = null; + } + return new WritePrimaryResult<>(request, response, bulkResult.location, failure, primary, logger); + } + + @Override + protected WriteReplicaResult shardOperationOnReplica( + Request replicaRequest, IndexShard replica) throws Exception { + BulkItemRequest[] itemRequests = new BulkItemRequest[1]; + WriteRequest.RefreshPolicy refreshPolicy = replicaRequest.getRefreshPolicy(); + itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) replicaRequest)); + BulkShardRequest bulkShardRequest = new BulkShardRequest(replicaRequest.shardId(), refreshPolicy, itemRequests); + WriteReplicaResult result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica); + // nocommit - is the null failure ok? + return new WriteReplicaResult<>(replicaRequest, result.location, null, replica, logger); + } + + + private ActionListener wrapBulkResponse(ActionListener listener) { + return ActionListener.wrap(bulkItemResponses -> { + assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request"; + BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0]; + if (bulkItemResponse.isFailed() == false) { + final DocWriteResponse response = bulkItemResponse.getResponse(); + listener.onResponse((Response) response); + } else { + listener.onFailure(bulkItemResponse.getFailure().getCause()); + } + }, listener::onFailure); + } + + public static BulkRequest toSingleItemBulkRequest(ReplicatedWriteRequest request) { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(((DocWriteRequest) request)); + bulkRequest.setRefreshPolicy(request.getRefreshPolicy()); + bulkRequest.timeout(request.timeout()); + bulkRequest.waitForActiveShards(request.waitForActiveShards()); + request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); + return bulkRequest; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 5a8cbbc5bcc..3aaf4a472fa 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -19,41 +19,26 @@ package org.elasticsearch.action.delete; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkShardRequest; -import org.elasticsearch.action.bulk.BulkShardResponse; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkAction; +import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnPrimary; -import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnReplica; -import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.toSingleItemBulkRequest; -import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.wrapBulkResponse; -import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.ResultHolder; - /** * Performs the delete operation. * * Deprecated use TransportBulkAction with a single item instead */ @Deprecated -public class TransportDeleteAction extends TransportWriteAction { - - private final TransportBulkAction bulkAction; - private final TransportShardBulkAction shardBulkAction; +public class TransportDeleteAction extends TransportSingleItemBulkWriteAction { @Inject public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService, @@ -61,41 +46,12 @@ public class TransportDeleteAction extends TransportWriteAction listener) { - bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener)); + actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX, + bulkAction, shardBulkAction); } @Override protected DeleteResponse newResponseInstance() { return new DeleteResponse(); } - - @Override - protected WritePrimaryResult shardOperationOnPrimary( - DeleteRequest request, IndexShard primary) throws Exception { - ResultHolder resultHolder = executeSingleItemBulkRequestOnPrimary(request, - bulkShardRequest -> { - WritePrimaryResult result = - shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary); - return new Tuple<>(result.finalResponseIfSuccessful, result.location); - } - ); - return new WritePrimaryResult<>(request, resultHolder.response, resultHolder.location, resultHolder.failure, primary, logger); - } - - @Override - protected WriteReplicaResult shardOperationOnReplica( - DeleteRequest request, IndexShard replica) throws Exception { - ResultHolder resultHolder = executeSingleItemBulkRequestOnReplica(request, bulkShardRequest -> { - WriteReplicaResult result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica); - return result.location; - }); - return new WriteReplicaResult<>(request, resultHolder.location, null, replica, logger); - } } diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 3b3c29b5a08..88a210c7180 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -19,31 +19,19 @@ package org.elasticsearch.action.index; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkShardRequest; -import org.elasticsearch.action.bulk.BulkShardResponse; -import org.elasticsearch.action.bulk.SingleWriteOperationUtility.ResultHolder; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkAction; +import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnReplica; -import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnPrimary; -import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.toSingleItemBulkRequest; -import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.wrapBulkResponse; - /** * Performs the index operation. * @@ -57,10 +45,7 @@ import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.wrapBulk * Deprecated use TransportBulkAction with a single item instead */ @Deprecated -public class TransportIndexAction extends TransportWriteAction { - - private final TransportBulkAction bulkAction; - private final TransportShardBulkAction shardBulkAction; +public class TransportIndexAction extends TransportSingleItemBulkWriteAction { @Inject public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService, @@ -69,43 +54,12 @@ public class TransportIndexAction extends TransportWriteAction listener) { - bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener)); + actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX, + bulkAction, shardBulkAction); } @Override protected IndexResponse newResponseInstance() { return new IndexResponse(); } - - @Override - protected WritePrimaryResult shardOperationOnPrimary( - IndexRequest request, final IndexShard primary) throws Exception { - ResultHolder resultHolder = executeSingleItemBulkRequestOnPrimary(request, - bulkShardRequest -> { - WritePrimaryResult result = - shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary); - return new Tuple<>(result.finalResponseIfSuccessful, result.location); - } - ); - return new WritePrimaryResult<>(request, resultHolder.response, resultHolder.location, resultHolder.failure, primary, logger); - } - - @Override - protected WriteReplicaResult shardOperationOnReplica( - IndexRequest request, IndexShard replica) throws Exception { - ResultHolder resultHolder = executeSingleItemBulkRequestOnReplica(request, - bulkShardRequest -> { - WriteReplicaResult result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica); - return result.location; - } - ); - return new WriteReplicaResult<>(request, resultHolder.location, null, replica, logger); - } }