add a test to check excecution flow (#44481)

add a test for the execution flow of a2p indexer
This commit is contained in:
Hendrik Muhs 2019-07-22 15:42:05 +02:00
parent 5273a548a4
commit 4387d81e5b
1 changed files with 112 additions and 0 deletions

View File

@ -8,8 +8,10 @@ package org.elasticsearch.xpack.core.indexing;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponseSections;
@ -146,6 +148,100 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
}
private class MockIndexerFiveRuns extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
// counters
private volatile boolean started = false;
private volatile int searchRequests = 0;
private volatile int searchOps = 0;
private volatile int processOps = 0;
private volatile int bulkOps = 0;
protected MockIndexerFiveRuns(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition) {
super(executor, initialState, initialPosition, new MockJobStats());
}
@Override
protected String getJobId() {
return "mock_5_runs";
}
@Override
protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
++processOps;
if (processOps == 5) {
return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, true);
}
else if (processOps % 2 == 0) {
return new IterationResult<>(Collections.emptyList(), processOps, false);
}
return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, false);
}
@Override
protected SearchRequest buildSearchRequest() {
++searchRequests;
return new SearchRequest();
}
@Override
protected void onStart(long now, ActionListener<Void> listener) {
started = true;
listener.onResponse(null);
}
@Override
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
++searchOps;
final SearchResponseSections sections = new SearchResponseSections(
new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null,
null, false, null, null, 1);
nextPhase.onResponse(new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null));
}
@Override
protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase) {
++bulkOps;
nextPhase.onResponse(new BulkResponse(new BulkItemResponse[0], 100));
}
@Override
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
next.run();
}
@Override
protected void onFailure(Exception exc) {
fail(exc.getMessage());
}
@Override
protected void onFinish(ActionListener<Void> listener) {
assertTrue(isFinished.compareAndSet(false, true));
listener.onResponse(null);
}
@Override
protected void onStop() {
assertTrue(isStopped.compareAndSet(false, true));
}
@Override
protected void onAbort() {
}
public void assertCounters() {
assertTrue(started);
assertEquals(5L, searchRequests);
assertEquals(5L, searchOps);
assertEquals(5L, processOps);
assertEquals(2L, bulkOps);
}
}
private class MockIndexerThrowsFirstSearch extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
// test the execution order
@ -288,4 +384,20 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
executor.shutdownNow();
}
}
public void testFiveRuns() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (executor, state, 2);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
assertTrue(awaitBusy(() -> isFinished.get()));
indexer.assertCounters();
} finally {
executor.shutdownNow();
}
}
}