From e02d5563f4ef794f47e4329dceccde9dbcf7797f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 9 Feb 2017 09:30:13 +0100 Subject: [PATCH] Harden ops counting in AbstractSearchAsyncAction (#23045) Today we account for too many response with an `IllegalStateException` in `AbstractSearchAsyncAction` while this is something that should never happen we should rather assert that we are always have less or equal the number of expected ops when waiting for responses. --- .../action/search/AbstractSearchAsyncAction.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 2a4f6893243..0b5af75674a 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -170,8 +170,9 @@ abstract class AbstractSearchAsyncAction if (xTotalOps == expectedTotalOps) { executePhase(initialPhaseName(), innerGetNextPhase(), null); } else if (xTotalOps > expectedTotalOps) { - raisePhaseFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared " + - "to expected [" + expectedTotalOps + "]")); + // this is fatal - something is completely wrong here? + throw new AssertionError( "unexpected higher total ops [" + xTotalOps + "] compared to expected [" + + expectedTotalOps + "]"); } } @@ -302,19 +303,19 @@ abstract class AbstractSearchAsyncAction /** * This method should be called if a search phase failed to ensure all relevant search contexts and resources are released. * this method will also notify the listener and sends back a failure to the user. - * @param e the exception explaining or causing the phase failure + * @param exception the exception explaining or causing the phase failure */ - protected void raisePhaseFailure(Exception e) { + protected void raisePhaseFailure(SearchPhaseExecutionException exception) { for (AtomicArray.Entry entry : initialResults.asList()) { try { Transport.Connection connection = nodeIdToConnection.apply(entry.value.shardTarget().getNodeId()); sendReleaseSearchContext(entry.value.id(), connection); } catch (Exception inner) { - inner.addSuppressed(e); + inner.addSuppressed(exception); logger.trace("failed to release context", inner); } } - listener.onFailure(e); + listener.onFailure(exception); } protected void sendReleaseSearchContext(long contextId, Transport.Connection connection) {