mirror of https://github.com/apache/lucene.git
Add new parallel merge task executor for parallel actions within a single merge action (#13190)
This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options. Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others. Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly. Relates to: https://github.com/apache/lucene/issues/12740 Relates to: https://github.com/apache/lucene/issues/9626 This is a take 2 of: https://github.com/apache/lucene/pull/13124
This commit is contained in:
parent
d5f8853fda
commit
75e1ebc450
|
@ -213,6 +213,10 @@ Improvements
|
|||
|
||||
* GITHUB#13066: Support getMaxScore of DisjunctionSumScorer for non top level scoring clause (Shintaro Murakami)
|
||||
|
||||
* GITHUB#13124: MergeScheduler can now provide an executor for intra-merge parallelism. The first
|
||||
implementation is the ConcurrentMergeScheduler and the Lucene99HnswVectorsFormat will use it if no other
|
||||
executor is provided. (Ben Trent)
|
||||
|
||||
Optimizations
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -24,6 +24,8 @@ import org.apache.lucene.codecs.KnnVectorsFormat;
|
|||
import org.apache.lucene.codecs.KnnVectorsReader;
|
||||
import org.apache.lucene.codecs.KnnVectorsWriter;
|
||||
import org.apache.lucene.codecs.lucene90.IndexedDISI;
|
||||
import org.apache.lucene.index.MergePolicy;
|
||||
import org.apache.lucene.index.MergeScheduler;
|
||||
import org.apache.lucene.index.SegmentReadState;
|
||||
import org.apache.lucene.index.SegmentWriteState;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
|
@ -163,7 +165,8 @@ public final class Lucene99HnswVectorsFormat extends KnnVectorsFormat {
|
|||
* @param numMergeWorkers number of workers (threads) that will be used when doing merge. If
|
||||
* larger than 1, a non-null {@link ExecutorService} must be passed as mergeExec
|
||||
* @param mergeExec the {@link ExecutorService} that will be used by ALL vector writers that are
|
||||
* generated by this format to do the merge
|
||||
* generated by this format to do the merge. If null, the configured {@link
|
||||
* MergeScheduler#getIntraMergeExecutor(MergePolicy.OneMerge)} is used.
|
||||
*/
|
||||
public Lucene99HnswVectorsFormat(
|
||||
int maxConn, int beamWidth, int numMergeWorkers, ExecutorService mergeExec) {
|
||||
|
@ -184,10 +187,6 @@ public final class Lucene99HnswVectorsFormat extends KnnVectorsFormat {
|
|||
}
|
||||
this.maxConn = maxConn;
|
||||
this.beamWidth = beamWidth;
|
||||
if (numMergeWorkers > 1 && mergeExec == null) {
|
||||
throw new IllegalArgumentException(
|
||||
"No executor service passed in when " + numMergeWorkers + " merge workers are requested");
|
||||
}
|
||||
if (numMergeWorkers == 1 && mergeExec != null) {
|
||||
throw new IllegalArgumentException(
|
||||
"No executor service is needed as we'll use single thread to merge");
|
||||
|
|
|
@ -352,7 +352,14 @@ public final class Lucene99HnswVectorsWriter extends KnnVectorsWriter {
|
|||
int[][] vectorIndexNodeOffsets = null;
|
||||
if (scorerSupplier.totalVectorCount() > 0) {
|
||||
// build graph
|
||||
HnswGraphMerger merger = createGraphMerger(fieldInfo, scorerSupplier);
|
||||
HnswGraphMerger merger =
|
||||
createGraphMerger(
|
||||
fieldInfo,
|
||||
scorerSupplier,
|
||||
mergeState.intraMergeTaskExecutor == null
|
||||
? null
|
||||
: new TaskExecutor(mergeState.intraMergeTaskExecutor),
|
||||
numMergeWorkers);
|
||||
for (int i = 0; i < mergeState.liveDocs.length; i++) {
|
||||
merger.addReader(
|
||||
mergeState.knnVectorsReaders[i], mergeState.docMaps[i], mergeState.liveDocs[i]);
|
||||
|
@ -489,11 +496,23 @@ public final class Lucene99HnswVectorsWriter extends KnnVectorsWriter {
|
|||
}
|
||||
|
||||
private HnswGraphMerger createGraphMerger(
|
||||
FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) {
|
||||
FieldInfo fieldInfo,
|
||||
RandomVectorScorerSupplier scorerSupplier,
|
||||
TaskExecutor parallelMergeTaskExecutor,
|
||||
int numParallelMergeWorkers) {
|
||||
if (mergeExec != null) {
|
||||
return new ConcurrentHnswMerger(
|
||||
fieldInfo, scorerSupplier, M, beamWidth, mergeExec, numMergeWorkers);
|
||||
}
|
||||
if (parallelMergeTaskExecutor != null) {
|
||||
return new ConcurrentHnswMerger(
|
||||
fieldInfo,
|
||||
scorerSupplier,
|
||||
M,
|
||||
beamWidth,
|
||||
parallelMergeTaskExecutor,
|
||||
numParallelMergeWorkers);
|
||||
}
|
||||
return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth);
|
||||
}
|
||||
|
||||
|
|
|
@ -21,9 +21,12 @@ import java.io.UncheckedIOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.lucene.index.MergePolicy.OneMerge;
|
||||
import org.apache.lucene.internal.tests.ConcurrentMergeSchedulerAccess;
|
||||
import org.apache.lucene.internal.tests.TestSecrets;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.Directory;
|
||||
|
@ -109,6 +112,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
|
||||
private double forceMergeMBPerSec = Double.POSITIVE_INFINITY;
|
||||
|
||||
/** The executor provided for intra-merge parallelization */
|
||||
protected CachedExecutor intraMergeExecutor;
|
||||
|
||||
/** Sole constructor, with all settings set to default values. */
|
||||
public ConcurrentMergeScheduler() {}
|
||||
|
||||
|
@ -259,6 +265,16 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
assert false : "merge thread " + currentThread + " was not found";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Executor getIntraMergeExecutor(OneMerge merge) {
|
||||
assert intraMergeExecutor != null : "scaledExecutor is not initialized";
|
||||
// don't do multithreaded merges for small merges
|
||||
if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB * 1024 * 1024) {
|
||||
return super.getIntraMergeExecutor(merge);
|
||||
}
|
||||
return intraMergeExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Directory wrapForMerge(OneMerge merge, Directory in) {
|
||||
Thread mergeThread = Thread.currentThread();
|
||||
|
@ -268,6 +284,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
}
|
||||
|
||||
// Return a wrapped Directory which has rate-limited output.
|
||||
// Note: the rate limiter is only per thread. So, if there are multiple merge threads running
|
||||
// and throttling is required, each thread will be throttled independently.
|
||||
// The implication of this, is that the total IO rate could be higher than the target rate.
|
||||
RateLimiter rateLimiter = ((MergeThread) mergeThread).rateLimiter;
|
||||
return new FilterDirectory(in) {
|
||||
@Override
|
||||
|
@ -279,14 +298,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
// somewhere that is failing to pass down the right IOContext:
|
||||
assert context.context == IOContext.Context.MERGE : "got context=" + context.context;
|
||||
|
||||
// Because rateLimiter is bound to a particular merge thread, this method should
|
||||
// always be called from that context. Verify this.
|
||||
assert mergeThread == Thread.currentThread()
|
||||
: "Not the same merge thread, current="
|
||||
+ Thread.currentThread()
|
||||
+ ", expected="
|
||||
+ mergeThread;
|
||||
|
||||
return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context));
|
||||
}
|
||||
};
|
||||
|
@ -445,8 +456,15 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
sync();
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
try {
|
||||
sync();
|
||||
} finally {
|
||||
if (intraMergeExecutor != null) {
|
||||
intraMergeExecutor.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -510,6 +528,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
void initialize(InfoStream infoStream, Directory directory) throws IOException {
|
||||
super.initialize(infoStream, directory);
|
||||
initDynamicDefaults(directory);
|
||||
if (intraMergeExecutor == null) {
|
||||
intraMergeExecutor = new CachedExecutor();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -755,11 +776,16 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": ");
|
||||
sb.append("maxThreadCount=").append(maxThreadCount).append(", ");
|
||||
sb.append("maxMergeCount=").append(maxMergeCount).append(", ");
|
||||
sb.append("ioThrottle=").append(doAutoIOThrottle);
|
||||
return sb.toString();
|
||||
return getClass().getSimpleName()
|
||||
+ ": "
|
||||
+ "maxThreadCount="
|
||||
+ maxThreadCount
|
||||
+ ", "
|
||||
+ "maxMergeCount="
|
||||
+ maxMergeCount
|
||||
+ ", "
|
||||
+ "ioThrottle="
|
||||
+ doAutoIOThrottle;
|
||||
}
|
||||
|
||||
private boolean isBacklog(long now, OneMerge merge) {
|
||||
|
@ -902,12 +928,64 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
}
|
||||
|
||||
static {
|
||||
TestSecrets.setConcurrentMergeSchedulerAccess(
|
||||
new ConcurrentMergeSchedulerAccess() {
|
||||
@Override
|
||||
public void setSuppressExceptions(ConcurrentMergeScheduler cms) {
|
||||
cms.setSuppressExceptions();
|
||||
}
|
||||
});
|
||||
TestSecrets.setConcurrentMergeSchedulerAccess(ConcurrentMergeScheduler::setSuppressExceptions);
|
||||
}
|
||||
|
||||
/**
|
||||
* This executor provides intra-merge threads for parallel execution of merge tasks. It provides a
|
||||
* limited number of threads to execute merge tasks. In particular, if the number of
|
||||
* `mergeThreads` is equal to `maxThreadCount`, then the executor will execute the merge task in
|
||||
* the calling thread.
|
||||
*/
|
||||
private class CachedExecutor implements Executor {
|
||||
|
||||
private final AtomicInteger activeCount = new AtomicInteger(0);
|
||||
private final ThreadPoolExecutor executor;
|
||||
|
||||
public CachedExecutor() {
|
||||
this.executor =
|
||||
new ThreadPoolExecutor(0, 1024, 1L, TimeUnit.MINUTES, new SynchronousQueue<>());
|
||||
}
|
||||
|
||||
void shutdown() {
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
final boolean isThreadAvailable;
|
||||
// we need to check if a thread is available before submitting the task to the executor
|
||||
// synchronize on CMS to get an accurate count of current threads
|
||||
synchronized (ConcurrentMergeScheduler.this) {
|
||||
int max = maxThreadCount - mergeThreads.size() - 1;
|
||||
int value = activeCount.get();
|
||||
if (value < max) {
|
||||
activeCount.incrementAndGet();
|
||||
assert activeCount.get() > 0 : "active count must be greater than 0 after increment";
|
||||
isThreadAvailable = true;
|
||||
} else {
|
||||
isThreadAvailable = false;
|
||||
}
|
||||
}
|
||||
if (isThreadAvailable) {
|
||||
executor.execute(
|
||||
() -> {
|
||||
try {
|
||||
command.run();
|
||||
} catch (Throwable exc) {
|
||||
if (suppressExceptions == false) {
|
||||
// suppressExceptions is normally only set during
|
||||
// testing.
|
||||
handleMergeException(exc);
|
||||
}
|
||||
} finally {
|
||||
activeCount.decrementAndGet();
|
||||
assert activeCount.get() >= 0 : "unexpected negative active count";
|
||||
}
|
||||
});
|
||||
} else {
|
||||
command.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3430,7 +3430,14 @@ public class IndexWriter
|
|||
}
|
||||
|
||||
SegmentMerger merger =
|
||||
new SegmentMerger(readers, segInfo, infoStream, trackingDir, globalFieldNumberMap, context);
|
||||
new SegmentMerger(
|
||||
readers,
|
||||
segInfo,
|
||||
infoStream,
|
||||
trackingDir,
|
||||
globalFieldNumberMap,
|
||||
context,
|
||||
mergeScheduler.getIntraMergeExecutor(merge));
|
||||
|
||||
if (!merger.shouldMerge()) {
|
||||
return;
|
||||
|
@ -5219,7 +5226,13 @@ public class IndexWriter
|
|||
|
||||
final SegmentMerger merger =
|
||||
new SegmentMerger(
|
||||
mergeReaders, merge.info.info, infoStream, dirWrapper, globalFieldNumberMap, context);
|
||||
mergeReaders,
|
||||
merge.info.info,
|
||||
infoStream,
|
||||
dirWrapper,
|
||||
globalFieldNumberMap,
|
||||
context,
|
||||
mergeScheduler.getIntraMergeExecutor(merge));
|
||||
merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get()));
|
||||
merge.checkAborted();
|
||||
|
||||
|
|
|
@ -136,14 +136,6 @@ public abstract class MergePolicy {
|
|||
*/
|
||||
public void pauseNanos(long pauseNanos, PauseReason reason, BooleanSupplier condition)
|
||||
throws InterruptedException {
|
||||
if (Thread.currentThread() != owner) {
|
||||
throw new RuntimeException(
|
||||
"Only the merge owner thread can call pauseNanos(). This thread: "
|
||||
+ Thread.currentThread().getName()
|
||||
+ ", owner thread: "
|
||||
+ owner);
|
||||
}
|
||||
|
||||
long start = System.nanoTime();
|
||||
AtomicLong timeUpdate = pauseTimesNS.get(reason);
|
||||
pauseLock.lock();
|
||||
|
|
|
@ -39,7 +39,7 @@ public class MergeRateLimiter extends RateLimiter {
|
|||
private volatile double mbPerSec;
|
||||
private volatile long minPauseCheckBytes;
|
||||
|
||||
private long lastNS;
|
||||
private AtomicLong lastNS = new AtomicLong(0);
|
||||
|
||||
private AtomicLong totalBytesWritten = new AtomicLong();
|
||||
|
||||
|
@ -89,7 +89,7 @@ public class MergeRateLimiter extends RateLimiter {
|
|||
// is changed while we were pausing:
|
||||
long paused = 0;
|
||||
long delta;
|
||||
while ((delta = maybePause(bytes, System.nanoTime())) >= 0) {
|
||||
while ((delta = maybePause(bytes)) >= 0) {
|
||||
// Keep waiting.
|
||||
paused += delta;
|
||||
}
|
||||
|
@ -112,30 +112,45 @@ public class MergeRateLimiter extends RateLimiter {
|
|||
* applied. If the thread needs pausing, this method delegates to the linked {@link
|
||||
* OneMergeProgress}.
|
||||
*/
|
||||
private long maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException {
|
||||
private long maybePause(long bytes) throws MergePolicy.MergeAbortedException {
|
||||
// Now is a good time to abort the merge:
|
||||
if (mergeProgress.isAborted()) {
|
||||
throw new MergePolicy.MergeAbortedException("Merge aborted.");
|
||||
}
|
||||
|
||||
double rate = mbPerSec; // read from volatile rate once.
|
||||
double secondsToPause = (bytes / 1024. / 1024.) / rate;
|
||||
final double rate = mbPerSec; // read from volatile rate once.
|
||||
final double secondsToPause = (bytes / 1024. / 1024.) / rate;
|
||||
|
||||
// Time we should sleep until; this is purely instantaneous
|
||||
// rate (just adds seconds onto the last time we had paused to);
|
||||
// maybe we should also offer decayed recent history one?
|
||||
long targetNS = lastNS + (long) (1000000000 * secondsToPause);
|
||||
AtomicLong curPauseNSSetter = new AtomicLong();
|
||||
// While we use updateAndGet to avoid a race condition between multiple threads, this doesn't
|
||||
// mean
|
||||
// that multiple threads will end up getting paused at the same time.
|
||||
// We only pause the calling thread. This means if the upstream caller (e.g.
|
||||
// ConcurrentMergeScheduler)
|
||||
// is using multiple intra-threads, they will all be paused independently.
|
||||
lastNS.updateAndGet(
|
||||
last -> {
|
||||
long curNS = System.nanoTime();
|
||||
// Time we should sleep until; this is purely instantaneous
|
||||
// rate (just adds seconds onto the last time we had paused to);
|
||||
// maybe we should also offer decayed recent history one?
|
||||
long targetNS = last + (long) (1000000000 * secondsToPause);
|
||||
long curPauseNS = targetNS - curNS;
|
||||
// We don't bother with thread pausing if the pause is smaller than 2 msec.
|
||||
if (curPauseNS <= MIN_PAUSE_NS) {
|
||||
// Set to curNS, not targetNS, to enforce the instant rate, not
|
||||
// the "averaged over all history" rate:
|
||||
curPauseNSSetter.set(0);
|
||||
return curNS;
|
||||
}
|
||||
curPauseNSSetter.set(curPauseNS);
|
||||
return last;
|
||||
});
|
||||
|
||||
long curPauseNS = targetNS - curNS;
|
||||
|
||||
// We don't bother with thread pausing if the pause is smaller than 2 msec.
|
||||
if (curPauseNS <= MIN_PAUSE_NS) {
|
||||
// Set to curNS, not targetNS, to enforce the instant rate, not
|
||||
// the "averaged over all history" rate:
|
||||
lastNS = curNS;
|
||||
if (curPauseNSSetter.get() == 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
long curPauseNS = curPauseNSSetter.get();
|
||||
// Defensive: don't sleep for too long; the loop above will call us again if
|
||||
// we should keep sleeping and the rate may be adjusted in between.
|
||||
if (curPauseNS > MAX_PAUSE_NS) {
|
||||
|
|
|
@ -18,10 +18,13 @@ package org.apache.lucene.index;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import org.apache.lucene.index.MergePolicy.OneMerge;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.RateLimitedIndexOutput;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.apache.lucene.util.SameThreadExecutorService;
|
||||
|
||||
/**
|
||||
* Expert: {@link IndexWriter} uses an instance implementing this interface to execute the merges
|
||||
|
@ -32,6 +35,8 @@ import org.apache.lucene.util.InfoStream;
|
|||
*/
|
||||
public abstract class MergeScheduler implements Closeable {
|
||||
|
||||
private final ExecutorService executor = new SameThreadExecutorService();
|
||||
|
||||
/** Sole constructor. (For invocation by subclass constructors, typically implicit.) */
|
||||
protected MergeScheduler() {}
|
||||
|
||||
|
@ -52,9 +57,20 @@ public abstract class MergeScheduler implements Closeable {
|
|||
return in;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides an executor for parallelism during a single merge operation. By default, the method
|
||||
* returns a {@link SameThreadExecutorService} where all intra-merge actions occur in their
|
||||
* calling thread.
|
||||
*/
|
||||
public Executor getIntraMergeExecutor(OneMerge merge) {
|
||||
return executor;
|
||||
}
|
||||
|
||||
/** Close this MergeScheduler. */
|
||||
@Override
|
||||
public abstract void close() throws IOException;
|
||||
public void close() throws IOException {
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
/** For messages about merge scheduling */
|
||||
protected InfoStream infoStream;
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.lucene.index.IndexWriter.isCongruentSort;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.lucene.codecs.DocValuesProducer;
|
||||
import org.apache.lucene.codecs.FieldsProducer;
|
||||
|
@ -84,15 +85,23 @@ public class MergeState {
|
|||
/** InfoStream for debugging messages. */
|
||||
public final InfoStream infoStream;
|
||||
|
||||
/** Executor for intra merge activity */
|
||||
public final Executor intraMergeTaskExecutor;
|
||||
|
||||
/** Indicates if the index needs to be sorted * */
|
||||
public boolean needsIndexSort;
|
||||
|
||||
/** Sole constructor. */
|
||||
MergeState(List<CodecReader> readers, SegmentInfo segmentInfo, InfoStream infoStream)
|
||||
MergeState(
|
||||
List<CodecReader> readers,
|
||||
SegmentInfo segmentInfo,
|
||||
InfoStream infoStream,
|
||||
Executor intraMergeTaskExecutor)
|
||||
throws IOException {
|
||||
verifyIndexSort(readers, segmentInfo);
|
||||
this.infoStream = infoStream;
|
||||
int numReaders = readers.size();
|
||||
this.intraMergeTaskExecutor = intraMergeTaskExecutor;
|
||||
|
||||
maxDocs = new int[numReaders];
|
||||
fieldsProducers = new FieldsProducer[numReaders];
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.lucene.index;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import org.apache.lucene.index.MergePolicy.OneMerge;
|
||||
import org.apache.lucene.store.Directory;
|
||||
|
||||
|
@ -52,4 +53,9 @@ public final class NoMergeScheduler extends MergeScheduler {
|
|||
public MergeScheduler clone() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Executor getIntraMergeExecutor(OneMerge merge) {
|
||||
throw new UnsupportedOperationException("NoMergeScheduler does not support merges");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,10 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.codecs.DocValuesConsumer;
|
||||
|
@ -28,6 +31,7 @@ import org.apache.lucene.codecs.NormsProducer;
|
|||
import org.apache.lucene.codecs.PointsWriter;
|
||||
import org.apache.lucene.codecs.StoredFieldsWriter;
|
||||
import org.apache.lucene.codecs.TermVectorsWriter;
|
||||
import org.apache.lucene.search.TaskExecutor;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
|
@ -56,13 +60,14 @@ final class SegmentMerger {
|
|||
InfoStream infoStream,
|
||||
Directory dir,
|
||||
FieldInfos.FieldNumbers fieldNumbers,
|
||||
IOContext context)
|
||||
IOContext context,
|
||||
Executor intraMergeTaskExecutor)
|
||||
throws IOException {
|
||||
if (context.context != IOContext.Context.MERGE) {
|
||||
throw new IllegalArgumentException(
|
||||
"IOContext.context should be MERGE; got: " + context.context);
|
||||
}
|
||||
mergeState = new MergeState(readers, segmentInfo, infoStream);
|
||||
mergeState = new MergeState(readers, segmentInfo, infoStream, intraMergeTaskExecutor);
|
||||
directory = dir;
|
||||
this.codec = segmentInfo.getCodec();
|
||||
this.context = context;
|
||||
|
@ -130,19 +135,36 @@ final class SegmentMerger {
|
|||
IOContext.READ,
|
||||
segmentWriteState.segmentSuffix);
|
||||
|
||||
if (mergeState.mergeFieldInfos.hasNorms()) {
|
||||
mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
|
||||
}
|
||||
TaskExecutor taskExecutor = new TaskExecutor(mergeState.intraMergeTaskExecutor);
|
||||
List<Callable<Void>> mergingTasks = new ArrayList<>();
|
||||
mergingTasks.add(
|
||||
() -> {
|
||||
if (mergeState.mergeFieldInfos.hasNorms()) {
|
||||
mergeWithLogging(
|
||||
this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
|
||||
}
|
||||
|
||||
mergeWithLogging(this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
|
||||
mergeWithLogging(
|
||||
this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
|
||||
return null;
|
||||
});
|
||||
|
||||
if (mergeState.mergeFieldInfos.hasDocValues()) {
|
||||
mergeWithLogging(
|
||||
this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
|
||||
mergingTasks.add(
|
||||
() -> {
|
||||
mergeWithLogging(
|
||||
this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
if (mergeState.mergeFieldInfos.hasPointValues()) {
|
||||
mergeWithLogging(this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged);
|
||||
mergingTasks.add(
|
||||
() -> {
|
||||
mergeWithLogging(
|
||||
this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
if (mergeState.mergeFieldInfos.hasVectorValues()) {
|
||||
|
@ -155,9 +177,14 @@ final class SegmentMerger {
|
|||
}
|
||||
|
||||
if (mergeState.mergeFieldInfos.hasVectors()) {
|
||||
mergeWithLogging(this::mergeTermVectors, "term vectors");
|
||||
mergingTasks.add(
|
||||
() -> {
|
||||
mergeWithLogging(this::mergeTermVectors, "term vectors");
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
taskExecutor.invokeAll(mergingTasks);
|
||||
// write the merged infos
|
||||
mergeWithLogging(
|
||||
this::mergeFieldInfos, segmentWriteState, segmentReadState, "field infos", numMerged);
|
||||
|
|
|
@ -178,9 +178,6 @@ public class TestLucene99HnswQuantizedVectorsFormat extends BaseKnnVectorsFormat
|
|||
expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 0, 0.8f, null));
|
||||
expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 100, null, null));
|
||||
expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() ->
|
||||
|
|
|
@ -49,8 +49,6 @@ public class TestLucene99HnswVectorsFormat extends BaseKnnVectorsFormatTestCase
|
|||
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, -1));
|
||||
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(512 + 1, 20));
|
||||
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 3201));
|
||||
expectThrows(
|
||||
IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 100, 100, null));
|
||||
expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> new Lucene99HnswVectorsFormat(20, 100, 1, new SameThreadExecutorService()));
|
||||
|
|
|
@ -20,16 +20,20 @@ import com.carrotsearch.randomizedtesting.generators.RandomStrings;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.KnnFloatVectorField;
|
||||
import org.apache.lucene.document.StringField;
|
||||
import org.apache.lucene.document.TextField;
|
||||
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
|
||||
|
@ -42,7 +46,10 @@ import org.apache.lucene.tests.store.MockDirectoryWrapper;
|
|||
import org.apache.lucene.tests.util.LuceneTestCase;
|
||||
import org.apache.lucene.tests.util.TestUtil;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.apache.lucene.util.SameThreadExecutorService;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.apache.lucene.util.SuppressForbidden;
|
||||
import org.apache.lucene.util.Version;
|
||||
|
||||
public class TestConcurrentMergeScheduler extends LuceneTestCase {
|
||||
|
||||
|
@ -90,12 +97,22 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
|
|||
|| (th instanceof IllegalStateException
|
||||
&& th.getMessage().contains("this writer hit an unrecoverable error"));
|
||||
}
|
||||
|
||||
@Override
|
||||
// override here to ensure even tiny merges get the parallel executor
|
||||
public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
|
||||
assert intraMergeExecutor != null : "intraMergeExecutor is not initialized";
|
||||
return intraMergeExecutor;
|
||||
}
|
||||
});
|
||||
}
|
||||
IndexWriter writer = new IndexWriter(directory, iwc);
|
||||
Document doc = new Document();
|
||||
Field idField = newStringField("id", "", Field.Store.YES);
|
||||
KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f});
|
||||
doc.add(idField);
|
||||
// Add knn float vectors to test parallel merge
|
||||
doc.add(knnField);
|
||||
|
||||
outer:
|
||||
for (int i = 0; i < 10; i++) {
|
||||
|
@ -105,6 +122,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
|
|||
|
||||
for (int j = 0; j < 20; j++) {
|
||||
idField.setStringValue(Integer.toString(i * 20 + j));
|
||||
knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()});
|
||||
writer.addDocument(doc);
|
||||
}
|
||||
|
||||
|
@ -226,23 +244,35 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
|
|||
Directory directory = newDirectory();
|
||||
Document doc = new Document();
|
||||
Field idField = newStringField("id", "", Field.Store.YES);
|
||||
KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f});
|
||||
doc.add(idField);
|
||||
doc.add(knnField);
|
||||
IndexWriterConfig iwc =
|
||||
newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
// Force excessive merging:
|
||||
.setMaxBufferedDocs(2)
|
||||
.setMergePolicy(newLogMergePolicy(100))
|
||||
.setCommitOnClose(false);
|
||||
if (iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler) {
|
||||
iwc.setMergeScheduler(
|
||||
new ConcurrentMergeScheduler() {
|
||||
@Override
|
||||
// override here to ensure even tiny merges get the parallel executor
|
||||
public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
|
||||
assert intraMergeExecutor != null : "scaledExecutor is not initialized";
|
||||
return intraMergeExecutor;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
IndexWriter writer =
|
||||
new IndexWriter(
|
||||
directory,
|
||||
newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.
|
||||
// Force excessive merging:
|
||||
setMaxBufferedDocs(2)
|
||||
.setMergePolicy(newLogMergePolicy(100))
|
||||
.setCommitOnClose(false));
|
||||
IndexWriter writer = new IndexWriter(directory, iwc);
|
||||
|
||||
int numIters = TEST_NIGHTLY ? 10 : 3;
|
||||
for (int iter = 0; iter < numIters; iter++) {
|
||||
|
||||
for (int j = 0; j < 201; j++) {
|
||||
idField.setStringValue(Integer.toString(iter * 201 + j));
|
||||
knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()});
|
||||
writer.addDocument(doc);
|
||||
}
|
||||
|
||||
|
@ -364,6 +394,118 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
|
|||
dir.close();
|
||||
}
|
||||
|
||||
public void testSmallMergesDonNotGetThreads() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
|
||||
iwc.setMaxBufferedDocs(2);
|
||||
iwc.setMergeScheduler(
|
||||
new ConcurrentMergeScheduler() {
|
||||
@Override
|
||||
protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge)
|
||||
throws IOException {
|
||||
assertTrue(this.getIntraMergeExecutor(merge) instanceof SameThreadExecutorService);
|
||||
super.doMerge(mergeSource, merge);
|
||||
}
|
||||
});
|
||||
IndexWriter w = new IndexWriter(dir, iwc);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Document doc = new Document();
|
||||
doc.add(new StringField("id", "" + i, Field.Store.NO));
|
||||
w.addDocument(doc);
|
||||
}
|
||||
w.forceMerge(1);
|
||||
w.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "Thread sleep")
|
||||
public void testIntraMergeThreadPoolIsLimitedByMaxThreads() throws IOException {
|
||||
ConcurrentMergeScheduler mergeScheduler = new ConcurrentMergeScheduler();
|
||||
MergeScheduler.MergeSource mergeSource =
|
||||
new MergeScheduler.MergeSource() {
|
||||
@Override
|
||||
public MergePolicy.OneMerge getNextMerge() {
|
||||
fail("should not be called");
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMergeFinished(MergePolicy.OneMerge merge) {
|
||||
fail("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPendingMerges() {
|
||||
fail("should not be called");
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void merge(MergePolicy.OneMerge merge) throws IOException {
|
||||
fail("should not be called");
|
||||
}
|
||||
};
|
||||
try (Directory dir = newDirectory();
|
||||
mergeScheduler) {
|
||||
MergePolicy.OneMerge merge =
|
||||
new MergePolicy.OneMerge(
|
||||
List.of(
|
||||
new SegmentCommitInfo(
|
||||
new SegmentInfo(
|
||||
dir,
|
||||
Version.LATEST,
|
||||
null,
|
||||
"test",
|
||||
0,
|
||||
false,
|
||||
false,
|
||||
Codec.getDefault(),
|
||||
Collections.emptyMap(),
|
||||
StringHelper.randomId(),
|
||||
new HashMap<>(),
|
||||
null),
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
new byte[16])));
|
||||
mergeScheduler.initialize(InfoStream.NO_OUTPUT, dir);
|
||||
mergeScheduler.setMaxMergesAndThreads(6, 6);
|
||||
Executor executor = mergeScheduler.intraMergeExecutor;
|
||||
AtomicInteger threadsExecutedOnPool = new AtomicInteger();
|
||||
AtomicInteger threadsExecutedOnSelf = new AtomicInteger();
|
||||
for (int i = 0; i < 4; i++) {
|
||||
mergeScheduler.mergeThreads.add(
|
||||
mergeScheduler.new MergeThread(mergeSource, merge) {
|
||||
@Override
|
||||
@SuppressForbidden(reason = "Thread sleep")
|
||||
public void run() {
|
||||
executor.execute(
|
||||
() -> {
|
||||
if (Thread.currentThread() == this) {
|
||||
threadsExecutedOnSelf.incrementAndGet();
|
||||
} else {
|
||||
threadsExecutedOnPool.incrementAndGet();
|
||||
}
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
for (ConcurrentMergeScheduler.MergeThread thread : mergeScheduler.mergeThreads) {
|
||||
thread.start();
|
||||
}
|
||||
mergeScheduler.sync();
|
||||
assertEquals(3, threadsExecutedOnSelf.get());
|
||||
assertEquals(1, threadsExecutedOnPool.get());
|
||||
}
|
||||
}
|
||||
|
||||
private static class TrackingCMS extends ConcurrentMergeScheduler {
|
||||
long totMergedBytes;
|
||||
CountDownLatch atLeastOneMerge;
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.lucene.tests.store.MockDirectoryWrapper;
|
|||
import org.apache.lucene.tests.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.apache.lucene.util.SameThreadExecutorService;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.apache.lucene.util.Version;
|
||||
|
||||
|
@ -237,7 +238,8 @@ public class TestDoc extends LuceneTestCase {
|
|||
InfoStream.getDefault(),
|
||||
trackingDir,
|
||||
new FieldInfos.FieldNumbers(null, null),
|
||||
context);
|
||||
context,
|
||||
new SameThreadExecutorService());
|
||||
|
||||
merger.merge();
|
||||
r1.close();
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.util.Map;
|
|||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -937,6 +938,7 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||
@Override
|
||||
protected boolean isOK(Throwable th) {
|
||||
return th instanceof AlreadyClosedException
|
||||
|| th instanceof RejectedExecutionException
|
||||
|| (th instanceof IllegalStateException
|
||||
&& th.getMessage()
|
||||
.contains("this writer hit an unrecoverable error"));
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.lucene.tests.util.TestUtil;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.FixedBitSet;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.apache.lucene.util.SameThreadExecutorService;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.apache.lucene.util.Version;
|
||||
import org.apache.lucene.util.packed.PackedLongValues;
|
||||
|
@ -105,7 +106,8 @@ public class TestSegmentMerger extends LuceneTestCase {
|
|||
InfoStream.getDefault(),
|
||||
mergedDir,
|
||||
new FieldInfos.FieldNumbers(null, null),
|
||||
newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))));
|
||||
newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))),
|
||||
new SameThreadExecutorService());
|
||||
MergeState mergeState = merger.merge();
|
||||
int docsMerged = mergeState.segmentInfo.maxDoc();
|
||||
assertTrue(docsMerged == 2);
|
||||
|
|
Loading…
Reference in New Issue