Fixing rollup state tests after onFailure ordering change (#45784) (#45814)

After the PR #45676 onFailure is now called before the indexer state has transitioned out of indexing.

To fix these tests, I added a new check to make sure that we don't mark it as failed until AFTER doSaveState is called with a STARTED indexer.
This commit is contained in:
Benjamin Trent 2019-08-21 14:46:09 -05:00 committed by GitHub
parent 47b1e2b3d0
commit 3ebeaa2557
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 42 additions and 23 deletions

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.rollup.job; package org.elasticsearch.xpack.rollup.job;
import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
@ -51,7 +50,6 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45770")
public class RollupIndexerStateTests extends ESTestCase { public class RollupIndexerStateTests extends ESTestCase {
private static class EmptyRollupIndexer extends RollupIndexer { private static class EmptyRollupIndexer extends RollupIndexer {
EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState, EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
@ -262,7 +260,7 @@ public class RollupIndexerStateTests extends ESTestCase {
indexer.start(); indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
ESTestCase.awaitBusy(() -> indexer.getState() == IndexerState.STARTED); assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)));
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L)); assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
assertThat(indexer.getStats().getNumPages(), equalTo(1L)); assertThat(indexer.getStats().getNumPages(), equalTo(1L));
assertThat(indexer.getStats().getIndexFailures(), equalTo(0L)); assertThat(indexer.getStats().getIndexFailures(), equalTo(0L));
@ -286,9 +284,18 @@ public class RollupIndexerStateTests extends ESTestCase {
protected void onFinish(ActionListener<Void> listener) { protected void onFinish(ActionListener<Void> listener) {
super.onFinish(ActionListener.wrap(r -> { super.onFinish(ActionListener.wrap(r -> {
listener.onResponse(r); listener.onResponse(r);
isFinished.set(true);
}, listener::onFailure)); }, listener::onFailure));
} }
@Override
protected void doSaveState(IndexerState state, Map<String, Object> position, Runnable next) {
super.doSaveState(state, position, () -> {
if (state == IndexerState.STARTED) {
isFinished.set(true);
}
next.run();
});
}
}; };
final CountDownLatch latch = indexer.newLatch(); final CountDownLatch latch = indexer.newLatch();
indexer.start(); indexer.start();
@ -296,7 +303,7 @@ public class RollupIndexerStateTests extends ESTestCase {
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
latch.countDown(); latch.countDown();
ESTestCase.awaitBusy(() -> isFinished.get()); assertBusy(() -> assertTrue(isFinished.get()));
assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L)); assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
assertThat(indexer.getStats().getNumPages(), equalTo(1L)); assertThat(indexer.getStats().getNumPages(), equalTo(1L));
@ -309,7 +316,7 @@ public class RollupIndexerStateTests extends ESTestCase {
} }
} }
public void testStateChangeMidTrigger() throws Exception { public void testStateChangeMidTrigger() {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED); AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
RollupIndexerJobStats stats = new RollupIndexerJobStats(); RollupIndexerJobStats stats = new RollupIndexerJobStats();
@ -389,7 +396,7 @@ public class RollupIndexerStateTests extends ESTestCase {
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
latch.countDown(); latch.countDown();
ESTestCase.awaitBusy(() -> aborted.get()); assertBusy(() -> assertTrue(aborted.get()));
assertThat(indexer.getState(), equalTo(IndexerState.ABORTING)); assertThat(indexer.getState(), equalTo(IndexerState.ABORTING));
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L)); assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
assertThat(indexer.getStats().getNumPages(), equalTo(0L)); assertThat(indexer.getStats().getNumPages(), equalTo(0L));
@ -477,7 +484,7 @@ public class RollupIndexerStateTests extends ESTestCase {
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
doNextSearchLatch.countDown(); doNextSearchLatch.countDown();
ESTestCase.awaitBusy(() -> aborted.get()); assertBusy(() -> assertTrue(aborted.get()));
assertThat(indexer.getState(), equalTo(IndexerState.ABORTING)); assertThat(indexer.getState(), equalTo(IndexerState.ABORTING));
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L)); assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
assertThat(indexer.getStats().getNumPages(), equalTo(1L)); assertThat(indexer.getStats().getNumPages(), equalTo(1L));
@ -501,7 +508,7 @@ public class RollupIndexerStateTests extends ESTestCase {
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
assertThat(indexer.stop(), equalTo(IndexerState.STOPPING)); assertThat(indexer.stop(), equalTo(IndexerState.STOPPING));
latch.countDown(); latch.countDown();
ESTestCase.awaitBusy(() -> indexer.getState() == IndexerState.STOPPED); assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)));
assertTrue(indexer.abort()); assertTrue(indexer.abort());
} finally { } finally {
executor.shutdownNow(); executor.shutdownNow();
@ -528,14 +535,14 @@ public class RollupIndexerStateTests extends ESTestCase {
assertFalse(indexer.abort()); assertFalse(indexer.abort());
assertThat(indexer.getState(), equalTo(IndexerState.ABORTING)); assertThat(indexer.getState(), equalTo(IndexerState.ABORTING));
latch.countDown(); latch.countDown();
ESTestCase.awaitBusy(() -> isAborted.get()); assertBusy(() -> assertTrue(isAborted.get()));
assertFalse(indexer.abort()); assertFalse(indexer.abort());
} finally { } finally {
executor.shutdownNow(); executor.shutdownNow();
} }
} }
public void testAbortStarted() throws Exception { public void testAbortStarted() {
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED); AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1); final ExecutorService executor = Executors.newFixedThreadPool(1);
@ -582,7 +589,7 @@ public class RollupIndexerStateTests extends ESTestCase {
assertFalse(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertFalse(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
latch.countDown(); latch.countDown();
ESTestCase.awaitBusy(() -> indexer.getState() == IndexerState.STARTED); assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)));
assertThat(indexer.getStats().getNumInvocations(), equalTo((long) i + 1)); assertThat(indexer.getStats().getNumInvocations(), equalTo((long) i + 1));
assertThat(indexer.getStats().getNumPages(), equalTo((long) i + 1)); assertThat(indexer.getStats().getNumPages(), equalTo((long) i + 1));
} }
@ -591,7 +598,7 @@ public class RollupIndexerStateTests extends ESTestCase {
assertThat(indexer.stop(), equalTo(IndexerState.STOPPING)); assertThat(indexer.stop(), equalTo(IndexerState.STOPPING));
assertThat(indexer.getState(), equalTo(IndexerState.STOPPING)); assertThat(indexer.getState(), equalTo(IndexerState.STOPPING));
latch.countDown(); latch.countDown();
ESTestCase.awaitBusy(() -> indexer.getState() == IndexerState.STOPPED); assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)));
assertTrue(indexer.abort()); assertTrue(indexer.abort());
} finally { } finally {
executor.shutdownNow(); executor.shutdownNow();
@ -674,21 +681,25 @@ public class RollupIndexerStateTests extends ESTestCase {
Consumer<Exception> failureConsumer = e -> { Consumer<Exception> failureConsumer = e -> {
assertThat(e.getMessage(), equalTo("Could not identify key in agg [foo]")); assertThat(e.getMessage(), equalTo("Could not identify key in agg [foo]"));
isFinished.set(true); };
BiConsumer<IndexerState, Map<String, Object>> stateCheck = (i, p) -> {
if (i == IndexerState.STARTED) {
isFinished.set(true);
}
}; };
final ExecutorService executor = Executors.newFixedThreadPool(1); final ExecutorService executor = Executors.newFixedThreadPool(1);
try { try {
NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null,
searchFunction, bulkFunction, failureConsumer); searchFunction, bulkFunction, failureConsumer, stateCheck);
final CountDownLatch latch = indexer.newLatch(1); final CountDownLatch latch = indexer.newLatch(1);
indexer.start(); indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
latch.countDown(); latch.countDown();
ESTestCase.awaitBusy(() -> isFinished.get()); assertBusy(() -> assertTrue(isFinished.get()));
// Despite failure in bulk, we should move back to STARTED and wait to try again on next trigger // Despite failure in bulk, we should move back to STARTED and wait to try again on next trigger
assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
@ -800,7 +811,7 @@ public class RollupIndexerStateTests extends ESTestCase {
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
latch.countDown(); latch.countDown();
ESTestCase.awaitBusy(() -> isFinished.get()); assertBusy(() -> assertTrue(isFinished.get()));
// Despite failure in processing keys, we should continue moving to STOPPED // Despite failure in processing keys, we should continue moving to STOPPED
assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)); assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L)); assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
@ -830,21 +841,25 @@ public class RollupIndexerStateTests extends ESTestCase {
Consumer<Exception> failureConsumer = e -> { Consumer<Exception> failureConsumer = e -> {
assertThat(e.getMessage(), startsWith("Partial shards failure")); assertThat(e.getMessage(), startsWith("Partial shards failure"));
isFinished.set(true); };
BiConsumer<IndexerState, Map<String, Object>> stateCheck = (i, p) -> {
if (i == IndexerState.STARTED) {
isFinished.set(true);
}
}; };
final ExecutorService executor = Executors.newFixedThreadPool(1); final ExecutorService executor = Executors.newFixedThreadPool(1);
try { try {
NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null,
searchFunction, bulkFunction, failureConsumer); searchFunction, bulkFunction, failureConsumer, stateCheck);
final CountDownLatch latch = indexer.newLatch(1); final CountDownLatch latch = indexer.newLatch(1);
indexer.start(); indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
latch.countDown(); latch.countDown();
ESTestCase.awaitBusy(() -> isFinished.get()); assertBusy(() -> assertTrue(isFinished.get()));
// Despite failure in bulk, we should move back to STARTED and wait to try again on next trigger // Despite failure in bulk, we should move back to STARTED and wait to try again on next trigger
assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
@ -939,14 +954,18 @@ public class RollupIndexerStateTests extends ESTestCase {
Consumer<Exception> failureConsumer = e -> { Consumer<Exception> failureConsumer = e -> {
assertThat(e.getMessage(), equalTo("failed")); assertThat(e.getMessage(), equalTo("failed"));
isFinished.set(true); };
BiConsumer<IndexerState, Map<String, Object>> stateCheck = (i, p) -> {
if (i == IndexerState.STARTED) {
isFinished.set(true);
}
}; };
final ExecutorService executor = Executors.newFixedThreadPool(1); final ExecutorService executor = Executors.newFixedThreadPool(1);
try { try {
NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null,
searchFunction, bulkFunction, failureConsumer) { searchFunction, bulkFunction, failureConsumer, stateCheck) {
@Override @Override
protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase) { protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase) {
nextPhase.onFailure(new RuntimeException("failed")); nextPhase.onFailure(new RuntimeException("failed"));
@ -958,7 +977,7 @@ public class RollupIndexerStateTests extends ESTestCase {
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
latch.countDown(); latch.countDown();
ESTestCase.awaitBusy(() -> isFinished.get()); assertBusy(() -> assertTrue(isFinished.get()));
// Despite failure in bulk, we should move back to STARTED and wait to try again on next trigger // Despite failure in bulk, we should move back to STARTED and wait to try again on next trigger
assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertThat(indexer.getState(), equalTo(IndexerState.STARTED));