diff --git a/server/src/test/java/org/elasticsearch/node/NodeTests.java b/server/src/test/java/org/elasticsearch/node/NodeTests.java index a5653eb88e1..777f4eb13ee 100644 --- a/server/src/test/java/org/elasticsearch/node/NodeTests.java +++ b/server/src/test/java/org/elasticsearch/node/NodeTests.java @@ -50,7 +50,6 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42577") @LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") public class NodeTests extends ESTestCase { @@ -154,35 +153,87 @@ public class NodeTests extends ESTestCase { node.start(); ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); AtomicBoolean shouldRun = new AtomicBoolean(true); + final CountDownLatch threadRunning = new CountDownLatch(1); threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + threadRunning.countDown(); while (shouldRun.get()); }); + threadRunning.await(); node.close(); shouldRun.set(false); assertTrue(node.awaitClose(1, TimeUnit.DAYS)); } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42577") + public void testCloseRaceWithTaskExecution() throws Exception { + Node node = new MockNode(baseSettings().build(), basePlugins()); + node.start(); + ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); + AtomicBoolean shouldRun = new AtomicBoolean(true); + final CountDownLatch running = new CountDownLatch(3); + Thread submitThread = new Thread(() -> { + running.countDown(); + try { + running.await(); + } catch (InterruptedException e) { + throw new AssertionError("interrupted while waiting", e); + } + threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + while (shouldRun.get()); + }); + }); + Thread closeThread = new Thread(() -> { + running.countDown(); + try { + running.await(); + } catch (InterruptedException e) { + throw new AssertionError("interrupted while waiting", e); + } + try { + node.close(); + } catch (IOException e) { + throw new AssertionError("node close failed", e); + } + }); + submitThread.start(); + closeThread.start(); + running.countDown(); + running.await(); + + submitThread.join(); + closeThread.join(); + + shouldRun.set(false); + assertTrue(node.awaitClose(1, TimeUnit.DAYS)); + } + public void testAwaitCloseTimeoutsOnNonInterruptibleTask() throws Exception { Node node = new MockNode(baseSettings().build(), basePlugins()); node.start(); ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); AtomicBoolean shouldRun = new AtomicBoolean(true); + final CountDownLatch threadRunning = new CountDownLatch(1); threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + threadRunning.countDown(); while (shouldRun.get()); }); + threadRunning.await(); node.close(); assertFalse(node.awaitClose(0, TimeUnit.MILLISECONDS)); shouldRun.set(false); + assertTrue(node.awaitClose(1, TimeUnit.DAYS)); } public void testCloseOnInterruptibleTask() throws Exception { Node node = new MockNode(baseSettings().build(), basePlugins()); node.start(); ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); - CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch threadRunning = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch finishLatch = new CountDownLatch(1); final AtomicBoolean interrupted = new AtomicBoolean(false); threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + threadRunning.countDown(); try { latch.await(); } catch (InterruptedException e) { @@ -192,6 +243,7 @@ public class NodeTests extends ESTestCase { finishLatch.countDown(); } }); + threadRunning.await(); node.close(); // close should not interrput ongoing tasks assertFalse(interrupted.get());