Ensure threads running before closing node (#43240)

There are a few tests within NodeTests that submit items to the
threadpool and then close the node. The tests are designed to check
how running tasks are affected during node close. These tests can cause
CI failures since the submitted tasks may not be running when the node
is closed and then execute after the thread context is closed, which
triggers an unexpected exception. This change ensures the threads are
running so we avoid the unexpected exception and can test these cases.

The test of task submittal while a node is closing is also important so
an additional but muted test has been added that tests the case where a
task may be getting submitted while the node is closing and ensuring we
do not trigger anything unexpected in these cases.

Relates #42774
Relates #42577
This commit is contained in:
Jay Modi 2019-06-14 12:29:43 -06:00 committed by jaymode
parent 4b1d8e4433
commit c3f1e6a542
1 changed files with 54 additions and 2 deletions

View File

@ -50,7 +50,6 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42577")
@LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") @LuceneTestCase.SuppressFileSystems(value = "ExtrasFS")
public class NodeTests extends ESTestCase { public class NodeTests extends ESTestCase {
@ -154,10 +153,56 @@ public class NodeTests extends ESTestCase {
node.start(); node.start();
ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); ThreadPool threadpool = node.injector().getInstance(ThreadPool.class);
AtomicBoolean shouldRun = new AtomicBoolean(true); AtomicBoolean shouldRun = new AtomicBoolean(true);
final CountDownLatch threadRunning = new CountDownLatch(1);
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
threadRunning.countDown();
while (shouldRun.get());
});
threadRunning.await();
node.close();
shouldRun.set(false);
assertTrue(node.awaitClose(1, TimeUnit.DAYS));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42577")
public void testCloseRaceWithTaskExecution() throws Exception {
Node node = new MockNode(baseSettings().build(), basePlugins());
node.start();
ThreadPool threadpool = node.injector().getInstance(ThreadPool.class);
AtomicBoolean shouldRun = new AtomicBoolean(true);
final CountDownLatch running = new CountDownLatch(3);
Thread submitThread = new Thread(() -> {
running.countDown();
try {
running.await();
} catch (InterruptedException e) {
throw new AssertionError("interrupted while waiting", e);
}
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
while (shouldRun.get()); while (shouldRun.get());
}); });
});
Thread closeThread = new Thread(() -> {
running.countDown();
try {
running.await();
} catch (InterruptedException e) {
throw new AssertionError("interrupted while waiting", e);
}
try {
node.close(); node.close();
} catch (IOException e) {
throw new AssertionError("node close failed", e);
}
});
submitThread.start();
closeThread.start();
running.countDown();
running.await();
submitThread.join();
closeThread.join();
shouldRun.set(false); shouldRun.set(false);
assertTrue(node.awaitClose(1, TimeUnit.DAYS)); assertTrue(node.awaitClose(1, TimeUnit.DAYS));
} }
@ -167,22 +212,28 @@ public class NodeTests extends ESTestCase {
node.start(); node.start();
ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); ThreadPool threadpool = node.injector().getInstance(ThreadPool.class);
AtomicBoolean shouldRun = new AtomicBoolean(true); AtomicBoolean shouldRun = new AtomicBoolean(true);
final CountDownLatch threadRunning = new CountDownLatch(1);
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
threadRunning.countDown();
while (shouldRun.get()); while (shouldRun.get());
}); });
threadRunning.await();
node.close(); node.close();
assertFalse(node.awaitClose(0, TimeUnit.MILLISECONDS)); assertFalse(node.awaitClose(0, TimeUnit.MILLISECONDS));
shouldRun.set(false); shouldRun.set(false);
assertTrue(node.awaitClose(1, TimeUnit.DAYS));
} }
public void testCloseOnInterruptibleTask() throws Exception { public void testCloseOnInterruptibleTask() throws Exception {
Node node = new MockNode(baseSettings().build(), basePlugins()); Node node = new MockNode(baseSettings().build(), basePlugins());
node.start(); node.start();
ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); ThreadPool threadpool = node.injector().getInstance(ThreadPool.class);
CountDownLatch latch = new CountDownLatch(1); final CountDownLatch threadRunning = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch finishLatch = new CountDownLatch(1); final CountDownLatch finishLatch = new CountDownLatch(1);
final AtomicBoolean interrupted = new AtomicBoolean(false); final AtomicBoolean interrupted = new AtomicBoolean(false);
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
threadRunning.countDown();
try { try {
latch.await(); latch.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -192,6 +243,7 @@ public class NodeTests extends ESTestCase {
finishLatch.countDown(); finishLatch.countDown();
} }
}); });
threadRunning.await();
node.close(); node.close();
// close should not interrput ongoing tasks // close should not interrput ongoing tasks
assertFalse(interrupted.get()); assertFalse(interrupted.get());