refactor and cleanup transport request handling

This refactoring and cleanup is that each request handler ends up
implementing too many methods that can be provided when the request handler itself
is registered, including a prototype like class that can be used to instantiate
new request instances for streaming.
closes #10730
This commit is contained in:
Shay Banon 2015-04-21 10:20:34 +02:00
parent cb615ffecf
commit 8dbb79c96a
138 changed files with 518 additions and 2334 deletions

View File

@ -45,7 +45,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
@Inject
public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ClusterName clusterName, ActionFilters actionFilters) {
super(settings, ClusterHealthAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, ClusterHealthAction.NAME, transportService, clusterService, threadPool, actionFilters, ClusterHealthRequest.class);
this.clusterName = clusterName;
}
@ -60,11 +60,6 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
return null; // we want users to be able to call this even when there are global blocks, just to check the health (are there blocks?)
}
@Override
protected ClusterHealthRequest newRequest() {
return new ClusterHealthRequest();
}
@Override
protected ClusterHealthResponse newResponse() {
return new ClusterHealthResponse();

View File

@ -38,6 +38,11 @@ public class NodesHotThreadsRequest extends NodesOperationRequest<NodesHotThread
int snapshots = 10;
boolean ignoreIdleThreads = true;
// for serialization
NodesHotThreadsRequest() {
}
/**
* Get hot threads from nodes based on the nodes ids specified. If none are passed, hot
* threads for all nodes is used.

View File

@ -46,12 +46,8 @@ public class TransportNodesHotThreadsAction extends TransportNodesOperationActio
@Inject
public TransportNodesHotThreadsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(settings, NodesHotThreadsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters);
}
@Override
protected String executor() {
return ThreadPool.Names.GENERIC;
super(settings, NodesHotThreadsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
NodesHotThreadsRequest.class, NodeRequest.class, ThreadPool.Names.GENERIC);
}
@Override
@ -66,16 +62,6 @@ public class TransportNodesHotThreadsAction extends TransportNodesOperationActio
return new NodesHotThreadsResponse(clusterName, nodes.toArray(new NodeHotThreads[nodes.size()]));
}
@Override
protected NodesHotThreadsRequest newRequestInstance() {
return new NodesHotThreadsRequest();
}
@Override
protected NodeRequest newNodeRequest() {
return new NodeRequest();
}
@Override
protected NodeRequest newNodeRequest(String nodeId, NodesHotThreadsRequest request) {
return new NodeRequest(nodeId, request);

View File

@ -49,15 +49,11 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction<Node
public TransportNodesInfoAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
NodeService nodeService, ActionFilters actionFilters) {
super(settings, NodesInfoAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters);
super(settings, NodesInfoAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
NodesInfoRequest.class, NodeInfoRequest.class, ThreadPool.Names.MANAGEMENT);
this.nodeService = nodeService;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected NodesInfoResponse newResponse(NodesInfoRequest nodesInfoRequest, AtomicReferenceArray responses) {
final List<NodeInfo> nodesInfos = new ArrayList<>();
@ -70,16 +66,6 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction<Node
return new NodesInfoResponse(clusterName, nodesInfos.toArray(new NodeInfo[nodesInfos.size()]));
}
@Override
protected NodesInfoRequest newRequestInstance() {
return new NodesInfoRequest();
}
@Override
protected NodeInfoRequest newNodeRequest() {
return new NodeInfoRequest();
}
@Override
protected NodeInfoRequest newNodeRequest(String nodeId, NodesInfoRequest request) {
return new NodeInfoRequest(nodeId, request);

View File

@ -26,7 +26,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
public final class TransportLivenessAction extends BaseTransportRequestHandler<LivenessRequest> {
public final class TransportLivenessAction implements TransportRequestHandler<LivenessRequest> {
private final ClusterService clusterService;
private final ClusterName clusterName;
@ -37,21 +37,11 @@ public final class TransportLivenessAction extends BaseTransportRequestHandler<L
ClusterService clusterService, TransportService transportService) {
this.clusterService = clusterService;
this.clusterName = clusterName;
transportService.registerHandler(NAME, this);
}
@Override
public LivenessRequest newInstance() {
return new LivenessRequest();
transportService.registerRequestHandler(NAME, LivenessRequest.class, ThreadPool.Names.SAME, this);
}
@Override
public void messageReceived(LivenessRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(new LivenessResponse(clusterName, clusterService.localNode()));
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}

View File

@ -59,13 +59,13 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
@Inject
public TransportNodesShutdownAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
Node node, ClusterName clusterName, ActionFilters actionFilters) {
super(settings, NodesShutdownAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, NodesShutdownAction.NAME, transportService, clusterService, threadPool, actionFilters, NodesShutdownRequest.class);
this.node = node;
this.clusterName = clusterName;
this.disabled = settings.getAsBoolean("action.disable_shutdown", this.settings.getAsBoolean("action.admin.cluster.node.shutdown.disabled", false));
this.delay = this.settings.getAsTime("action.admin.cluster.node.shutdown.delay", TimeValue.timeValueMillis(200));
this.transportService.registerHandler(SHUTDOWN_NODE_ACTION_NAME, new NodeShutdownRequestHandler());
this.transportService.registerRequestHandler(SHUTDOWN_NODE_ACTION_NAME, NodeShutdownRequest.class, ThreadPool.Names.SAME, new NodeShutdownRequestHandler());
}
@Override
@ -79,11 +79,6 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected NodesShutdownRequest newRequest() {
return new NodesShutdownRequest();
}
@Override
protected NodesShutdownResponse newResponse() {
return new NodesShutdownResponse();
@ -229,17 +224,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
listener.onResponse(new NodesShutdownResponse(clusterName, nodes.toArray(DiscoveryNode.class)));
}
private class NodeShutdownRequestHandler extends BaseTransportRequestHandler<NodeShutdownRequest> {
@Override
public NodeShutdownRequest newInstance() {
return new NodeShutdownRequest();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
private class NodeShutdownRequestHandler implements TransportRequestHandler<NodeShutdownRequest> {
@Override
public void messageReceived(final NodeShutdownRequest request, TransportChannel channel) throws Exception {

View File

@ -49,15 +49,11 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
public TransportNodesStatsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
NodeService nodeService, ActionFilters actionFilters) {
super(settings, NodesStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters);
super(settings, NodesStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
NodesStatsRequest.class, NodeStatsRequest.class, ThreadPool.Names.MANAGEMENT);
this.nodeService = nodeService;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected NodesStatsResponse newResponse(NodesStatsRequest nodesInfoRequest, AtomicReferenceArray responses) {
final List<NodeStats> nodeStats = Lists.newArrayList();
@ -70,16 +66,6 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
return new NodesStatsResponse(clusterName, nodeStats.toArray(new NodeStats[nodeStats.size()]));
}
@Override
protected NodesStatsRequest newRequestInstance() {
return new NodesStatsRequest();
}
@Override
protected NodeStatsRequest newNodeRequest() {
return new NodeStatsRequest();
}
@Override
protected NodeStatsRequest newNodeRequest(String nodeId, NodesStatsRequest request) {
return new NodeStatsRequest(nodeId, request);

View File

@ -44,7 +44,7 @@ public class TransportDeleteRepositoryAction extends TransportMasterNodeOperatio
@Inject
public TransportDeleteRepositoryAction(Settings settings, TransportService transportService, ClusterService clusterService,
RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, DeleteRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, DeleteRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters, DeleteRepositoryRequest.class);
this.repositoriesService = repositoriesService;
}
@ -53,11 +53,6 @@ public class TransportDeleteRepositoryAction extends TransportMasterNodeOperatio
return ThreadPool.Names.SAME;
}
@Override
protected DeleteRepositoryRequest newRequest() {
return new DeleteRepositoryRequest();
}
@Override
protected DeleteRepositoryResponse newResponse() {
return new DeleteRepositoryResponse();

View File

@ -45,7 +45,7 @@ public class TransportGetRepositoriesAction extends TransportMasterNodeReadOpera
@Inject
public TransportGetRepositoriesAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, GetRepositoriesAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, GetRepositoriesAction.NAME, transportService, clusterService, threadPool, actionFilters, GetRepositoriesRequest.class);
}
@Override
@ -53,11 +53,6 @@ public class TransportGetRepositoriesAction extends TransportMasterNodeReadOpera
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected GetRepositoriesRequest newRequest() {
return new GetRepositoriesRequest();
}
@Override
protected GetRepositoriesResponse newResponse() {
return new GetRepositoriesResponse();

View File

@ -44,7 +44,7 @@ public class TransportPutRepositoryAction extends TransportMasterNodeOperationAc
@Inject
public TransportPutRepositoryAction(Settings settings, TransportService transportService, ClusterService clusterService,
RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, PutRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, PutRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters, PutRepositoryRequest.class);
this.repositoriesService = repositoriesService;
}
@ -53,11 +53,6 @@ public class TransportPutRepositoryAction extends TransportMasterNodeOperationAc
return ThreadPool.Names.SAME;
}
@Override
protected PutRepositoryRequest newRequest() {
return new PutRepositoryRequest();
}
@Override
protected PutRepositoryResponse newResponse() {
return new PutRepositoryResponse();

View File

@ -47,7 +47,7 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeOperatio
@Inject
public TransportVerifyRepositoryAction(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, VerifyRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, VerifyRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters, VerifyRepositoryRequest.class);
this.repositoriesService = repositoriesService;
this.clusterName = clusterName;
}
@ -57,11 +57,6 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeOperatio
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected VerifyRepositoryRequest newRequest() {
return new VerifyRepositoryRequest();
}
@Override
protected VerifyRepositoryResponse newResponse() {
return new VerifyRepositoryResponse();

View File

@ -46,7 +46,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
@Inject
public TransportClusterRerouteAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
AllocationService allocationService, ActionFilters actionFilters) {
super(settings, ClusterRerouteAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, ClusterRerouteAction.NAME, transportService, clusterService, threadPool, actionFilters, ClusterRerouteRequest.class);
this.allocationService = allocationService;
}
@ -61,11 +61,6 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected ClusterRerouteRequest newRequest() {
return new ClusterRerouteRequest();
}
@Override
protected ClusterRerouteResponse newResponse() {
return new ClusterRerouteResponse();

View File

@ -59,7 +59,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
@Inject
public TransportClusterUpdateSettingsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
AllocationService allocationService, @ClusterDynamicSettings DynamicSettings dynamicSettings, ActionFilters actionFilters) {
super(settings, ClusterUpdateSettingsAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, ClusterUpdateSettingsAction.NAME, transportService, clusterService, threadPool, actionFilters, ClusterUpdateSettingsRequest.class);
this.allocationService = allocationService;
this.dynamicSettings = dynamicSettings;
}
@ -80,11 +80,6 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
}
@Override
protected ClusterUpdateSettingsRequest newRequest() {
return new ClusterUpdateSettingsRequest();
}
@Override
protected ClusterUpdateSettingsResponse newResponse() {
return new ClusterUpdateSettingsResponse();

View File

@ -47,7 +47,7 @@ public class TransportClusterSearchShardsAction extends TransportMasterNodeReadO
@Inject
public TransportClusterSearchShardsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, ClusterSearchShardsAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, ClusterSearchShardsAction.NAME, transportService, clusterService, threadPool, actionFilters, ClusterSearchShardsRequest.class);
}
@Override
@ -61,11 +61,6 @@ public class TransportClusterSearchShardsAction extends TransportMasterNodeReadO
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected ClusterSearchShardsRequest newRequest() {
return new ClusterSearchShardsRequest();
}
@Override
protected ClusterSearchShardsResponse newResponse() {
return new ClusterSearchShardsResponse();

View File

@ -44,7 +44,7 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeOperationA
@Inject
public TransportCreateSnapshotAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, SnapshotsService snapshotsService, ActionFilters actionFilters) {
super(settings, CreateSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, CreateSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters, CreateSnapshotRequest.class);
this.snapshotsService = snapshotsService;
}
@ -53,11 +53,6 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeOperationA
return ThreadPool.Names.SNAPSHOT;
}
@Override
protected CreateSnapshotRequest newRequest() {
return new CreateSnapshotRequest();
}
@Override
protected CreateSnapshotResponse newResponse() {
return new CreateSnapshotResponse();

View File

@ -43,7 +43,7 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeOperationA
@Inject
public TransportDeleteSnapshotAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, SnapshotsService snapshotsService, ActionFilters actionFilters) {
super(settings, DeleteSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, DeleteSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters, DeleteSnapshotRequest.class);
this.snapshotsService = snapshotsService;
}
@ -52,11 +52,6 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeOperationA
return ThreadPool.Names.GENERIC;
}
@Override
protected DeleteSnapshotRequest newRequest() {
return new DeleteSnapshotRequest();
}
@Override
protected DeleteSnapshotResponse newResponse() {
return new DeleteSnapshotResponse();

View File

@ -46,7 +46,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeOperationAct
@Inject
public TransportGetSnapshotsAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, SnapshotsService snapshotsService, ActionFilters actionFilters) {
super(settings, GetSnapshotsAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, GetSnapshotsAction.NAME, transportService, clusterService, threadPool, actionFilters, GetSnapshotsRequest.class);
this.snapshotsService = snapshotsService;
}
@ -55,11 +55,6 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeOperationAct
return ThreadPool.Names.GENERIC;
}
@Override
protected GetSnapshotsRequest newRequest() {
return new GetSnapshotsRequest();
}
@Override
protected GetSnapshotsResponse newResponse() {
return new GetSnapshotsResponse();

View File

@ -44,7 +44,7 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeOperation
@Inject
public TransportRestoreSnapshotAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, RestoreService restoreService, ActionFilters actionFilters) {
super(settings, RestoreSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, RestoreSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters, RestoreSnapshotRequest.class);
this.restoreService = restoreService;
}
@ -53,11 +53,6 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeOperation
return ThreadPool.Names.SNAPSHOT;
}
@Override
protected RestoreSnapshotRequest newRequest() {
return new RestoreSnapshotRequest();
}
@Override
protected RestoreSnapshotResponse newResponse() {
return new RestoreSnapshotResponse();

View File

@ -55,30 +55,16 @@ public class TransportNodesSnapshotsStatus extends TransportNodesOperationAction
@Inject
public TransportNodesSnapshotsStatus(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, SnapshotsService snapshotsService, ActionFilters actionFilters) {
super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters);
super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
Request.class, NodeRequest.class, ThreadPool.Names.GENERIC);
this.snapshotsService = snapshotsService;
}
@Override
protected String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
protected boolean transportCompress() {
return true; // compress since the metadata can become large
}
@Override
protected Request newRequestInstance() {
return new Request();
}
@Override
protected NodeRequest newNodeRequest() {
return new NodeRequest();
}
@Override
protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId, request);

View File

@ -58,7 +58,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeOperation
@Inject
public TransportSnapshotsStatusAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, SnapshotsService snapshotsService, TransportNodesSnapshotsStatus transportNodesSnapshotsStatus, ActionFilters actionFilters) {
super(settings, SnapshotsStatusAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, SnapshotsStatusAction.NAME, transportService, clusterService, threadPool, actionFilters, SnapshotsStatusRequest.class);
this.snapshotsService = snapshotsService;
this.transportNodesSnapshotsStatus = transportNodesSnapshotsStatus;
}
@ -73,11 +73,6 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeOperation
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
@Override
protected SnapshotsStatusRequest newRequest() {
return new SnapshotsStatusRequest();
}
@Override
protected SnapshotsStatusResponse newResponse() {
return new SnapshotsStatusResponse();

View File

@ -54,7 +54,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadOperatio
@Inject
public TransportClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ClusterName clusterName, ActionFilters actionFilters) {
super(settings, ClusterStateAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, ClusterStateAction.NAME, transportService, clusterService, threadPool, actionFilters, ClusterStateRequest.class);
this.clusterName = clusterName;
}
@ -73,11 +73,6 @@ public class TransportClusterStateAction extends TransportMasterNodeReadOperatio
return null;
}
@Override
protected ClusterStateRequest newRequest() {
return new ClusterStateRequest();
}
@Override
protected ClusterStateResponse newResponse() {
return new ClusterStateResponse();

View File

@ -30,6 +30,9 @@ import java.io.IOException;
*/
public class ClusterStatsRequest extends NodesOperationRequest<ClusterStatsRequest> {
ClusterStatsRequest() {
}
/**
* Get stats from nodes based on the nodes ids specified. If none are passed, stats
* based on all nodes will be returned.

View File

@ -67,16 +67,12 @@ public class TransportClusterStatsAction extends TransportNodesOperationAction<C
public TransportClusterStatsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
NodeService nodeService, IndicesService indicesService, ActionFilters actionFilters) {
super(settings, ClusterStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters);
super(settings, ClusterStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
ClusterStatsRequest.class, ClusterStatsNodeRequest.class, ThreadPool.Names.MANAGEMENT);
this.nodeService = nodeService;
this.indicesService = indicesService;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected ClusterStatsResponse newResponse(ClusterStatsRequest clusterStatsRequest, AtomicReferenceArray responses) {
final List<ClusterStatsNodeResponse> nodeStats = new ArrayList<>(responses.length());
@ -90,16 +86,6 @@ public class TransportClusterStatsAction extends TransportNodesOperationAction<C
clusterService.state().metaData().uuid(), nodeStats.toArray(new ClusterStatsNodeResponse[nodeStats.size()]));
}
@Override
protected ClusterStatsRequest newRequestInstance() {
return new ClusterStatsRequest();
}
@Override
protected ClusterStatsNodeRequest newNodeRequest() {
return new ClusterStatsNodeRequest();
}
@Override
protected ClusterStatsNodeRequest newNodeRequest(String nodeId, ClusterStatsRequest request) {
return new ClusterStatsNodeRequest(nodeId, request);

View File

@ -41,7 +41,7 @@ public class TransportPendingClusterTasksAction extends TransportMasterNodeReadO
@Inject
public TransportPendingClusterTasksAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, PendingClusterTasksAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, PendingClusterTasksAction.NAME, transportService, clusterService, threadPool, actionFilters, PendingClusterTasksRequest.class);
this.clusterService = clusterService;
}
@ -56,11 +56,6 @@ public class TransportPendingClusterTasksAction extends TransportMasterNodeReadO
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
@Override
protected PendingClusterTasksRequest newRequest() {
return new PendingClusterTasksRequest();
}
@Override
protected PendingClusterTasksResponse newResponse() {
return new PendingClusterTasksResponse();

View File

@ -50,7 +50,7 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationA
@Inject
public TransportIndicesAliasesAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataIndexAliasesService indexAliasesService, ActionFilters actionFilters) {
super(settings, IndicesAliasesAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, IndicesAliasesAction.NAME, transportService, clusterService, threadPool, actionFilters, IndicesAliasesRequest.class);
this.indexAliasesService = indexAliasesService;
}
@ -60,11 +60,6 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationA
return ThreadPool.Names.SAME;
}
@Override
protected IndicesAliasesRequest newRequest() {
return new IndicesAliasesRequest();
}
@Override
protected IndicesAliasesResponse newResponse() {
return new IndicesAliasesResponse();

View File

@ -38,7 +38,7 @@ public class TransportAliasesExistAction extends TransportMasterNodeReadOperatio
@Inject
public TransportAliasesExistAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, AliasesExistAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, AliasesExistAction.NAME, transportService, clusterService, threadPool, actionFilters, GetAliasesRequest.class);
}
@Override
@ -52,11 +52,6 @@ public class TransportAliasesExistAction extends TransportMasterNodeReadOperatio
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected GetAliasesRequest newRequest() {
return new GetAliasesRequest();
}
@Override
protected AliasesExistResponse newResponse() {
return new AliasesExistResponse();

View File

@ -41,7 +41,7 @@ public class TransportGetAliasesAction extends TransportMasterNodeReadOperationA
@Inject
public TransportGetAliasesAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, GetAliasesAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, GetAliasesAction.NAME, transportService, clusterService, threadPool, actionFilters, GetAliasesRequest.class);
}
@Override
@ -55,11 +55,6 @@ public class TransportGetAliasesAction extends TransportMasterNodeReadOperationA
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected GetAliasesRequest newRequest() {
return new GetAliasesRequest();
}
@Override
protected GetAliasesResponse newResponse() {
return new GetAliasesResponse();

View File

@ -47,8 +47,6 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -60,7 +58,6 @@ import java.util.List;
public class TransportAnalyzeAction extends TransportSingleCustomOperationAction<AnalyzeRequest, AnalyzeResponse> {
private final IndicesService indicesService;
private final IndicesAnalysisService indicesAnalysisService;
private static final Settings DEFAULT_SETTINGS = ImmutableSettings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
@ -68,20 +65,9 @@ public class TransportAnalyzeAction extends TransportSingleCustomOperationAction
@Inject
public TransportAnalyzeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, IndicesAnalysisService indicesAnalysisService, ActionFilters actionFilters) {
super(settings, AnalyzeAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, AnalyzeAction.NAME, threadPool, clusterService, transportService, actionFilters, AnalyzeRequest.class, ThreadPool.Names.INDEX);
this.indicesService = indicesService;
this.indicesAnalysisService = indicesAnalysisService;
transportService.registerHandler(AnalyzeAction.NAME, new TransportHandler());
}
@Override
protected String executor() {
return ThreadPool.Names.INDEX;
}
@Override
protected AnalyzeRequest newRequest() {
return new AnalyzeRequest();
}
@Override
@ -260,44 +246,4 @@ public class TransportAnalyzeAction extends TransportSingleCustomOperationAction
return new AnalyzeResponse(tokens);
}
private class TransportHandler extends BaseTransportRequestHandler<AnalyzeRequest> {
@Override
public AnalyzeRequest newInstance() {
return newRequest();
}
@Override
public void messageReceived(AnalyzeRequest request, final TransportChannel channel) throws Exception {
// no need to have a threaded listener since we just send back a response
request.listenerThreaded(false);
// if we have a local operation, execute it on a thread since we don't spawn
request.operationThreaded(true);
execute(request, new ActionListener<AnalyzeResponse>() {
@Override
public void onResponse(AnalyzeResponse result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response for get", e1);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
}

View File

@ -58,21 +58,12 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
public TransportClearIndicesCacheAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService,
IndicesQueryCache indicesQueryCache, ActionFilters actionFilters) {
super(settings, ClearIndicesCacheAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, ClearIndicesCacheAction.NAME, threadPool, clusterService, transportService, actionFilters,
ClearIndicesCacheRequest.class, ShardClearIndicesCacheRequest.class, ThreadPool.Names.MANAGEMENT);
this.indicesService = indicesService;
this.indicesQueryCache = indicesQueryCache;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected ClearIndicesCacheRequest newRequestInstance() {
return new ClearIndicesCacheRequest();
}
@Override
protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
@ -95,11 +86,6 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
return new ClearIndicesCacheResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override
protected ShardClearIndicesCacheRequest newShardRequest() {
return new ShardClearIndicesCacheRequest();
}
@Override
protected ShardClearIndicesCacheRequest newShardRequest(int numShards, ShardRouting shard, ClearIndicesCacheRequest request) {
return new ShardClearIndicesCacheRequest(shard.shardId(), request);

View File

@ -47,7 +47,7 @@ public class TransportCloseIndexAction extends TransportMasterNodeOperationActio
@Inject
public TransportCloseIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataIndexStateService indexStateService, NodeSettingsService nodeSettingsService, ActionFilters actionFilters) {
super(settings, CloseIndexAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, CloseIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, CloseIndexRequest.class);
this.indexStateService = indexStateService;
this.destructiveOperations = new DestructiveOperations(logger, settings, nodeSettingsService);
}
@ -58,11 +58,6 @@ public class TransportCloseIndexAction extends TransportMasterNodeOperationActio
return ThreadPool.Names.SAME;
}
@Override
protected CloseIndexRequest newRequest() {
return new CloseIndexRequest();
}
@Override
protected CloseIndexResponse newResponse() {
return new CloseIndexResponse();

View File

@ -45,7 +45,7 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi
@Inject
public TransportCreateIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataCreateIndexService createIndexService, ActionFilters actionFilters) {
super(settings, CreateIndexAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, CreateIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest.class);
this.createIndexService = createIndexService;
}
@ -55,11 +55,6 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi
return ThreadPool.Names.SAME;
}
@Override
protected CreateIndexRequest newRequest() {
return new CreateIndexRequest();
}
@Override
protected CreateIndexResponse newResponse() {
return new CreateIndexResponse();

View File

@ -48,7 +48,7 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi
public TransportDeleteIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataDeleteIndexService deleteIndexService,
NodeSettingsService nodeSettingsService, ActionFilters actionFilters) {
super(settings, DeleteIndexAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, DeleteIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, DeleteIndexRequest.class);
this.deleteIndexService = deleteIndexService;
this.destructiveOperations = new DestructiveOperations(logger, settings, nodeSettingsService);
}
@ -58,11 +58,6 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi
return ThreadPool.Names.SAME;
}
@Override
protected DeleteIndexRequest newRequest() {
return new DeleteIndexRequest();
}
@Override
protected DeleteIndexResponse newResponse() {
return new DeleteIndexResponse();

View File

@ -37,6 +37,11 @@ public class IndicesExistsRequest extends MasterNodeReadOperationRequest<Indices
private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, true);
// for serialization
IndicesExistsRequest() {
}
public IndicesExistsRequest(String... indices) {
this.indices = indices;
}

View File

@ -42,7 +42,7 @@ public class TransportIndicesExistsAction extends TransportMasterNodeReadOperati
@Inject
public TransportIndicesExistsAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, IndicesExistsAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, IndicesExistsAction.NAME, transportService, clusterService, threadPool, actionFilters, IndicesExistsRequest.class);
}
@Override
@ -51,11 +51,6 @@ public class TransportIndicesExistsAction extends TransportMasterNodeReadOperati
return ThreadPool.Names.SAME;
}
@Override
protected IndicesExistsRequest newRequest() {
return new IndicesExistsRequest();
}
@Override
protected IndicesExistsResponse newResponse() {
return new IndicesExistsResponse();

View File

@ -41,7 +41,7 @@ public class TransportTypesExistsAction extends TransportMasterNodeReadOperation
@Inject
public TransportTypesExistsAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, TypesExistsAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, TypesExistsAction.NAME, transportService, clusterService, threadPool, actionFilters, TypesExistsRequest.class);
}
@Override
@ -50,11 +50,6 @@ public class TransportTypesExistsAction extends TransportMasterNodeReadOperation
return ThreadPool.Names.SAME;
}
@Override
protected TypesExistsRequest newRequest() {
return new TypesExistsRequest();
}
@Override
protected TypesExistsResponse newResponse() {
return new TypesExistsResponse();

View File

@ -52,20 +52,11 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
@Inject
public TransportFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService, ActionFilters actionFilters) {
super(settings, FlushAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, FlushAction.NAME, threadPool, clusterService, transportService, actionFilters,
FlushRequest.class, ShardFlushRequest.class, ThreadPool.Names.FLUSH);
this.indicesService = indicesService;
}
@Override
protected String executor() {
return ThreadPool.Names.FLUSH;
}
@Override
protected FlushRequest newRequestInstance() {
return new FlushRequest();
}
@Override
protected FlushResponse newResponse(FlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
@ -88,11 +79,6 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
return new FlushResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override
protected ShardFlushRequest newShardRequest() {
return new ShardFlushRequest();
}
@Override
protected ShardFlushRequest newShardRequest(int numShards, ShardRouting shard, FlushRequest request) {
return new ShardFlushRequest(shard.shardId(), request);

View File

@ -49,7 +49,7 @@ public class TransportGetIndexAction extends TransportClusterInfoAction<GetIndex
@Inject
public TransportGetIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, GetIndexAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, GetIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, GetIndexRequest.class);
}
@Override
@ -63,11 +63,6 @@ public class TransportGetIndexAction extends TransportClusterInfoAction<GetIndex
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected GetIndexRequest newRequest() {
return new GetIndexRequest();
}
@Override
protected GetIndexResponse newResponse() {
return new GetIndexResponse();

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
@ -46,7 +45,7 @@ public class TransportGetFieldMappingsAction extends HandledTransportAction<GetF
@Inject
public TransportGetFieldMappingsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, TransportGetFieldMappingsIndexAction shardAction, ActionFilters actionFilters) {
super(settings, GetFieldMappingsAction.NAME, threadPool, transportService, actionFilters);
super(settings, GetFieldMappingsAction.NAME, threadPool, transportService, actionFilters, GetFieldMappingsRequest.class);
this.clusterService = clusterService;
this.shardAction = shardAction;
}
@ -100,9 +99,4 @@ public class TransportGetFieldMappingsAction extends HandledTransportAction<GetF
}
return new GetFieldMappingsResponse(mergedResponses.immutableMap());
}
@Override
public GetFieldMappingsRequest newRequestInstance() {
return new GetFieldMappingsRequest();
}
}

View File

@ -53,7 +53,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
/**
* Transport action used to retrieve the mappings related to fields that belong to a specific index
@ -70,16 +69,11 @@ public class TransportGetFieldMappingsIndexAction extends TransportSingleCustomO
TransportService transportService,
IndicesService indicesService,
ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters, GetFieldMappingsIndexRequest.class, ThreadPool.Names.MANAGEMENT);
this.clusterService = clusterService;
this.indicesService = indicesService;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected boolean resolveIndex(GetFieldMappingsIndexRequest request) {
//internal action, index already resolved
@ -126,11 +120,6 @@ public class TransportGetFieldMappingsIndexAction extends TransportSingleCustomO
return new GetFieldMappingsResponse(ImmutableMap.of(shardId.getIndex(), typeMappings.immutableMap()));
}
@Override
protected GetFieldMappingsIndexRequest newRequest() {
return new GetFieldMappingsIndexRequest();
}
@Override
protected GetFieldMappingsResponse newResponse() {
return new GetFieldMappingsResponse();

View File

@ -40,7 +40,7 @@ public class TransportGetMappingsAction extends TransportClusterInfoAction<GetMa
@Inject
public TransportGetMappingsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, GetMappingsAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, GetMappingsAction.NAME, transportService, clusterService, threadPool, actionFilters, GetMappingsRequest.class);
}
@Override
@ -54,11 +54,6 @@ public class TransportGetMappingsAction extends TransportClusterInfoAction<GetMa
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected GetMappingsRequest newRequest() {
return new GetMappingsRequest();
}
@Override
protected GetMappingsResponse newResponse() {
return new GetMappingsResponse();

View File

@ -44,7 +44,7 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio
@Inject
public TransportPutMappingAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataMappingService metaDataMappingService, ActionFilters actionFilters) {
super(settings, PutMappingAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, PutMappingAction.NAME, transportService, clusterService, threadPool, actionFilters, PutMappingRequest.class);
this.metaDataMappingService = metaDataMappingService;
}
@ -54,11 +54,6 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio
return ThreadPool.Names.SAME;
}
@Override
protected PutMappingRequest newRequest() {
return new PutMappingRequest();
}
@Override
protected PutMappingResponse newResponse() {
return new PutMappingResponse();

View File

@ -47,7 +47,7 @@ public class TransportOpenIndexAction extends TransportMasterNodeOperationAction
@Inject
public TransportOpenIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataIndexStateService indexStateService, NodeSettingsService nodeSettingsService, ActionFilters actionFilters) {
super(settings, OpenIndexAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, OpenIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, OpenIndexRequest.class);
this.indexStateService = indexStateService;
this.destructiveOperations = new DestructiveOperations(logger, settings, nodeSettingsService);
}
@ -58,11 +58,6 @@ public class TransportOpenIndexAction extends TransportMasterNodeOperationAction
return ThreadPool.Names.SAME;
}
@Override
protected OpenIndexRequest newRequest() {
return new OpenIndexRequest();
}
@Override
protected OpenIndexResponse newResponse() {
return new OpenIndexResponse();

View File

@ -53,20 +53,11 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
@Inject
public TransportOptimizeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, ActionFilters actionFilters) {
super(settings, OptimizeAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, OptimizeAction.NAME, threadPool, clusterService, transportService, actionFilters,
OptimizeRequest.class, ShardOptimizeRequest.class, ThreadPool.Names.OPTIMIZE);
this.indicesService = indicesService;
}
@Override
protected String executor() {
return ThreadPool.Names.OPTIMIZE;
}
@Override
protected OptimizeRequest newRequestInstance() {
return new OptimizeRequest();
}
@Override
protected OptimizeResponse newResponse(OptimizeRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
@ -89,11 +80,6 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
return new OptimizeResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override
protected ShardOptimizeRequest newShardRequest() {
return new ShardOptimizeRequest();
}
@Override
protected ShardOptimizeRequest newShardRequest(int numShards, ShardRouting shard, OptimizeRequest request) {
return new ShardOptimizeRequest(shard.shardId(), request);

View File

@ -52,29 +52,18 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
* Transport action for shard recovery operation. This transport action does not actually
* perform shard recovery, it only reports on recoveries (both active and complete).
*/
public class TransportRecoveryAction extends
TransportBroadcastOperationAction<RecoveryRequest, RecoveryResponse, TransportRecoveryAction.ShardRecoveryRequest, ShardRecoveryResponse> {
public class TransportRecoveryAction extends TransportBroadcastOperationAction<RecoveryRequest, RecoveryResponse, TransportRecoveryAction.ShardRecoveryRequest, ShardRecoveryResponse> {
private final IndicesService indicesService;
@Inject
public TransportRecoveryAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, ActionFilters actionFilters) {
super(settings, RecoveryAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, RecoveryAction.NAME, threadPool, clusterService, transportService, actionFilters,
RecoveryRequest.class, ShardRecoveryRequest.class, ThreadPool.Names.MANAGEMENT);
this.indicesService = indicesService;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected RecoveryRequest newRequestInstance() {
return new RecoveryRequest();
}
@Override
protected RecoveryResponse newResponse(RecoveryRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
@ -120,14 +109,8 @@ public class TransportRecoveryAction extends
}
}
RecoveryResponse response = new RecoveryResponse(shardsResponses.length(), successfulShards,
return new RecoveryResponse(shardsResponses.length(), successfulShards,
failedShards, request.detailed(), shardResponses, shardFailures);
return response;
}
@Override
protected ShardRecoveryRequest newShardRequest() {
return new ShardRecoveryRequest();
}
@Override

View File

@ -53,20 +53,11 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
@Inject
public TransportRefreshAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, ActionFilters actionFilters) {
super(settings, RefreshAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, RefreshAction.NAME, threadPool, clusterService, transportService, actionFilters,
RefreshRequest.class, ShardRefreshRequest.class, ThreadPool.Names.REFRESH);
this.indicesService = indicesService;
}
@Override
protected String executor() {
return ThreadPool.Names.REFRESH;
}
@Override
protected RefreshRequest newRequestInstance() {
return new RefreshRequest();
}
@Override
protected RefreshResponse newResponse(RefreshRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
@ -89,11 +80,6 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
return new RefreshResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override
protected ShardRefreshRequest newShardRequest() {
return new ShardRefreshRequest();
}
@Override
protected ShardRefreshRequest newShardRequest(int numShards, ShardRouting shard, RefreshRequest request) {
return new ShardRefreshRequest(shard.shardId(), request);

View File

@ -59,20 +59,11 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastOperationA
@Inject
public TransportIndicesSegmentsAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ActionFilters actionFilters) {
super(settings, IndicesSegmentsAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, IndicesSegmentsAction.NAME, threadPool, clusterService, transportService, actionFilters,
IndicesSegmentsRequest.class, TransportIndicesSegmentsAction.IndexShardSegmentRequest.class, ThreadPool.Names.MANAGEMENT);
this.indicesService = indicesService;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected IndicesSegmentsRequest newRequestInstance() {
return new IndicesSegmentsRequest();
}
/**
* Segments goes across *all* active shards.
*/
@ -115,11 +106,6 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastOperationA
return new IndicesSegmentResponse(shards.toArray(new ShardSegments[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override
protected IndexShardSegmentRequest newShardRequest() {
return new IndexShardSegmentRequest();
}
@Override
protected IndexShardSegmentRequest newShardRequest(int numShards, ShardRouting shard, IndicesSegmentsRequest request) {
return new IndexShardSegmentRequest(shard.shardId(), request);

View File

@ -50,7 +50,7 @@ public class TransportGetSettingsAction extends TransportMasterNodeReadOperation
@Inject
public TransportGetSettingsAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, SettingsFilter settingsFilter, ActionFilters actionFilters) {
super(settings, GetSettingsAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, GetSettingsAction.NAME, transportService, clusterService, threadPool, actionFilters, GetSettingsRequest.class);
this.settingsFilter = settingsFilter;
}
@ -66,11 +66,6 @@ public class TransportGetSettingsAction extends TransportMasterNodeReadOperation
}
@Override
protected GetSettingsRequest newRequest() {
return new GetSettingsRequest();
}
@Override
protected GetSettingsResponse newResponse() {
return new GetSettingsResponse();

View File

@ -45,7 +45,7 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeOperationA
@Inject
public TransportUpdateSettingsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
MetaDataUpdateSettingsService updateSettingsService, ActionFilters actionFilters) {
super(settings, UpdateSettingsAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, UpdateSettingsAction.NAME, transportService, clusterService, threadPool, actionFilters, UpdateSettingsRequest.class);
this.updateSettingsService = updateSettingsService;
}
@ -68,11 +68,6 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeOperationA
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected UpdateSettingsRequest newRequest() {
return new UpdateSettingsRequest();
}
@Override
protected UpdateSettingsResponse newResponse() {
return new UpdateSettingsResponse();

View File

@ -60,20 +60,11 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
@Inject
public TransportIndicesStatsAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ActionFilters actionFilters) {
super(settings, IndicesStatsAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, IndicesStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
IndicesStatsRequest.class, IndexShardStatsRequest.class, ThreadPool.Names.MANAGEMENT);
this.indicesService = indicesService;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected IndicesStatsRequest newRequestInstance() {
return new IndicesStatsRequest();
}
/**
* Status goes across *all* shards.
*/
@ -117,11 +108,6 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
return new IndicesStatsResponse(shards.toArray(new ShardStats[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override
protected IndexShardStatsRequest newShardRequest() {
return new IndexShardStatsRequest();
}
@Override
protected IndexShardStatsRequest newShardRequest(int numShards, ShardRouting shard, IndicesStatsRequest request) {
return new IndexShardStatsRequest(shard.shardId(), request);

View File

@ -42,7 +42,7 @@ public class TransportDeleteIndexTemplateAction extends TransportMasterNodeOpera
@Inject
public TransportDeleteIndexTemplateAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataIndexTemplateService indexTemplateService, ActionFilters actionFilters) {
super(settings, DeleteIndexTemplateAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, DeleteIndexTemplateAction.NAME, transportService, clusterService, threadPool, actionFilters, DeleteIndexTemplateRequest.class);
this.indexTemplateService = indexTemplateService;
}
@ -52,11 +52,6 @@ public class TransportDeleteIndexTemplateAction extends TransportMasterNodeOpera
return ThreadPool.Names.SAME;
}
@Override
protected DeleteIndexTemplateRequest newRequest() {
return new DeleteIndexTemplateRequest();
}
@Override
protected DeleteIndexTemplateResponse newResponse() {
return new DeleteIndexTemplateResponse();

View File

@ -44,7 +44,7 @@ public class TransportGetIndexTemplatesAction extends TransportMasterNodeReadOpe
@Inject
public TransportGetIndexTemplatesAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, GetIndexTemplatesAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, GetIndexTemplatesAction.NAME, transportService, clusterService, threadPool, actionFilters, GetIndexTemplatesRequest.class);
}
@Override
@ -57,11 +57,6 @@ public class TransportGetIndexTemplatesAction extends TransportMasterNodeReadOpe
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
@Override
protected GetIndexTemplatesRequest newRequest() {
return new GetIndexTemplatesRequest();
}
@Override
protected GetIndexTemplatesResponse newResponse() {
return new GetIndexTemplatesResponse();

View File

@ -42,7 +42,7 @@ public class TransportPutIndexTemplateAction extends TransportMasterNodeOperatio
@Inject
public TransportPutIndexTemplateAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataIndexTemplateService indexTemplateService, ActionFilters actionFilters) {
super(settings, PutIndexTemplateAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, PutIndexTemplateAction.NAME, transportService, clusterService, threadPool, actionFilters, PutIndexTemplateRequest.class);
this.indexTemplateService = indexTemplateService;
}
@ -52,11 +52,6 @@ public class TransportPutIndexTemplateAction extends TransportMasterNodeOperatio
return ThreadPool.Names.SAME;
}
@Override
protected PutIndexTemplateRequest newRequest() {
return new PutIndexTemplateRequest();
}
@Override
protected PutIndexTemplateResponse newResponse() {
return new PutIndexTemplateResponse();

View File

@ -76,7 +76,8 @@ public class TransportValidateQueryAction extends TransportBroadcastOperationAct
@Inject
public TransportValidateQueryAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService, ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, ActionFilters actionFilters) {
super(settings, ValidateQueryAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, ValidateQueryAction.NAME, threadPool, clusterService, transportService, actionFilters,
ValidateQueryRequest.class, ShardValidateQueryRequest.class, ThreadPool.Names.SEARCH);
this.indicesService = indicesService;
this.scriptService = scriptService;
this.pageCacheRecycler = pageCacheRecycler;
@ -89,21 +90,6 @@ public class TransportValidateQueryAction extends TransportBroadcastOperationAct
super.doExecute(request, listener);
}
@Override
protected String executor() {
return ThreadPool.Names.SEARCH;
}
@Override
protected ValidateQueryRequest newRequestInstance() {
return new ValidateQueryRequest();
}
@Override
protected ShardValidateQueryRequest newShardRequest() {
return new ShardValidateQueryRequest();
}
@Override
protected ShardValidateQueryRequest newShardRequest(int numShards, ShardRouting shard, ValidateQueryRequest request) {
String[] filteringAliases = clusterService.state().metaData().filteringAliases(shard.index(), request.indices());

View File

@ -52,7 +52,7 @@ public class TransportDeleteWarmerAction extends TransportMasterNodeOperationAct
@Inject
public TransportDeleteWarmerAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, DeleteWarmerAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, DeleteWarmerAction.NAME, transportService, clusterService, threadPool, actionFilters, DeleteWarmerRequest.class);
}
@Override
@ -61,11 +61,6 @@ public class TransportDeleteWarmerAction extends TransportMasterNodeOperationAct
return ThreadPool.Names.SAME;
}
@Override
protected DeleteWarmerRequest newRequest() {
return new DeleteWarmerRequest();
}
@Override
protected DeleteWarmerResponse newResponse() {
return new DeleteWarmerResponse();

View File

@ -44,7 +44,7 @@ public class TransportGetWarmersAction extends TransportClusterInfoAction<GetWar
@Inject
public TransportGetWarmersAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, GetWarmersAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, GetWarmersAction.NAME, transportService, clusterService, threadPool, actionFilters, GetWarmersRequest.class);
}
@Override
@ -58,11 +58,6 @@ public class TransportGetWarmersAction extends TransportClusterInfoAction<GetWar
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected GetWarmersRequest newRequest() {
return new GetWarmersRequest();
}
@Override
protected GetWarmersResponse newResponse() {
return new GetWarmersResponse();

View File

@ -58,7 +58,7 @@ public class TransportPutWarmerAction extends TransportMasterNodeOperationAction
@Inject
public TransportPutWarmerAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
TransportSearchAction searchAction, ActionFilters actionFilters) {
super(settings, PutWarmerAction.NAME, transportService, clusterService, threadPool, actionFilters);
super(settings, PutWarmerAction.NAME, transportService, clusterService, threadPool, actionFilters, PutWarmerRequest.class);
this.searchAction = searchAction;
}
@ -67,11 +67,6 @@ public class TransportPutWarmerAction extends TransportMasterNodeOperationAction
return ThreadPool.Names.SAME;
}
@Override
protected PutWarmerRequest newRequest() {
return new PutWarmerRequest();
}
@Override
protected PutWarmerResponse newResponse() {
return new PutWarmerResponse();

View File

@ -67,19 +67,15 @@ import java.util.concurrent.atomic.AtomicInteger;
public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
private final AutoCreateIndex autoCreateIndex;
private final boolean allowIdGeneration;
private final ClusterService clusterService;
private final TransportShardBulkAction shardBulkAction;
private final TransportCreateIndexAction createIndexAction;
@Inject
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction, ActionFilters actionFilters) {
super(settings, BulkAction.NAME, threadPool, transportService, actionFilters);
super(settings, BulkAction.NAME, threadPool, transportService, actionFilters, BulkRequest.class);
this.clusterService = clusterService;
this.shardBulkAction = shardBulkAction;
this.createIndexAction = createIndexAction;
@ -88,11 +84,6 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
this.allowIdGeneration = this.settings.getAsBoolean("action.bulk.action.allow_id_generation", true);
}
@Override
public BulkRequest newRequestInstance(){
return new BulkRequest();
}
@Override
protected void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
final long startTime = System.currentTimeMillis();

View File

@ -56,6 +56,7 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.river.RiverIndexName;
@ -83,17 +84,13 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters);
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
BulkShardRequest.class, BulkShardRequest.class, ThreadPool.Names.BULK);
this.mappingUpdatedAction = mappingUpdatedAction;
this.updateHelper = updateHelper;
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
}
@Override
protected String executor() {
return ThreadPool.Names.BULK;
}
@Override
protected boolean checkWriteConsistency() {
return true;
@ -103,17 +100,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
protected TransportRequestOptions transportOptions() {
return BulkAction.INSTANCE.transportOptions(settings);
}
@Override
protected BulkShardRequest newRequestInstance() {
return new BulkShardRequest();
}
@Override
protected BulkShardRequest newReplicaRequestInstance() {
return newRequestInstance();
}
@Override
protected BulkShardResponse newResponseInstance() {
return new BulkShardResponse();
@ -546,10 +532,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception {
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
final BulkShardRequest request = shardRequest.request;
protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request) throws Exception {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardId.id());
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
if (item == null || item.isIgnoreOnReplica()) {

View File

@ -65,18 +65,16 @@ import static org.elasticsearch.search.internal.SearchContext.DEFAULT_TERMINATE_
public class TransportCountAction extends TransportBroadcastOperationAction<CountRequest, CountResponse, ShardCountRequest, ShardCountResponse> {
private final IndicesService indicesService;
private final ScriptService scriptService;
private final PageCacheRecycler pageCacheRecycler;
private final BigArrays bigArrays;
@Inject
public TransportCountAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ScriptService scriptService, PageCacheRecycler pageCacheRecycler,
BigArrays bigArrays, ActionFilters actionFilters) {
super(settings, CountAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, CountAction.NAME, threadPool, clusterService, transportService, actionFilters,
CountRequest.class, ShardCountRequest.class, ThreadPool.Names.SEARCH);
this.indicesService = indicesService;
this.scriptService = scriptService;
this.pageCacheRecycler = pageCacheRecycler;
@ -89,21 +87,6 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
super.doExecute(request, listener);
}
@Override
protected String executor() {
return ThreadPool.Names.SEARCH;
}
@Override
protected CountRequest newRequestInstance() {
return new CountRequest();
}
@Override
protected ShardCountRequest newShardRequest() {
return new ShardCountRequest();
}
@Override
protected ShardCountRequest newShardRequest(int numShards, ShardRouting shard, CountRequest request) {
String[] filteringAliases = clusterService.state().metaData().filteringAliases(shard.index(), request.indices());

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -51,23 +52,18 @@ import org.elasticsearch.transport.TransportService;
public class TransportDeleteAction extends TransportShardReplicationOperationAction<DeleteRequest, DeleteRequest, DeleteResponse> {
private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction;
@Inject
public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
TransportCreateIndexAction createIndexAction, ActionFilters actionFilters) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters);
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
DeleteRequest.class, DeleteRequest.class, ThreadPool.Names.INDEX);
this.createIndexAction = createIndexAction;
this.autoCreateIndex = new AutoCreateIndex(settings);
}
@Override
protected String executor() {
return ThreadPool.Names.INDEX;
}
@Override
protected void doExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
@ -125,16 +121,6 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
return true;
}
@Override
protected DeleteRequest newRequestInstance() {
return new DeleteRequest();
}
@Override
protected DeleteRequest newReplicaRequestInstance() {
return newRequestInstance();
}
@Override
protected DeleteResponse newResponseInstance() {
return new DeleteResponse();
@ -165,9 +151,8 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
}
@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
protected void shardOperationOnReplica(ShardId shardId, DeleteRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).shardSafe(shardId.id());
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
indexShard.delete(delete);

View File

@ -48,7 +48,7 @@ public class TransportDeleteByQueryAction extends TransportIndicesReplicationOpe
public TransportDeleteByQueryAction(Settings settings, ClusterService clusterService, TransportService transportService,
ThreadPool threadPool, TransportIndexDeleteByQueryAction indexDeleteByQueryAction,
NodeSettingsService nodeSettingsService, ActionFilters actionFilters) {
super(settings, DeleteByQueryAction.NAME, transportService, clusterService, threadPool, indexDeleteByQueryAction, actionFilters);
super(settings, DeleteByQueryAction.NAME, transportService, clusterService, threadPool, indexDeleteByQueryAction, actionFilters, DeleteByQueryRequest.class);
this.destructiveOperations = new DestructiveOperations(logger, settings, nodeSettingsService);
}
@ -63,11 +63,6 @@ public class TransportDeleteByQueryAction extends TransportIndicesReplicationOpe
return clusterState.metaData().resolveSearchRouting(request.routing(), request.indices());
}
@Override
protected DeleteByQueryRequest newRequestInstance() {
return new DeleteByQueryRequest();
}
@Override
protected DeleteByQueryResponse newResponseInstance(DeleteByQueryRequest request, AtomicReferenceArray indexResponses) {
DeleteByQueryResponse response = new DeleteByQueryResponse();

View File

@ -36,6 +36,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.internal.DefaultSearchContext;
@ -62,7 +63,8 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ScriptService scriptService,
PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, ActionFilters actionFilters) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters);
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
ShardDeleteByQueryRequest.class, ShardDeleteByQueryRequest.class, ThreadPool.Names.INDEX);
this.scriptService = scriptService;
this.pageCacheRecycler = pageCacheRecycler;
this.bigArrays = bigArrays;
@ -73,21 +75,6 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
return true;
}
@Override
protected String executor() {
return ThreadPool.Names.INDEX;
}
@Override
protected ShardDeleteByQueryRequest newRequestInstance() {
return new ShardDeleteByQueryRequest();
}
@Override
protected ShardDeleteByQueryRequest newReplicaRequestInstance() {
return newRequestInstance();
}
@Override
protected ShardDeleteByQueryResponse newResponseInstance() {
return new ShardDeleteByQueryResponse();
@ -121,10 +108,9 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request;
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
protected void shardOperationOnReplica(ShardId shardId, ShardDeleteByQueryRequest request) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardId.id());
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchLocalRequest(request.types(), request.nowInMillis()), null,
indexShard.acquireSearcher(DELETE_BY_QUERY_API, true), indexService, indexShard, scriptService,

View File

@ -65,18 +65,16 @@ import static org.elasticsearch.action.exists.ExistsRequest.DEFAULT_MIN_SCORE;
public class TransportExistsAction extends TransportBroadcastOperationAction<ExistsRequest, ExistsResponse, ShardExistsRequest, ShardExistsResponse> {
private final IndicesService indicesService;
private final ScriptService scriptService;
private final PageCacheRecycler pageCacheRecycler;
private final BigArrays bigArrays;
@Inject
public TransportExistsAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ScriptService scriptService,
PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, ActionFilters actionFilters) {
super(settings, ExistsAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, ExistsAction.NAME, threadPool, clusterService, transportService, actionFilters,
ExistsRequest.class, ShardExistsRequest.class, ThreadPool.Names.SEARCH);
this.indicesService = indicesService;
this.scriptService = scriptService;
this.pageCacheRecycler = pageCacheRecycler;
@ -89,21 +87,6 @@ public class TransportExistsAction extends TransportBroadcastOperationAction<Exi
new ExistsAsyncBroadcastAction(request, listener).start();
}
@Override
protected String executor() {
return ThreadPool.Names.SEARCH;
}
@Override
protected ExistsRequest newRequestInstance() {
return new ExistsRequest();
}
@Override
protected ShardExistsRequest newShardRequest() {
return new ShardExistsRequest();
}
@Override
protected ShardExistsRequest newShardRequest(int numShards, ShardRouting shard, ExistsRequest request) {
String[] filteringAliases = clusterService.state().metaData().filteringAliases(shard.index(), request.indices());

View File

@ -71,7 +71,8 @@ public class TransportExplainAction extends TransportShardSingleOperationAction<
TransportService transportService, IndicesService indicesService,
ScriptService scriptService, PageCacheRecycler pageCacheRecycler,
BigArrays bigArrays, ActionFilters actionFilters) {
super(settings, ExplainAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, ExplainAction.NAME, threadPool, clusterService, transportService, actionFilters,
ExplainRequest.class, ThreadPool.Names.GET);
this.indicesService = indicesService;
this.scriptService = scriptService;
this.pageCacheRecycler = pageCacheRecycler;
@ -84,11 +85,6 @@ public class TransportExplainAction extends TransportShardSingleOperationAction<
super.doExecute(request, listener);
}
@Override
protected String executor() {
return ThreadPool.Names.GET; // Or use Names.SEARCH?
}
@Override
protected boolean resolveIndex() {
return true;
@ -147,11 +143,6 @@ public class TransportExplainAction extends TransportShardSingleOperationAction<
}
}
@Override
protected ExplainRequest newRequest() {
return new ExplainRequest();
}
@Override
protected ExplainResponse newResponse() {
return new ExplainResponse();

View File

@ -59,20 +59,10 @@ public class TransportFieldStatsTransportAction extends TransportBroadcastOperat
@Inject
public TransportFieldStatsTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndicesService indicesService) {
super(settings, FieldStatsAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, FieldStatsAction.NAME, threadPool, clusterService, transportService, actionFilters, FieldStatsRequest.class, FieldStatsShardRequest.class, ThreadPool.Names.MANAGEMENT);
this.indicesService = indicesService;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected FieldStatsRequest newRequestInstance() {
return new FieldStatsRequest();
}
@Override
protected FieldStatsResponse newResponse(FieldStatsRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
@ -125,11 +115,6 @@ public class TransportFieldStatsTransportAction extends TransportBroadcastOperat
return new FieldStatsResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, indicesMergedFieldStats);
}
@Override
protected FieldStatsShardRequest newShardRequest() {
return new FieldStatsShardRequest();
}
@Override
protected FieldStatsShardRequest newShardRequest(int numShards, ShardRouting shard, FieldStatsRequest request) {
return new FieldStatsShardRequest(shard.shardId(), request);

View File

@ -49,17 +49,13 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
@Inject
public TransportGetAction(Settings settings, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, GetAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, GetAction.NAME, threadPool, clusterService, transportService, actionFilters,
GetRequest.class, ThreadPool.Names.GET);
this.indicesService = indicesService;
this.realtime = settings.getAsBoolean("action.get.realtime", true);
}
@Override
protected String executor() {
return ThreadPool.Names.GET;
}
@Override
protected boolean resolveIndex() {
return true;
@ -106,11 +102,6 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
return new GetResponse(result);
}
@Override
protected GetRequest newRequest() {
return new GetRequest();
}
@Override
protected GetResponse newResponse() {
return new GetResponse();

View File

@ -45,16 +45,11 @@ public class TransportMultiGetAction extends HandledTransportAction<MultiGetRequ
@Inject
public TransportMultiGetAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, TransportShardMultiGetAction shardAction, ActionFilters actionFilters) {
super(settings, MultiGetAction.NAME, threadPool, transportService, actionFilters);
super(settings, MultiGetAction.NAME, threadPool, transportService, actionFilters, MultiGetRequest.class);
this.clusterService = clusterService;
this.shardAction = shardAction;
}
@Override
public MultiGetRequest newRequestInstance(){
return new MultiGetRequest();
}
@Override
protected void doExecute(final MultiGetRequest request, final ActionListener<MultiGetResponse> listener) {
ClusterState clusterState = clusterService.state();

View File

@ -48,7 +48,8 @@ public class TransportShardMultiGetAction extends TransportShardSingleOperationA
@Inject
public TransportShardMultiGetAction(Settings settings, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
MultiGetShardRequest.class, ThreadPool.Names.GET);
this.indicesService = indicesService;
this.realtime = settings.getAsBoolean("action.get.realtime", true);
@ -59,16 +60,6 @@ public class TransportShardMultiGetAction extends TransportShardSingleOperationA
return true;
}
@Override
protected String executor() {
return ThreadPool.Names.GET;
}
@Override
protected MultiGetShardRequest newRequest() {
return new MultiGetShardRequest();
}
@Override
protected MultiGetShardResponse newResponse() {
return new MultiGetShardResponse();

View File

@ -47,6 +47,7 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.river.RiverIndexName;
@ -68,18 +69,16 @@ import java.io.IOException;
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
private final AutoCreateIndex autoCreateIndex;
private final boolean allowIdGeneration;
private final TransportCreateIndexAction createIndexAction;
private final MappingUpdatedAction mappingUpdatedAction;
@Inject
public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
TransportCreateIndexAction createIndexAction, MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters) {
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters);
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
IndexRequest.class, IndexRequest.class, ThreadPool.Names.INDEX);
this.createIndexAction = createIndexAction;
this.mappingUpdatedAction = mappingUpdatedAction;
this.autoCreateIndex = new AutoCreateIndex(settings);
@ -145,26 +144,11 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
return true;
}
@Override
protected IndexRequest newRequestInstance() {
return new IndexRequest();
}
@Override
protected IndexRequest newReplicaRequestInstance() {
return newRequestInstance();
}
@Override
protected IndexResponse newResponseInstance() {
return new IndexResponse();
}
@Override
protected String executor() {
return ThreadPool.Names.INDEX;
}
@Override
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
return clusterService.operationRouting()
@ -260,10 +244,9 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
IndexRequest request = shardRequest.request;
protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardId.id());
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
if (request.opType() == IndexRequest.OpType.INDEX) {

View File

@ -40,15 +40,10 @@ public class TransportDeleteIndexedScriptAction extends HandledTransportAction<D
@Inject
public TransportDeleteIndexedScriptAction(Settings settings, ThreadPool threadPool, ScriptService scriptService,
TransportService transportService, ActionFilters actionFilters) {
super(settings, DeleteIndexedScriptAction.NAME, threadPool, transportService, actionFilters);
super(settings, DeleteIndexedScriptAction.NAME, threadPool, transportService, actionFilters, DeleteIndexedScriptRequest.class);
this.scriptService = scriptService;
}
@Override
public DeleteIndexedScriptRequest newRequestInstance(){
return new DeleteIndexedScriptRequest();
}
@Override
protected void doExecute(final DeleteIndexedScriptRequest request, final ActionListener<DeleteIndexedScriptResponse> listener) {
scriptService.deleteScriptFromIndex(request, new DelegatingActionListener<DeleteResponse, DeleteIndexedScriptResponse>(listener) {

View File

@ -39,15 +39,10 @@ public class TransportGetIndexedScriptAction extends HandledTransportAction<GetI
@Inject
public TransportGetIndexedScriptAction(Settings settings, ThreadPool threadPool, ScriptService scriptService,
TransportService transportService, ActionFilters actionFilters) {
super(settings, GetIndexedScriptAction.NAME, threadPool,transportService, actionFilters);
super(settings, GetIndexedScriptAction.NAME, threadPool,transportService, actionFilters, GetIndexedScriptRequest.class);
this.scriptService = scriptService;
}
@Override
public GetIndexedScriptRequest newRequestInstance(){
return new GetIndexedScriptRequest();
}
@Override
public void doExecute(GetIndexedScriptRequest request, final ActionListener<GetIndexedScriptResponse> listener){
// forward the handling to the script service we are running on a network thread here...

View File

@ -40,15 +40,10 @@ public class TransportPutIndexedScriptAction extends HandledTransportAction<PutI
@Inject
public TransportPutIndexedScriptAction(Settings settings, ThreadPool threadPool,
ScriptService scriptService, TransportService transportService, ActionFilters actionFilters) {
super(settings, PutIndexedScriptAction.NAME, threadPool, transportService, actionFilters);
super(settings, PutIndexedScriptAction.NAME, threadPool, transportService, actionFilters, PutIndexedScriptRequest.class);
this.scriptService = scriptService;
}
@Override
public PutIndexedScriptRequest newRequestInstance(){
return new PutIndexedScriptRequest();
}
@Override
protected void doExecute(final PutIndexedScriptRequest request, final ActionListener<PutIndexedScriptResponse> listener) {
scriptService.putScriptToIndex(request, new DelegatingActionListener<IndexResponse,PutIndexedScriptResponse>(listener) {

View File

@ -69,19 +69,15 @@ import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
public class TransportMoreLikeThisAction extends HandledTransportAction<MoreLikeThisRequest, SearchResponse> {
private final TransportSearchAction searchAction;
private final TransportGetAction getAction;
private final IndicesService indicesService;
private final ClusterService clusterService;
private final TransportService transportService;
@Inject
public TransportMoreLikeThisAction(Settings settings, ThreadPool threadPool, TransportSearchAction searchAction, TransportGetAction getAction,
ClusterService clusterService, IndicesService indicesService, TransportService transportService, ActionFilters actionFilters) {
super(settings, MoreLikeThisAction.NAME, threadPool, transportService, actionFilters);
super(settings, MoreLikeThisAction.NAME, threadPool, transportService, actionFilters, MoreLikeThisRequest.class);
this.searchAction = searchAction;
this.getAction = getAction;
this.indicesService = indicesService;
@ -89,11 +85,6 @@ public class TransportMoreLikeThisAction extends HandledTransportAction<MoreLike
this.transportService = transportService;
}
@Override
public MoreLikeThisRequest newRequestInstance(){
return new MoreLikeThisRequest();
}
@Override
protected void doExecute(final MoreLikeThisRequest request, final ActionListener<SearchResponse> listener) {
// update to actual index name

View File

@ -60,18 +60,13 @@ public class TransportMultiPercolateAction extends HandledTransportAction<MultiP
public TransportMultiPercolateAction(Settings settings, ThreadPool threadPool, TransportShardMultiPercolateAction shardMultiPercolateAction,
ClusterService clusterService, TransportService transportService, PercolatorService percolatorService,
TransportMultiGetAction multiGetAction, ActionFilters actionFilters) {
super(settings, MultiPercolateAction.NAME, threadPool, transportService, actionFilters);
super(settings, MultiPercolateAction.NAME, threadPool, transportService, actionFilters, MultiPercolateRequest.class);
this.shardMultiPercolateAction = shardMultiPercolateAction;
this.clusterService = clusterService;
this.percolatorService = percolatorService;
this.multiGetAction = multiGetAction;
}
@Override
public MultiPercolateRequest newRequestInstance() {
return new MultiPercolateRequest();
}
@Override
protected void doExecute(final MultiPercolateRequest request, final ActionListener<MultiPercolateResponse> listener) {
final ClusterState clusterState = clusterService.state();

View File

@ -62,7 +62,8 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
public TransportPercolateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, PercolatorService percolatorService,
TransportGetAction getAction, ActionFilters actionFilters) {
super(settings, PercolateAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, PercolateAction.NAME, threadPool, clusterService, transportService, actionFilters,
PercolateRequest.class, PercolateShardRequest.class, ThreadPool.Names.PERCOLATE);
this.percolatorService = percolatorService;
this.getAction = getAction;
}
@ -95,16 +96,6 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
}
}
@Override
protected String executor() {
return ThreadPool.Names.PERCOLATE;
}
@Override
protected PercolateRequest newRequestInstance() {
return new PercolateRequest();
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, PercolateRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
@ -165,11 +156,6 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
}
}
@Override
protected PercolateShardRequest newShardRequest() {
return new PercolateShardRequest();
}
@Override
protected PercolateShardRequest newShardRequest(int numShards, ShardRouting shard, PercolateRequest request) {
return new PercolateShardRequest(shard.shardId(), numShards, request);

View File

@ -57,7 +57,8 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
@Inject
public TransportShardMultiPercolateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, PercolatorService percolatorService, ActionFilters actionFilters) {
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
Request.class, ThreadPool.Names.PERCOLATE);
this.percolatorService = percolatorService;
}
@ -66,16 +67,6 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
return true;
}
@Override
protected String executor() {
return ThreadPool.Names.PERCOLATE;
}
@Override
protected Request newRequest() {
return new Request();
}
@Override
protected Response newResponse() {
return new Response();

View File

@ -22,7 +22,6 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -33,8 +32,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
@ -54,7 +51,7 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
@Inject
public TransportClearScrollAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, SearchServiceTransportAction searchServiceTransportAction, ActionFilters actionFilters) {
super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters);
super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, ClearScrollRequest.class);
this.clusterService = clusterService;
this.searchServiceTransportAction = searchServiceTransportAction;
}
@ -64,11 +61,6 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
new Async(request, listener, clusterService.state()).run();
}
@Override
public ClearScrollRequest newRequestInstance() {
return new ClearScrollRequest();
}
private class Async {
final DiscoveryNodes nodes;

View File

@ -39,12 +39,11 @@ import java.util.concurrent.atomic.AtomicInteger;
public class TransportMultiSearchAction extends HandledTransportAction<MultiSearchRequest, MultiSearchResponse> {
private final ClusterService clusterService;
private final TransportSearchAction searchAction;
@Inject
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, TransportSearchAction searchAction, ActionFilters actionFilters) {
super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters);
super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters, MultiSearchRequest.class);
this.clusterService = clusterService;
this.searchAction = searchAction;
}
@ -82,9 +81,4 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
});
}
}
@Override
public MultiSearchRequest newRequestInstance() {
return new MultiSearchRequest();
}
}

View File

@ -62,7 +62,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
TransportSearchScanAction scanAction,
TransportSearchCountAction countAction,
ActionFilters actionFilters) {
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters);
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, SearchRequest.class);
this.clusterService = clusterService;
this.dfsQueryThenFetchAction = dfsQueryThenFetchAction;
this.queryThenFetchAction = queryThenFetchAction;
@ -109,9 +109,4 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
throw new ElasticsearchIllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
}
}
@Override
public SearchRequest newRequestInstance() {
return new SearchRequest();
}
}

View File

@ -27,12 +27,9 @@ import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchA
import org.elasticsearch.action.search.type.TransportSearchScrollScanAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import static org.elasticsearch.action.search.type.ParsedScrollId.*;
@ -44,9 +41,7 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.parseSc
public class TransportSearchScrollAction extends HandledTransportAction<SearchScrollRequest, SearchResponse> {
private final TransportSearchScrollQueryThenFetchAction queryThenFetchAction;
private final TransportSearchScrollQueryAndFetchAction queryAndFetchAction;
private final TransportSearchScrollScanAction scanAction;
@Inject
@ -54,7 +49,7 @@ public class TransportSearchScrollAction extends HandledTransportAction<SearchSc
TransportSearchScrollQueryThenFetchAction queryThenFetchAction,
TransportSearchScrollQueryAndFetchAction queryAndFetchAction,
TransportSearchScrollScanAction scanAction, ActionFilters actionFilters) {
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters);
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, SearchScrollRequest.class);
this.queryThenFetchAction = queryThenFetchAction;
this.queryAndFetchAction = queryAndFetchAction;
this.scanAction = scanAction;
@ -77,9 +72,4 @@ public class TransportSearchScrollAction extends HandledTransportAction<SearchSc
listener.onFailure(e);
}
}
@Override
public SearchScrollRequest newRequestInstance() {
return new SearchScrollRequest();
}
}

View File

@ -62,32 +62,17 @@ import static com.google.common.collect.Lists.newArrayList;
public class TransportSuggestAction extends TransportBroadcastOperationAction<SuggestRequest, SuggestResponse, ShardSuggestRequest, ShardSuggestResponse> {
private final IndicesService indicesService;
private final SuggestPhase suggestPhase;
@Inject
public TransportSuggestAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, SuggestPhase suggestPhase, ActionFilters actionFilters) {
super(settings, SuggestAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, SuggestAction.NAME, threadPool, clusterService, transportService, actionFilters,
SuggestRequest.class, ShardSuggestRequest.class, ThreadPool.Names.SUGGEST);
this.indicesService = indicesService;
this.suggestPhase = suggestPhase;
}
@Override
protected String executor() {
return ThreadPool.Names.SUGGEST;
}
@Override
protected SuggestRequest newRequestInstance() {
return new SuggestRequest();
}
@Override
protected ShardSuggestRequest newShardRequest() {
return new ShardSuggestRequest();
}
@Override
protected ShardSuggestRequest newShardRequest(int numShards, ShardRouting shard, SuggestRequest request) {
return new ShardSuggestRequest(shard.shardId(), request);

View File

@ -23,8 +23,8 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportService;
/**
@ -32,30 +32,12 @@ import org.elasticsearch.transport.TransportService;
*/
public abstract class HandledTransportAction<Request extends ActionRequest, Response extends ActionResponse> extends TransportAction<Request,Response>{
/**
* Sub classes implement this call to get new instance of a Request object
* @return Request
*/
protected abstract Request newRequestInstance();
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters){
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, Class<Request> request) {
super(settings, actionName, threadPool, actionFilters);
transportService.registerHandler(actionName, new TransportHandler() {
@Override
public Request newInstance(){
return newRequestInstance();
}
});
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
}
abstract class TransportHandler extends BaseTransportRequestHandler<Request>{
/**
* Call to get an instance of type Request
* @return Request
*/
@Override
public abstract Request newInstance();
class TransportHandler implements TransportRequestHandler<Request> {
@Override
public final void messageReceived(final Request request, final TransportChannel channel) throws Exception {
@ -82,12 +64,6 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -53,17 +52,16 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
protected final TransportService transportService;
final String transportShardAction;
final String executor;
protected TransportBroadcastOperationAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(settings, actionName, threadPool, transportService, actionFilters);
protected TransportBroadcastOperationAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
Class<Request> request, Class<ShardRequest> shardRequest, String shardExecutor) {
super(settings, actionName, threadPool, transportService, actionFilters, request);
this.clusterService = clusterService;
this.transportService = transportService;
this.threadPool = threadPool;
this.transportShardAction = actionName + "[s]";
this.executor = executor();
transportService.registerHandler(transportShardAction, new ShardTransportHandler());
transportService.registerRequestHandler(transportShardAction, shardRequest, shardExecutor, new ShardTransportHandler());
}
@Override
@ -71,12 +69,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
new AsyncBroadcastAction(request, listener).start();
}
protected abstract String executor();
protected abstract Response newResponse(Request request, AtomicReferenceArray shardsResponses, ClusterState clusterState);
protected abstract ShardRequest newShardRequest();
protected abstract ShardRequest newShardRequest(int numShards, ShardRouting shard, Request request);
protected abstract ShardResponse newShardResponse();
@ -268,17 +262,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
}
class ShardTransportHandler extends BaseTransportRequestHandler<ShardRequest> {
@Override
public ShardRequest newInstance() {
return newShardRequest();
}
@Override
public String executor() {
return executor;
}
class ShardTransportHandler implements TransportRequestHandler<ShardRequest> {
@Override
public void messageReceived(final ShardRequest request, final TransportChannel channel) throws Exception {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
@ -42,28 +43,23 @@ import org.elasticsearch.transport.*;
/**
* A base class for operations that needs to be performed on the master node.
*/
public abstract class TransportMasterNodeOperationAction<Request extends MasterNodeOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
public abstract class TransportMasterNodeOperationAction<Request extends MasterNodeOperationRequest, Response extends ActionResponse> extends HandledTransportAction<Request, Response> {
protected final TransportService transportService;
protected final ClusterService clusterService;
final String executor;
protected TransportMasterNodeOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters);
protected TransportMasterNodeOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
Class<Request> request) {
super(settings, actionName, threadPool, transportService, actionFilters, request);
this.transportService = transportService;
this.clusterService = clusterService;
this.executor = executor();
transportService.registerHandler(actionName, new TransportHandler());
}
protected abstract String executor();
protected abstract Request newRequest();
protected abstract Response newResponse();
protected abstract void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception;
@ -225,42 +221,4 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
});
}
}
private class TransportHandler extends BaseTransportRequestHandler<Request> {
@Override
public Request newInstance() {
return newRequest();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
// we just send back a response, no need to fork a listener
request.listenerThreaded(false);
execute(request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
channel.sendResponse(response);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response", e1);
}
}
});
}
}
}

View File

@ -36,8 +36,8 @@ public abstract class TransportMasterNodeReadOperationAction<Request extends Mas
private Boolean forceLocal;
protected TransportMasterNodeReadOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters);
protected TransportMasterNodeReadOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, Class<Request> request) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters,request);
this.forceLocal = settings.getAsBoolean(FORCE_LOCAL_SETTING, null);
}

View File

@ -33,8 +33,8 @@ import org.elasticsearch.transport.TransportService;
*/
public abstract class TransportClusterInfoAction<Request extends ClusterInfoRequest, Response extends ActionResponse> extends TransportMasterNodeReadOperationAction<Request, Response> {
public TransportClusterInfoAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters);
public TransportClusterInfoAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, Class<Request> request) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters, request);
}
@Override

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.NoSuchNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -48,19 +47,18 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
protected final TransportService transportService;
final String transportNodeAction;
final String executor;
protected TransportNodesOperationAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(settings, actionName, threadPool, transportService, actionFilters);
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
Class<Request> request, Class<NodeRequest> nodeRequest, String nodeExecutor) {
super(settings, actionName, threadPool, transportService, actionFilters, request);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.transportService = transportService;
this.transportNodeAction = actionName + "[n]";
this.executor = executor();
transportService.registerHandler(transportNodeAction, new NodeTransportHandler());
transportService.registerRequestHandler(transportNodeAction, nodeRequest, nodeExecutor, new NodeTransportHandler());
}
@Override
@ -72,12 +70,8 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
return false;
}
protected abstract String executor();
protected abstract Response newResponse(Request request, AtomicReferenceArray nodesResponses);
protected abstract NodeRequest newNodeRequest();
protected abstract NodeRequest newNodeRequest(String nodeId, Request request);
protected abstract NodeResponse newNodeResponse();
@ -198,26 +192,11 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
}
}
private class NodeTransportHandler extends BaseTransportRequestHandler<NodeRequest> {
@Override
public NodeRequest newInstance() {
return newNodeRequest();
}
class NodeTransportHandler implements TransportRequestHandler<NodeRequest> {
@Override
public void messageReceived(final NodeRequest request, final TransportChannel channel) throws Exception {
channel.sendResponse(nodeOperation(request));
}
@Override
public String toString() {
return transportNodeAction;
}
@Override
public String executor() {
return executor;
}
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@ -40,8 +41,9 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);
protected TimeValue timeout = DEFAULT_TIMEOUT;
ShardId internalShardId;
protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;
private boolean threadedOperation = true;
@ -171,6 +173,9 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
internalShardId = ShardId.readShardId(in);
}
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
index = in.readString();
@ -181,6 +186,7 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStreamable(internalShardId);
out.writeByte(consistencyLevel.id());
timeout.writeTo(out);
out.writeString(index);

View File

@ -24,14 +24,13 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
@ -43,19 +42,18 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
*/
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse,
ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse>
extends TransportAction<Request, Response> {
extends HandledTransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction;
protected TransportIndicesReplicationOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters);
TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction, ActionFilters actionFilters,
Class<Request> request) {
super(settings, actionName, threadPool, transportService, actionFilters, request);
this.clusterService = clusterService;
this.indexAction = indexAction;
transportService.registerHandler(actionName, new TransportHandler());
}
@ -116,8 +114,6 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
}
}
protected abstract Request newRequestInstance();
protected abstract Response newResponseInstance(Request request, AtomicReferenceArray indexResponses);
protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set<String> routing, long startTimeInMillis);
@ -127,42 +123,4 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices);
private class TransportHandler extends BaseTransportRequestHandler<Request> {
@Override
public Request newInstance() {
return newRequestInstance();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
// no need for a threaded listener, since we just send a response
request.listenerThreaded(false);
execute(request, new ActionListener<Response>() {
@Override
public void onResponse(Response result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send error response for action [" + actionName + "] and request [" + request + "]", e1);
}
}
});
}
}
}

View File

@ -78,7 +78,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected TransportShardReplicationOperationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters) {
ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters,
Class<Request> request, Class<ReplicaRequest> replicaRequest, String executor) {
super(settings, actionName, threadPool, actionFilters);
this.transportService = transportService;
this.clusterService = clusterService;
@ -86,11 +87,12 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
this.shardStateAction = shardStateAction;
this.transportReplicaAction = actionName + "[r]";
this.executor = executor();
this.executor = executor;
this.checkWriteConsistency = checkWriteConsistency();
transportService.registerHandler(actionName, new OperationTransportHandler());
transportService.registerHandler(transportReplicaAction, new ReplicaOperationTransportHandler());
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction, replicaRequest, executor, true, new ReplicaOperationTransportHandler());
this.transportOptions = transportOptions();
@ -102,21 +104,15 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
new PrimaryPhase(request, listener).run();
}
protected abstract Request newRequestInstance();
protected abstract ReplicaRequest newReplicaRequestInstance();
protected abstract Response newResponseInstance();
protected abstract String executor();
/**
* @return A tuple containing not null values, as first value the result of the primary operation and as second value
* the request to be executed on the replica shards.
*/
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception;
protected abstract void shardOperationOnReplica(ShardId shardId, ReplicaRequest shardRequest) throws Exception;
protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException;
@ -175,18 +171,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return false;
}
class OperationTransportHandler extends BaseTransportRequestHandler<Request> {
@Override
public Request newInstance() {
return newRequestInstance();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
class OperationTransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
// no need to have a threaded listener since we just send back a response
@ -215,30 +200,13 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
class ReplicaOperationTransportHandler implements TransportRequestHandler<ReplicaRequest> {
@Override
public ReplicaOperationRequest newInstance() {
return new ReplicaOperationRequest();
}
@Override
public String executor() {
return executor;
}
// we must never reject on because of thread pool capacity on replicas
@Override
public boolean isForceExecution() {
return true;
}
@Override
public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final ReplicaRequest request, final TransportChannel channel) throws Exception {
try {
shardOperationOnReplica(request);
shardOperationOnReplica(request.internalShardId, request);
} catch (Throwable t) {
failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t);
failReplicaIfNeeded(request.internalShardId.getIndex(), request.internalShardId.id(), t);
throw t;
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
@ -255,46 +223,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
protected class ReplicaOperationRequest extends TransportRequest implements IndicesRequest {
public ShardId shardId;
public ReplicaRequest request;
ReplicaOperationRequest() {
}
ReplicaOperationRequest(ShardId shardId, ReplicaRequest request) {
super(request);
this.shardId = shardId;
this.request = request;
}
@Override
public String[] indices() {
return request.indices();
}
@Override
public IndicesOptions indicesOptions() {
return request.indicesOptions();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
request = newReplicaRequestInstance();
request.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
request.writeTo(out);
}
}
/**
* Responsible for performing all operations up to the point we start starting sending requests to replica shards.
* Including forwarding the request to another node if the primary is not assigned locally.
@ -804,11 +732,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return;
}
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), replicaRequest);
replicaRequest.internalShardId = shardIt.shardId();
if (!nodeId.equals(observer.observedState().nodes().localNodeId())) {
final DiscoveryNode node = observer.observedState().nodes().get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest,
transportService.sendRequest(node, transportReplicaAction, replicaRequest,
transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty vResponse) {
@ -834,7 +762,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
protected void doRun() {
try {
shardOperationOnReplica(shardRequest);
shardOperationOnReplica(shard.shardId(), replicaRequest);
onReplicaSuccess();
} catch (Throwable e) {
onReplicaFailure(nodeId, e);
@ -859,7 +787,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
} else {
try {
shardOperationOnReplica(shardRequest);
shardOperationOnReplica(shard.shardId(), replicaRequest);
onReplicaSuccess();
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
@ -33,6 +34,8 @@ import java.io.IOException;
*/
public abstract class SingleCustomOperationRequest<T extends SingleCustomOperationRequest> extends ActionRequest<T> implements IndicesRequest {
ShardId internalShardId;
private boolean threadedOperation = true;
private boolean preferLocal = true;
private String index;
@ -113,6 +116,9 @@ public abstract class SingleCustomOperationRequest<T extends SingleCustomOperati
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
internalShardId = ShardId.readShardId(in);
}
preferLocal = in.readBoolean();
readIndex(in);
}
@ -124,6 +130,7 @@ public abstract class SingleCustomOperationRequest<T extends SingleCustomOperati
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStreamable(internalShardId);
out.writeBoolean(preferLocal);
writeIndex(out);
}

View File

@ -19,14 +19,11 @@
package org.elasticsearch.action.support.single.custom;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -35,37 +32,33 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
/**
* Transport action used to send a read request to one of the shards that belong to an index.
* Supports retrying another shard in case of failure.
*/
public abstract class TransportSingleCustomOperationAction<Request extends SingleCustomOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
public abstract class TransportSingleCustomOperationAction<Request extends SingleCustomOperationRequest, Response extends ActionResponse> extends HandledTransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportService transportService;
final String transportShardAction;
final String executor;
protected TransportSingleCustomOperationAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters);
protected TransportSingleCustomOperationAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
Class<Request> request, String executor) {
super(settings, actionName, threadPool, transportService, actionFilters, request);
this.clusterService = clusterService;
this.transportService = transportService;
this.transportShardAction = actionName + "[s]";
this.executor = executor();
this.executor = executor;
transportService.registerHandler(transportShardAction, new ShardTransportHandler());
transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());
}
@Override
@ -73,8 +66,6 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
new AsyncSingleAction(request, listener).start();
}
protected abstract String executor();
/**
* Can return null to execute on this local node.
*/
@ -86,8 +77,6 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
*/
protected abstract Response shardOperation(Request request, ShardId shardId) throws ElasticsearchException;
protected abstract Request newRequest();
protected abstract Response newResponse();
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
@ -154,7 +143,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
// just execute it on the local node
if (internalRequest.request().operationThreaded()) {
internalRequest.request().beforeLocalFork();
threadPool.executor(executor()).execute(new Runnable() {
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
@ -187,7 +176,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
foundLocal = true;
if (internalRequest.request().operationThreaded()) {
internalRequest.request().beforeLocalFork();
threadPool.executor(executor()).execute(new Runnable() {
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
@ -264,7 +253,8 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
}
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
transportService.sendRequest(node, transportShardAction, new ShardSingleOperationRequest(internalRequest.request(), shard.shardId()), new BaseTransportResponseHandler<Response>() {
internalRequest.request().internalShardId = shard.shardId();
transportService.sendRequest(node, transportShardAction, internalRequest.request(), new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
@ -290,73 +280,15 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
}
}
private class ShardTransportHandler extends BaseTransportRequestHandler<ShardSingleOperationRequest> {
private class ShardTransportHandler implements TransportRequestHandler<Request> {
@Override
public ShardSingleOperationRequest newInstance() {
return new ShardSingleOperationRequest();
}
@Override
public String executor() {
return executor;
}
@Override
public void messageReceived(final ShardSingleOperationRequest request, final TransportChannel channel) throws Exception {
Response response = shardOperation(request.request(), request.shardId());
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
Response response = shardOperation(request, request.internalShardId);
channel.sendResponse(response);
}
}
protected class ShardSingleOperationRequest extends TransportRequest implements IndicesRequest {
private Request request;
private ShardId shardId;
ShardSingleOperationRequest() {
}
public ShardSingleOperationRequest(Request request, ShardId shardId) {
super(request);
this.request = request;
this.shardId = shardId;
}
public Request request() {
return request;
}
@Override
public String[] indices() {
return request.indices();
}
@Override
public IndicesOptions indicesOptions() {
return request.indicesOptions();
}
public ShardId shardId() {
return shardId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request = newRequest();
request.readFrom(in);
shardId = ShardId.readShardId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
shardId.writeTo(out);
}
}
/**
* Internal request class that gets built on each node. Holds the original request plus additional info.
*/

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -47,22 +48,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
*
*/
public abstract class TransportInstanceSingleOperationAction<Request extends InstanceShardOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
public abstract class TransportInstanceSingleOperationAction<Request extends InstanceShardOperationRequest, Response extends ActionResponse> extends HandledTransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportService transportService;
final String executor;
protected TransportInstanceSingleOperationAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters);
protected TransportInstanceSingleOperationAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, Class<Request> request) {
super(settings, actionName, threadPool, transportService, actionFilters, request);
this.clusterService = clusterService;
this.transportService = transportService;
this.executor = executor();
transportService.registerHandler(actionName, new TransportHandler());
}
@Override
@ -74,8 +71,6 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
protected abstract void shardOperation(InternalRequest request, ActionListener<Response> listener) throws ElasticsearchException;
protected abstract Request newRequest();
protected abstract Response newResponse();
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
@ -279,44 +274,6 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
}
}
class TransportHandler extends BaseTransportRequestHandler<Request> {
@Override
public Request newInstance() {
return newRequest();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void messageReceived(Request request, final TransportChannel channel) throws Exception {
// no need to have a threaded listener since we just send back a response
request.listenerThreaded(false);
execute(request, new ActionListener<Response>() {
@Override
public void onResponse(Response result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response for get", e1);
}
}
});
}
}
/**
* Internal request class that gets built on each node. Holds the original request plus additional info.
*/

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
@ -34,10 +35,10 @@ import java.io.IOException;
*/
public abstract class SingleShardOperationRequest<T extends SingleShardOperationRequest> extends ActionRequest<T> implements IndicesRequest {
ShardId internalShardId;
protected String index;
public static final IndicesOptions INDICES_OPTIONS = IndicesOptions.strictSingleIndexNoExpandForbidClosed();
private boolean threadedOperation = true;
protected SingleShardOperationRequest() {
@ -107,6 +108,9 @@ public abstract class SingleShardOperationRequest<T extends SingleShardOperation
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
internalShardId = ShardId.readShardId(in);
}
index = in.readString();
// no need to pass threading over the network, they are always false when coming throw a thread pool
}
@ -114,6 +118,7 @@ public abstract class SingleShardOperationRequest<T extends SingleShardOperation
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStreamable(internalShardId);
out.writeString(index);
}

View File

@ -20,13 +20,10 @@
package org.elasticsearch.action.support.single.shard;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService;
@ -38,15 +35,11 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
/**
@ -61,18 +54,19 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
final String transportShardAction;
final String executor;
protected TransportShardSingleOperationAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
protected TransportShardSingleOperationAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
Class<Request> request, String executor) {
super(settings, actionName, threadPool, actionFilters);
this.clusterService = clusterService;
this.transportService = transportService;
this.transportShardAction = actionName + "[s]";
this.executor = executor();
this.executor = executor;
if (!isSubAction()) {
transportService.registerHandler(actionName, new TransportHandler());
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
}
transportService.registerHandler(transportShardAction, new ShardTransportHandler());
transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());
}
/**
@ -89,12 +83,8 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
new AsyncSingleAction(request, listener).start();
}
protected abstract String executor();
protected abstract Response shardOperation(Request request, ShardId shardId) throws ElasticsearchException;
protected abstract Request newRequest();
protected abstract Response newResponse();
protected abstract boolean resolveIndex();
@ -210,7 +200,8 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardIt.shardId()));
} else {
transportService.sendRequest(node, transportShardAction, new ShardSingleOperationRequest(internalRequest.request(), shardRouting.shardId()), new BaseTransportResponseHandler<Response>() {
internalRequest.request().internalShardId = shardRouting.shardId();
transportService.sendRequest(node, transportShardAction, internalRequest.request(), new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
@ -237,17 +228,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
}
}
private class TransportHandler extends BaseTransportRequestHandler<Request> {
@Override
public Request newInstance() {
return newRequest();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
private class TransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(Request request, final TransportChannel channel) throws Exception {
@ -277,77 +258,18 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
}
}
private class ShardTransportHandler extends BaseTransportRequestHandler<ShardSingleOperationRequest> {
private class ShardTransportHandler implements TransportRequestHandler<Request> {
@Override
public ShardSingleOperationRequest newInstance() {
return new ShardSingleOperationRequest();
}
@Override
public String executor() {
return executor;
}
@Override
public void messageReceived(final ShardSingleOperationRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request.request(), request.shardId());
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}
Response response = shardOperation(request.request(), request.shardId());
Response response = shardOperation(request, request.internalShardId);
channel.sendResponse(response);
}
}
class ShardSingleOperationRequest extends TransportRequest implements IndicesRequest {
private Request request;
private ShardId shardId;
ShardSingleOperationRequest() {
}
ShardSingleOperationRequest(Request request, ShardId shardId) {
super(request);
this.request = request;
this.shardId = shardId;
}
public Request request() {
return request;
}
public ShardId shardId() {
return shardId;
}
@Override
public String[] indices() {
return request.indices();
}
@Override
public IndicesOptions indicesOptions() {
return request.indicesOptions();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request = newRequest();
request.readFrom(in);
shardId = ShardId.readShardId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
shardId.writeTo(out);
}
}
/**
* Internal request class that gets built on each node. Holds the original request plus additional info.
*/

View File

@ -46,7 +46,7 @@ public class TransportMultiTermVectorsAction extends HandledTransportAction<Mult
@Inject
public TransportMultiTermVectorsAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, TransportShardMultiTermsVectorAction shardAction, ActionFilters actionFilters) {
super(settings, MultiTermVectorsAction.NAME, threadPool, transportService, actionFilters);
super(settings, MultiTermVectorsAction.NAME, threadPool, transportService, actionFilters, MultiTermVectorsRequest.class);
this.clusterService = clusterService;
this.shardAction = shardAction;
}
@ -127,9 +127,4 @@ public class TransportMultiTermVectorsAction extends HandledTransportAction<Mult
});
}
}
@Override
public MultiTermVectorsRequest newRequestInstance() {
return new MultiTermVectorsRequest();
}
}

View File

@ -45,7 +45,8 @@ public class TransportShardMultiTermsVectorAction extends TransportShardSingleOp
@Inject
public TransportShardMultiTermsVectorAction(Settings settings, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
MultiTermVectorsShardRequest.class, ThreadPool.Names.GET);
this.indicesService = indicesService;
}
@ -54,16 +55,6 @@ public class TransportShardMultiTermsVectorAction extends TransportShardSingleOp
return true;
}
@Override
protected String executor() {
return ThreadPool.Names.GET;
}
@Override
protected MultiTermVectorsShardRequest newRequest() {
return new MultiTermVectorsShardRequest();
}
@Override
protected MultiTermVectorsShardResponse newResponse() {
return new MultiTermVectorsShardResponse();

View File

@ -52,16 +52,11 @@ public class TransportTermVectorsAction extends TransportShardSingleOperationAct
@Inject
public TransportTermVectorsAction(Settings settings, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, TermVectorsAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, TermVectorsAction.NAME, threadPool, clusterService, transportService, actionFilters,
TermVectorsRequest.class, ThreadPool.Names.GET);
this.indicesService = indicesService;
}
@Override
protected String executor() {
// TODO: Is this the right pool to execute this on?
return ThreadPool.Names.GET;
}
@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {
return clusterService.operationRouting().getShards(state, request.concreteIndex(), request.request().type(), request.request().id(),
@ -92,11 +87,6 @@ public class TransportTermVectorsAction extends TransportShardSingleOperationAct
return response;
}
@Override
protected TermVectorsRequest newRequest() {
return new TermVectorsRequest();
}
@Override
protected TermVectorsResponse newResponse() {
return new TermVectorsResponse();

View File

@ -63,7 +63,8 @@ public class TransportDfsOnlyAction extends TransportBroadcastOperationAction<Df
@Inject
public TransportDfsOnlyAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, SearchService searchService, SearchPhaseController searchPhaseController) {
super(settings, NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, NAME, threadPool, clusterService, transportService, actionFilters,
DfsOnlyRequest.class, ShardDfsOnlyRequest.class, ThreadPool.Names.SEARCH);
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
@ -74,21 +75,6 @@ public class TransportDfsOnlyAction extends TransportBroadcastOperationAction<Df
super.doExecute(request, listener);
}
@Override
protected String executor() {
return ThreadPool.Names.SEARCH;
}
@Override
protected DfsOnlyRequest newRequestInstance() {
return new DfsOnlyRequest();
}
@Override
protected ShardDfsOnlyRequest newShardRequest() {
return new ShardDfsOnlyRequest();
}
@Override
protected ShardDfsOnlyRequest newShardRequest(int numShards, ShardRouting shard, DfsOnlyRequest request) {
String[] filteringAliases = clusterService.state().metaData().filteringAliases(shard.index(), request.indices());

View File

@ -76,7 +76,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportCreateIndexAction createIndexAction,
UpdateHelper updateHelper, ActionFilters actionFilters, IndicesService indicesService) {
super(settings, UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters);
super(settings, UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters, UpdateRequest.class);
this.indexAction = indexAction;
this.deleteAction = deleteAction;
this.createIndexAction = createIndexAction;
@ -90,11 +90,6 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
return ThreadPool.Names.INDEX;
}
@Override
protected UpdateRequest newRequest() {
return new UpdateRequest();
}
@Override
protected UpdateResponse newResponse() {
return new UpdateResponse();

Some files were not shown because too many files have changed in this diff Show More