diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 98664a56655..690ec48bcc6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -85,7 +85,6 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPoolStats; import org.junit.Assert; import java.io.IOException; @@ -102,6 +101,8 @@ import java.util.Locale; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Phaser; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -772,20 +773,28 @@ public class IndexShardIT extends ESSingleNodeTestCase { // wait for both to ensure we don't have in-flight operations updateSettingsLatch.await(); refreshLatch.await(); - // ensure no scheduled refresh to compete with the scheduleRefresh we are going to verify. - assertBusy(() -> { - for (ThreadPoolStats.Stats stat : indexService.getThreadPool().stats()) { - if (stat.getName().equals(ThreadPool.Names.REFRESH) && (stat.getQueue() > 0 || stat.getActive() > 0)) { - throw new AssertionError(); // cause assert busy to retry - } - } - }); + // We need to ensure a `scheduledRefresh` triggered by the internal refresh setting update is executed before we index a new doc; + // otherwise, it will compete to call `Engine#maybeRefresh` with the `scheduledRefresh` that we are going to verify. + ensureNoPendingScheduledRefresh(indexService.getThreadPool()); client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); assertTrue(shard.scheduledRefresh()); assertTrue(shard.isSearchIdle()); assertHitCount(client().prepareSearch().get(), 3); } + private void ensureNoPendingScheduledRefresh(ThreadPool threadPool) { + // We can make sure that all scheduled refresh tasks are done by submitting *maximumPoolSize* blocking tasks, + // then wait until all of them completed. Note that using ThreadPoolStats is not watertight as both queue and + // active count can be 0 when ThreadPoolExecutor just takes a task out the queue but before marking it active. + ThreadPoolExecutor refreshThreadPoolExecutor = (ThreadPoolExecutor) threadPool.executor(ThreadPool.Names.REFRESH); + int maximumPoolSize = refreshThreadPoolExecutor.getMaximumPoolSize(); + Phaser barrier = new Phaser(maximumPoolSize + 1); + for (int i = 0; i < maximumPoolSize; i++) { + refreshThreadPoolExecutor.execute(barrier::arriveAndAwaitAdvance); + } + barrier.arriveAndAwaitAdvance(); + } + public void testGlobalCheckpointListeners() throws Exception { createIndex("test", Settings.builder() .put("index.number_of_shards", 1)