Raise a phase failure if fetch phase gets rejected

This commit is contained in:
Simon Willnauer 2017-02-08 12:51:50 +01:00
parent 0161edae10
commit 2d6d871f5c
1 changed files with 10 additions and 9 deletions

View File

@ -170,7 +170,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
if (xTotalOps == expectedTotalOps) {
executePhase(initialPhaseName(), innerGetNextPhase(), null);
} else if (xTotalOps > expectedTotalOps) {
raiseEarlyFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared " +
raisePhaseFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared " +
"to expected [" + expectedTotalOps + "]"));
}
}
@ -188,7 +188,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
"Failed to execute [{}] while moving to second phase", request),
e);
}
raiseEarlyFailure(new ReduceSearchPhaseException(phaseName, "", e, buildShardFailures()));
raisePhaseFailure(new ReduceSearchPhaseException(phaseName, "", e, buildShardFailures()));
}
}
@ -220,7 +220,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
}
// no successful ops, raise an exception
raiseEarlyFailure(new SearchPhaseExecutionException(initialPhaseName(), "all shards failed", e, shardSearchFailures));
raisePhaseFailure(new SearchPhaseExecutionException(initialPhaseName(), "all shards failed", e, shardSearchFailures));
} else {
executePhase(initialPhaseName(), innerGetNextPhase(), e);
}
@ -299,7 +299,12 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
}
}
private void raiseEarlyFailure(Exception e) {
/**
* This method should be called if a search phase failed to ensure all relevant search contexts and resources are released.
* this method will also notify the listener and sends back a failure to the user.
* @param e the exception explaining or causing the phase failure
*/
protected void raisePhaseFailure(Exception e) {
for (AtomicArray.Entry<FirstResult> entry : initialResults.asList()) {
try {
Transport.Connection connection = nodeIdToConnection.apply(entry.value.shardTarget().getNodeId());
@ -445,11 +450,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
@Override
public void onFailure(Exception e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(failure);
raisePhaseFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
});
}