diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index fef4592d750..63149267b16 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -14,9 +14,13 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; import java.util.List; -import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -38,15 +42,70 @@ import java.util.concurrent.atomic.AtomicReference; public abstract class AsyncTwoPhaseIndexer { private static final Logger logger = LogManager.getLogger(AsyncTwoPhaseIndexer.class.getName()); + // max time to wait for during throttling + private static final TimeValue MAX_THROTTLE_WAIT_TIME = TimeValue.timeValueHours(1); + // min time to trigger delayed execution, this avoids scheduling tasks with super short amount of time + private static final TimeValue MIN_THROTTLE_WAIT_TIME = TimeValue.timeValueMillis(10); + + private final ActionListener searchResponseListener = ActionListener.wrap( + this::onSearchResponse, + this::finishWithSearchFailure + ); + private final JobStats stats; private final AtomicReference state; private final AtomicReference position; - private final Executor executor; + private final ThreadPool threadPool; + private final String executorName; - protected AsyncTwoPhaseIndexer(Executor executor, AtomicReference initialState, - JobPosition initialPosition, JobStats jobStats) { - this.executor = executor; + // throttling implementation + private volatile float currentMaxDocsPerSecond; + private volatile long lastSearchStartTimeNanos = 0; + private volatile long lastDocCount = 0; + private volatile ScheduledRunnable scheduledNextSearch; + + /** + * Task wrapper for throttled execution, we need this wrapper in order to cancel and re-issue scheduled searches + */ + class ScheduledRunnable { + private final ThreadPool threadPool; + private final String executorName; + private final Runnable command; + private Scheduler.ScheduledCancellable scheduled; + + ScheduledRunnable(ThreadPool threadPool, String executorName, TimeValue delay, Runnable command) { + this.threadPool = threadPool; + this.executorName = executorName; + + // with wrapping the command in RunOnce we ensure the command isn't executed twice, e.g. if the + // future is already running and cancel returns true + this.command = new RunOnce(command); + this.scheduled = threadPool.schedule(() -> { command.run(); }, delay, executorName); + } + + public void reschedule(TimeValue delay) { + // note: cancel return true if the runnable is currently executing + if (scheduled.cancel()) { + if (delay.duration() > 0) { + scheduled = threadPool.schedule(() -> command.run(), delay, executorName); + } else { + threadPool.executor(executorName).execute(() -> command.run()); + } + } + } + + } + + protected AsyncTwoPhaseIndexer( + ThreadPool threadPool, + String executorName, + AtomicReference initialState, + JobPosition initialPosition, + JobStats jobStats + ) { + this.threadPool = threadPool; + this.executorName = executorName; this.state = initialState; this.position = new AtomicReference<>(initialPosition); this.stats = jobStats; @@ -96,7 +155,7 @@ public abstract class AsyncTwoPhaseIndexer { + IndexerState indexerState = state.updateAndGet(previousState -> { if (previousState == IndexerState.INDEXING) { return IndexerState.STOPPING; } else if (previousState == IndexerState.STARTED) { @@ -105,6 +164,13 @@ public abstract class AsyncTwoPhaseIndexer { + threadPool.executor(executorName).execute(() -> { onStart(now, ActionListener.wrap(r -> { assert r != null; if (r) { - nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure)); + nextSearch(); } else { onFinish(ActionListener.wrap( onFinishResponse -> doSaveState(finishAndSetState(), position.get(), () -> {}), @@ -178,6 +244,34 @@ public abstract class AsyncTwoPhaseIndexer iterationResult = doProcess(searchResponse); + // record the number of documents returned to base throttling on the output + lastDocCount = stats.getNumDocuments() - numDocumentsBefore; + if (iterationResult.isDone()) { - logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down."); + logger.debug("Finished indexing for job [{}], saving state and shutting down.", getJobId()); position.set(iterationResult.getPosition()); stats.markEndProcessing(); @@ -375,7 +478,7 @@ public abstract class AsyncTwoPhaseIndexer listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure); - nextSearch(listener); + nextSearch(); } catch (Exception e) { finishWithFailure(e); } @@ -409,26 +511,74 @@ public abstract class AsyncTwoPhaseIndexer listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure); // TODO probably something more intelligent than every-50 is needed if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) { doSaveState(IndexerState.INDEXING, position, () -> { - nextSearch(listener); + nextSearch(); }); } else { - nextSearch(listener); + nextSearch(); } } catch (Exception e) { finishWithIndexingFailure(e); } } - private void nextSearch(ActionListener listener) { + protected void nextSearch() { + currentMaxDocsPerSecond = getMaxDocsPerSecond(); + if (currentMaxDocsPerSecond > 0 && lastDocCount > 0) { + TimeValue executionDelay = calculateThrottlingDelay( + currentMaxDocsPerSecond, + lastDocCount, + lastSearchStartTimeNanos, + getTimeNanos() + ); + + if (executionDelay.duration() > 0) { + logger.debug( + "throttling job [{}], wait for {} ({} {})", + getJobId(), + executionDelay, + currentMaxDocsPerSecond, + lastDocCount + ); + scheduledNextSearch = new ScheduledRunnable( + threadPool, + executorName, + executionDelay, + () -> triggerNextSearch(executionDelay.getNanos()) + ); + + // corner case: if for whatever reason stop() has been called meanwhile fast forward + if (getState().equals(IndexerState.STOPPING)) { + scheduledNextSearch.reschedule(TimeValue.ZERO); + } + return; + } + } + + triggerNextSearch(0L); + } + + private void triggerNextSearch(long waitTimeInNanos) { + if (checkState(getState()) == false) { + return; + } + stats.markStartSearch(); + lastSearchStartTimeNanos = getTimeNanos(); + // ensure that partial results are not accepted and cause a search failure - SearchRequest searchRequest = buildSearchRequest().allowPartialSearchResults(false); - doNextSearch(searchRequest, listener); + SearchRequest searchRequest = buildSearchRequest(waitTimeInNanos).allowPartialSearchResults(false); + + doNextSearch(searchRequest, searchResponseListener); } /** @@ -461,4 +611,41 @@ public abstract class AsyncTwoPhaseIndexer initialState, Integer initialPosition, - CountDownLatch latch, boolean stoppedBeforeFinished) { - super(executor, initialState, initialPosition, new MockJobStats()); + protected MockIndexer(ThreadPool threadPool, String executorName, AtomicReference initialState, + Integer initialPosition, CountDownLatch latch, boolean stoppedBeforeFinished) { + super(threadPool, executorName, initialState, initialPosition, new MockJobStats()); this.latch = latch; this.stoppedBeforeFinished = stoppedBeforeFinished; } @@ -81,7 +86,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { } @Override - protected SearchRequest buildSearchRequest() { + protected SearchRequest buildSearchRequest(long waitTimeInNanos) { assertThat(step, equalTo(1)); ++step; return new SearchRequest(); @@ -114,8 +119,10 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { @Override protected void doSaveState(IndexerState state, Integer position, Runnable next) { - int expectedStep = stoppedBeforeFinished ? 3 : 5; - assertThat(step, equalTo(expectedStep)); + // for stop before finished we do not know if its stopped before are after the search + if (stoppedBeforeFinished == false) { + assertThat(step, equalTo(5)); + } ++step; next.run(); } @@ -150,15 +157,29 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { private class MockIndexerFiveRuns extends AsyncTwoPhaseIndexer { + private final long startTime; + private final CountDownLatch latch; + private volatile float maxDocsPerSecond; + // counters private volatile boolean started = false; + private volatile boolean waitingForLatch = false; private volatile int searchRequests = 0; private volatile int searchOps = 0; private volatile int processOps = 0; private volatile int bulkOps = 0; - protected MockIndexerFiveRuns(Executor executor, AtomicReference initialState, Integer initialPosition) { - super(executor, initialState, initialPosition, new MockJobStats()); + protected MockIndexerFiveRuns(ThreadPool threadPool, String executorName, AtomicReference initialState, + Integer initialPosition, float maxDocsPerSecond, CountDownLatch latch) { + super(threadPool, executorName, initialState, initialPosition, new MockJobStats()); + startTime = System.nanoTime(); + this.latch = latch; + this.maxDocsPerSecond = maxDocsPerSecond; + } + + public void rethrottle(float maxDocsPerSecond) { + this.maxDocsPerSecond = maxDocsPerSecond; + rethrottle(); } @Override @@ -166,8 +187,16 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { return "mock_5_runs"; } + @Override + protected float getMaxDocsPerSecond() { + return maxDocsPerSecond; + } + @Override protected IterationResult doProcess(SearchResponse searchResponse) { + // increment doc count for throttling + getStats().incrementNumDocuments(1000); + ++processOps; if (processOps == 5) { return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, true); @@ -180,7 +209,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { } @Override - protected SearchRequest buildSearchRequest() { + protected SearchRequest buildSearchRequest(long waitTimeInNanos) { ++searchRequests; return new SearchRequest(); } @@ -191,6 +220,23 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { listener.onResponse(true); } + private void awaitForLatch() { + if (latch == null) { + return; + } + try { + waitingForLatch = true; + latch.await(10, TimeUnit.SECONDS); + waitingForLatch = false; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public boolean waitingForLatchCountDown() { + return waitingForLatch; + } + @Override protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { ++searchOps; @@ -198,6 +244,10 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null, null, false, null, null, 1); + if (processOps == 3) { + awaitForLatch(); + } + nextPhase.onResponse(new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null)); } @@ -232,6 +282,11 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { protected void onAbort() { } + @Override + protected long getTimeNanos() { + return startTime + searchOps * 50_000_000L; + } + public void assertCounters() { assertTrue(started); assertEquals(5L, searchRequests); @@ -247,8 +302,9 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { // test the execution order private int step; - protected MockIndexerThrowsFirstSearch(Executor executor, AtomicReference initialState, Integer initialPosition) { - super(executor, initialState, initialPosition, new MockJobStats()); + protected MockIndexerThrowsFirstSearch(ThreadPool threadPool, String executorName, AtomicReference initialState, + Integer initialPosition) { + super(threadPool, executorName, initialState, initialPosition, new MockJobStats()); } @Override @@ -263,7 +319,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { } @Override - protected SearchRequest buildSearchRequest() { + protected SearchRequest buildSearchRequest(long waitTimeInNanos) { assertThat(step, equalTo(1)); ++step; return new SearchRequest(); @@ -321,12 +377,32 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { } } + private class MockThreadPool extends TestThreadPool { + + private List delays = new ArrayList<>(); + + MockThreadPool(String name, ExecutorBuilder... customBuilders) { + super(name, Settings.EMPTY, customBuilders); + } + + @Override + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) { + delays.add(delay); + + return super.schedule(command, TimeValue.ZERO, executor); + } + + public void assertCountersAndDelay(Collection expectedDelays) { + assertThat(delays, equalTo(expectedDelays)); + } + } + public void testStateMachine() throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { CountDownLatch countDownLatch = new CountDownLatch(1); - MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false); + MockIndexer indexer = new MockIndexer(threadPool, ThreadPool.Names.GENERIC, state, 2, countDownLatch, false); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); @@ -344,33 +420,32 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testStateMachineBrokenSearch() throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2); + MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(threadPool, ThreadPool.Names.GENERIC, state, 2); indexer.start(); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertBusy(() -> assertTrue(isFinished.get()), 10000, TimeUnit.SECONDS); assertThat(indexer.getStep(), equalTo(3)); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testStop_WhileIndexing() throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { CountDownLatch countDownLatch = new CountDownLatch(1); - MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, true); + MockIndexer indexer = new MockIndexer(threadPool, ThreadPool.Names.GENERIC, state, 2, countDownLatch, true); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); @@ -382,22 +457,135 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { assertBusy(() -> assertTrue(isStopped.get())); assertFalse(isFinished.get()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testFiveRuns() throws Exception { + doTestFiveRuns(-1, Collections.emptyList()); + } + + public void testFiveRunsThrottled100() throws Exception { + // expect throttling to kick in + doTestFiveRuns(100, timeValueCollectionFromMilliseconds(9950L, 9950L, 9950L, 9950L)); + } + + public void testFiveRunsThrottled1000() throws Exception { + // expect throttling to kick in + doTestFiveRuns(1_000, timeValueCollectionFromMilliseconds(950L, 950L, 950L, 950L)); + } + + public void testFiveRunsThrottled18000() throws Exception { + // expect throttling to not kick in due to min wait time + doTestFiveRuns(18_000, Collections.emptyList()); + } + + public void testFiveRunsThrottled1000000() throws Exception { + // docs per seconds is set high, so throttling does not kick in + doTestFiveRuns(1_000_000, Collections.emptyList()); + } + + public void doTestFiveRuns(float docsPerSecond, Collection expectedDelays) throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final MockThreadPool threadPool = new MockThreadPool(getTestName()); try { - MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (executor, state, 2); + MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, ThreadPool.Names.GENERIC, state, 2, docsPerSecond, + null); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertBusy(() -> assertTrue(isFinished.get())); indexer.assertCounters(); + threadPool.assertCountersAndDelay(expectedDelays); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } + + public void testFiveRunsRethrottle0_100() throws Exception { + doTestFiveRunsRethrottle(-1, 100, timeValueCollectionFromMilliseconds(9950L)); + } + + public void testFiveRunsRethrottle100_0() throws Exception { + doTestFiveRunsRethrottle(100, 0, timeValueCollectionFromMilliseconds(9950L, 9950L, 9950L)); + } + + public void testFiveRunsRethrottle100_1000() throws Exception { + doTestFiveRunsRethrottle(100, 1000, timeValueCollectionFromMilliseconds(9950L, 9950L, 9950L, 950L)); + } + + public void testFiveRunsRethrottle1000_100() throws Exception { + doTestFiveRunsRethrottle(1000, 100, timeValueCollectionFromMilliseconds(950L, 950L, 950L, 9950L)); + } + + public void doTestFiveRunsRethrottle( + float docsPerSecond, + float docsPerSecondRethrottle, + Collection expectedDelays + ) throws Exception { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + + final MockThreadPool threadPool = new MockThreadPool(getTestName()); + try { + CountDownLatch latch = new CountDownLatch(1); + MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, ThreadPool.Names.GENERIC, state, 2, docsPerSecond, + latch); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + // wait until the indexer starts waiting on the latch + assertBusy(() -> assertTrue(indexer.waitingForLatchCountDown())); + // rethrottle + indexer.rethrottle(docsPerSecondRethrottle); + latch.countDown(); + // let it finish + assertBusy(() -> assertTrue(isFinished.get())); + indexer.assertCounters(); + threadPool.assertCountersAndDelay(expectedDelays); + } finally { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + } + } + + public void testCalculateThrottlingDelay() { + // negative docs per second, throttling turned off + assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(-100, 100, 1_000, 1_000), equalTo(TimeValue.ZERO)); + + // negative docs per second, throttling turned off + assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(0, 100, 1_000, 1_000), equalTo(TimeValue.ZERO)); + + // 100 docs/s with 100 docs -> 1s delay + assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(100, 100, 1_000_000, 1_000_000), equalTo(TimeValue.timeValueSeconds(1))); + + // 100 docs/s with 100 docs, 200ms passed -> 800ms delay + assertThat( + AsyncTwoPhaseIndexer.calculateThrottlingDelay(100, 100, 1_000_000_000L, 1_200_000_000L), + equalTo(TimeValue.timeValueMillis(800)) + ); + + // 100 docs/s with 100 docs done, time passed -> no delay + assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(100, 100, 1_000_000_000L, 5_000_000_000L), equalTo(TimeValue.ZERO)); + + // 1_000_000 docs/s with 1 doc done, time passed -> no delay + assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(1_000_000, 1, 1_000_000_000L, 1_000_000_000L), equalTo(TimeValue.ZERO)); + + // max: 1 docs/s with 1_000_000 docs done, time passed -> no delay + assertThat( + AsyncTwoPhaseIndexer.calculateThrottlingDelay(1, 1_000_000, 1_000_000_000L, 1_000_000_000L), + equalTo(TimeValue.timeValueHours(1)) + ); + + // min: 100 docs/s with 100 docs, 995ms passed -> no delay, because minimum not reached + assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(100, 100, 1_000_000_000L, 1_995_000_000L), equalTo(TimeValue.ZERO)); + + } + + private static Collection timeValueCollectionFromMilliseconds(Long... milliseconds) { + List timeValues = new ArrayList<>(); + for (Long m: milliseconds) { + timeValues.add(TimeValue.timeValueMillis(m)); + } + + return timeValues; + } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index f9c063f3266..57281c75cc2 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilde import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; @@ -48,7 +49,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -67,29 +67,31 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer initialState, Map initialPosition, - AtomicBoolean upgradedDocumentID) { - this(executor, job, initialState, initialPosition, upgradedDocumentID, new RollupIndexerJobStats()); + RollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, + Map initialPosition, AtomicBoolean upgradedDocumentID) { + this(threadPool, executorName, job, initialState, initialPosition, upgradedDocumentID, new RollupIndexerJobStats()); } /** * Ctr - * @param executor Executor to use to fire the first request of a background job. + * @param threadPool ThreadPool to use to fire the first request of a background job. + * @param executorName Name of the executor to use to fire the first request of a background job. * @param job The rollup job * @param initialState Initial state for the indexer * @param initialPosition The last indexed bucket of the task * @param upgradedDocumentID whether job has updated IDs (for BWC) * @param jobStats jobstats instance for collecting stats */ - RollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, Map initialPosition, - AtomicBoolean upgradedDocumentID, RollupIndexerJobStats jobStats) { - super(executor, initialState, initialPosition, jobStats); + RollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, + Map initialPosition, AtomicBoolean upgradedDocumentID, RollupIndexerJobStats jobStats) { + super(threadPool, executorName, initialState, initialPosition, jobStats); this.job = job; this.compositeBuilder = createCompositeBuilder(job.getConfig()); this.upgradedDocumentID = upgradedDocumentID; @@ -123,7 +125,7 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer position = getPosition(); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index 21fdb2f4998..599e8ea9ee0 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -103,7 +103,7 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE ClientRollupPageManager(RollupJob job, IndexerState initialState, Map initialPosition, Client client, AtomicBoolean upgradedDocumentID) { - super(threadPool.executor(ThreadPool.Names.GENERIC), job, new AtomicReference<>(initialState), + super(threadPool, ThreadPool.Names.GENERIC, job, new AtomicReference<>(initialState), initialPosition, upgradedDocumentID); this.client = client; this.job = job; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java index 16e5a3eeffa..8ab5b68b7ae 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java @@ -47,6 +47,8 @@ import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig; @@ -71,9 +73,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -524,14 +523,15 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { IndexReader reader = DirectoryReader.open(dir); IndexSearcher searcher = new IndexSearcher(reader); String dateHistoField = config.getGroupConfig().getDateHistogram().getField(); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); + try { RollupJob job = new RollupJob(config, Collections.emptyMap()); - final SyncRollupIndexer action = new SyncRollupIndexer(executor, job, searcher, + final SyncRollupIndexer action = new SyncRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, searcher, fieldTypeLookup.values().toArray(new MappedFieldType[0]), fieldTypeLookup.get(dateHistoField)); rollupConsumer.accept(action.triggerAndWaitForCompletion(now)); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); reader.close(); dir.close(); } @@ -627,9 +627,9 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase { private final CountDownLatch latch = new CountDownLatch(1); private Exception exc; - SyncRollupIndexer(Executor executor, RollupJob job, IndexSearcher searcher, + SyncRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, IndexSearcher searcher, MappedFieldType[] fieldTypes, MappedFieldType timestampField) { - super(executor, job, new AtomicReference<>(IndexerState.STARTED), null, new AtomicBoolean(newIDScheme)); + super(threadPool, executorName, job, new AtomicReference<>(IndexerState.STARTED), null, new AtomicBoolean(newIDScheme)); this.searcher = searcher; this.fieldTypes = fieldTypes; this.timestampField = timestampField; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java index 30ee1db25d5..51b9bb4e315 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java @@ -22,6 +22,8 @@ import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.RollupField; @@ -35,9 +37,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -52,19 +52,19 @@ import static org.mockito.Mockito.spy; public class RollupIndexerStateTests extends ESTestCase { private static class EmptyRollupIndexer extends RollupIndexer { - EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + EmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition, boolean upgraded, RollupIndexerJobStats stats) { - super(executor, job, initialState, initialPosition, new AtomicBoolean(upgraded), stats); + super(threadPool, executorName, job, initialState, initialPosition, new AtomicBoolean(upgraded), stats); } - EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + EmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition, boolean upgraded) { - super(executor, job, initialState, initialPosition, new AtomicBoolean(upgraded)); + super(threadPool, executorName, job, initialState, initialPosition, new AtomicBoolean(upgraded)); } - EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + EmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition) { - this(executor, job, initialState, initialPosition, randomBoolean()); + this(threadPool, executorName, job, initialState, initialPosition, randomBoolean()); } @@ -140,19 +140,19 @@ public class RollupIndexerStateTests extends ESTestCase { private static class DelayedEmptyRollupIndexer extends EmptyRollupIndexer { protected CountDownLatch latch; - DelayedEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + DelayedEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition, boolean upgraded) { - super(executor, job, initialState, initialPosition, upgraded); + super(threadPool, executorName, job, initialState, initialPosition, upgraded); } - DelayedEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + DelayedEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition) { - super(executor, job, initialState, initialPosition, randomBoolean()); + super(threadPool, executorName, job, initialState, initialPosition, randomBoolean()); } - DelayedEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + DelayedEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition, RollupIndexerJobStats stats) { - super(executor, job, initialState, initialPosition, randomBoolean(), stats); + super(threadPool, executorName, job, initialState, initialPosition, randomBoolean(), stats); } private CountDownLatch newLatch() { @@ -178,17 +178,17 @@ public class RollupIndexerStateTests extends ESTestCase { final BiConsumer> saveStateCheck; private CountDownLatch latch; - NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + NonEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition, Function searchFunction, Function bulkFunction, Consumer failureConsumer) { - this(executor, job, initialState, initialPosition, searchFunction, bulkFunction, failureConsumer, (i, m) -> {}); + this(threadPool, executorName, job, initialState, initialPosition, searchFunction, bulkFunction, failureConsumer, (i, m) -> {}); } - NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + NonEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition, Function searchFunction, Function bulkFunction, Consumer failureConsumer, BiConsumer> saveStateCheck) { - super(executor, job, initialState, initialPosition, new AtomicBoolean(randomBoolean())); + super(threadPool, executorName, job, initialState, initialPosition, new AtomicBoolean(randomBoolean())); this.searchFunction = searchFunction; this.bulkFunction = bulkFunction; this.failureConsumer = failureConsumer; @@ -253,9 +253,9 @@ public class RollupIndexerStateTests extends ESTestCase { public void testStarted() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - RollupIndexer indexer = new EmptyRollupIndexer(executor, job, state, null, true); + RollupIndexer indexer = new EmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null, true); assertTrue(indexer.isUpgradedDocumentID()); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); @@ -269,17 +269,17 @@ public class RollupIndexerStateTests extends ESTestCase { assertThat(indexer.getStats().getIndexTotal(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testIndexing() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { AtomicBoolean isFinished = new AtomicBoolean(false); - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { + DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) { @Override protected void onFinish(ActionListener listener) { super.onFinish(ActionListener.wrap(r -> { @@ -312,7 +312,7 @@ public class RollupIndexerStateTests extends ESTestCase { assertThat(indexer.getStats().getSearchTotal(), equalTo(1L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -334,10 +334,11 @@ public class RollupIndexerStateTests extends ESTestCase { doAnswer(forwardAndChangeState).when(spyStats).incrementNumInvocations(1L); RollupJob job = new RollupJob(config, Collections.emptyMap()); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { AtomicBoolean isFinished = new AtomicBoolean(false); - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null, spyStats) { + DelayedEmptyRollupIndexer indexer = + new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null, spyStats) { @Override protected void onFinish(ActionListener listener) { super.onFinish(ActionListener.wrap(r -> { @@ -357,7 +358,7 @@ public class RollupIndexerStateTests extends ESTestCase { assertThat(indexer.getStats().getNumPages(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -365,10 +366,10 @@ public class RollupIndexerStateTests extends ESTestCase { final AtomicBoolean aborted = new AtomicBoolean(false); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); final CountDownLatch latch = new CountDownLatch(1); try { - EmptyRollupIndexer indexer = new EmptyRollupIndexer(executor, job, state, null) { + EmptyRollupIndexer indexer = new EmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) { @Override protected void onFinish(ActionListener listener) { fail("Should not have called onFinish"); @@ -402,7 +403,7 @@ public class RollupIndexerStateTests extends ESTestCase { assertThat(indexer.getStats().getNumPages(), equalTo(0L)); assertThat(indexer.getStats().getSearchFailures(), equalTo(0L)); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -410,13 +411,13 @@ public class RollupIndexerStateTests extends ESTestCase { final AtomicBoolean aborted = new AtomicBoolean(false); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); // Don't use the indexer's latch because we completely change doNextSearch() final CountDownLatch doNextSearchLatch = new CountDownLatch(1); try { - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { + DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) { @Override protected void onAbort() { aborted.set(true); @@ -489,16 +490,16 @@ public class RollupIndexerStateTests extends ESTestCase { assertThat(indexer.getStats().getNumInvocations(), equalTo(1L)); assertThat(indexer.getStats().getNumPages(), equalTo(1L)); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testStopIndexing() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null); + DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null); final CountDownLatch latch = indexer.newLatch(); assertFalse(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)); @@ -511,17 +512,17 @@ public class RollupIndexerStateTests extends ESTestCase { assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STOPPED))); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testAbortIndexing() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { final AtomicBoolean isAborted = new AtomicBoolean(false); - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { + DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) { @Override protected void onAbort() { isAborted.set(true); @@ -538,17 +539,17 @@ public class RollupIndexerStateTests extends ESTestCase { assertBusy(() -> assertTrue(isAborted.get())); assertFalse(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testAbortStarted() { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { final AtomicBoolean isAborted = new AtomicBoolean(false); - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { + DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) { @Override protected void onAbort() { isAborted.set(true); @@ -564,17 +565,17 @@ public class RollupIndexerStateTests extends ESTestCase { assertThat(indexer.getStats().getNumPages(), equalTo(0L)); assertFalse(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testMultipleJobTriggering() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { final AtomicBoolean isAborted = new AtomicBoolean(false); - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { + DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) { @Override protected void onAbort() { isAborted.set(true); @@ -601,7 +602,7 @@ public class RollupIndexerStateTests extends ESTestCase { assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STOPPED))); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -688,10 +689,10 @@ public class RollupIndexerStateTests extends ESTestCase { } }; - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, + NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null, searchFunction, bulkFunction, failureConsumer, stateCheck); final CountDownLatch latch = indexer.newLatch(1); indexer.start(); @@ -713,7 +714,7 @@ public class RollupIndexerStateTests extends ESTestCase { assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -800,10 +801,10 @@ public class RollupIndexerStateTests extends ESTestCase { isFinished.set(true); }; - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, + NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null, searchFunction, bulkFunction, failureConsumer, doSaveStateCheck); final CountDownLatch latch = indexer.newLatch(1); indexer.start(); @@ -824,7 +825,7 @@ public class RollupIndexerStateTests extends ESTestCase { assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -848,10 +849,10 @@ public class RollupIndexerStateTests extends ESTestCase { } }; - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, + NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null, searchFunction, bulkFunction, failureConsumer, stateCheck); final CountDownLatch latch = indexer.newLatch(1); indexer.start(); @@ -873,7 +874,7 @@ public class RollupIndexerStateTests extends ESTestCase { assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -961,10 +962,10 @@ public class RollupIndexerStateTests extends ESTestCase { } }; - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, + NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null, searchFunction, bulkFunction, failureConsumer, stateCheck) { @Override protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { @@ -991,7 +992,7 @@ public class RollupIndexerStateTests extends ESTestCase { assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 6e247689ad5..35651e49ae2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -22,6 +22,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -42,7 +43,6 @@ import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -56,7 +56,8 @@ class ClientTransformIndexer extends TransformIndexer { private final AtomicReference seqNoPrimaryTermAndIndex; ClientTransformIndexer( - Executor executor, + ThreadPool threadPool, + String executorName, TransformConfigManager transformsConfigManager, CheckpointProvider checkpointProvider, TransformProgressGatherer progressGatherer, @@ -75,7 +76,8 @@ class ClientTransformIndexer extends TransformIndexer { boolean shouldStopAtCheckpoint ) { super( - ExceptionsHelper.requireNonNull(executor, "executor"), + ExceptionsHelper.requireNonNull(threadPool, "threadPool"), + executorName, transformsConfigManager, checkpointProvider, progressGatherer, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java index d7c699b5870..c3739dfd761 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.transform.transforms; import org.elasticsearch.client.ParentTaskAssigningClient; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -20,7 +21,6 @@ import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; class ClientTransformIndexerBuilder { @@ -43,11 +43,12 @@ class ClientTransformIndexerBuilder { this.initialStats = new TransformIndexerStats(); } - ClientTransformIndexer build(Executor executor, TransformContext context) { + ClientTransformIndexer build(ThreadPool threadPool, String executorName, TransformContext context) { CheckpointProvider checkpointProvider = transformsCheckpointService.getCheckpointProvider(parentTaskClient, transformConfig); return new ClientTransformIndexer( - executor, + threadPool, + executorName, transformsConfigManager, checkpointProvider, new TransformProgressGatherer(parentTaskClient), diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 80b9c9ca1dd..e46626aa8b5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; @@ -50,7 +51,6 @@ import java.time.Instant; import java.util.Collections; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -114,7 +114,8 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer listener) { - long now = getTime(); + long now = getTimeNanos() * 1000; long checkpointLowerBound = context.getCheckpoint() - NUMBER_OF_CHECKPOINTS_TO_KEEP; long lowerBoundEpochMs = now - RETENTION_OF_CHECKPOINTS_MS; @@ -704,7 +705,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer failureConsumer ) { super( - executor, + threadPool, + executorName, transformsConfigManager, checkpointProvider, progressGatherer, @@ -216,11 +218,13 @@ public class TransformIndexerTests extends ESTestCase { @Before public void setUpMocks() { client = new NoOpClient(getTestName()); + threadPool = new TestThreadPool(getTestName()); } @After public void tearDownClient() { client.close(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } public void testPageSizeAdapt() throws Exception { @@ -248,8 +252,6 @@ public class TransformIndexerTests extends ESTestCase { Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - final ExecutorService executor = Executors.newFixedThreadPool(1); - try { TransformAuditor auditor = new TransformAuditor(client, "node_1"); TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); @@ -259,7 +261,8 @@ public class TransformIndexerTests extends ESTestCase { searchFunction, bulkFunction, null, - executor, + threadPool, + ThreadPool.Names.GENERIC, auditor, context ); @@ -289,10 +292,6 @@ public class TransformIndexerTests extends ESTestCase { // assert that page size has been reduced again assertThat(pageSizeAfterFirstReduction, greaterThan((long) indexer.getPageSize())); assertThat(pageSizeAfterFirstReduction, greaterThan((long) TransformIndexer.MINIMUM_PAGE_SIZE)); - - } finally { - executor.shutdownNow(); - } } public void testDoProcessAggNullCheck() { @@ -330,8 +329,6 @@ public class TransformIndexerTests extends ESTestCase { Function searchFunction = searchRequest -> searchResponse; Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - final ExecutorService executor = Executors.newFixedThreadPool(1); - try { TransformAuditor auditor = mock(TransformAuditor.class); TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); @@ -341,7 +338,8 @@ public class TransformIndexerTests extends ESTestCase { searchFunction, bulkFunction, null, - executor, + threadPool, + ThreadPool.Names.GENERIC, auditor, context ); @@ -351,9 +349,6 @@ public class TransformIndexerTests extends ESTestCase { assertThat(newPosition.getPosition(), is(nullValue())); assertThat(newPosition.isDone(), is(true)); verify(auditor, times(1)).info(anyString(), anyString()); - } finally { - executor.shutdownNow(); - } } public void testScriptError() throws Exception { @@ -397,8 +392,6 @@ public class TransformIndexerTests extends ESTestCase { failureMessage.compareAndSet(null, message); }; - final ExecutorService executor = Executors.newFixedThreadPool(1); - try { MockTransformAuditor auditor = new MockTransformAuditor(); TransformContext.Listener contextListener = mock(TransformContext.Listener.class); TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); @@ -409,7 +402,8 @@ public class TransformIndexerTests extends ESTestCase { searchFunction, bulkFunction, failureConsumer, - executor, + threadPool, + ThreadPool.Names.GENERIC, auditor, context ); @@ -433,9 +427,6 @@ public class TransformIndexerTests extends ESTestCase { failureMessage.get(), matchesRegex("Failed to execute script with error: \\[.*ArithmeticException: / by zero\\], stack trace: \\[stack\\]") ); - } finally { - executor.shutdownNow(); - } } private MockedTransformIndexer createMockIndexer( @@ -444,12 +435,14 @@ public class TransformIndexerTests extends ESTestCase { Function searchFunction, Function bulkFunction, Consumer failureConsumer, - final ExecutorService executor, + ThreadPool threadPool, + String executorName, TransformAuditor auditor, TransformContext context ) { return new MockedTransformIndexer( - executor, + threadPool, + executorName, mock(IndexBasedTransformConfigManager.class), mock(CheckpointProvider.class), new TransformProgressGatherer(client),