Fix RollupJobTaskTests Leaking Threads on Slowness (#56438) (#56518)

We are ensuring order in the two tests changed by waiting on latches.
The problem is, that 3s is a pretty short wait and on CI can randomly be exceeded
by pure chance. If that happened we wouldn't have visibility on it since we didn't
assert that the waits actually worked.
=> Fixed by asserting that the waits work and upping the timeout to our standard 10s
Also, moved to a per-test threadpool to make it simpler to identify which test failed,
should an unexpected task run on a closed client's pool afterall.
This commit is contained in:
Armin Braun 2020-05-11 17:24:10 +02:00 committed by GitHub
parent 7ce0a25fbc
commit 3ab6eba6bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 28 additions and 20 deletions

View File

@ -33,7 +33,8 @@ import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.rollup.job.RollupJob;
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.junit.AfterClass; import org.junit.After;
import org.junit.Before;
import java.time.Clock; import java.time.Clock;
import java.util.Collections; import java.util.Collections;
@ -56,14 +57,17 @@ public class RollupJobTaskTests extends ESTestCase {
private static final Settings SETTINGS = Settings.builder() private static final Settings SETTINGS = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), "test") .put(Node.NODE_NAME_SETTING.getKey(), "test")
.build(); .build();
private static ThreadPool pool = new TestThreadPool("test");
@AfterClass private ThreadPool pool;
public static void stopThreadPool() {
if (pool != null) { @Before
pool.shutdownNow(); public void createThreadPool() {
pool = null; pool = new TestThreadPool("test");
} }
@After
public void stopThreadPool() {
assertThat(ThreadPool.terminate(pool, 10L, TimeUnit.SECONDS), equalTo(true));
} }
public void testInitialStatusStopped() { public void testInitialStatusStopped() {
@ -249,14 +253,14 @@ public class RollupJobTaskTests extends ESTestCase {
fail("Should not have entered onFailure"); fail("Should not have entered onFailure");
} }
}); });
latch.await(3, TimeUnit.SECONDS); assertUnblockIn10s(latch);
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L)); assertThat(task.getStats().getNumInvocations(), equalTo(1L));
// wait until the search request is send, this is unblocked in the client // wait until the search request is send, this is unblocked in the client
block.await(3, TimeUnit.SECONDS); assertUnblockIn10s(block);
task.stop(new ActionListener<StopRollupJobAction.Response>() { task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override @Override
public void onResponse(StopRollupJobAction.Response response) { public void onResponse(StopRollupJobAction.Response response) {
@ -286,7 +290,7 @@ public class RollupJobTaskTests extends ESTestCase {
latch2.countDown(); latch2.countDown();
} }
}); });
latch2.await(3, TimeUnit.SECONDS); assertUnblockIn10s(latch2);
// the the client answer // the the client answer
unblock.countDown(); unblock.countDown();
@ -762,14 +766,14 @@ public class RollupJobTaskTests extends ESTestCase {
fail("Should not have entered onFailure"); fail("Should not have entered onFailure");
} }
}); });
latch.await(3, TimeUnit.SECONDS); assertUnblockIn10s(latch);
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L)); assertThat(task.getStats().getNumInvocations(), equalTo(1L));
// wait until the search request is send, this is unblocked in the client // wait until the search request is send, this is unblocked in the client
block.await(3, TimeUnit.SECONDS); assertUnblockIn10s(block);
task.stop(new ActionListener<StopRollupJobAction.Response>() { task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override @Override
@ -799,7 +803,7 @@ public class RollupJobTaskTests extends ESTestCase {
fail("Should not have entered onFailure"); fail("Should not have entered onFailure");
} }
}); });
latch2.await(3, TimeUnit.SECONDS); assertUnblockIn10s(latch2);
unblock.countDown(); unblock.countDown();
} }
} }
@ -846,18 +850,22 @@ public class RollupJobTaskTests extends ESTestCase {
latch.await(3, TimeUnit.SECONDS); latch.await(3, TimeUnit.SECONDS);
} }
private static void assertUnblockIn10s(CountDownLatch latch) {
try {
assertThat(latch.await(10, TimeUnit.SECONDS), equalTo(true));
} catch (InterruptedException e) {
throw new AssertionError("Should not have been interrupted", e);
}
}
private NoOpClient getEmptySearchResponseClient(CountDownLatch unblock, CountDownLatch block) { private NoOpClient getEmptySearchResponseClient(CountDownLatch unblock, CountDownLatch block) {
return new NoOpClient(getTestName()) { return new NoOpClient(getTestName()) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
protected <Request extends ActionRequest, Response extends ActionResponse> protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) { void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
try { unblock.countDown();
unblock.countDown(); assertUnblockIn10s(block);
block.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
fail("Should not have timed out");
}
listener.onResponse((Response) mock(SearchResponse.class)); listener.onResponse((Response) mock(SearchResponse.class));
} }
}; };