diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java index 4c8375e759d..8c0d591085d 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java @@ -33,7 +33,8 @@ import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; -import org.junit.AfterClass; +import org.junit.After; +import org.junit.Before; import java.time.Clock; import java.util.Collections; @@ -56,14 +57,17 @@ public class RollupJobTaskTests extends ESTestCase { private static final Settings SETTINGS = Settings.builder() .put(Node.NODE_NAME_SETTING.getKey(), "test") .build(); - private static ThreadPool pool = new TestThreadPool("test"); - @AfterClass - public static void stopThreadPool() { - if (pool != null) { - pool.shutdownNow(); - pool = null; - } + private ThreadPool pool; + + @Before + public void createThreadPool() { + pool = new TestThreadPool("test"); + } + + @After + public void stopThreadPool() { + assertThat(ThreadPool.terminate(pool, 10L, TimeUnit.SECONDS), equalTo(true)); } public void testInitialStatusStopped() { @@ -249,14 +253,14 @@ public class RollupJobTaskTests extends ESTestCase { fail("Should not have entered onFailure"); } }); - latch.await(3, TimeUnit.SECONDS); + assertUnblockIn10s(latch); task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); assertThat(task.getStats().getNumInvocations(), equalTo(1L)); // wait until the search request is send, this is unblocked in the client - block.await(3, TimeUnit.SECONDS); + assertUnblockIn10s(block); task.stop(new ActionListener() { @Override public void onResponse(StopRollupJobAction.Response response) { @@ -286,7 +290,7 @@ public class RollupJobTaskTests extends ESTestCase { latch2.countDown(); } }); - latch2.await(3, TimeUnit.SECONDS); + assertUnblockIn10s(latch2); // the the client answer unblock.countDown(); @@ -762,14 +766,14 @@ public class RollupJobTaskTests extends ESTestCase { fail("Should not have entered onFailure"); } }); - latch.await(3, TimeUnit.SECONDS); + assertUnblockIn10s(latch); task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); assertThat(task.getStats().getNumInvocations(), equalTo(1L)); // wait until the search request is send, this is unblocked in the client - block.await(3, TimeUnit.SECONDS); + assertUnblockIn10s(block); task.stop(new ActionListener() { @Override @@ -799,7 +803,7 @@ public class RollupJobTaskTests extends ESTestCase { fail("Should not have entered onFailure"); } }); - latch2.await(3, TimeUnit.SECONDS); + assertUnblockIn10s(latch2); unblock.countDown(); } } @@ -846,18 +850,22 @@ public class RollupJobTaskTests extends ESTestCase { latch.await(3, TimeUnit.SECONDS); } + private static void assertUnblockIn10s(CountDownLatch latch) { + try { + assertThat(latch.await(10, TimeUnit.SECONDS), equalTo(true)); + } catch (InterruptedException e) { + throw new AssertionError("Should not have been interrupted", e); + } + } + private NoOpClient getEmptySearchResponseClient(CountDownLatch unblock, CountDownLatch block) { return new NoOpClient(getTestName()) { @SuppressWarnings("unchecked") @Override protected void doExecute(ActionType action, Request request, ActionListener listener) { - try { - unblock.countDown(); - block.await(3, TimeUnit.SECONDS); - } catch (InterruptedException e) { - fail("Should not have timed out"); - } + unblock.countDown(); + assertUnblockIn10s(block); listener.onResponse((Response) mock(SearchResponse.class)); } };