Adjust DWPT pool concurrency to the number of cores. (#12216)

After upgrading Elasticsearch to a recent Lucene snapshot, we observed a few
indexing slowdowns when indexing with low numbers of cores. This appears to be
due to the fact that we lost too much of the bias towards larger DWPTs in
apache/lucene#12199. This change tries to add back more ordering by adjusting
the concurrency of `DWPTPool` to the number of cores that are available on the
local node.
This commit is contained in:
Adrien Grand 2023-03-31 15:07:48 +02:00 committed by GitHub
parent 172dfaf867
commit 56e65919b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 22 deletions

View File

@ -22,25 +22,49 @@ import java.util.function.Predicate;
/**
* Concurrent version of {@link ApproximatePriorityQueue}, which trades a bit more of ordering for
* better concurrency by maintaining 8 sub {@link ApproximatePriorityQueue}s that are locked
* independently.
* better concurrency by maintaining multiple sub {@link ApproximatePriorityQueue}s that are locked
* independently. The number of subs is computed dynamically based on hardware concurrency.
*/
final class ConcurrentApproximatePriorityQueue<T> {
/** Keeping 8 queues should already help a lot compared to a single one. */
static final int CONCURRENCY = 8;
static final int MIN_CONCURRENCY = 1;
static final int MAX_CONCURRENCY = 256;
private static final int MASK = 0x07;
private static final int getConcurrency() {
int coreCount = Runtime.getRuntime().availableProcessors();
// Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is
// that if we set the concurrency too high then we'll completely lose the bias towards larger
// DWPTs. And if we set it too low then we risk seeing contention.
int concurrency = coreCount / 4;
concurrency = Math.max(MIN_CONCURRENCY, concurrency);
concurrency = Math.min(MAX_CONCURRENCY, concurrency);
return concurrency;
}
final int concurrency;
final Lock[] locks;
final ApproximatePriorityQueue<T>[] queues;
ConcurrentApproximatePriorityQueue() {
locks = new Lock[CONCURRENCY];
this(getConcurrency());
}
ConcurrentApproximatePriorityQueue(int concurrency) {
if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) {
throw new IllegalArgumentException(
"concurrency must be in ["
+ MIN_CONCURRENCY
+ ", "
+ MAX_CONCURRENCY
+ "], got "
+ concurrency);
}
this.concurrency = concurrency;
locks = new Lock[concurrency];
@SuppressWarnings({"rawtypes", "unchecked"})
ApproximatePriorityQueue<T>[] queues = new ApproximatePriorityQueue[CONCURRENCY];
ApproximatePriorityQueue<T>[] queues = new ApproximatePriorityQueue[concurrency];
this.queues = queues;
for (int i = 0; i < CONCURRENCY; ++i) {
for (int i = 0; i < concurrency; ++i) {
locks[i] = new ReentrantLock();
queues[i] = new ApproximatePriorityQueue<>();
}
@ -50,9 +74,9 @@ final class ConcurrentApproximatePriorityQueue<T> {
// Seed the order in which to look at entries based on the current thread. This helps distribute
// entries across queues and gives a bit of thread affinity between entries and threads, which
// can't hurt.
final int threadHash = Thread.currentThread().hashCode();
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
if (lock.tryLock()) {
@ -64,7 +88,7 @@ final class ConcurrentApproximatePriorityQueue<T> {
}
}
}
final int index = threadHash & MASK;
final int index = threadHash % concurrency;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
lock.lock();
@ -76,9 +100,9 @@ final class ConcurrentApproximatePriorityQueue<T> {
}
T poll(Predicate<T> predicate) {
final int threadHash = Thread.currentThread().hashCode();
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
if (lock.tryLock()) {
@ -92,8 +116,8 @@ final class ConcurrentApproximatePriorityQueue<T> {
}
}
}
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
lock.lock();
@ -117,7 +141,7 @@ final class ConcurrentApproximatePriorityQueue<T> {
throw new AssertionError("contains should only be used for assertions");
}
for (int i = 0; i < CONCURRENCY; ++i) {
for (int i = 0; i < concurrency; ++i) {
final Lock lock = locks[i];
final ApproximatePriorityQueue<T> queue = queues[i];
lock.lock();
@ -133,7 +157,7 @@ final class ConcurrentApproximatePriorityQueue<T> {
}
boolean remove(Object o) {
for (int i = 0; i < CONCURRENCY; ++i) {
for (int i = 0; i < concurrency; ++i) {
final Lock lock = locks[i];
final ApproximatePriorityQueue<T> queue = queues[i];
lock.lock();

View File

@ -18,12 +18,18 @@ package org.apache.lucene.index;
import java.util.concurrent.CountDownLatch;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.ThreadInterruptedException;
public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase {
public void testPollFromSameThread() {
ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
ConcurrentApproximatePriorityQueue<Integer> pq =
new ConcurrentApproximatePriorityQueue<>(
TestUtil.nextInt(
random(),
ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY,
ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY));
pq.add(3, 3);
pq.add(10, 10);
pq.add(7, 7);
@ -34,7 +40,12 @@ public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase {
}
public void testPollFromDifferentThread() throws Exception {
ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
ConcurrentApproximatePriorityQueue<Integer> pq =
new ConcurrentApproximatePriorityQueue<>(
TestUtil.nextInt(
random(),
ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY,
ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY));
pq.add(3, 3);
pq.add(10, 10);
pq.add(7, 7);
@ -53,7 +64,10 @@ public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase {
}
public void testCurrentLockIsBusy() throws Exception {
ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
// This test needs a concurrency of 2 or more.
ConcurrentApproximatePriorityQueue<Integer> pq =
new ConcurrentApproximatePriorityQueue<>(
TestUtil.nextInt(random(), 2, ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY));
pq.add(3, 3);
CountDownLatch takeLock = new CountDownLatch(1);
CountDownLatch releaseLock = new CountDownLatch(1);