From 56e65919b18dccb368d080cf9188080727bd63f1 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Fri, 31 Mar 2023 15:07:48 +0200 Subject: [PATCH] Adjust DWPT pool concurrency to the number of cores. (#12216) After upgrading Elasticsearch to a recent Lucene snapshot, we observed a few indexing slowdowns when indexing with low numbers of cores. This appears to be due to the fact that we lost too much of the bias towards larger DWPTs in apache/lucene#12199. This change tries to add back more ordering by adjusting the concurrency of `DWPTPool` to the number of cores that are available on the local node. --- .../ConcurrentApproximatePriorityQueue.java | 62 +++++++++++++------ ...estConcurrentApproximatePriorityQueue.java | 20 +++++- 2 files changed, 60 insertions(+), 22 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java index 036ba24c962..8a8fc72ab4c 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java @@ -22,25 +22,49 @@ import java.util.function.Predicate; /** * Concurrent version of {@link ApproximatePriorityQueue}, which trades a bit more of ordering for - * better concurrency by maintaining 8 sub {@link ApproximatePriorityQueue}s that are locked - * independently. + * better concurrency by maintaining multiple sub {@link ApproximatePriorityQueue}s that are locked + * independently. The number of subs is computed dynamically based on hardware concurrency. */ final class ConcurrentApproximatePriorityQueue { - /** Keeping 8 queues should already help a lot compared to a single one. */ - static final int CONCURRENCY = 8; + static final int MIN_CONCURRENCY = 1; + static final int MAX_CONCURRENCY = 256; - private static final int MASK = 0x07; + private static final int getConcurrency() { + int coreCount = Runtime.getRuntime().availableProcessors(); + // Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is + // that if we set the concurrency too high then we'll completely lose the bias towards larger + // DWPTs. And if we set it too low then we risk seeing contention. + int concurrency = coreCount / 4; + concurrency = Math.max(MIN_CONCURRENCY, concurrency); + concurrency = Math.min(MAX_CONCURRENCY, concurrency); + return concurrency; + } + final int concurrency; final Lock[] locks; final ApproximatePriorityQueue[] queues; ConcurrentApproximatePriorityQueue() { - locks = new Lock[CONCURRENCY]; + this(getConcurrency()); + } + + ConcurrentApproximatePriorityQueue(int concurrency) { + if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) { + throw new IllegalArgumentException( + "concurrency must be in [" + + MIN_CONCURRENCY + + ", " + + MAX_CONCURRENCY + + "], got " + + concurrency); + } + this.concurrency = concurrency; + locks = new Lock[concurrency]; @SuppressWarnings({"rawtypes", "unchecked"}) - ApproximatePriorityQueue[] queues = new ApproximatePriorityQueue[CONCURRENCY]; + ApproximatePriorityQueue[] queues = new ApproximatePriorityQueue[concurrency]; this.queues = queues; - for (int i = 0; i < CONCURRENCY; ++i) { + for (int i = 0; i < concurrency; ++i) { locks[i] = new ReentrantLock(); queues[i] = new ApproximatePriorityQueue<>(); } @@ -50,9 +74,9 @@ final class ConcurrentApproximatePriorityQueue { // Seed the order in which to look at entries based on the current thread. This helps distribute // entries across queues and gives a bit of thread affinity between entries and threads, which // can't hurt. - final int threadHash = Thread.currentThread().hashCode(); - for (int i = 0; i < CONCURRENCY; ++i) { - final int index = (threadHash + i) & MASK; + final int threadHash = Thread.currentThread().hashCode() & 0xFFFF; + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; if (lock.tryLock()) { @@ -64,7 +88,7 @@ final class ConcurrentApproximatePriorityQueue { } } } - final int index = threadHash & MASK; + final int index = threadHash % concurrency; final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; lock.lock(); @@ -76,9 +100,9 @@ final class ConcurrentApproximatePriorityQueue { } T poll(Predicate predicate) { - final int threadHash = Thread.currentThread().hashCode(); - for (int i = 0; i < CONCURRENCY; ++i) { - final int index = (threadHash + i) & MASK; + final int threadHash = Thread.currentThread().hashCode() & 0xFFFF; + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; if (lock.tryLock()) { @@ -92,8 +116,8 @@ final class ConcurrentApproximatePriorityQueue { } } } - for (int i = 0; i < CONCURRENCY; ++i) { - final int index = (threadHash + i) & MASK; + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; lock.lock(); @@ -117,7 +141,7 @@ final class ConcurrentApproximatePriorityQueue { throw new AssertionError("contains should only be used for assertions"); } - for (int i = 0; i < CONCURRENCY; ++i) { + for (int i = 0; i < concurrency; ++i) { final Lock lock = locks[i]; final ApproximatePriorityQueue queue = queues[i]; lock.lock(); @@ -133,7 +157,7 @@ final class ConcurrentApproximatePriorityQueue { } boolean remove(Object o) { - for (int i = 0; i < CONCURRENCY; ++i) { + for (int i = 0; i < concurrency; ++i) { final Lock lock = locks[i]; final ApproximatePriorityQueue queue = queues[i]; lock.lock(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java index 49584d9e697..2656e4a3885 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java @@ -18,12 +18,18 @@ package org.apache.lucene.index; import java.util.concurrent.CountDownLatch; import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.ThreadInterruptedException; public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase { public void testPollFromSameThread() { - ConcurrentApproximatePriorityQueue pq = new ConcurrentApproximatePriorityQueue<>(); + ConcurrentApproximatePriorityQueue pq = + new ConcurrentApproximatePriorityQueue<>( + TestUtil.nextInt( + random(), + ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY, + ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY)); pq.add(3, 3); pq.add(10, 10); pq.add(7, 7); @@ -34,7 +40,12 @@ public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase { } public void testPollFromDifferentThread() throws Exception { - ConcurrentApproximatePriorityQueue pq = new ConcurrentApproximatePriorityQueue<>(); + ConcurrentApproximatePriorityQueue pq = + new ConcurrentApproximatePriorityQueue<>( + TestUtil.nextInt( + random(), + ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY, + ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY)); pq.add(3, 3); pq.add(10, 10); pq.add(7, 7); @@ -53,7 +64,10 @@ public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase { } public void testCurrentLockIsBusy() throws Exception { - ConcurrentApproximatePriorityQueue pq = new ConcurrentApproximatePriorityQueue<>(); + // This test needs a concurrency of 2 or more. + ConcurrentApproximatePriorityQueue pq = + new ConcurrentApproximatePriorityQueue<>( + TestUtil.nextInt(random(), 2, ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY)); pq.add(3, 3); CountDownLatch takeLock = new CountDownLatch(1); CountDownLatch releaseLock = new CountDownLatch(1);