diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index 58ccd9fd418..021f72a1d10 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -115,15 +115,19 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - }); - } else { - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); + try { + if (localAsync) { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); + } + }); + } else { + executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); + } + } catch (Throwable t) { + onSecondPhaseFailure(t, querySearchRequest, entry.index, dfsResult, counter); } } } @@ -144,18 +148,22 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc @Override public void onFailure(Throwable t) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); - } - AsyncAction.this.addShardFailure(shardIndex, dfsResult.shardTarget(), t); - successulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } + onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter); } }); } + void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, AtomicInteger counter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); + } + this.addShardFailure(shardIndex, dfsResult.shardTarget(), t); + successulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + void finishHim() { try { innerFinishHim(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index fc2d525fbca..706f3d09058 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -124,15 +124,19 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - }); - } else { - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); + try { + if (localAsync) { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); + } + }); + } else { + executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); + } + } catch (Throwable t) { + onQueryFailure(t, querySearchRequest, entry.index, dfsResult, counter); } } } @@ -153,18 +157,22 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA @Override public void onFailure(Throwable t) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); - } - AsyncAction.this.addShardFailure(shardIndex, dfsResult.shardTarget(), t); - successulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - executeFetchPhase(); - } + onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter); } }); } + void onQueryFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, AtomicInteger counter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); + } + this.addShardFailure(shardIndex, dfsResult.shardTarget(), t); + successulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + executeFetchPhase(); + } + } + void executeFetchPhase() { try { innerExecuteFetchPhase(); @@ -217,15 +225,19 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value); - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - }); - } else { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); + try { + if (localAsync) { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); + } + }); + } else { + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); + } + } catch (Throwable t) { + onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter); } } } @@ -246,18 +258,22 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA @Override public void onFailure(Throwable t) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); - } - AsyncAction.this.addShardFailure(shardIndex, shardTarget, t); - successulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } + onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); } }); } + void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); + } + this.addShardFailure(shardIndex, shardTarget, t); + successulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + void finishHim() { try { innerFinishHim(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index 58187fbeceb..a3ace10d496 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -126,15 +126,19 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value); - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - }); - } else { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); + try { + if (localAsync) { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); + } + }); + } else { + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); + } + } catch (Throwable t) { + onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter); } } } @@ -155,18 +159,22 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi @Override public void onFailure(Throwable t) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); - } - AsyncAction.this.addShardFailure(shardIndex, shardTarget, t); - successulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } + onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); } }); } + void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); + } + this.addShardFailure(shardIndex, shardTarget, t); + successulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + void finishHim() { try { innerFinishHim(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java index 75380a65eb0..2da79f230f2 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java @@ -170,15 +170,19 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent final int shardIndex = i; final DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executePhase(shardIndex, node, target.v2()); - } - }); - } else { - executePhase(shardIndex, node, target.v2()); + try { + if (localAsync) { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + executePhase(shardIndex, node, target.v2()); + } + }); + } else { + executePhase(shardIndex, node, target.v2()); + } + } catch (Throwable t) { + onPhaseFailure(t, target.v2(), shardIndex); } } } @@ -200,7 +204,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent } } - private void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { + void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { searchService.sendExecuteFetch(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener() { @Override public void onResult(QueryFetchSearchResult result) { @@ -212,18 +216,22 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent @Override public void onFailure(Throwable t) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute query phase", t, searchId); - } - addShardFailure(shardIndex, new ShardSearchFailure(t)); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } + onPhaseFailure(t, searchId, shardIndex); } }); } + private void onPhaseFailure(Throwable t, long searchId, int shardIndex) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, searchId); + } + addShardFailure(shardIndex, new ShardSearchFailure(t)); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + private void finishHim() { try { innerFinishHim(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java index 52eac436169..b398fab16fc 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java @@ -176,15 +176,19 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent final int shardIndex = i; final DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeQueryPhase(shardIndex, counter, node, target.v2()); - } - }); - } else { - executeQueryPhase(shardIndex, counter, node, target.v2()); + try { + if (localAsync) { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + executeQueryPhase(shardIndex, counter, node, target.v2()); + } + }); + } else { + executeQueryPhase(shardIndex, counter, node, target.v2()); + } + } catch (Throwable t) { + onQueryPhaseFailure(shardIndex, counter, target.v2(), t); } } } @@ -204,18 +208,22 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent @Override public void onFailure(Throwable t) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute query phase", t, searchId); - } - addShardFailure(shardIndex, new ShardSearchFailure(t)); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - executeFetchPhase(); - } + onQueryPhaseFailure(shardIndex, counter, searchId, t); } }); } + void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Throwable t) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, searchId); + } + addShardFailure(shardIndex, new ShardSearchFailure(t)); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + executeFetchPhase(); + } + } + private void executeFetchPhase() { sortedShardList = searchPhaseController.sortDocs(queryResults); AtomicArray docIdsToLoad = new AtomicArray(queryResults.length()); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java index dcd50309944..d2017ab5992 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -172,15 +172,19 @@ public class TransportSearchScrollScanAction extends AbstractComponent { final int shardIndex = i; final DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executePhase(shardIndex, node, target.v2()); - } - }); - } else { - executePhase(shardIndex, node, target.v2()); + try { + if (localAsync) { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + executePhase(shardIndex, node, target.v2()); + } + }); + } else { + executePhase(shardIndex, node, target.v2()); + } + } catch (Throwable t) { + onPhaseFailure(t, target.v2(), shardIndex); } } } @@ -202,7 +206,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { } } - private void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { + void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener() { @Override public void onResult(QueryFetchSearchResult result) { @@ -214,18 +218,22 @@ public class TransportSearchScrollScanAction extends AbstractComponent { @Override public void onFailure(Throwable t) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute query phase", t, searchId); - } - addShardFailure(shardIndex, new ShardSearchFailure(t)); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } + onPhaseFailure(t, searchId, shardIndex); } }); } + void onPhaseFailure(Throwable t, long searchId, int shardIndex) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, searchId); + } + addShardFailure(shardIndex, new ShardSearchFailure(t)); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + private void finishHim() { try { innerFinishHim(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index 816c841c7d2..81e246df63b 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -179,12 +179,16 @@ public abstract class TransportSearchTypeAction extends TransportAction() { + @Override + public ShardResponse newInstance() { + return newShardResponse(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(ShardResponse response) { + onOperation(shard, shardIndex, response); + } + + @Override + public void handleException(TransportException e) { onOperation(shard, shardIt, shardIndex, e); } - } - }); - } else { - try { - onOperation(shard, shardIndex, shardOperation(shardRequest)); - } catch (Throwable e) { - onOperation(shard, shardIt, shardIndex, e); + }); } } - } else { - DiscoveryNode node = nodes.get(shard.currentNodeId()); - if (node == null) { - // no node connected, act as failure - onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId())); - } else { - transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler() { - @Override - public ShardResponse newInstance() { - return newShardResponse(); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public void handleResponse(ShardResponse response) { - onOperation(shard, shardIndex, response); - } - - @Override - public void handleException(TransportException e) { - onOperation(shard, shardIt, shardIndex, e); - } - }); - } + } catch (Throwable e) { + onOperation(shard, shardIt, shardIndex, e); } } } diff --git a/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java b/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java index 4a454c92d8e..e8ab6eff584 100644 --- a/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java @@ -137,16 +137,20 @@ public abstract class TransportMasterNodeOperationAction() { + try { + if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) { + threadPool.executor(executor()).execute(new Runnable() { @Override - public NodeResponse newInstance() { - return newNodeResponse(); - } - - @Override - public void handleResponse(NodeResponse response) { - onOperation(response); - } - - @Override - public void handleException(TransportException exp) { - onFailure(node.id(), exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; + public void run() { + try { + onOperation(nodeOperation(newNodeRequest(clusterState.nodes().localNodeId(), request))); + } catch (Throwable e) { + onFailure(clusterState.nodes().localNodeId(), e); + } } }); + } else if (nodeId.equals("_master")) { + threadPool.executor(executor()).execute(new Runnable() { + @Override + public void run() { + try { + onOperation(nodeOperation(newNodeRequest(clusterState.nodes().masterNodeId(), request))); + } catch (Throwable e) { + onFailure(clusterState.nodes().masterNodeId(), e); + } + } + }); + } else { + if (node == null) { + onFailure(nodeId, new NoSuchNodeException(nodeId)); + } else { + NodeRequest nodeRequest = newNodeRequest(nodeId, request); + transportService.sendRequest(node, transportNodeAction, nodeRequest, transportRequestOptions, new BaseTransportResponseHandler() { + @Override + public NodeResponse newInstance() { + return newNodeResponse(); + } + + @Override + public void handleResponse(NodeResponse response) { + onOperation(response); + } + + @Override + public void handleException(TransportException exp) { + onFailure(node.id(), exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + } } + } catch (Throwable t) { + onFailure(nodeId, t); } } } diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 7c985352c6d..4bbd270bd3f 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -411,16 +411,20 @@ public abstract class TransportShardReplicationOperationAction() { diff --git a/src/main/java/org/elasticsearch/action/support/single/shard/TransportShardSingleOperationAction.java b/src/main/java/org/elasticsearch/action/support/single/shard/TransportShardSingleOperationAction.java index f135efba0ee..5b7e68146b2 100644 --- a/src/main/java/org/elasticsearch/action/support/single/shard/TransportShardSingleOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/single/shard/TransportShardSingleOperationAction.java @@ -151,26 +151,26 @@ public abstract class TransportShardSingleOperationAction