From c3f1e6a54230a11696df29d286e208a4b0034b5f Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Fri, 14 Jun 2019 12:29:43 -0600 Subject: [PATCH] Ensure threads running before closing node (#43240) There are a few tests within NodeTests that submit items to the threadpool and then close the node. The tests are designed to check how running tasks are affected during node close. These tests can cause CI failures since the submitted tasks may not be running when the node is closed and then execute after the thread context is closed, which triggers an unexpected exception. This change ensures the threads are running so we avoid the unexpected exception and can test these cases. The test of task submittal while a node is closing is also important so an additional but muted test has been added that tests the case where a task may be getting submitted while the node is closing and ensuring we do not trigger anything unexpected in these cases. Relates #42774 Relates #42577 --- .../org/elasticsearch/node/NodeTests.java | 56 ++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/node/NodeTests.java b/server/src/test/java/org/elasticsearch/node/NodeTests.java index a5653eb88e1..777f4eb13ee 100644 --- a/server/src/test/java/org/elasticsearch/node/NodeTests.java +++ b/server/src/test/java/org/elasticsearch/node/NodeTests.java @@ -50,7 +50,6 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42577") @LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") public class NodeTests extends ESTestCase { @@ -154,35 +153,87 @@ public class NodeTests extends ESTestCase { node.start(); ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); AtomicBoolean shouldRun = new AtomicBoolean(true); + final CountDownLatch threadRunning = new CountDownLatch(1); threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + threadRunning.countDown(); while (shouldRun.get()); }); + threadRunning.await(); node.close(); shouldRun.set(false); assertTrue(node.awaitClose(1, TimeUnit.DAYS)); } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42577") + public void testCloseRaceWithTaskExecution() throws Exception { + Node node = new MockNode(baseSettings().build(), basePlugins()); + node.start(); + ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); + AtomicBoolean shouldRun = new AtomicBoolean(true); + final CountDownLatch running = new CountDownLatch(3); + Thread submitThread = new Thread(() -> { + running.countDown(); + try { + running.await(); + } catch (InterruptedException e) { + throw new AssertionError("interrupted while waiting", e); + } + threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + while (shouldRun.get()); + }); + }); + Thread closeThread = new Thread(() -> { + running.countDown(); + try { + running.await(); + } catch (InterruptedException e) { + throw new AssertionError("interrupted while waiting", e); + } + try { + node.close(); + } catch (IOException e) { + throw new AssertionError("node close failed", e); + } + }); + submitThread.start(); + closeThread.start(); + running.countDown(); + running.await(); + + submitThread.join(); + closeThread.join(); + + shouldRun.set(false); + assertTrue(node.awaitClose(1, TimeUnit.DAYS)); + } + public void testAwaitCloseTimeoutsOnNonInterruptibleTask() throws Exception { Node node = new MockNode(baseSettings().build(), basePlugins()); node.start(); ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); AtomicBoolean shouldRun = new AtomicBoolean(true); + final CountDownLatch threadRunning = new CountDownLatch(1); threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + threadRunning.countDown(); while (shouldRun.get()); }); + threadRunning.await(); node.close(); assertFalse(node.awaitClose(0, TimeUnit.MILLISECONDS)); shouldRun.set(false); + assertTrue(node.awaitClose(1, TimeUnit.DAYS)); } public void testCloseOnInterruptibleTask() throws Exception { Node node = new MockNode(baseSettings().build(), basePlugins()); node.start(); ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); - CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch threadRunning = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch finishLatch = new CountDownLatch(1); final AtomicBoolean interrupted = new AtomicBoolean(false); threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + threadRunning.countDown(); try { latch.await(); } catch (InterruptedException e) { @@ -192,6 +243,7 @@ public class NodeTests extends ESTestCase { finishLatch.countDown(); } }); + threadRunning.await(); node.close(); // close should not interrput ongoing tasks assertFalse(interrupted.get());