move latch await to doNextSearch (#42275)
move latch await to doNextSearch, fixes a race condition when the executor thread is faster than the coordinator thread fixes #42084
This commit is contained in:
parent
34dda75cdf
commit
3493f3b637
|
@ -64,7 +64,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
|
protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
|
||||||
awaitForLatch();
|
assertFalse("should not be called as stoppedBeforeFinished is false", stoppedBeforeFinished);
|
||||||
assertThat(step, equalTo(3));
|
assertThat(step, equalTo(3));
|
||||||
++step;
|
++step;
|
||||||
return new IterationResult<>(Collections.emptyList(), 3, true);
|
return new IterationResult<>(Collections.emptyList(), 3, true);
|
||||||
|
@ -99,6 +99,9 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
|
||||||
final SearchResponseSections sections = new SearchResponseSections(
|
final SearchResponseSections sections = new SearchResponseSections(
|
||||||
new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null,
|
new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null,
|
||||||
null, false, null, null, 1);
|
null, false, null, null, 1);
|
||||||
|
|
||||||
|
// block till latch has been counted down, simulating network latency
|
||||||
|
awaitForLatch();
|
||||||
nextPhase.onResponse(new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null));
|
nextPhase.onResponse(new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,7 +225,6 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42084")
|
|
||||||
public void testStateMachine() throws Exception {
|
public void testStateMachine() throws Exception {
|
||||||
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
|
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
|
||||||
final ExecutorService executor = Executors.newFixedThreadPool(1);
|
final ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||||
|
@ -265,7 +267,6 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42084")
|
|
||||||
public void testStop_AfterIndexerIsFinished() throws InterruptedException {
|
public void testStop_AfterIndexerIsFinished() throws InterruptedException {
|
||||||
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
|
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
|
||||||
final ExecutorService executor = Executors.newFixedThreadPool(1);
|
final ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||||
|
@ -285,7 +286,6 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42084")
|
|
||||||
public void testStop_WhileIndexing() throws InterruptedException {
|
public void testStop_WhileIndexing() throws InterruptedException {
|
||||||
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
|
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
|
||||||
final ExecutorService executor = Executors.newFixedThreadPool(1);
|
final ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||||
|
|
Loading…
Reference in New Issue