Harden ops counting in AbstractSearchAsyncAction (#23045)
Today we account for too many response with an `IllegalStateException` in `AbstractSearchAsyncAction` while this is something that should never happen we should rather assert that we are always have less or equal the number of expected ops when waiting for responses.
This commit is contained in:
parent
b5f5356c4a
commit
e02d5563f4
|
@ -170,8 +170,9 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
|
||||||
if (xTotalOps == expectedTotalOps) {
|
if (xTotalOps == expectedTotalOps) {
|
||||||
executePhase(initialPhaseName(), innerGetNextPhase(), null);
|
executePhase(initialPhaseName(), innerGetNextPhase(), null);
|
||||||
} else if (xTotalOps > expectedTotalOps) {
|
} else if (xTotalOps > expectedTotalOps) {
|
||||||
raisePhaseFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared " +
|
// this is fatal - something is completely wrong here?
|
||||||
"to expected [" + expectedTotalOps + "]"));
|
throw new AssertionError( "unexpected higher total ops [" + xTotalOps + "] compared to expected ["
|
||||||
|
+ expectedTotalOps + "]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -302,19 +303,19 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
|
||||||
/**
|
/**
|
||||||
* This method should be called if a search phase failed to ensure all relevant search contexts and resources are released.
|
* This method should be called if a search phase failed to ensure all relevant search contexts and resources are released.
|
||||||
* this method will also notify the listener and sends back a failure to the user.
|
* this method will also notify the listener and sends back a failure to the user.
|
||||||
* @param e the exception explaining or causing the phase failure
|
* @param exception the exception explaining or causing the phase failure
|
||||||
*/
|
*/
|
||||||
protected void raisePhaseFailure(Exception e) {
|
protected void raisePhaseFailure(SearchPhaseExecutionException exception) {
|
||||||
for (AtomicArray.Entry<FirstResult> entry : initialResults.asList()) {
|
for (AtomicArray.Entry<FirstResult> entry : initialResults.asList()) {
|
||||||
try {
|
try {
|
||||||
Transport.Connection connection = nodeIdToConnection.apply(entry.value.shardTarget().getNodeId());
|
Transport.Connection connection = nodeIdToConnection.apply(entry.value.shardTarget().getNodeId());
|
||||||
sendReleaseSearchContext(entry.value.id(), connection);
|
sendReleaseSearchContext(entry.value.id(), connection);
|
||||||
} catch (Exception inner) {
|
} catch (Exception inner) {
|
||||||
inner.addSuppressed(e);
|
inner.addSuppressed(exception);
|
||||||
logger.trace("failed to release context", inner);
|
logger.trace("failed to release context", inner);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
listener.onFailure(e);
|
listener.onFailure(exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void sendReleaseSearchContext(long contextId, Transport.Connection connection) {
|
protected void sendReleaseSearchContext(long contextId, Transport.Connection connection) {
|
||||||
|
|
Loading…
Reference in New Issue