diff --git a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index a6ce0ee2867..890c9f406e4 100644 --- a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -45,6 +45,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; +import java.util.concurrent.Callable; /** * An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through @@ -67,6 +68,26 @@ public class SearchServiceTransportAction extends AbstractComponent { } } + private static void execute(Callable callable, SearchServiceListener listener) { + // Listeners typically do counting on errors and successes, and the decision to move to second phase, etc. is based on + // these counts so we need to be careful here to never propagate exceptions thrown by onResult to onFailure + T result = null; + Throwable error = null; + try { + result = callable.call(); + } catch (Throwable t) { + error = t; + } finally { + if (result == null) { + assert error != null; + listener.onFailure(error); + } else { + assert error == null : error; + listener.onResult(result); + } + } + } + private final TransportService transportService; private final ClusterService clusterService; @@ -164,12 +185,12 @@ public class SearchServiceTransportAction extends AbstractComponent { public void sendExecuteDfs(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener listener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - try { - DfsSearchResult result = searchService.executeDfsPhase(request); - listener.onResult(result); - } catch (Throwable e) { - listener.onFailure(e); - } + execute(new Callable() { + @Override + public DfsSearchResult call() throws Exception { + return searchService.executeDfsPhase(request); + } + }, listener); } else { transportService.sendRequest(node, SearchDfsTransportHandler.ACTION, request, new BaseTransportResponseHandler() { @@ -198,12 +219,12 @@ public class SearchServiceTransportAction extends AbstractComponent { public void sendExecuteQuery(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener listener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - try { - QuerySearchResult result = searchService.executeQueryPhase(request); - listener.onResult(result); - } catch (Throwable e) { - listener.onFailure(e); - } + execute(new Callable() { + @Override + public QuerySearchResult call() throws Exception { + return searchService.executeQueryPhase(request); + } + }, listener); } else { transportService.sendRequest(node, SearchQueryTransportHandler.ACTION, request, new BaseTransportResponseHandler() { @@ -232,12 +253,12 @@ public class SearchServiceTransportAction extends AbstractComponent { public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, final SearchServiceListener listener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - try { - QuerySearchResult result = searchService.executeQueryPhase(request); - listener.onResult(result); - } catch (Throwable e) { - listener.onFailure(e); - } + execute(new Callable() { + @Override + public QuerySearchResult call() throws Exception { + return searchService.executeQueryPhase(request); + } + }, listener); } else { transportService.sendRequest(node, SearchQueryByIdTransportHandler.ACTION, request, new BaseTransportResponseHandler() { @@ -266,12 +287,12 @@ public class SearchServiceTransportAction extends AbstractComponent { public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener listener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - try { - ScrollQuerySearchResult result = searchService.executeQueryPhase(request); - listener.onResult(result.queryResult()); - } catch (Throwable e) { - listener.onFailure(e); - } + execute(new Callable() { + @Override + public QuerySearchResult call() throws Exception { + return searchService.executeQueryPhase(request).queryResult(); + } + }, listener); } else { transportService.sendRequest(node, SearchQueryScrollTransportHandler.ACTION, request, new BaseTransportResponseHandler() { @@ -300,12 +321,12 @@ public class SearchServiceTransportAction extends AbstractComponent { public void sendExecuteFetch(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener listener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - try { - QueryFetchSearchResult result = searchService.executeFetchPhase(request); - listener.onResult(result); - } catch (Throwable e) { - listener.onFailure(e); - } + execute(new Callable() { + @Override + public QueryFetchSearchResult call() throws Exception { + return searchService.executeFetchPhase(request); + } + }, listener); } else { transportService.sendRequest(node, SearchQueryFetchTransportHandler.ACTION, request, new BaseTransportResponseHandler() { @@ -334,12 +355,12 @@ public class SearchServiceTransportAction extends AbstractComponent { public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, final SearchServiceListener listener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - try { - QueryFetchSearchResult result = searchService.executeFetchPhase(request); - listener.onResult(result); - } catch (Throwable e) { - listener.onFailure(e); - } + execute(new Callable() { + @Override + public QueryFetchSearchResult call() throws Exception { + return searchService.executeFetchPhase(request); + } + }, listener); } else { transportService.sendRequest(node, SearchQueryQueryFetchTransportHandler.ACTION, request, new BaseTransportResponseHandler() { @@ -368,12 +389,12 @@ public class SearchServiceTransportAction extends AbstractComponent { public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener listener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - try { - ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request); - listener.onResult(result.result()); - } catch (Throwable e) { - listener.onFailure(e); - } + execute(new Callable() { + @Override + public QueryFetchSearchResult call() throws Exception { + return searchService.executeFetchPhase(request).result(); + } + }, listener); } else { transportService.sendRequest(node, SearchQueryFetchScrollTransportHandler.ACTION, request, new BaseTransportResponseHandler() { @@ -402,12 +423,12 @@ public class SearchServiceTransportAction extends AbstractComponent { public void sendExecuteFetch(DiscoveryNode node, final FetchSearchRequest request, final SearchServiceListener listener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - try { - FetchSearchResult result = searchService.executeFetchPhase(request); - listener.onResult(result); - } catch (Throwable e) { - listener.onFailure(e); - } + execute(new Callable() { + @Override + public FetchSearchResult call() throws Exception { + return searchService.executeFetchPhase(request); + } + }, listener); } else { transportService.sendRequest(node, SearchFetchByIdTransportHandler.ACTION, request, new BaseTransportResponseHandler() { @@ -436,12 +457,12 @@ public class SearchServiceTransportAction extends AbstractComponent { public void sendExecuteScan(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener listener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - try { - QuerySearchResult result = searchService.executeScan(request); - listener.onResult(result); - } catch (Throwable e) { - listener.onFailure(e); - } + execute(new Callable() { + @Override + public QuerySearchResult call() throws Exception { + return searchService.executeScan(request); + } + }, listener); } else { transportService.sendRequest(node, SearchScanTransportHandler.ACTION, request, new BaseTransportResponseHandler() { @@ -470,12 +491,12 @@ public class SearchServiceTransportAction extends AbstractComponent { public void sendExecuteScan(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener listener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - try { - ScrollQueryFetchSearchResult result = searchService.executeScan(request); - listener.onResult(result.result()); - } catch (Throwable e) { - listener.onFailure(e); - } + execute(new Callable() { + @Override + public QueryFetchSearchResult call() throws Exception { + return searchService.executeScan(request).result(); + } + }, listener); } else { transportService.sendRequest(node, SearchScanScrollTransportHandler.ACTION, request, new BaseTransportResponseHandler() {