better code sharding?
This commit is contained in:
parent
180ceef134
commit
0eaaee160b
|
@ -22,8 +22,8 @@ package org.elasticsearch.action.bulk;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.DocWriteResponse;
|
|
||||||
import org.elasticsearch.action.DocWriteRequest.OpType;
|
import org.elasticsearch.action.DocWriteRequest.OpType;
|
||||||
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
import org.elasticsearch.action.delete.DeleteResponse;
|
import org.elasticsearch.action.delete.DeleteResponse;
|
||||||
import org.elasticsearch.action.index.IndexResponse;
|
import org.elasticsearch.action.index.IndexResponse;
|
||||||
import org.elasticsearch.action.update.UpdateResponse;
|
import org.elasticsearch.action.update.UpdateResponse;
|
||||||
|
|
|
@ -1,107 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to Elasticsearch under one or more contributor
|
|
||||||
* license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright
|
|
||||||
* ownership. Elasticsearch licenses this file to you under
|
|
||||||
* the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
* not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.action.bulk;
|
|
||||||
|
|
||||||
import org.elasticsearch.action.ActionListener;
|
|
||||||
import org.elasticsearch.action.DocWriteRequest;
|
|
||||||
import org.elasticsearch.action.DocWriteResponse;
|
|
||||||
import org.elasticsearch.action.support.WriteRequest;
|
|
||||||
import org.elasticsearch.action.support.WriteResponse;
|
|
||||||
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
|
|
||||||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
|
||||||
import org.elasticsearch.index.translog.Translog;
|
|
||||||
|
|
||||||
public class SingleWriteOperationUtility {
|
|
||||||
|
|
||||||
public static <T extends DocWriteResponse> ActionListener<BulkResponse> wrapBulkResponse(ActionListener<T> listener) {
|
|
||||||
return ActionListener.wrap(bulkItemResponses -> {
|
|
||||||
assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request";
|
|
||||||
BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0];
|
|
||||||
if (bulkItemResponse.isFailed() == false) {
|
|
||||||
listener.onResponse(bulkItemResponse.getResponse());
|
|
||||||
} else {
|
|
||||||
listener.onFailure(bulkItemResponse.getFailure().getCause());
|
|
||||||
}
|
|
||||||
}, listener::onFailure);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static BulkRequest toSingleItemBulkRequest(ReplicatedWriteRequest request) {
|
|
||||||
BulkRequest bulkRequest = new BulkRequest();
|
|
||||||
bulkRequest.add(((DocWriteRequest) request));
|
|
||||||
bulkRequest.setRefreshPolicy(request.getRefreshPolicy());
|
|
||||||
bulkRequest.timeout(request.timeout());
|
|
||||||
bulkRequest.waitForActiveShards(request.waitForActiveShards());
|
|
||||||
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
|
|
||||||
return bulkRequest;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final class ResultHolder<Response extends ReplicationResponse & WriteResponse> {
|
|
||||||
public final Response response;
|
|
||||||
public final Translog.Location location;
|
|
||||||
public final Exception failure;
|
|
||||||
|
|
||||||
public ResultHolder(Response response, Translog.Location location, Exception failure) {
|
|
||||||
this.response = response;
|
|
||||||
this.location = location;
|
|
||||||
this.failure = failure;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <Response extends ReplicationResponse & WriteResponse> ResultHolder<Response>
|
|
||||||
executeSingleItemBulkRequestOnPrimary(ReplicatedWriteRequest request,
|
|
||||||
ThrowableFunction<BulkShardRequest, Tuple<BulkShardResponse,
|
|
||||||
Translog.Location>> executeShardBulkAction) throws Exception {
|
|
||||||
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
|
|
||||||
WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy();
|
|
||||||
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
|
|
||||||
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request));
|
|
||||||
BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests);
|
|
||||||
Tuple<BulkShardResponse, Translog.Location> responseLocationTuple = executeShardBulkAction.apply(bulkShardRequest);
|
|
||||||
BulkShardResponse bulkShardResponse = responseLocationTuple.v1();
|
|
||||||
assert bulkShardResponse.getResponses().length == 1: "expected only one bulk shard response";
|
|
||||||
BulkItemResponse itemResponse = bulkShardResponse.getResponses()[0];
|
|
||||||
final Response response;
|
|
||||||
final Exception failure;
|
|
||||||
if (itemResponse.isFailed()) {
|
|
||||||
failure = itemResponse.getFailure().getCause();
|
|
||||||
response = null;
|
|
||||||
} else {
|
|
||||||
response = (Response) itemResponse.getResponse();
|
|
||||||
failure = null;
|
|
||||||
}
|
|
||||||
return new ResultHolder<>(response, responseLocationTuple.v2(), failure);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ResultHolder executeSingleItemBulkRequestOnReplica(ReplicatedWriteRequest replicaRequest,
|
|
||||||
ThrowableFunction<BulkShardRequest,
|
|
||||||
Translog.Location> executeShardBulkAction) throws Exception {
|
|
||||||
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
|
|
||||||
WriteRequest.RefreshPolicy refreshPolicy = replicaRequest.getRefreshPolicy();
|
|
||||||
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) replicaRequest));
|
|
||||||
BulkShardRequest bulkShardRequest = new BulkShardRequest(replicaRequest.shardId(), refreshPolicy, itemRequests);
|
|
||||||
Translog.Location location = executeShardBulkAction.apply(bulkShardRequest);
|
|
||||||
return new ResultHolder<>(null, location, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public interface ThrowableFunction<T, R> {
|
|
||||||
R apply(T t) throws Exception;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,131 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.action.bulk;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
|
import org.elasticsearch.action.support.WriteRequest;
|
||||||
|
import org.elasticsearch.action.support.WriteResponse;
|
||||||
|
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
|
||||||
|
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||||
|
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||||
|
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
|
import org.elasticsearch.indices.IndicesService;
|
||||||
|
import org.elasticsearch.tasks.Task;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
/** use transport bulk action directly */
|
||||||
|
@Deprecated
|
||||||
|
public abstract class TransportSingleItemBulkWriteAction<
|
||||||
|
Request extends ReplicatedWriteRequest<Request>,
|
||||||
|
Response extends ReplicationResponse & WriteResponse
|
||||||
|
> extends TransportWriteAction<Request, Request, Response> {
|
||||||
|
|
||||||
|
private final TransportBulkAction bulkAction;
|
||||||
|
private final TransportShardBulkAction shardBulkAction;
|
||||||
|
|
||||||
|
|
||||||
|
protected TransportSingleItemBulkWriteAction(Settings settings, String actionName, TransportService transportService,
|
||||||
|
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||||
|
ShardStateAction shardStateAction, ActionFilters actionFilters,
|
||||||
|
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
|
||||||
|
Supplier<Request> replicaRequest, String executor,
|
||||||
|
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
|
||||||
|
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||||
|
indexNameExpressionResolver, request, replicaRequest, executor);
|
||||||
|
this.bulkAction = bulkAction;
|
||||||
|
this.shardBulkAction = shardBulkAction;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doExecute(Task task, final Request request, final ActionListener<Response> listener) {
|
||||||
|
bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
|
||||||
|
Request request, final IndexShard primary) throws Exception {
|
||||||
|
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
|
||||||
|
WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy();
|
||||||
|
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
|
||||||
|
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request));
|
||||||
|
BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests);
|
||||||
|
WritePrimaryResult<BulkShardRequest, BulkShardResponse> bulkResult =
|
||||||
|
shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary);
|
||||||
|
assert bulkResult.finalResponseIfSuccessful.getResponses().length == 1 : "expected only one bulk shard response";
|
||||||
|
BulkItemResponse itemResponse = bulkResult.finalResponseIfSuccessful.getResponses()[0];
|
||||||
|
final Response response;
|
||||||
|
final Exception failure;
|
||||||
|
if (itemResponse.isFailed()) {
|
||||||
|
failure = itemResponse.getFailure().getCause();
|
||||||
|
response = null;
|
||||||
|
} else {
|
||||||
|
response = (Response) itemResponse.getResponse();
|
||||||
|
failure = null;
|
||||||
|
}
|
||||||
|
return new WritePrimaryResult<>(request, response, bulkResult.location, failure, primary, logger);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected WriteReplicaResult<Request> shardOperationOnReplica(
|
||||||
|
Request replicaRequest, IndexShard replica) throws Exception {
|
||||||
|
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
|
||||||
|
WriteRequest.RefreshPolicy refreshPolicy = replicaRequest.getRefreshPolicy();
|
||||||
|
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) replicaRequest));
|
||||||
|
BulkShardRequest bulkShardRequest = new BulkShardRequest(replicaRequest.shardId(), refreshPolicy, itemRequests);
|
||||||
|
WriteReplicaResult<BulkShardRequest> result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica);
|
||||||
|
// nocommit - is the null failure ok?
|
||||||
|
return new WriteReplicaResult<>(replicaRequest, result.location, null, replica, logger);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private ActionListener<BulkResponse> wrapBulkResponse(ActionListener<Response> listener) {
|
||||||
|
return ActionListener.wrap(bulkItemResponses -> {
|
||||||
|
assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request";
|
||||||
|
BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0];
|
||||||
|
if (bulkItemResponse.isFailed() == false) {
|
||||||
|
final DocWriteResponse response = bulkItemResponse.getResponse();
|
||||||
|
listener.onResponse((Response) response);
|
||||||
|
} else {
|
||||||
|
listener.onFailure(bulkItemResponse.getFailure().getCause());
|
||||||
|
}
|
||||||
|
}, listener::onFailure);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BulkRequest toSingleItemBulkRequest(ReplicatedWriteRequest request) {
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
bulkRequest.add(((DocWriteRequest) request));
|
||||||
|
bulkRequest.setRefreshPolicy(request.getRefreshPolicy());
|
||||||
|
bulkRequest.timeout(request.timeout());
|
||||||
|
bulkRequest.waitForActiveShards(request.waitForActiveShards());
|
||||||
|
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
|
||||||
|
return bulkRequest;
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,41 +19,26 @@
|
||||||
|
|
||||||
package org.elasticsearch.action.delete;
|
package org.elasticsearch.action.delete;
|
||||||
|
|
||||||
import org.elasticsearch.action.ActionListener;
|
|
||||||
import org.elasticsearch.action.bulk.BulkShardRequest;
|
|
||||||
import org.elasticsearch.action.bulk.BulkShardResponse;
|
|
||||||
import org.elasticsearch.action.bulk.TransportBulkAction;
|
import org.elasticsearch.action.bulk.TransportBulkAction;
|
||||||
import org.elasticsearch.action.bulk.TransportShardBulkAction;
|
import org.elasticsearch.action.bulk.TransportShardBulkAction;
|
||||||
|
import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
|
||||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.tasks.Task;
|
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnPrimary;
|
|
||||||
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnReplica;
|
|
||||||
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.toSingleItemBulkRequest;
|
|
||||||
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.wrapBulkResponse;
|
|
||||||
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.ResultHolder;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Performs the delete operation.
|
* Performs the delete operation.
|
||||||
*
|
*
|
||||||
* Deprecated use TransportBulkAction with a single item instead
|
* Deprecated use TransportBulkAction with a single item instead
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteRequest, DeleteResponse> {
|
public class TransportDeleteAction extends TransportSingleItemBulkWriteAction<DeleteRequest, DeleteResponse> {
|
||||||
|
|
||||||
private final TransportBulkAction bulkAction;
|
|
||||||
private final TransportShardBulkAction shardBulkAction;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||||
|
@ -61,41 +46,12 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
|
||||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||||
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
|
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
|
||||||
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
|
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
|
||||||
actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX);
|
actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX,
|
||||||
this.bulkAction = bulkAction;
|
bulkAction, shardBulkAction);
|
||||||
this.shardBulkAction = shardBulkAction;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
|
|
||||||
bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected DeleteResponse newResponseInstance() {
|
protected DeleteResponse newResponseInstance() {
|
||||||
return new DeleteResponse();
|
return new DeleteResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected WritePrimaryResult<DeleteRequest, DeleteResponse> shardOperationOnPrimary(
|
|
||||||
DeleteRequest request, IndexShard primary) throws Exception {
|
|
||||||
ResultHolder<DeleteResponse> resultHolder = executeSingleItemBulkRequestOnPrimary(request,
|
|
||||||
bulkShardRequest -> {
|
|
||||||
WritePrimaryResult<BulkShardRequest, BulkShardResponse> result =
|
|
||||||
shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary);
|
|
||||||
return new Tuple<>(result.finalResponseIfSuccessful, result.location);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
return new WritePrimaryResult<>(request, resultHolder.response, resultHolder.location, resultHolder.failure, primary, logger);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected WriteReplicaResult<DeleteRequest> shardOperationOnReplica(
|
|
||||||
DeleteRequest request, IndexShard replica) throws Exception {
|
|
||||||
ResultHolder resultHolder = executeSingleItemBulkRequestOnReplica(request, bulkShardRequest -> {
|
|
||||||
WriteReplicaResult<BulkShardRequest> result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica);
|
|
||||||
return result.location;
|
|
||||||
});
|
|
||||||
return new WriteReplicaResult<>(request, resultHolder.location, null, replica, logger);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,31 +19,19 @@
|
||||||
|
|
||||||
package org.elasticsearch.action.index;
|
package org.elasticsearch.action.index;
|
||||||
|
|
||||||
import org.elasticsearch.action.ActionListener;
|
|
||||||
import org.elasticsearch.action.bulk.BulkShardRequest;
|
|
||||||
import org.elasticsearch.action.bulk.BulkShardResponse;
|
|
||||||
import org.elasticsearch.action.bulk.SingleWriteOperationUtility.ResultHolder;
|
|
||||||
import org.elasticsearch.action.bulk.TransportBulkAction;
|
import org.elasticsearch.action.bulk.TransportBulkAction;
|
||||||
import org.elasticsearch.action.bulk.TransportShardBulkAction;
|
import org.elasticsearch.action.bulk.TransportShardBulkAction;
|
||||||
|
import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
|
||||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.tasks.Task;
|
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnReplica;
|
|
||||||
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnPrimary;
|
|
||||||
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.toSingleItemBulkRequest;
|
|
||||||
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.wrapBulkResponse;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Performs the index operation.
|
* Performs the index operation.
|
||||||
*
|
*
|
||||||
|
@ -57,10 +45,7 @@ import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.wrapBulk
|
||||||
* Deprecated use TransportBulkAction with a single item instead
|
* Deprecated use TransportBulkAction with a single item instead
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexRequest, IndexResponse> {
|
public class TransportIndexAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> {
|
||||||
|
|
||||||
private final TransportBulkAction bulkAction;
|
|
||||||
private final TransportShardBulkAction shardBulkAction;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||||
|
@ -69,43 +54,12 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
||||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||||
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
|
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
|
||||||
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
|
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
|
||||||
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX);
|
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX,
|
||||||
this.bulkAction = bulkAction;
|
bulkAction, shardBulkAction);
|
||||||
this.shardBulkAction = shardBulkAction;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
|
|
||||||
bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected IndexResponse newResponseInstance() {
|
protected IndexResponse newResponseInstance() {
|
||||||
return new IndexResponse();
|
return new IndexResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected WritePrimaryResult<IndexRequest, IndexResponse> shardOperationOnPrimary(
|
|
||||||
IndexRequest request, final IndexShard primary) throws Exception {
|
|
||||||
ResultHolder<IndexResponse> resultHolder = executeSingleItemBulkRequestOnPrimary(request,
|
|
||||||
bulkShardRequest -> {
|
|
||||||
WritePrimaryResult<BulkShardRequest, BulkShardResponse> result =
|
|
||||||
shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary);
|
|
||||||
return new Tuple<>(result.finalResponseIfSuccessful, result.location);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
return new WritePrimaryResult<>(request, resultHolder.response, resultHolder.location, resultHolder.failure, primary, logger);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected WriteReplicaResult<IndexRequest> shardOperationOnReplica(
|
|
||||||
IndexRequest request, IndexShard replica) throws Exception {
|
|
||||||
ResultHolder resultHolder = executeSingleItemBulkRequestOnReplica(request,
|
|
||||||
bulkShardRequest -> {
|
|
||||||
WriteReplicaResult<BulkShardRequest> result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica);
|
|
||||||
return result.location;
|
|
||||||
}
|
|
||||||
);
|
|
||||||
return new WriteReplicaResult<>(request, resultHolder.location, null, replica, logger);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue