AsyncTwoPhaseIndexerTests race condition fixed (#37830)

The unlucky timing can cause this test to fail when the indexing is triggered from `maybeTriggerAsyncJob`. As this is asynchronous, in can finish quicker then the test stepping over to next assertion
The introduced barrier solves the problem
closes #37695
This commit is contained in:
Przemyslaw Gomulka 2019-01-25 16:26:16 +01:00 committed by GitHub
parent 27c3fb8e0d
commit 85acc11ef7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 19 additions and 3 deletions

View File

@ -21,9 +21,11 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -35,11 +37,14 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> { private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
private final CountDownLatch latch;
// test the execution order // test the execution order
private int step; private int step;
protected MockIndexer(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition) { protected MockIndexer(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition,
CountDownLatch latch) {
super(executor, initialState, initialPosition, new MockJobStats()); super(executor, initialState, initialPosition, new MockJobStats());
this.latch = latch;
} }
@Override @Override
@ -49,11 +54,20 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
@Override @Override
protected IterationResult<Integer> doProcess(SearchResponse searchResponse) { protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
awaitForLatch();
assertThat(step, equalTo(3)); assertThat(step, equalTo(3));
++step; ++step;
return new IterationResult<Integer>(Collections.emptyList(), 3, true); return new IterationResult<Integer>(Collections.emptyList(), 3, true);
} }
private void awaitForLatch() {
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override @Override
protected SearchRequest buildSearchRequest() { protected SearchRequest buildSearchRequest() {
assertThat(step, equalTo(1)); assertThat(step, equalTo(1));
@ -196,12 +210,14 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
final ExecutorService executor = Executors.newFixedThreadPool(1); final ExecutorService executor = Executors.newFixedThreadPool(1);
isFinished.set(false); isFinished.set(false);
try { try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2); MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch);
indexer.start(); indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
countDownLatch.countDown();
assertThat(indexer.getPosition(), equalTo(2)); assertThat(indexer.getPosition(), equalTo(2));
ESTestCase.awaitBusy(() -> isFinished.get()); ESTestCase.awaitBusy(() -> isFinished.get());
assertThat(indexer.getStep(), equalTo(6)); assertThat(indexer.getStep(), equalTo(6));