From 2d6d871f5c70b11c23a5b99b5e3a0bf0cff8c6b0 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 8 Feb 2017 12:51:50 +0100 Subject: [PATCH] Raise a phase failure if fetch phase gets rejected --- .../search/AbstractSearchAsyncAction.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 0a7a4fe8612..607e2fbdafd 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -170,7 +170,7 @@ abstract class AbstractSearchAsyncAction if (xTotalOps == expectedTotalOps) { executePhase(initialPhaseName(), innerGetNextPhase(), null); } else if (xTotalOps > expectedTotalOps) { - raiseEarlyFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared " + + raisePhaseFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared " + "to expected [" + expectedTotalOps + "]")); } } @@ -188,7 +188,7 @@ abstract class AbstractSearchAsyncAction "Failed to execute [{}] while moving to second phase", request), e); } - raiseEarlyFailure(new ReduceSearchPhaseException(phaseName, "", e, buildShardFailures())); + raisePhaseFailure(new ReduceSearchPhaseException(phaseName, "", e, buildShardFailures())); } } @@ -220,7 +220,7 @@ abstract class AbstractSearchAsyncAction } // no successful ops, raise an exception - raiseEarlyFailure(new SearchPhaseExecutionException(initialPhaseName(), "all shards failed", e, shardSearchFailures)); + raisePhaseFailure(new SearchPhaseExecutionException(initialPhaseName(), "all shards failed", e, shardSearchFailures)); } else { executePhase(initialPhaseName(), innerGetNextPhase(), e); } @@ -299,7 +299,12 @@ abstract class AbstractSearchAsyncAction } } - private void raiseEarlyFailure(Exception e) { + /** + * This method should be called if a search phase failed to ensure all relevant search contexts and resources are released. + * this method will also notify the listener and sends back a failure to the user. + * @param e the exception explaining or causing the phase failure + */ + protected void raisePhaseFailure(Exception e) { for (AtomicArray.Entry entry : initialResults.asList()) { try { Transport.Connection connection = nodeIdToConnection.apply(entry.value.shardTarget().getNodeId()); @@ -445,11 +450,7 @@ abstract class AbstractSearchAsyncAction @Override public void onFailure(Exception e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(failure); + raisePhaseFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); } }); }