Cleanup local code transport execution

Now that we handle automatically the local execution within the transport service, we can remove parts of the code that handle it in actions.
closes #10582
This commit is contained in:
Shay Banon 2015-04-13 14:16:23 -07:00
parent dc1742785d
commit 45fa5dcad9
38 changed files with 290 additions and 712 deletions

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
/**
* A simple base class for action response listeners, defaulting to using the SAME executor (as its
* very common on response handlers).
*/
public abstract class ActionListenerResponseHandler<Response extends TransportResponse> extends BaseTransportResponseHandler<Response> {
private final ActionListener<Response> listener;
public ActionListenerResponseHandler(ActionListener<Response> listener) {
this.listener = listener;
}
@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException e) {
listener.onFailure(e);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}

View File

@ -67,7 +67,7 @@ public class TransportNodesHotThreadsAction extends TransportNodesOperationActio
} }
@Override @Override
protected NodesHotThreadsRequest newRequest() { protected NodesHotThreadsRequest newRequestInstance() {
return new NodesHotThreadsRequest(); return new NodesHotThreadsRequest();
} }

View File

@ -71,7 +71,7 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction<Node
} }
@Override @Override
protected NodesInfoRequest newRequest() { protected NodesInfoRequest newRequestInstance() {
return new NodesInfoRequest(); return new NodesInfoRequest();
} }

View File

@ -71,7 +71,7 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
} }
@Override @Override
protected NodesStatsRequest newRequest() { protected NodesStatsRequest newRequestInstance() {
return new NodesStatsRequest(); return new NodesStatsRequest();
} }

View File

@ -70,7 +70,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesOperationAction
} }
@Override @Override
protected Request newRequest() { protected Request newRequestInstance() {
return new Request(); return new Request();
} }

View File

@ -91,7 +91,7 @@ public class TransportClusterStatsAction extends TransportNodesOperationAction<C
} }
@Override @Override
protected ClusterStatsRequest newRequest() { protected ClusterStatsRequest newRequestInstance() {
return new ClusterStatsRequest(); return new ClusterStatsRequest();
} }

View File

@ -69,7 +69,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
} }
@Override @Override
protected ClearIndicesCacheRequest newRequest() { protected ClearIndicesCacheRequest newRequestInstance() {
return new ClearIndicesCacheRequest(); return new ClearIndicesCacheRequest();
} }

View File

@ -62,7 +62,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
} }
@Override @Override
protected FlushRequest newRequest() { protected FlushRequest newRequestInstance() {
return new FlushRequest(); return new FlushRequest();
} }

View File

@ -63,7 +63,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
} }
@Override @Override
protected OptimizeRequest newRequest() { protected OptimizeRequest newRequestInstance() {
return new OptimizeRequest(); return new OptimizeRequest();
} }

View File

@ -71,7 +71,7 @@ public class TransportRecoveryAction extends
} }
@Override @Override
protected RecoveryRequest newRequest() { protected RecoveryRequest newRequestInstance() {
return new RecoveryRequest(); return new RecoveryRequest();
} }

View File

@ -63,7 +63,7 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
} }
@Override @Override
protected RefreshRequest newRequest() { protected RefreshRequest newRequestInstance() {
return new RefreshRequest(); return new RefreshRequest();
} }

View File

@ -33,6 +33,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -41,6 +43,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
@ -66,7 +69,7 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastOperationA
} }
@Override @Override
protected IndicesSegmentsRequest newRequest() { protected IndicesSegmentsRequest newRequestInstance() {
return new IndicesSegmentsRequest(); return new IndicesSegmentsRequest();
} }
@ -135,7 +138,7 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastOperationA
} }
static class IndexShardSegmentRequest extends BroadcastShardOperationRequest { static class IndexShardSegmentRequest extends BroadcastShardOperationRequest {
final boolean verbose; boolean verbose;
IndexShardSegmentRequest() { IndexShardSegmentRequest() {
verbose = false; verbose = false;
@ -145,5 +148,17 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastOperationA
super(shardId, request); super(shardId, request);
verbose = request.verbose(); verbose = request.verbose();
} }
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(verbose);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
verbose = in.readBoolean();
}
} }
} }

View File

@ -70,7 +70,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
} }
@Override @Override
protected IndicesStatsRequest newRequest() { protected IndicesStatsRequest newRequestInstance() {
return new IndicesStatsRequest(); return new IndicesStatsRequest();
} }

View File

@ -96,7 +96,7 @@ public class TransportValidateQueryAction extends TransportBroadcastOperationAct
} }
@Override @Override
protected ValidateQueryRequest newRequest() { protected ValidateQueryRequest newRequestInstance() {
return new ValidateQueryRequest(); return new ValidateQueryRequest();
} }

View File

@ -95,7 +95,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
} }
@Override @Override
protected CountRequest newRequest() { protected CountRequest newRequestInstance() {
return new CountRequest(); return new CountRequest();
} }

View File

@ -95,7 +95,7 @@ public class TransportExistsAction extends TransportBroadcastOperationAction<Exi
} }
@Override @Override
protected ExistsRequest newRequest() { protected ExistsRequest newRequestInstance() {
return new ExistsRequest(); return new ExistsRequest();
} }

View File

@ -101,7 +101,7 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
} }
@Override @Override
protected PercolateRequest newRequest() { protected PercolateRequest newRequestInstance() {
return new PercolateRequest(); return new PercolateRequest();
} }

View File

@ -35,6 +35,7 @@ import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler; import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.ArrayList; import java.util.ArrayList;
@ -105,10 +106,10 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
if (contexts.isEmpty()) { if (contexts.isEmpty()) {
for (final DiscoveryNode node : nodes) { for (final DiscoveryNode node : nodes) {
searchServiceTransportAction.sendClearAllScrollContexts(node, request, new ActionListener<Boolean>() { searchServiceTransportAction.sendClearAllScrollContexts(node, request, new ActionListener<TransportResponse>() {
@Override @Override
public void onResponse(Boolean freed) { public void onResponse(TransportResponse response) {
onFreedContext(freed); onFreedContext(true);
} }
@Override @Override
@ -126,10 +127,10 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
continue; continue;
} }
searchServiceTransportAction.sendFreeContext(node, target.v2(), request, new ActionListener<Boolean>() { searchServiceTransportAction.sendFreeContext(node, target.v2(), request, new ActionListener<SearchServiceTransportAction.SearchFreeContextResponse>() {
@Override @Override
public void onResponse(Boolean freed) { public void onResponse(SearchServiceTransportAction.SearchFreeContextResponse freed) {
onFreedContext(freed); onFreedContext(freed.isFreed());
} }
@Override @Override

View File

@ -28,7 +28,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchResultProvider; import org.elasticsearch.search.fetch.FetchSearchResultProvider;
@ -67,7 +66,7 @@ public class TransportSearchCountAction extends TransportSearchTypeAction {
} }
@Override @Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<QuerySearchResultProvider> listener) { protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener<QuerySearchResultProvider> listener) {
searchService.sendExecuteQuery(node, request, listener); searchService.sendExecuteQuery(node, request, listener);
} }

View File

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.AggregatedDfs;
@ -75,7 +74,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
} }
@Override @Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<DfsSearchResult> listener) { protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener<DfsSearchResult> listener) {
searchService.sendExecuteDfs(node, request, listener); searchService.sendExecuteDfs(node, request, listener);
} }
@ -93,9 +92,9 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
} }
void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) { void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
searchService.sendExecuteFetch(node, querySearchRequest, new SearchServiceListener<QueryFetchSearchResult>() { searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener<QueryFetchSearchResult>() {
@Override @Override
public void onResult(QueryFetchSearchResult result) { public void onResponse(QueryFetchSearchResult result) {
result.shardTarget(dfsResult.shardTarget()); result.shardTarget(dfsResult.shardTarget());
queryFetchResults.set(shardIndex, result); queryFetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {

View File

@ -34,7 +34,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.AggregatedDfs;
@ -85,7 +84,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
} }
@Override @Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<DfsSearchResult> listener) { protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener<DfsSearchResult> listener) {
searchService.sendExecuteDfs(node, request, listener); searchService.sendExecuteDfs(node, request, listener);
} }
@ -102,9 +101,9 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
} }
void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) { void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) {
searchService.sendExecuteQuery(node, querySearchRequest, new SearchServiceListener<QuerySearchResult>() { searchService.sendExecuteQuery(node, querySearchRequest, new ActionListener<QuerySearchResult>() {
@Override @Override
public void onResult(QuerySearchResult result) { public void onResponse(QuerySearchResult result) {
result.shardTarget(dfsResult.shardTarget()); result.shardTarget(dfsResult.shardTarget());
queryResults.set(shardIndex, result); queryResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
@ -165,9 +164,9 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
} }
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() { searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
@Override @Override
public void onResult(FetchSearchResult result) { public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget); result.shardTarget(shardTarget);
fetchResults.set(shardIndex, result); fetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {

View File

@ -29,7 +29,6 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.QueryFetchSearchResult;
@ -69,7 +68,7 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
} }
@Override @Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<QueryFetchSearchResult> listener) { protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener<QueryFetchSearchResult> listener) {
searchService.sendExecuteFetch(node, request, listener); searchService.sendExecuteFetch(node, request, listener);
} }

View File

@ -33,7 +33,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
@ -79,7 +78,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
} }
@Override @Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<QuerySearchResultProvider> listener) { protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener<QuerySearchResultProvider> listener) {
searchService.sendExecuteQuery(node, request, listener); searchService.sendExecuteQuery(node, request, listener);
} }
@ -107,9 +106,9 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
} }
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() { searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
@Override @Override
public void onResult(FetchSearchResult result) { public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget); result.shardTarget(shardTarget);
fetchResults.set(shardIndex, result); fetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {

View File

@ -29,7 +29,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchResultProvider; import org.elasticsearch.search.fetch.FetchSearchResultProvider;
@ -65,7 +64,7 @@ public class TransportSearchScanAction extends TransportSearchTypeAction {
} }
@Override @Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<QuerySearchResult> listener) { protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener<QuerySearchResult> listener) {
searchService.sendExecuteScan(node, request, listener); searchService.sendExecuteScan(node, request, listener);
} }

View File

@ -30,10 +30,10 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
@ -148,10 +148,10 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchService.sendExecuteFetch(node, internalRequest, new SearchServiceListener<QueryFetchSearchResult>() { searchService.sendExecuteFetch(node, internalRequest, new ActionListener<ScrollQueryFetchSearchResult>() {
@Override @Override
public void onResult(QueryFetchSearchResult result) { public void onResponse(ScrollQueryFetchSearchResult result) {
queryFetchResults.set(shardIndex, result); queryFetchResults.set(shardIndex, result.result());
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim();
} }

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult;
@ -39,6 +38,7 @@ import org.elasticsearch.search.fetch.ShardFetchRequest;
import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -149,10 +149,10 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchService.sendExecuteQuery(node, internalRequest, new SearchServiceListener<QuerySearchResult>() { searchService.sendExecuteQuery(node, internalRequest, new ActionListener<ScrollQuerySearchResult>() {
@Override @Override
public void onResult(QuerySearchResult result) { public void onResponse(ScrollQuerySearchResult result) {
queryResults.set(shardIndex, result); queryResults.set(shardIndex, result.queryResult());
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
try { try {
executeFetchPhase(); executeFetchPhase();
@ -207,9 +207,9 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(request, querySearchResult.id(), docIds, lastEmittedDoc); ShardFetchRequest shardFetchRequest = new ShardFetchRequest(request, querySearchResult.id(), docIds, lastEmittedDoc);
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId()); DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
searchService.sendExecuteFetchScroll(node, shardFetchRequest, new SearchServiceListener<FetchSearchResult>() { searchService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener<FetchSearchResult>() {
@Override @Override
public void onResult(FetchSearchResult result) { public void onResponse(FetchSearchResult result) {
result.shardTarget(querySearchResult.shardTarget()); result.shardTarget(querySearchResult.shardTarget());
fetchResults.set(entry.index, result); fetchResults.set(entry.index, result);
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {

View File

@ -33,10 +33,10 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
@ -156,10 +156,10 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
} }
void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() { searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new ActionListener<ScrollQueryFetchSearchResult>() {
@Override @Override
public void onResult(QueryFetchSearchResult result) { public void onResponse(ScrollQueryFetchSearchResult result) {
queryFetchResults.set(shardIndex, result); queryFetchResults.set(shardIndex, result.result());
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim();
} }

View File

@ -48,7 +48,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
@ -160,9 +159,9 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else { } else {
String[] filteringAliases = clusterState.metaData().filteringAliases(shard.index(), request.indices()); String[] filteringAliases = clusterState.metaData().filteringAliases(shard.index(), request.indices());
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime()), new SearchServiceListener<FirstResult>() { sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime()), new ActionListener<FirstResult>() {
@Override @Override
public void onResult(FirstResult result) { public void onResponse(FirstResult result) {
onFirstPhaseResult(shardIndex, shard, result, shardIt); onFirstPhaseResult(shardIndex, shard, result, shardIt);
} }
@ -351,7 +350,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
} }
} }
protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<FirstResult> listener); protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener<FirstResult> listener);
protected final void processFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result) { protected final void processFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result) {
firstResults.set(shardIndex, result); firstResults.set(shardIndex, result);

View File

@ -79,7 +79,7 @@ public class TransportSuggestAction extends TransportBroadcastOperationAction<Su
} }
@Override @Override
protected SuggestRequest newRequest() { protected SuggestRequest newRequestInstance() {
return new SuggestRequest(); return new SuggestRequest();
} }

View File

@ -36,7 +36,7 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
* Sub classes implement this call to get new instance of a Request object * Sub classes implement this call to get new instance of a Request object
* @return Request * @return Request
*/ */
public abstract Request newRequestInstance(); protected abstract Request newRequestInstance();
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters){ protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters){
super(settings, actionName, threadPool, actionFilters); super(settings, actionName, threadPool, actionFilters);
@ -48,7 +48,7 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
}); });
} }
private abstract class TransportHandler extends BaseTransportRequestHandler<Request>{ abstract class TransportHandler extends BaseTransportRequestHandler<Request>{
/** /**
* Call to get an instance of type Request * Call to get an instance of type Request

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -45,7 +46,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
* *
*/ */
public abstract class TransportBroadcastOperationAction<Request extends BroadcastOperationRequest, Response extends BroadcastOperationResponse, ShardRequest extends BroadcastShardOperationRequest, ShardResponse extends BroadcastShardOperationResponse> public abstract class TransportBroadcastOperationAction<Request extends BroadcastOperationRequest, Response extends BroadcastOperationResponse, ShardRequest extends BroadcastShardOperationRequest, ShardResponse extends BroadcastShardOperationResponse>
extends TransportAction<Request, Response> { extends HandledTransportAction<Request, Response> {
protected final ThreadPool threadPool; protected final ThreadPool threadPool;
protected final ClusterService clusterService; protected final ClusterService clusterService;
@ -55,14 +56,13 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
final String executor; final String executor;
protected TransportBroadcastOperationAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) { protected TransportBroadcastOperationAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters); super(settings, actionName, threadPool, transportService, actionFilters);
this.clusterService = clusterService; this.clusterService = clusterService;
this.transportService = transportService; this.transportService = transportService;
this.threadPool = threadPool; this.threadPool = threadPool;
this.transportShardAction = actionName + "[s]"; this.transportShardAction = actionName + "[s]";
this.executor = executor(); this.executor = executor();
transportService.registerHandler(actionName, new TransportHandler());
transportService.registerHandler(transportShardAction, new ShardTransportHandler()); transportService.registerHandler(transportShardAction, new ShardTransportHandler());
} }
@ -73,8 +73,6 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
protected abstract String executor(); protected abstract String executor();
protected abstract Request newRequest();
protected abstract Response newResponse(Request request, AtomicReferenceArray shardsResponses, ClusterState clusterState); protected abstract Response newResponse(Request request, AtomicReferenceArray shardsResponses, ClusterState clusterState);
protected abstract ShardRequest newShardRequest(); protected abstract ShardRequest newShardRequest();
@ -162,18 +160,6 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
} else { } else {
try { try {
final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request); final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
if (shard.currentNodeId().equals(nodes.localNodeId())) {
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
onOperation(shard, shardIndex, shardOperation(shardRequest));
} catch (Throwable e) {
onOperation(shard, shardIt, shardIndex, e);
}
}
});
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId()); DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) { if (node == null) {
// no node connected, act as failure // no node connected, act as failure
@ -201,7 +187,6 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
} }
}); });
} }
}
} catch (Throwable e) { } catch (Throwable e) {
onOperation(shard, shardIt, shardIndex, e); onOperation(shard, shardIt, shardIndex, e);
} }
@ -283,44 +268,6 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
} }
} }
class TransportHandler extends BaseTransportRequestHandler<Request> {
@Override
public Request newInstance() {
return newRequest();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void messageReceived(Request request, final TransportChannel channel) throws Exception {
// we just send back a response, no need to fork a listener
request.listenerThreaded(false);
execute(request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
channel.sendResponse(response);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response", e1);
}
}
});
}
}
class ShardTransportHandler extends BaseTransportRequestHandler<ShardRequest> { class ShardTransportHandler extends BaseTransportRequestHandler<ShardRequest> {
@Override @Override

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.NoSuchNodeException; import org.elasticsearch.action.NoSuchNodeException;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -40,12 +41,10 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
/** /**
* *
*/ */
public abstract class TransportNodesOperationAction<Request extends NodesOperationRequest, Response extends NodesOperationResponse, NodeRequest extends NodeOperationRequest, NodeResponse extends NodeOperationResponse> extends TransportAction<Request, Response> { public abstract class TransportNodesOperationAction<Request extends NodesOperationRequest, Response extends NodesOperationResponse, NodeRequest extends NodeOperationRequest, NodeResponse extends NodeOperationResponse> extends HandledTransportAction<Request, Response> {
protected final ClusterName clusterName; protected final ClusterName clusterName;
protected final ClusterService clusterService; protected final ClusterService clusterService;
protected final TransportService transportService; protected final TransportService transportService;
final String transportNodeAction; final String transportNodeAction;
@ -53,7 +52,7 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
protected TransportNodesOperationAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool, protected TransportNodesOperationAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) { ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters); super(settings, actionName, threadPool, transportService, actionFilters);
this.clusterName = clusterName; this.clusterName = clusterName;
this.clusterService = clusterService; this.clusterService = clusterService;
this.transportService = transportService; this.transportService = transportService;
@ -61,7 +60,6 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
this.transportNodeAction = actionName + "[n]"; this.transportNodeAction = actionName + "[n]";
this.executor = executor(); this.executor = executor();
transportService.registerHandler(actionName, new TransportHandler());
transportService.registerHandler(transportNodeAction, new NodeTransportHandler()); transportService.registerHandler(transportNodeAction, new NodeTransportHandler());
} }
@ -76,8 +74,6 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
protected abstract String executor(); protected abstract String executor();
protected abstract Request newRequest();
protected abstract Response newResponse(Request request, AtomicReferenceArray nodesResponses); protected abstract Response newResponse(Request request, AtomicReferenceArray nodesResponses);
protected abstract NodeRequest newNodeRequest(); protected abstract NodeRequest newNodeRequest();
@ -133,32 +129,12 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
final int idx = i; final int idx = i;
final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId); final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId);
try { try {
if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
try {
onOperation(idx, nodeOperation(newNodeRequest(clusterState.nodes().localNodeId(), request)));
} catch (Throwable e) {
onFailure(idx, clusterState.nodes().localNodeId(), e);
}
}
});
} else if (nodeId.equals("_master")) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
try {
onOperation(idx, nodeOperation(newNodeRequest(clusterState.nodes().masterNodeId(), request)));
} catch (Throwable e) {
onFailure(idx, clusterState.nodes().masterNodeId(), e);
}
}
});
} else {
if (node == null) { if (node == null) {
onFailure(idx, nodeId, new NoSuchNodeException(nodeId)); onFailure(idx, nodeId, new NoSuchNodeException(nodeId));
} else if (!clusterService.localNode().shouldConnectTo(node)) { } else if (!clusterService.localNode().shouldConnectTo(node) && !clusterService.localNode().equals(node)) {
// the check "!clusterService.localNode().equals(node)" is to maintain backward comp. where before
// we allowed to connect from "local" client node to itself, certain tests rely on it, if we remove it, we need to fix
// those (and they randomize the client node usage, so tricky to find when)
onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node)); onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node));
} else { } else {
NodeRequest nodeRequest = newNodeRequest(nodeId, request); NodeRequest nodeRequest = newNodeRequest(nodeId, request);
@ -184,7 +160,6 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
} }
}); });
} }
}
} catch (Throwable t) { } catch (Throwable t) {
onFailure(idx, nodeId, t); onFailure(idx, nodeId, t);
} }
@ -223,49 +198,6 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
} }
} }
private class TransportHandler extends BaseTransportRequestHandler<Request> {
@Override
public Request newInstance() {
return newRequest();
}
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
request.listenerThreaded(false);
execute(request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
TransportResponseOptions options = TransportResponseOptions.options().withCompress(transportCompress());
try {
channel.sendResponse(response, options);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response", e);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public String toString() {
return actionName;
}
}
private class NodeTransportHandler extends BaseTransportRequestHandler<NodeRequest> { private class NodeTransportHandler extends BaseTransportRequestHandler<NodeRequest> {
@Override @Override

View File

@ -80,7 +80,7 @@ public class TransportDfsOnlyAction extends TransportBroadcastOperationAction<Df
} }
@Override @Override
protected DfsOnlyRequest newRequest() { protected DfsOnlyRequest newRequestInstance() {
return new DfsOnlyRequest(); return new DfsOnlyRequest();
} }

View File

@ -76,7 +76,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA
} }
@Override @Override
protected Request newRequest() { protected Request newRequestInstance() {
return new Request(); return new Request();
} }

View File

@ -74,7 +74,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
} }
@Override @Override
protected Request newRequest() { protected Request newRequestInstance() {
return new Request(); return new Request();
} }

View File

@ -86,7 +86,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
} }
@Override @Override
protected Request newRequest() { protected Request newRequestInstance() {
return new Request(); return new Request();
} }

View File

@ -1,30 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.action;
/**
*
*/
public interface SearchServiceListener<T> {
void onResult(T result);
void onFailure(Throwable t);
}

View File

@ -19,14 +19,13 @@
package org.elasticsearch.search.action; package org.elasticsearch.search.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -46,7 +45,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Callable;
/** /**
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through * An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
@ -69,55 +67,13 @@ public class SearchServiceTransportAction extends AbstractComponent {
public static final String SCAN_ACTION_NAME = "indices:data/read/search[phase/scan]"; public static final String SCAN_ACTION_NAME = "indices:data/read/search[phase/scan]";
public static final String SCAN_SCROLL_ACTION_NAME = "indices:data/read/search[phase/scan/scroll]"; public static final String SCAN_SCROLL_ACTION_NAME = "indices:data/read/search[phase/scan/scroll]";
static final class FreeContextResponseHandler implements TransportResponseHandler<SearchFreeContextResponse> {
private final ActionListener<Boolean> listener;
FreeContextResponseHandler(final ActionListener<Boolean> listener) {
this.listener = listener;
}
@Override
public SearchFreeContextResponse newInstance() {
return new SearchFreeContextResponse();
}
@Override
public void handleResponse(SearchFreeContextResponse response) {
listener.onResponse(response.freed);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
//
private final ThreadPool threadPool;
private final TransportService transportService; private final TransportService transportService;
private final ClusterService clusterService;
private final SearchService searchService; private final SearchService searchService;
private final FreeContextResponseHandler freeContextResponseHandler = new FreeContextResponseHandler(new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {}
@Override
public void onFailure(Throwable exp) {
logger.warn("Failed to send release search context", exp);
}
});
@Inject @Inject
public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) { public SearchServiceTransportAction(Settings settings, TransportService transportService, SearchService searchService) {
super(settings); super(settings);
this.threadPool = threadPool;
this.transportService = transportService; this.transportService = transportService;
this.clusterService = clusterService;
this.searchService = searchService; this.searchService = searchService;
transportService.registerHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextTransportHandler()); transportService.registerHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextTransportHandler());
@ -137,428 +93,139 @@ public class SearchServiceTransportAction extends AbstractComponent {
} }
public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) { public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) { transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), new ActionListenerResponseHandler<SearchFreeContextResponse>(new ActionListener<SearchFreeContextResponse>() {
searchService.freeContext(contextId); @Override
} else { public void onResponse(SearchFreeContextResponse response) {
transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), freeContextResponseHandler); // no need to respond if it was freed or not
}
} }
public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener<Boolean> actionListener) { @Override
if (clusterService.state().nodes().localNodeId().equals(node.id())) { public void onFailure(Throwable e) {
final boolean freed = searchService.freeContext(contextId);
actionListener.onResponse(freed);
} else {
//use the separate action for scroll when possible
transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener));
} }
}) {
@Override
public SearchFreeContextResponse newInstance() {
return new SearchFreeContextResponse();
}
});
} }
public void sendClearAllScrollContexts(DiscoveryNode node, ClearScrollRequest request, final ActionListener<Boolean> actionListener) { public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener<SearchFreeContextResponse> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) { transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(request, contextId), new ActionListenerResponseHandler<SearchFreeContextResponse>(listener) {
searchService.freeAllScrollContexts(); @Override
actionListener.onResponse(true); public SearchFreeContextResponse newInstance() {
} else { return new SearchFreeContextResponse();
transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(request), new TransportResponseHandler<TransportResponse>() { }
});
}
public void sendClearAllScrollContexts(DiscoveryNode node, ClearScrollRequest request, final ActionListener<TransportResponse> listener) {
transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(request), new ActionListenerResponseHandler<TransportResponse>(listener) {
@Override @Override
public TransportResponse newInstance() { public TransportResponse newInstance() {
return TransportResponse.Empty.INSTANCE; return TransportResponse.Empty.INSTANCE;
} }
@Override
public void handleResponse(TransportResponse response) {
actionListener.onResponse(true);
}
@Override
public void handleException(TransportException exp) {
actionListener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}); });
} }
}
public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, final SearchServiceListener<DfsSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<DfsSearchResult>() {
@Override
public DfsSearchResult call() throws Exception {
return searchService.executeDfsPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, DFS_ACTION_NAME, request, new BaseTransportResponseHandler<DfsSearchResult>() {
public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener<DfsSearchResult> listener) {
transportService.sendRequest(node, DFS_ACTION_NAME, request, new ActionListenerResponseHandler<DfsSearchResult>(listener) {
@Override @Override
public DfsSearchResult newInstance() { public DfsSearchResult newInstance() {
return new DfsSearchResult(); return new DfsSearchResult();
} }
@Override
public void handleResponse(DfsSearchResult response) {
listener.onResult(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}); });
} }
}
public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, final SearchServiceListener<QuerySearchResultProvider> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<QuerySearchResultProvider>() {
@Override
public QuerySearchResultProvider call() throws Exception {
return searchService.executeQueryPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, QUERY_ACTION_NAME, request, new BaseTransportResponseHandler<QuerySearchResultProvider>() {
public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener<QuerySearchResultProvider> listener) {
transportService.sendRequest(node, QUERY_ACTION_NAME, request, new ActionListenerResponseHandler<QuerySearchResultProvider>(listener) {
@Override @Override
public QuerySearchResult newInstance() { public QuerySearchResult newInstance() {
return new QuerySearchResult(); return new QuerySearchResult();
} }
@Override
public void handleResponse(QuerySearchResultProvider response) {
listener.onResult(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}); });
} }
}
public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, final SearchServiceListener<QuerySearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<QuerySearchResult>() {
@Override
public QuerySearchResult call() throws Exception {
return searchService.executeQueryPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, QUERY_ID_ACTION_NAME, request, new BaseTransportResponseHandler<QuerySearchResult>() {
public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, final ActionListener<QuerySearchResult> listener) {
transportService.sendRequest(node, QUERY_ID_ACTION_NAME, request, new ActionListenerResponseHandler<QuerySearchResult>(listener) {
@Override @Override
public QuerySearchResult newInstance() { public QuerySearchResult newInstance() {
return new QuerySearchResult(); return new QuerySearchResult();
} }
@Override
public void handleResponse(QuerySearchResult response) {
listener.onResult(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}); });
} }
}
public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener<QuerySearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<QuerySearchResult>() {
@Override
public QuerySearchResult call() throws Exception {
return searchService.executeQueryPhase(request).queryResult();
}
}, listener);
} else {
transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request, new BaseTransportResponseHandler<ScrollQuerySearchResult>() {
public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener<ScrollQuerySearchResult> listener) {
transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request, new ActionListenerResponseHandler<ScrollQuerySearchResult>(listener) {
@Override @Override
public ScrollQuerySearchResult newInstance() { public ScrollQuerySearchResult newInstance() {
return new ScrollQuerySearchResult(); return new ScrollQuerySearchResult();
} }
@Override
public void handleResponse(ScrollQuerySearchResult response) {
listener.onResult(response.queryResult());
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}); });
} }
}
public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, final SearchServiceListener<QueryFetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<QueryFetchSearchResult>() {
@Override
public QueryFetchSearchResult call() throws Exception {
return searchService.executeFetchPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request, new BaseTransportResponseHandler<QueryFetchSearchResult>() {
public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener<QueryFetchSearchResult> listener) {
transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) {
@Override @Override
public QueryFetchSearchResult newInstance() { public QueryFetchSearchResult newInstance() {
return new QueryFetchSearchResult(); return new QueryFetchSearchResult();
} }
@Override
public void handleResponse(QueryFetchSearchResult response) {
listener.onResult(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}); });
} }
}
public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, final SearchServiceListener<QueryFetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<QueryFetchSearchResult>() {
@Override
public QueryFetchSearchResult call() throws Exception {
return searchService.executeFetchPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, new BaseTransportResponseHandler<QueryFetchSearchResult>() {
public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, final ActionListener<QueryFetchSearchResult> listener) {
transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) {
@Override @Override
public QueryFetchSearchResult newInstance() { public QueryFetchSearchResult newInstance() {
return new QueryFetchSearchResult(); return new QueryFetchSearchResult();
} }
@Override
public void handleResponse(QueryFetchSearchResult response) {
listener.onResult(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}); });
} }
}
public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener<QueryFetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<QueryFetchSearchResult>() {
@Override
public QueryFetchSearchResult call() throws Exception {
return searchService.executeFetchPhase(request).result();
}
}, listener);
} else {
transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, new BaseTransportResponseHandler<ScrollQueryFetchSearchResult>() {
public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener<ScrollQueryFetchSearchResult> listener) {
transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, new ActionListenerResponseHandler<ScrollQueryFetchSearchResult>(listener) {
@Override @Override
public ScrollQueryFetchSearchResult newInstance() { public ScrollQueryFetchSearchResult newInstance() {
return new ScrollQueryFetchSearchResult(); return new ScrollQueryFetchSearchResult();
} }
@Override
public void handleResponse(ScrollQueryFetchSearchResult response) {
listener.onResult(response.result());
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}); });
} }
}
public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, final SearchServiceListener<FetchSearchResult> listener) { public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, final ActionListener<FetchSearchResult> listener) {
sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener); sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener);
} }
public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, final SearchServiceListener<FetchSearchResult> listener) { public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, final ActionListener<FetchSearchResult> listener) {
sendExecuteFetch(node, FETCH_ID_SCROLL_ACTION_NAME, request, listener); sendExecuteFetch(node, FETCH_ID_SCROLL_ACTION_NAME, request, listener);
} }
private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, final SearchServiceListener<FetchSearchResult> listener) { private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, final ActionListener<FetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) { transportService.sendRequest(node, action, request, new ActionListenerResponseHandler<FetchSearchResult>(listener) {
execute(new Callable<FetchSearchResult>() {
@Override
public FetchSearchResult call() throws Exception {
return searchService.executeFetchPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, action, request, new BaseTransportResponseHandler<FetchSearchResult>() {
@Override @Override
public FetchSearchResult newInstance() { public FetchSearchResult newInstance() {
return new FetchSearchResult(); return new FetchSearchResult();
} }
@Override
public void handleResponse(FetchSearchResult response) {
listener.onResult(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}); });
} }
}
public void sendExecuteScan(DiscoveryNode node, final ShardSearchTransportRequest request, final SearchServiceListener<QuerySearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<QuerySearchResult>() {
@Override
public QuerySearchResult call() throws Exception {
return searchService.executeScan(request);
}
}, listener);
} else {
transportService.sendRequest(node, SCAN_ACTION_NAME, request, new BaseTransportResponseHandler<QuerySearchResult>() {
public void sendExecuteScan(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener<QuerySearchResult> listener) {
transportService.sendRequest(node, SCAN_ACTION_NAME, request, new ActionListenerResponseHandler<QuerySearchResult>(listener) {
@Override @Override
public QuerySearchResult newInstance() { public QuerySearchResult newInstance() {
return new QuerySearchResult(); return new QuerySearchResult();
} }
@Override
public void handleResponse(QuerySearchResult response) {
listener.onResult(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}); });
} }
}
public void sendExecuteScan(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener<QueryFetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<QueryFetchSearchResult>() {
@Override
public QueryFetchSearchResult call() throws Exception {
return searchService.executeScan(request).result();
}
}, listener);
} else {
transportService.sendRequest(node, SCAN_SCROLL_ACTION_NAME, request, new BaseTransportResponseHandler<ScrollQueryFetchSearchResult>() {
public void sendExecuteScan(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener<ScrollQueryFetchSearchResult> listener) {
transportService.sendRequest(node, SCAN_SCROLL_ACTION_NAME, request, new ActionListenerResponseHandler<ScrollQueryFetchSearchResult>(listener) {
@Override @Override
public ScrollQueryFetchSearchResult newInstance() { public ScrollQueryFetchSearchResult newInstance() {
return new ScrollQueryFetchSearchResult(); return new ScrollQueryFetchSearchResult();
} }
@Override
public void handleResponse(ScrollQueryFetchSearchResult response) {
listener.onResult(response.result());
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}); });
} }
}
private <T> void execute(final Callable<? extends T> callable, final SearchServiceListener<T> listener) {
try {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
// Listeners typically do counting on errors and successes, and the decision to move to second phase, etc. is based on
// these counts so we need to be careful here to never propagate exceptions thrown by onResult to onFailure
T result = null;
Throwable error = null;
try {
result = callable.call();
} catch (Throwable t) {
error = t;
} finally {
if (result == null) {
assert error != null;
listener.onFailure(error);
} else {
assert error == null : error;
listener.onResult(result);
}
}
}
});
} catch (Throwable t) {
listener.onFailure(t);
}
}
static class ScrollFreeContextRequest extends TransportRequest { static class ScrollFreeContextRequest extends TransportRequest {
private long id; private long id;
@ -632,7 +299,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
} }
} }
static class SearchFreeContextResponse extends TransportResponse { public static class SearchFreeContextResponse extends TransportResponse {
private boolean freed; private boolean freed;