Tighten no pending scheduled refresh check (#45025)
Previously, we use ThreadPoolStats to ensure that the scheduledRefresh triggered by the internal refresh setting update is executed before we index a new document. With that change (#40387), this test did not fail for the last 3 months. However, using ThreadPoolStats is not entirely watertight as both "active" and "queue" count can be 0 in a very small interval when ThreadPoolExecutor pulls a task from the queue but before marking the corresponding worker as active (i.e., lock it). Closes #39565
This commit is contained in:
parent
c088bafbbc
commit
3a487379c3
|
@ -85,7 +85,6 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||||
import org.elasticsearch.test.IndexSettingsModule;
|
import org.elasticsearch.test.IndexSettingsModule;
|
||||||
import org.elasticsearch.test.InternalSettingsPlugin;
|
import org.elasticsearch.test.InternalSettingsPlugin;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.threadpool.ThreadPoolStats;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -102,6 +101,8 @@ import java.util.Locale;
|
||||||
import java.util.concurrent.BrokenBarrierException;
|
import java.util.concurrent.BrokenBarrierException;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.Phaser;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -772,20 +773,28 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
||||||
// wait for both to ensure we don't have in-flight operations
|
// wait for both to ensure we don't have in-flight operations
|
||||||
updateSettingsLatch.await();
|
updateSettingsLatch.await();
|
||||||
refreshLatch.await();
|
refreshLatch.await();
|
||||||
// ensure no scheduled refresh to compete with the scheduleRefresh we are going to verify.
|
// We need to ensure a `scheduledRefresh` triggered by the internal refresh setting update is executed before we index a new doc;
|
||||||
assertBusy(() -> {
|
// otherwise, it will compete to call `Engine#maybeRefresh` with the `scheduledRefresh` that we are going to verify.
|
||||||
for (ThreadPoolStats.Stats stat : indexService.getThreadPool().stats()) {
|
ensureNoPendingScheduledRefresh(indexService.getThreadPool());
|
||||||
if (stat.getName().equals(ThreadPool.Names.REFRESH) && (stat.getQueue() > 0 || stat.getActive() > 0)) {
|
|
||||||
throw new AssertionError(); // cause assert busy to retry
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
|
client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
|
||||||
assertTrue(shard.scheduledRefresh());
|
assertTrue(shard.scheduledRefresh());
|
||||||
assertTrue(shard.isSearchIdle());
|
assertTrue(shard.isSearchIdle());
|
||||||
assertHitCount(client().prepareSearch().get(), 3);
|
assertHitCount(client().prepareSearch().get(), 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void ensureNoPendingScheduledRefresh(ThreadPool threadPool) {
|
||||||
|
// We can make sure that all scheduled refresh tasks are done by submitting *maximumPoolSize* blocking tasks,
|
||||||
|
// then wait until all of them completed. Note that using ThreadPoolStats is not watertight as both queue and
|
||||||
|
// active count can be 0 when ThreadPoolExecutor just takes a task out the queue but before marking it active.
|
||||||
|
ThreadPoolExecutor refreshThreadPoolExecutor = (ThreadPoolExecutor) threadPool.executor(ThreadPool.Names.REFRESH);
|
||||||
|
int maximumPoolSize = refreshThreadPoolExecutor.getMaximumPoolSize();
|
||||||
|
Phaser barrier = new Phaser(maximumPoolSize + 1);
|
||||||
|
for (int i = 0; i < maximumPoolSize; i++) {
|
||||||
|
refreshThreadPoolExecutor.execute(barrier::arriveAndAwaitAdvance);
|
||||||
|
}
|
||||||
|
barrier.arriveAndAwaitAdvance();
|
||||||
|
}
|
||||||
|
|
||||||
public void testGlobalCheckpointListeners() throws Exception {
|
public void testGlobalCheckpointListeners() throws Exception {
|
||||||
createIndex("test", Settings.builder()
|
createIndex("test", Settings.builder()
|
||||||
.put("index.number_of_shards", 1)
|
.put("index.number_of_shards", 1)
|
||||||
|
|
Loading…
Reference in New Issue