Reduce allocation rate in HNSW concurrent merge (backport of #14011) (#14037)

This commit is contained in:
Viliam Durina 2024-12-04 21:07:48 +01:00 committed by GitHub
parent 05ed6ebd3b
commit 173a799fe6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 24 additions and 24 deletions

View File

@ -106,6 +106,8 @@ Optimizations
* GITHUB#14031: Ensure Panama float vector distance impls inlinable. * GITHUB#14031: Ensure Panama float vector distance impls inlinable.
(Robert Muir, Chris Hegarty) (Robert Muir, Chris Hegarty)
* GITHUB#14011: Reduce allocation rate in HNSW concurrent merge. (Viliam Durina)
Bug Fixes Bug Fixes
--------------------- ---------------------
* GITHUB#13832: Fixed an issue where the DefaultPassageFormatter.format method did not format passages as intended * GITHUB#13832: Fixed an issue where the DefaultPassageFormatter.format method did not format passages as intended

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import org.apache.lucene.search.TaskExecutor; import org.apache.lucene.search.TaskExecutor;
import org.apache.lucene.util.BitSet; import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.FixedBitSet;
@ -56,7 +57,7 @@ public class HnswConcurrentMergeBuilder implements HnswBuilder {
this.taskExecutor = taskExecutor; this.taskExecutor = taskExecutor;
AtomicInteger workProgress = new AtomicInteger(0); AtomicInteger workProgress = new AtomicInteger(0);
workers = new ConcurrentMergeWorker[numWorker]; workers = new ConcurrentMergeWorker[numWorker];
hnswLock = new HnswLock(hnsw); hnswLock = new HnswLock();
for (int i = 0; i < numWorker; i++) { for (int i = 0; i < numWorker; i++) {
workers[i] = workers[i] =
new ConcurrentMergeWorker( new ConcurrentMergeWorker(
@ -221,13 +222,16 @@ public class HnswConcurrentMergeBuilder implements HnswBuilder {
@Override @Override
void graphSeek(HnswGraph graph, int level, int targetNode) { void graphSeek(HnswGraph graph, int level, int targetNode) {
try (HnswLock.LockedRow rowLock = hnswLock.read(level, targetNode)) { Lock lock = hnswLock.read(level, targetNode);
NeighborArray neighborArray = rowLock.row(); try {
NeighborArray neighborArray = ((OnHeapHnswGraph) graph).getNeighbors(level, targetNode);
if (nodeBuffer == null || nodeBuffer.length < neighborArray.size()) { if (nodeBuffer == null || nodeBuffer.length < neighborArray.size()) {
nodeBuffer = new int[neighborArray.size()]; nodeBuffer = new int[neighborArray.size()];
} }
size = neighborArray.size(); size = neighborArray.size();
if (size >= 0) System.arraycopy(neighborArray.nodes(), 0, nodeBuffer, 0, size); System.arraycopy(neighborArray.nodes(), 0, nodeBuffer, 0, size);
} finally {
lock.unlock();
} }
upto = -1; upto = -1;
} }

View File

@ -27,6 +27,7 @@ import java.util.Locale;
import java.util.Objects; import java.util.Objects;
import java.util.SplittableRandom; import java.util.SplittableRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import org.apache.lucene.search.KnnCollector; import org.apache.lucene.search.KnnCollector;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.FixedBitSet;
@ -338,9 +339,12 @@ public class HnswGraphBuilder implements HnswBuilder {
} }
int nbr = candidates.nodes()[i]; int nbr = candidates.nodes()[i];
if (hnswLock != null) { if (hnswLock != null) {
try (HnswLock.LockedRow rowLock = hnswLock.write(level, nbr)) { Lock lock = hnswLock.write(level, nbr);
NeighborArray nbrsOfNbr = rowLock.row(); try {
NeighborArray nbrsOfNbr = getGraph().getNeighbors(level, nbr);
nbrsOfNbr.addAndEnsureDiversity(node, candidates.scores()[i], nbr, scorerSupplier); nbrsOfNbr.addAndEnsureDiversity(node, candidates.scores()[i], nbr, scorerSupplier);
} finally {
lock.unlock();
} }
} else { } else {
NeighborArray nbrsOfNbr = hnsw.getNeighbors(level, nbr); NeighborArray nbrsOfNbr = hnsw.getNeighbors(level, nbr);

View File

@ -17,49 +17,39 @@
package org.apache.lucene.util.hnsw; package org.apache.lucene.util.hnsw;
import java.io.Closeable;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* Provide (read-and-write) locked access to rows of an OnHeapHnswGraph. For use by * Provide (read-and-write) striped locks for access to nodes of an {@link OnHeapHnswGraph}. For use
* HnswConcurrentMerger and its HnswGraphBuilders. * by {@link HnswConcurrentMergeBuilder} and its HnswGraphBuilders.
*/ */
final class HnswLock { final class HnswLock {
private static final int NUM_LOCKS = 512; private static final int NUM_LOCKS = 512;
private final ReentrantReadWriteLock[] locks; private final ReentrantReadWriteLock[] locks;
private final OnHeapHnswGraph graph;
HnswLock(OnHeapHnswGraph graph) { HnswLock() {
this.graph = graph;
locks = new ReentrantReadWriteLock[NUM_LOCKS]; locks = new ReentrantReadWriteLock[NUM_LOCKS];
for (int i = 0; i < NUM_LOCKS; i++) { for (int i = 0; i < NUM_LOCKS; i++) {
locks[i] = new ReentrantReadWriteLock(); locks[i] = new ReentrantReadWriteLock();
} }
} }
LockedRow read(int level, int node) { Lock read(int level, int node) {
int lockid = hash(level, node) % NUM_LOCKS; int lockid = hash(level, node) % NUM_LOCKS;
Lock lock = locks[lockid].readLock(); Lock lock = locks[lockid].readLock();
lock.lock(); lock.lock();
return new LockedRow(graph.getNeighbors(level, node), lock); return lock;
} }
LockedRow write(int level, int node) { Lock write(int level, int node) {
int lockid = hash(level, node) % NUM_LOCKS; int lockid = hash(level, node) % NUM_LOCKS;
Lock lock = locks[lockid].writeLock(); Lock lock = locks[lockid].writeLock();
lock.lock(); lock.lock();
return new LockedRow(graph.getNeighbors(level, node), lock); return lock;
} }
record LockedRow(NeighborArray row, Lock lock) implements Closeable { private static int hash(int v1, int v2) {
@Override
public void close() {
lock.unlock();
}
}
static int hash(int v1, int v2) {
return v1 * 31 + v2; return v1 * 31 + v2;
} }
} }