Don't reduce twice if failure occurs.

This commit is contained in:
Martijn van Groningen 2013-08-25 21:50:04 +02:00
parent 020e68f2a0
commit a09f217b45
2 changed files with 9 additions and 2 deletions

View File

@ -203,7 +203,12 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
assert expectedOperationsPerItem[item.slot()].get() >= 1;
if (expectedOperationsPerItem[item.slot()].decrementAndGet() == 0) {
try {
reduce(item.slot(), percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard);
} catch (Throwable t) {
// Don't let any failure bubble up, otherwise expectedOperationsPerItem will be decremented twice
listener.onFailure(t);
}
}
}
} catch (Throwable e) {
@ -225,7 +230,7 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
}
shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, e));
assert expectedOperationsPerItem[slot].get() >= 1;
assert expectedOperationsPerItem[slot].get() >= 1 : "Caused by: " + e.getMessage();
if (expectedOperationsPerItem[slot].decrementAndGet() == 0) {
reduce(slot, percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard);
}

View File

@ -363,6 +363,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
for (MultiPercolateResponse.Item item : response) {
assertThat(item.isFailure(), equalTo(false));
assertNoFailures(item.getResponse());
assertThat(item.getResponse().getSuccessfulShards(), equalTo(2));
assertThat(item.getResponse().getCount(), equalTo((long) numQueries));
assertThat(item.getResponse().getMatches().length, equalTo(numQueries));
}
@ -382,6 +383,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
.execute().actionGet();
}
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(2));
assertThat(response.getCount(), equalTo((long) numQueries));
assertThat(response.getMatches().length, equalTo(numQueries));
}