diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 053e41d9b2a..1593dfbb755 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -8,8 +8,10 @@ package org.elasticsearch.xpack.core.indexing; import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponseSections; @@ -146,6 +148,100 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { } + private class MockIndexerFiveRuns extends AsyncTwoPhaseIndexer { + + // counters + private volatile boolean started = false; + private volatile int searchRequests = 0; + private volatile int searchOps = 0; + private volatile int processOps = 0; + private volatile int bulkOps = 0; + + protected MockIndexerFiveRuns(Executor executor, AtomicReference initialState, Integer initialPosition) { + super(executor, initialState, initialPosition, new MockJobStats()); + } + + @Override + protected String getJobId() { + return "mock_5_runs"; + } + + @Override + protected IterationResult doProcess(SearchResponse searchResponse) { + ++processOps; + if (processOps == 5) { + return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, true); + } + else if (processOps % 2 == 0) { + return new IterationResult<>(Collections.emptyList(), processOps, false); + } + + return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, false); + } + + @Override + protected SearchRequest buildSearchRequest() { + ++searchRequests; + return new SearchRequest(); + } + + @Override + protected void onStart(long now, ActionListener listener) { + started = true; + listener.onResponse(null); + } + + @Override + protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { + ++searchOps; + final SearchResponseSections sections = new SearchResponseSections( + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null, + null, false, null, null, 1); + + nextPhase.onResponse(new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null)); + } + + @Override + protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { + ++bulkOps; + nextPhase.onResponse(new BulkResponse(new BulkItemResponse[0], 100)); + } + + @Override + protected void doSaveState(IndexerState state, Integer position, Runnable next) { + next.run(); + } + + @Override + protected void onFailure(Exception exc) { + fail(exc.getMessage()); + } + + @Override + protected void onFinish(ActionListener listener) { + assertTrue(isFinished.compareAndSet(false, true)); + listener.onResponse(null); + } + + @Override + protected void onStop() { + assertTrue(isStopped.compareAndSet(false, true)); + } + + @Override + protected void onAbort() { + } + + public void assertCounters() { + assertTrue(started); + assertEquals(5L, searchRequests); + assertEquals(5L, searchOps); + assertEquals(5L, processOps); + assertEquals(2L, bulkOps); + } + + } + private class MockIndexerThrowsFirstSearch extends AsyncTwoPhaseIndexer { // test the execution order @@ -288,4 +384,20 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { executor.shutdownNow(); } } + + public void testFiveRuns() throws InterruptedException { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + final ExecutorService executor = Executors.newFixedThreadPool(1); + try { + MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (executor, state, 2); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + assertTrue(awaitBusy(() -> isFinished.get())); + indexer.assertCounters(); + } finally { + executor.shutdownNow(); + } + } }