diff --git a/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java b/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java new file mode 100644 index 00000000000..fb4d99acab9 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action; + +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BaseTransportResponseHandler; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponse; + +/** + * A simple base class for action response listeners, defaulting to using the SAME executor (as its + * very common on response handlers). + */ +public abstract class ActionListenerResponseHandler extends BaseTransportResponseHandler { + + private final ActionListener listener; + + public ActionListenerResponseHandler(ActionListener listener) { + this.listener = listener; + } + + @Override + public void handleResponse(Response response) { + listener.onResponse(response); + } + + @Override + public void handleException(TransportException e) { + listener.onFailure(e); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java index 26a33975f26..489d1b7e31f 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java @@ -67,7 +67,7 @@ public class TransportNodesHotThreadsAction extends TransportNodesOperationActio } @Override - protected NodesHotThreadsRequest newRequest() { + protected NodesHotThreadsRequest newRequestInstance() { return new NodesHotThreadsRequest(); } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index e8fae492e2a..87da2e2738d 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -71,7 +71,7 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction() { + searchServiceTransportAction.sendClearAllScrollContexts(node, request, new ActionListener() { @Override - public void onResponse(Boolean freed) { - onFreedContext(freed); + public void onResponse(TransportResponse response) { + onFreedContext(true); } @Override @@ -126,10 +127,10 @@ public class TransportClearScrollAction extends HandledTransportAction() { + searchServiceTransportAction.sendFreeContext(node, target.v2(), request, new ActionListener() { @Override - public void onResponse(Boolean freed) { - onFreedContext(freed); + public void onResponse(SearchServiceTransportAction.SearchFreeContextResponse freed) { + onFreedContext(freed.isFreed()); } @Override diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java index 014de976ba4..ccc039c816b 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java @@ -28,7 +28,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.FetchSearchResultProvider; @@ -67,7 +66,7 @@ public class TransportSearchCountAction extends TransportSearchTypeAction { } @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener listener) { + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { searchService.sendExecuteQuery(node, request, listener); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index afdf07ac0f6..83e9aba54f0 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.dfs.AggregatedDfs; @@ -75,7 +74,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc } @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener listener) { + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { searchService.sendExecuteDfs(node, request, listener); } @@ -93,9 +92,9 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc } void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) { - searchService.sendExecuteFetch(node, querySearchRequest, new SearchServiceListener() { + searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener() { @Override - public void onResult(QueryFetchSearchResult result) { + public void onResponse(QueryFetchSearchResult result) { result.shardTarget(dfsResult.shardTarget()); queryFetchResults.set(shardIndex, result); if (counter.decrementAndGet() == 0) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index 3af4368c51d..c1a361903e8 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.dfs.AggregatedDfs; @@ -85,7 +84,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA } @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener listener) { + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { searchService.sendExecuteDfs(node, request, listener); } @@ -102,9 +101,9 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA } void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) { - searchService.sendExecuteQuery(node, querySearchRequest, new SearchServiceListener() { + searchService.sendExecuteQuery(node, querySearchRequest, new ActionListener() { @Override - public void onResult(QuerySearchResult result) { + public void onResponse(QuerySearchResult result) { result.shardTarget(dfsResult.shardTarget()); queryResults.set(shardIndex, result); if (counter.decrementAndGet() == 0) { @@ -165,9 +164,9 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA } void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { - searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener() { + searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { @Override - public void onResult(FetchSearchResult result) { + public void onResponse(FetchSearchResult result) { result.shardTarget(shardTarget); fetchResults.set(shardIndex, result); if (counter.decrementAndGet() == 0) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java index 13fa1116084..bb679321b1b 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.QueryFetchSearchResult; @@ -69,7 +68,7 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio } @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener listener) { + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { searchService.sendExecuteFetch(node, request, listener); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index 2534362e698..175a770e9c6 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.ShardFetchSearchRequest; @@ -79,7 +78,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi } @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener listener) { + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { searchService.sendExecuteQuery(node, request, listener); } @@ -107,9 +106,9 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi } void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { - searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener() { + searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { @Override - public void onResult(FetchSearchResult result) { + public void onResponse(FetchSearchResult result) { result.shardTarget(shardTarget); fetchResults.set(shardIndex, result); if (counter.decrementAndGet() == 0) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java index 31b0622678f..0474fb646ee 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.FetchSearchResultProvider; @@ -65,7 +64,7 @@ public class TransportSearchScanAction extends TransportSearchTypeAction { } @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener listener) { + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { searchService.sendExecuteScan(node, request, listener); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java index 3d7490f0bed..1683a6bbf6d 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java @@ -30,10 +30,10 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.QueryFetchSearchResult; +import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; @@ -148,10 +148,10 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); - searchService.sendExecuteFetch(node, internalRequest, new SearchServiceListener() { + searchService.sendExecuteFetch(node, internalRequest, new ActionListener() { @Override - public void onResult(QueryFetchSearchResult result) { - queryFetchResults.set(shardIndex, result); + public void onResponse(ScrollQueryFetchSearchResult result) { + queryFetchResults.set(shardIndex, result.result()); if (counter.decrementAndGet() == 0) { finishHim(); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java index da93ef37c2c..84d631e24c2 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.FetchSearchResult; @@ -39,6 +38,7 @@ import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.query.ScrollQuerySearchResult; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -149,10 +149,10 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); - searchService.sendExecuteQuery(node, internalRequest, new SearchServiceListener() { + searchService.sendExecuteQuery(node, internalRequest, new ActionListener() { @Override - public void onResult(QuerySearchResult result) { - queryResults.set(shardIndex, result); + public void onResponse(ScrollQuerySearchResult result) { + queryResults.set(shardIndex, result.queryResult()); if (counter.decrementAndGet() == 0) { try { executeFetchPhase(); @@ -207,9 +207,9 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; ShardFetchRequest shardFetchRequest = new ShardFetchRequest(request, querySearchResult.id(), docIds, lastEmittedDoc); DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId()); - searchService.sendExecuteFetchScroll(node, shardFetchRequest, new SearchServiceListener() { + searchService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener() { @Override - public void onResult(FetchSearchResult result) { + public void onResponse(FetchSearchResult result) { result.shardTarget(querySearchResult.shardTarget()); fetchResults.set(entry.index, result); if (counter.decrementAndGet() == 0) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java index f3ebe2f0309..93042815e00 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -33,10 +33,10 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.QueryFetchSearchResult; +import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; @@ -156,10 +156,10 @@ public class TransportSearchScrollScanAction extends AbstractComponent { } void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { - searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener() { + searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new ActionListener() { @Override - public void onResult(QueryFetchSearchResult result) { - queryFetchResults.set(shardIndex, result); + public void onResponse(ScrollQueryFetchSearchResult result) { + queryFetchResults.set(shardIndex, result.result()); if (counter.decrementAndGet() == 0) { finishHim(); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index 7473b7ac50b..189b035a918 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -48,7 +48,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.ShardFetchSearchRequest; @@ -160,9 +159,9 @@ public abstract class TransportSearchTypeAction extends TransportAction() { + sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime()), new ActionListener() { @Override - public void onResult(FirstResult result) { + public void onResponse(FirstResult result) { onFirstPhaseResult(shardIndex, shard, result, shardIt); } @@ -351,7 +350,7 @@ public abstract class TransportSearchTypeAction extends TransportAction listener); + protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener); protected final void processFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result) { firstResults.set(shardIndex, result); diff --git a/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAction.java b/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAction.java index fb139670aa3..410925727a7 100644 --- a/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAction.java +++ b/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAction.java @@ -79,7 +79,7 @@ public class TransportSuggestAction extends TransportBroadcastOperationAction{ + abstract class TransportHandler extends BaseTransportRequestHandler{ /** * Call to get an instance of type Request diff --git a/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java b/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java index d5b5b6f41ec..09d7bd55447 100644 --- a/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterService; @@ -45,7 +46,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray; * */ public abstract class TransportBroadcastOperationAction - extends TransportAction { + extends HandledTransportAction { protected final ThreadPool threadPool; protected final ClusterService clusterService; @@ -55,14 +56,13 @@ public abstract class TransportBroadcastOperationAction() { @Override - public void run() { - try { - onOperation(shard, shardIndex, shardOperation(shardRequest)); - } catch (Throwable e) { - onOperation(shard, shardIt, shardIndex, e); - } + public ShardResponse newInstance() { + return newShardResponse(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(ShardResponse response) { + onOperation(shard, shardIndex, response); + } + + @Override + public void handleException(TransportException e) { + onOperation(shard, shardIt, shardIndex, e); } }); - } else { - DiscoveryNode node = nodes.get(shard.currentNodeId()); - if (node == null) { - // no node connected, act as failure - onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId())); - } else { - transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler() { - @Override - public ShardResponse newInstance() { - return newShardResponse(); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public void handleResponse(ShardResponse response) { - onOperation(shard, shardIndex, response); - } - - @Override - public void handleException(TransportException e) { - onOperation(shard, shardIt, shardIndex, e); - } - }); - } } } catch (Throwable e) { onOperation(shard, shardIt, shardIndex, e); @@ -283,44 +268,6 @@ public abstract class TransportBroadcastOperationAction { - - @Override - public Request newInstance() { - return newRequest(); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public void messageReceived(Request request, final TransportChannel channel) throws Exception { - // we just send back a response, no need to fork a listener - request.listenerThreaded(false); - execute(request, new ActionListener() { - @Override - public void onResponse(Response response) { - try { - channel.sendResponse(response); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("Failed to send response", e1); - } - } - }); - } - } - class ShardTransportHandler extends BaseTransportRequestHandler { @Override diff --git a/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java b/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java index 59b3f75cb99..358f7d0860f 100644 --- a/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.NoSuchNodeException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; @@ -40,12 +41,10 @@ import java.util.concurrent.atomic.AtomicReferenceArray; /** * */ -public abstract class TransportNodesOperationAction extends TransportAction { +public abstract class TransportNodesOperationAction extends HandledTransportAction { protected final ClusterName clusterName; - protected final ClusterService clusterService; - protected final TransportService transportService; final String transportNodeAction; @@ -53,7 +52,7 @@ public abstract class TransportNodesOperationAction() { - @Override - public NodeResponse newInstance() { - return newNodeResponse(); - } + NodeRequest nodeRequest = newNodeRequest(nodeId, request); + transportService.sendRequest(node, transportNodeAction, nodeRequest, transportRequestOptions, new BaseTransportResponseHandler() { + @Override + public NodeResponse newInstance() { + return newNodeResponse(); + } - @Override - public void handleResponse(NodeResponse response) { - onOperation(idx, response); - } + @Override + public void handleResponse(NodeResponse response) { + onOperation(idx, response); + } - @Override - public void handleException(TransportException exp) { - onFailure(idx, node.id(), exp); - } + @Override + public void handleException(TransportException exp) { + onFailure(idx, node.id(), exp); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); } } catch (Throwable t) { onFailure(idx, nodeId, t); @@ -223,49 +198,6 @@ public abstract class TransportNodesOperationAction { - - @Override - public Request newInstance() { - return newRequest(); - } - - @Override - public void messageReceived(final Request request, final TransportChannel channel) throws Exception { - request.listenerThreaded(false); - execute(request, new ActionListener() { - @Override - public void onResponse(Response response) { - TransportResponseOptions options = TransportResponseOptions.options().withCompress(transportCompress()); - try { - channel.sendResponse(response, options); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("Failed to send response", e); - } - } - }); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public String toString() { - return actionName; - } - } - private class NodeTransportHandler extends BaseTransportRequestHandler { @Override diff --git a/src/main/java/org/elasticsearch/action/termvectors/dfs/TransportDfsOnlyAction.java b/src/main/java/org/elasticsearch/action/termvectors/dfs/TransportDfsOnlyAction.java index 11a92f1d826..3e42da036f4 100644 --- a/src/main/java/org/elasticsearch/action/termvectors/dfs/TransportDfsOnlyAction.java +++ b/src/main/java/org/elasticsearch/action/termvectors/dfs/TransportDfsOnlyAction.java @@ -80,7 +80,7 @@ public class TransportDfsOnlyAction extends TransportBroadcastOperationAction { - - void onResult(T result); - - void onFailure(Throwable t); -} diff --git a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index db90ef07dc9..13fe4bd15fb 100644 --- a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -19,14 +19,13 @@ package org.elasticsearch.search.action; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -46,7 +45,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; -import java.util.concurrent.Callable; /** * An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through @@ -69,55 +67,13 @@ public class SearchServiceTransportAction extends AbstractComponent { public static final String SCAN_ACTION_NAME = "indices:data/read/search[phase/scan]"; public static final String SCAN_SCROLL_ACTION_NAME = "indices:data/read/search[phase/scan/scroll]"; - static final class FreeContextResponseHandler implements TransportResponseHandler { - - private final ActionListener listener; - - FreeContextResponseHandler(final ActionListener listener) { - this.listener = listener; - } - - @Override - public SearchFreeContextResponse newInstance() { - return new SearchFreeContextResponse(); - } - - @Override - public void handleResponse(SearchFreeContextResponse response) { - listener.onResponse(response.freed); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - } - // - private final ThreadPool threadPool; private final TransportService transportService; - private final ClusterService clusterService; private final SearchService searchService; - private final FreeContextResponseHandler freeContextResponseHandler = new FreeContextResponseHandler(new ActionListener() { - @Override - public void onResponse(Boolean aBoolean) {} - - @Override - public void onFailure(Throwable exp) { - logger.warn("Failed to send release search context", exp); - } - }); @Inject - public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) { + public SearchServiceTransportAction(Settings settings, TransportService transportService, SearchService searchService) { super(settings); - this.threadPool = threadPool; this.transportService = transportService; - this.clusterService = clusterService; this.searchService = searchService; transportService.registerHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextTransportHandler()); @@ -137,427 +93,138 @@ public class SearchServiceTransportAction extends AbstractComponent { } public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) { - if (clusterService.state().nodes().localNodeId().equals(node.id())) { - searchService.freeContext(contextId); - } else { - transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), freeContextResponseHandler); - } + transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), new ActionListenerResponseHandler(new ActionListener() { + @Override + public void onResponse(SearchFreeContextResponse response) { + // no need to respond if it was freed or not + } + + @Override + public void onFailure(Throwable e) { + + } + }) { + @Override + public SearchFreeContextResponse newInstance() { + return new SearchFreeContextResponse(); + } + }); } - public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener actionListener) { - if (clusterService.state().nodes().localNodeId().equals(node.id())) { - final boolean freed = searchService.freeContext(contextId); - actionListener.onResponse(freed); - } else { - //use the separate action for scroll when possible - transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener)); - } + public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener listener) { + transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(request, contextId), new ActionListenerResponseHandler(listener) { + @Override + public SearchFreeContextResponse newInstance() { + return new SearchFreeContextResponse(); + } + }); } - public void sendClearAllScrollContexts(DiscoveryNode node, ClearScrollRequest request, final ActionListener actionListener) { - if (clusterService.state().nodes().localNodeId().equals(node.id())) { - searchService.freeAllScrollContexts(); - actionListener.onResponse(true); - } else { - transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(request), new TransportResponseHandler() { - @Override - public TransportResponse newInstance() { - return TransportResponse.Empty.INSTANCE; - } - - @Override - public void handleResponse(TransportResponse response) { - actionListener.onResponse(true); - } - - @Override - public void handleException(TransportException exp) { - actionListener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } + public void sendClearAllScrollContexts(DiscoveryNode node, ClearScrollRequest request, final ActionListener listener) { + transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(request), new ActionListenerResponseHandler(listener) { + @Override + public TransportResponse newInstance() { + return TransportResponse.Empty.INSTANCE; + } + }); } - public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, final SearchServiceListener listener) { - if (clusterService.state().nodes().localNodeId().equals(node.id())) { - execute(new Callable() { - @Override - public DfsSearchResult call() throws Exception { - return searchService.executeDfsPhase(request); - } - }, listener); - } else { - transportService.sendRequest(node, DFS_ACTION_NAME, request, new BaseTransportResponseHandler() { - - @Override - public DfsSearchResult newInstance() { - return new DfsSearchResult(); - } - - @Override - public void handleResponse(DfsSearchResult response) { - listener.onResult(response); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } + public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) { + transportService.sendRequest(node, DFS_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + @Override + public DfsSearchResult newInstance() { + return new DfsSearchResult(); + } + }); } - public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, final SearchServiceListener listener) { - if (clusterService.state().nodes().localNodeId().equals(node.id())) { - execute(new Callable() { - @Override - public QuerySearchResultProvider call() throws Exception { - return searchService.executeQueryPhase(request); - } - }, listener); - } else { - transportService.sendRequest(node, QUERY_ACTION_NAME, request, new BaseTransportResponseHandler() { - - @Override - public QuerySearchResult newInstance() { - return new QuerySearchResult(); - } - - @Override - public void handleResponse(QuerySearchResultProvider response) { - listener.onResult(response); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } + public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) { + transportService.sendRequest(node, QUERY_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + @Override + public QuerySearchResult newInstance() { + return new QuerySearchResult(); + } + }); } - public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, final SearchServiceListener listener) { - if (clusterService.state().nodes().localNodeId().equals(node.id())) { - execute(new Callable() { - @Override - public QuerySearchResult call() throws Exception { - return searchService.executeQueryPhase(request); - } - }, listener); - } else { - transportService.sendRequest(node, QUERY_ID_ACTION_NAME, request, new BaseTransportResponseHandler() { - - @Override - public QuerySearchResult newInstance() { - return new QuerySearchResult(); - } - - @Override - public void handleResponse(QuerySearchResult response) { - listener.onResult(response); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } + public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, final ActionListener listener) { + transportService.sendRequest(node, QUERY_ID_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + @Override + public QuerySearchResult newInstance() { + return new QuerySearchResult(); + } + }); } - public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener listener) { - if (clusterService.state().nodes().localNodeId().equals(node.id())) { - execute(new Callable() { - @Override - public QuerySearchResult call() throws Exception { - return searchService.executeQueryPhase(request).queryResult(); - } - }, listener); - } else { - transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request, new BaseTransportResponseHandler() { - - @Override - public ScrollQuerySearchResult newInstance() { - return new ScrollQuerySearchResult(); - } - - @Override - public void handleResponse(ScrollQuerySearchResult response) { - listener.onResult(response.queryResult()); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } + public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener listener) { + transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + @Override + public ScrollQuerySearchResult newInstance() { + return new ScrollQuerySearchResult(); + } + }); } - public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, final SearchServiceListener listener) { - if (clusterService.state().nodes().localNodeId().equals(node.id())) { - execute(new Callable() { - @Override - public QueryFetchSearchResult call() throws Exception { - return searchService.executeFetchPhase(request); - } - }, listener); - } else { - transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request, new BaseTransportResponseHandler() { - - @Override - public QueryFetchSearchResult newInstance() { - return new QueryFetchSearchResult(); - } - - @Override - public void handleResponse(QueryFetchSearchResult response) { - listener.onResult(response); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } + public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) { + transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + @Override + public QueryFetchSearchResult newInstance() { + return new QueryFetchSearchResult(); + } + }); } - public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, final SearchServiceListener listener) { - if (clusterService.state().nodes().localNodeId().equals(node.id())) { - execute(new Callable() { - @Override - public QueryFetchSearchResult call() throws Exception { - return searchService.executeFetchPhase(request); - } - }, listener); - } else { - transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, new BaseTransportResponseHandler() { - - @Override - public QueryFetchSearchResult newInstance() { - return new QueryFetchSearchResult(); - } - - @Override - public void handleResponse(QueryFetchSearchResult response) { - listener.onResult(response); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } + public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, final ActionListener listener) { + transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + @Override + public QueryFetchSearchResult newInstance() { + return new QueryFetchSearchResult(); + } + }); } - public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener listener) { - if (clusterService.state().nodes().localNodeId().equals(node.id())) { - execute(new Callable() { - @Override - public QueryFetchSearchResult call() throws Exception { - return searchService.executeFetchPhase(request).result(); - } - }, listener); - } else { - transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, new BaseTransportResponseHandler() { - - @Override - public ScrollQueryFetchSearchResult newInstance() { - return new ScrollQueryFetchSearchResult(); - } - - @Override - public void handleResponse(ScrollQueryFetchSearchResult response) { - listener.onResult(response.result()); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } + public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener listener) { + transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + @Override + public ScrollQueryFetchSearchResult newInstance() { + return new ScrollQueryFetchSearchResult(); + } + }); } - public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, final SearchServiceListener listener) { + public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, final ActionListener listener) { sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener); } - public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, final SearchServiceListener listener) { + public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, final ActionListener listener) { sendExecuteFetch(node, FETCH_ID_SCROLL_ACTION_NAME, request, listener); } - private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, final SearchServiceListener listener) { - if (clusterService.state().nodes().localNodeId().equals(node.id())) { - execute(new Callable() { - @Override - public FetchSearchResult call() throws Exception { - return searchService.executeFetchPhase(request); - } - }, listener); - } else { - transportService.sendRequest(node, action, request, new BaseTransportResponseHandler() { - - @Override - public FetchSearchResult newInstance() { - return new FetchSearchResult(); - } - - @Override - public void handleResponse(FetchSearchResult response) { - listener.onResult(response); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } + private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, final ActionListener listener) { + transportService.sendRequest(node, action, request, new ActionListenerResponseHandler(listener) { + @Override + public FetchSearchResult newInstance() { + return new FetchSearchResult(); + } + }); } - public void sendExecuteScan(DiscoveryNode node, final ShardSearchTransportRequest request, final SearchServiceListener listener) { - if (clusterService.state().nodes().localNodeId().equals(node.id())) { - execute(new Callable() { - @Override - public QuerySearchResult call() throws Exception { - return searchService.executeScan(request); - } - }, listener); - } else { - transportService.sendRequest(node, SCAN_ACTION_NAME, request, new BaseTransportResponseHandler() { - - @Override - public QuerySearchResult newInstance() { - return new QuerySearchResult(); - } - - @Override - public void handleResponse(QuerySearchResult response) { - listener.onResult(response); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } + public void sendExecuteScan(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) { + transportService.sendRequest(node, SCAN_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + @Override + public QuerySearchResult newInstance() { + return new QuerySearchResult(); + } + }); } - public void sendExecuteScan(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener listener) { - if (clusterService.state().nodes().localNodeId().equals(node.id())) { - execute(new Callable() { - @Override - public QueryFetchSearchResult call() throws Exception { - return searchService.executeScan(request).result(); - } - }, listener); - } else { - transportService.sendRequest(node, SCAN_SCROLL_ACTION_NAME, request, new BaseTransportResponseHandler() { - - @Override - public ScrollQueryFetchSearchResult newInstance() { - return new ScrollQueryFetchSearchResult(); - } - - @Override - public void handleResponse(ScrollQueryFetchSearchResult response) { - listener.onResult(response.result()); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } - } - - private void execute(final Callable callable, final SearchServiceListener listener) { - try { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - // Listeners typically do counting on errors and successes, and the decision to move to second phase, etc. is based on - // these counts so we need to be careful here to never propagate exceptions thrown by onResult to onFailure - T result = null; - Throwable error = null; - try { - result = callable.call(); - } catch (Throwable t) { - error = t; - } finally { - if (result == null) { - assert error != null; - listener.onFailure(error); - } else { - assert error == null : error; - listener.onResult(result); - } - } - } - }); - } catch (Throwable t) { - listener.onFailure(t); - } + public void sendExecuteScan(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener listener) { + transportService.sendRequest(node, SCAN_SCROLL_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + @Override + public ScrollQueryFetchSearchResult newInstance() { + return new ScrollQueryFetchSearchResult(); + } + }); } static class ScrollFreeContextRequest extends TransportRequest { @@ -632,7 +299,7 @@ public class SearchServiceTransportAction extends AbstractComponent { } } - static class SearchFreeContextResponse extends TransportResponse { + public static class SearchFreeContextResponse extends TransportResponse { private boolean freed;