mirror of https://github.com/apache/lucene.git
Revert "Add new parallel merge task executor for parallel actions within a si…"
This reverts commit e3a34bfe56
.
This commit is contained in:
parent
9fd0474251
commit
e1d24589f2
|
@ -208,10 +208,6 @@ Improvements
|
|||
|
||||
* GITHUB#13156: Hunspell: don't proceed with other suggestions if we found good REP ones (Peter Gromov)
|
||||
|
||||
* GITHUB#13124: MergeScheduler can now provide an executor for intra-merge parallelism. The first
|
||||
implementation is the ConcurrentMergeScheduler and the Lucene99HnswVectorsFormat will use it if no other
|
||||
executor is provided. (Ben Trent)
|
||||
|
||||
Optimizations
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -24,8 +24,6 @@ import org.apache.lucene.codecs.KnnVectorsFormat;
|
|||
import org.apache.lucene.codecs.KnnVectorsReader;
|
||||
import org.apache.lucene.codecs.KnnVectorsWriter;
|
||||
import org.apache.lucene.codecs.lucene90.IndexedDISI;
|
||||
import org.apache.lucene.index.MergePolicy;
|
||||
import org.apache.lucene.index.MergeScheduler;
|
||||
import org.apache.lucene.index.SegmentReadState;
|
||||
import org.apache.lucene.index.SegmentWriteState;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
|
@ -165,8 +163,7 @@ public final class Lucene99HnswVectorsFormat extends KnnVectorsFormat {
|
|||
* @param numMergeWorkers number of workers (threads) that will be used when doing merge. If
|
||||
* larger than 1, a non-null {@link ExecutorService} must be passed as mergeExec
|
||||
* @param mergeExec the {@link ExecutorService} that will be used by ALL vector writers that are
|
||||
* generated by this format to do the merge. If null, the configured {@link
|
||||
* MergeScheduler#getIntraMergeExecutor(MergePolicy.OneMerge)} is used.
|
||||
* generated by this format to do the merge
|
||||
*/
|
||||
public Lucene99HnswVectorsFormat(
|
||||
int maxConn, int beamWidth, int numMergeWorkers, ExecutorService mergeExec) {
|
||||
|
@ -187,6 +184,10 @@ public final class Lucene99HnswVectorsFormat extends KnnVectorsFormat {
|
|||
}
|
||||
this.maxConn = maxConn;
|
||||
this.beamWidth = beamWidth;
|
||||
if (numMergeWorkers > 1 && mergeExec == null) {
|
||||
throw new IllegalArgumentException(
|
||||
"No executor service passed in when " + numMergeWorkers + " merge workers are requested");
|
||||
}
|
||||
if (numMergeWorkers == 1 && mergeExec != null) {
|
||||
throw new IllegalArgumentException(
|
||||
"No executor service is needed as we'll use single thread to merge");
|
||||
|
|
|
@ -352,14 +352,7 @@ public final class Lucene99HnswVectorsWriter extends KnnVectorsWriter {
|
|||
int[][] vectorIndexNodeOffsets = null;
|
||||
if (scorerSupplier.totalVectorCount() > 0) {
|
||||
// build graph
|
||||
HnswGraphMerger merger =
|
||||
createGraphMerger(
|
||||
fieldInfo,
|
||||
scorerSupplier,
|
||||
mergeState.intraMergeTaskExecutor == null
|
||||
? null
|
||||
: new TaskExecutor(mergeState.intraMergeTaskExecutor),
|
||||
numMergeWorkers);
|
||||
HnswGraphMerger merger = createGraphMerger(fieldInfo, scorerSupplier);
|
||||
for (int i = 0; i < mergeState.liveDocs.length; i++) {
|
||||
merger.addReader(
|
||||
mergeState.knnVectorsReaders[i], mergeState.docMaps[i], mergeState.liveDocs[i]);
|
||||
|
@ -496,23 +489,11 @@ public final class Lucene99HnswVectorsWriter extends KnnVectorsWriter {
|
|||
}
|
||||
|
||||
private HnswGraphMerger createGraphMerger(
|
||||
FieldInfo fieldInfo,
|
||||
RandomVectorScorerSupplier scorerSupplier,
|
||||
TaskExecutor parallelMergeTaskExecutor,
|
||||
int numParallelMergeWorkers) {
|
||||
FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) {
|
||||
if (mergeExec != null) {
|
||||
return new ConcurrentHnswMerger(
|
||||
fieldInfo, scorerSupplier, M, beamWidth, mergeExec, numMergeWorkers);
|
||||
}
|
||||
if (parallelMergeTaskExecutor != null) {
|
||||
return new ConcurrentHnswMerger(
|
||||
fieldInfo,
|
||||
scorerSupplier,
|
||||
M,
|
||||
beamWidth,
|
||||
parallelMergeTaskExecutor,
|
||||
numParallelMergeWorkers);
|
||||
}
|
||||
return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth);
|
||||
}
|
||||
|
||||
|
|
|
@ -21,12 +21,9 @@ import java.io.UncheckedIOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.lucene.index.MergePolicy.OneMerge;
|
||||
import org.apache.lucene.internal.tests.ConcurrentMergeSchedulerAccess;
|
||||
import org.apache.lucene.internal.tests.TestSecrets;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.Directory;
|
||||
|
@ -112,9 +109,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
|
||||
private double forceMergeMBPerSec = Double.POSITIVE_INFINITY;
|
||||
|
||||
/** The executor provided for intra-merge parallelization */
|
||||
protected CachedExecutor intraMergeExecutor;
|
||||
|
||||
/** Sole constructor, with all settings set to default values. */
|
||||
public ConcurrentMergeScheduler() {}
|
||||
|
||||
|
@ -265,16 +259,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
assert false : "merge thread " + currentThread + " was not found";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Executor getIntraMergeExecutor(OneMerge merge) {
|
||||
assert intraMergeExecutor != null : "scaledExecutor is not initialized";
|
||||
// don't do multithreaded merges for small merges
|
||||
if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB * 1024 * 1024) {
|
||||
return super.getIntraMergeExecutor(merge);
|
||||
}
|
||||
return intraMergeExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Directory wrapForMerge(OneMerge merge, Directory in) {
|
||||
Thread mergeThread = Thread.currentThread();
|
||||
|
@ -462,13 +446,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
sync();
|
||||
} finally {
|
||||
if (intraMergeExecutor != null) {
|
||||
intraMergeExecutor.shutdown();
|
||||
}
|
||||
}
|
||||
sync();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -532,9 +510,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
void initialize(InfoStream infoStream, Directory directory) throws IOException {
|
||||
super.initialize(infoStream, directory);
|
||||
initDynamicDefaults(directory);
|
||||
if (intraMergeExecutor == null) {
|
||||
intraMergeExecutor = new CachedExecutor();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -780,16 +755,11 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName()
|
||||
+ ": "
|
||||
+ "maxThreadCount="
|
||||
+ maxThreadCount
|
||||
+ ", "
|
||||
+ "maxMergeCount="
|
||||
+ maxMergeCount
|
||||
+ ", "
|
||||
+ "ioThrottle="
|
||||
+ doAutoIOThrottle;
|
||||
StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": ");
|
||||
sb.append("maxThreadCount=").append(maxThreadCount).append(", ");
|
||||
sb.append("maxMergeCount=").append(maxMergeCount).append(", ");
|
||||
sb.append("ioThrottle=").append(doAutoIOThrottle);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private boolean isBacklog(long now, OneMerge merge) {
|
||||
|
@ -932,58 +902,12 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
}
|
||||
|
||||
static {
|
||||
TestSecrets.setConcurrentMergeSchedulerAccess(ConcurrentMergeScheduler::setSuppressExceptions);
|
||||
}
|
||||
|
||||
/**
|
||||
* This executor provides intra-merge threads for parallel execution of merge tasks. It provides a
|
||||
* limited number of threads to execute merge tasks. In particular, if the number of
|
||||
* `mergeThreads` is equal to `maxThreadCount`, then the executor will execute the merge task in
|
||||
* the calling thread.
|
||||
*/
|
||||
private class CachedExecutor implements Executor {
|
||||
|
||||
private final AtomicInteger activeCount = new AtomicInteger(0);
|
||||
private final ThreadPoolExecutor executor;
|
||||
|
||||
public CachedExecutor() {
|
||||
this.executor =
|
||||
new ThreadPoolExecutor(0, 1024, 1L, TimeUnit.MINUTES, new SynchronousQueue<>());
|
||||
}
|
||||
|
||||
void shutdown() {
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
final boolean isThreadAvailable;
|
||||
// we need to check if a thread is available before submitting the task to the executor
|
||||
// synchronize on CMS to get an accurate count of current threads
|
||||
synchronized (ConcurrentMergeScheduler.this) {
|
||||
int max = maxThreadCount - mergeThreads.size() - 1;
|
||||
int value = activeCount.get();
|
||||
if (value < max) {
|
||||
activeCount.incrementAndGet();
|
||||
assert activeCount.get() > 0 : "active count must be greater than 0 after increment";
|
||||
isThreadAvailable = true;
|
||||
} else {
|
||||
isThreadAvailable = false;
|
||||
}
|
||||
}
|
||||
if (isThreadAvailable) {
|
||||
executor.execute(
|
||||
() -> {
|
||||
try {
|
||||
command.run();
|
||||
} finally {
|
||||
activeCount.decrementAndGet();
|
||||
assert activeCount.get() >= 0 : "unexpected negative active count";
|
||||
}
|
||||
});
|
||||
} else {
|
||||
command.run();
|
||||
}
|
||||
}
|
||||
TestSecrets.setConcurrentMergeSchedulerAccess(
|
||||
new ConcurrentMergeSchedulerAccess() {
|
||||
@Override
|
||||
public void setSuppressExceptions(ConcurrentMergeScheduler cms) {
|
||||
cms.setSuppressExceptions();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3430,14 +3430,7 @@ public class IndexWriter
|
|||
}
|
||||
|
||||
SegmentMerger merger =
|
||||
new SegmentMerger(
|
||||
readers,
|
||||
segInfo,
|
||||
infoStream,
|
||||
trackingDir,
|
||||
globalFieldNumberMap,
|
||||
context,
|
||||
mergeScheduler.getIntraMergeExecutor(merge));
|
||||
new SegmentMerger(readers, segInfo, infoStream, trackingDir, globalFieldNumberMap, context);
|
||||
|
||||
if (!merger.shouldMerge()) {
|
||||
return;
|
||||
|
@ -5226,13 +5219,7 @@ public class IndexWriter
|
|||
|
||||
final SegmentMerger merger =
|
||||
new SegmentMerger(
|
||||
mergeReaders,
|
||||
merge.info.info,
|
||||
infoStream,
|
||||
dirWrapper,
|
||||
globalFieldNumberMap,
|
||||
context,
|
||||
mergeScheduler.getIntraMergeExecutor(merge));
|
||||
mergeReaders, merge.info.info, infoStream, dirWrapper, globalFieldNumberMap, context);
|
||||
merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get()));
|
||||
merge.checkAborted();
|
||||
|
||||
|
|
|
@ -18,12 +18,10 @@ package org.apache.lucene.index;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Executor;
|
||||
import org.apache.lucene.index.MergePolicy.OneMerge;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.RateLimitedIndexOutput;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.apache.lucene.util.SameThreadExecutorService;
|
||||
|
||||
/**
|
||||
* Expert: {@link IndexWriter} uses an instance implementing this interface to execute the merges
|
||||
|
@ -34,8 +32,6 @@ import org.apache.lucene.util.SameThreadExecutorService;
|
|||
*/
|
||||
public abstract class MergeScheduler implements Closeable {
|
||||
|
||||
private final Executor executor = new SameThreadExecutorService();
|
||||
|
||||
/** Sole constructor. (For invocation by subclass constructors, typically implicit.) */
|
||||
protected MergeScheduler() {}
|
||||
|
||||
|
@ -56,15 +52,6 @@ public abstract class MergeScheduler implements Closeable {
|
|||
return in;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides an executor for parallelism during a single merge operation. By default, the method
|
||||
* returns a {@link SameThreadExecutorService} where all intra-merge actions occur in their
|
||||
* calling thread.
|
||||
*/
|
||||
public Executor getIntraMergeExecutor(OneMerge merge) {
|
||||
return executor;
|
||||
}
|
||||
|
||||
/** Close this MergeScheduler. */
|
||||
@Override
|
||||
public abstract void close() throws IOException;
|
||||
|
|
|
@ -21,7 +21,6 @@ import static org.apache.lucene.index.IndexWriter.isCongruentSort;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.lucene.codecs.DocValuesProducer;
|
||||
import org.apache.lucene.codecs.FieldsProducer;
|
||||
|
@ -85,23 +84,15 @@ public class MergeState {
|
|||
/** InfoStream for debugging messages. */
|
||||
public final InfoStream infoStream;
|
||||
|
||||
/** Executor for intra merge activity */
|
||||
public final Executor intraMergeTaskExecutor;
|
||||
|
||||
/** Indicates if the index needs to be sorted * */
|
||||
public boolean needsIndexSort;
|
||||
|
||||
/** Sole constructor. */
|
||||
MergeState(
|
||||
List<CodecReader> readers,
|
||||
SegmentInfo segmentInfo,
|
||||
InfoStream infoStream,
|
||||
Executor intraMergeTaskExecutor)
|
||||
MergeState(List<CodecReader> readers, SegmentInfo segmentInfo, InfoStream infoStream)
|
||||
throws IOException {
|
||||
verifyIndexSort(readers, segmentInfo);
|
||||
this.infoStream = infoStream;
|
||||
int numReaders = readers.size();
|
||||
this.intraMergeTaskExecutor = intraMergeTaskExecutor;
|
||||
|
||||
maxDocs = new int[numReaders];
|
||||
fieldsProducers = new FieldsProducer[numReaders];
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.lucene.index;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import org.apache.lucene.index.MergePolicy.OneMerge;
|
||||
import org.apache.lucene.store.Directory;
|
||||
|
||||
|
@ -53,9 +52,4 @@ public final class NoMergeScheduler extends MergeScheduler {
|
|||
public MergeScheduler clone() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Executor getIntraMergeExecutor(OneMerge merge) {
|
||||
throw new UnsupportedOperationException("NoMergeScheduler does not support merges");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,10 +17,7 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.codecs.DocValuesConsumer;
|
||||
|
@ -31,7 +28,6 @@ import org.apache.lucene.codecs.NormsProducer;
|
|||
import org.apache.lucene.codecs.PointsWriter;
|
||||
import org.apache.lucene.codecs.StoredFieldsWriter;
|
||||
import org.apache.lucene.codecs.TermVectorsWriter;
|
||||
import org.apache.lucene.search.TaskExecutor;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
|
@ -60,14 +56,13 @@ final class SegmentMerger {
|
|||
InfoStream infoStream,
|
||||
Directory dir,
|
||||
FieldInfos.FieldNumbers fieldNumbers,
|
||||
IOContext context,
|
||||
Executor intraMergeTaskExecutor)
|
||||
IOContext context)
|
||||
throws IOException {
|
||||
if (context.context != IOContext.Context.MERGE) {
|
||||
throw new IllegalArgumentException(
|
||||
"IOContext.context should be MERGE; got: " + context.context);
|
||||
}
|
||||
mergeState = new MergeState(readers, segmentInfo, infoStream, intraMergeTaskExecutor);
|
||||
mergeState = new MergeState(readers, segmentInfo, infoStream);
|
||||
directory = dir;
|
||||
this.codec = segmentInfo.getCodec();
|
||||
this.context = context;
|
||||
|
@ -135,36 +130,19 @@ final class SegmentMerger {
|
|||
IOContext.READ,
|
||||
segmentWriteState.segmentSuffix);
|
||||
|
||||
TaskExecutor taskExecutor = new TaskExecutor(mergeState.intraMergeTaskExecutor);
|
||||
List<Callable<Void>> mergingTasks = new ArrayList<>();
|
||||
mergingTasks.add(
|
||||
() -> {
|
||||
if (mergeState.mergeFieldInfos.hasNorms()) {
|
||||
mergeWithLogging(
|
||||
this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
|
||||
}
|
||||
if (mergeState.mergeFieldInfos.hasNorms()) {
|
||||
mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
|
||||
}
|
||||
|
||||
mergeWithLogging(
|
||||
this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
|
||||
return null;
|
||||
});
|
||||
mergeWithLogging(this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
|
||||
|
||||
if (mergeState.mergeFieldInfos.hasDocValues()) {
|
||||
mergingTasks.add(
|
||||
() -> {
|
||||
mergeWithLogging(
|
||||
this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
|
||||
return null;
|
||||
});
|
||||
mergeWithLogging(
|
||||
this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
|
||||
}
|
||||
|
||||
if (mergeState.mergeFieldInfos.hasPointValues()) {
|
||||
mergingTasks.add(
|
||||
() -> {
|
||||
mergeWithLogging(
|
||||
this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged);
|
||||
return null;
|
||||
});
|
||||
mergeWithLogging(this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged);
|
||||
}
|
||||
|
||||
if (mergeState.mergeFieldInfos.hasVectorValues()) {
|
||||
|
@ -177,14 +155,9 @@ final class SegmentMerger {
|
|||
}
|
||||
|
||||
if (mergeState.mergeFieldInfos.hasVectors()) {
|
||||
mergingTasks.add(
|
||||
() -> {
|
||||
mergeWithLogging(this::mergeTermVectors, "term vectors");
|
||||
return null;
|
||||
});
|
||||
mergeWithLogging(this::mergeTermVectors, "term vectors");
|
||||
}
|
||||
|
||||
taskExecutor.invokeAll(mergingTasks);
|
||||
// write the merged infos
|
||||
mergeWithLogging(
|
||||
this::mergeFieldInfos, segmentWriteState, segmentReadState, "field infos", numMerged);
|
||||
|
|
|
@ -178,6 +178,9 @@ public class TestLucene99HnswQuantizedVectorsFormat extends BaseKnnVectorsFormat
|
|||
expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 0, 0.8f, null));
|
||||
expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 100, null, null));
|
||||
expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() ->
|
||||
|
|
|
@ -49,6 +49,8 @@ public class TestLucene99HnswVectorsFormat extends BaseKnnVectorsFormatTestCase
|
|||
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, -1));
|
||||
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(512 + 1, 20));
|
||||
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 3201));
|
||||
expectThrows(
|
||||
IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 100, 100, null));
|
||||
expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> new Lucene99HnswVectorsFormat(20, 100, 1, new SameThreadExecutorService()));
|
||||
|
|
|
@ -20,20 +20,16 @@ import com.carrotsearch.randomizedtesting.generators.RandomStrings;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.KnnFloatVectorField;
|
||||
import org.apache.lucene.document.StringField;
|
||||
import org.apache.lucene.document.TextField;
|
||||
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
|
||||
|
@ -46,10 +42,7 @@ import org.apache.lucene.tests.store.MockDirectoryWrapper;
|
|||
import org.apache.lucene.tests.util.LuceneTestCase;
|
||||
import org.apache.lucene.tests.util.TestUtil;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.apache.lucene.util.SameThreadExecutorService;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.apache.lucene.util.SuppressForbidden;
|
||||
import org.apache.lucene.util.Version;
|
||||
|
||||
public class TestConcurrentMergeScheduler extends LuceneTestCase {
|
||||
|
||||
|
@ -97,22 +90,12 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
|
|||
|| (th instanceof IllegalStateException
|
||||
&& th.getMessage().contains("this writer hit an unrecoverable error"));
|
||||
}
|
||||
|
||||
@Override
|
||||
// override here to ensure even tiny merges get the parallel executor
|
||||
public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
|
||||
assert intraMergeExecutor != null : "intraMergeExecutor is not initialized";
|
||||
return intraMergeExecutor;
|
||||
}
|
||||
});
|
||||
}
|
||||
IndexWriter writer = new IndexWriter(directory, iwc);
|
||||
Document doc = new Document();
|
||||
Field idField = newStringField("id", "", Field.Store.YES);
|
||||
KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f});
|
||||
doc.add(idField);
|
||||
// Add knn float vectors to test parallel merge
|
||||
doc.add(knnField);
|
||||
|
||||
outer:
|
||||
for (int i = 0; i < 10; i++) {
|
||||
|
@ -122,7 +105,6 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
|
|||
|
||||
for (int j = 0; j < 20; j++) {
|
||||
idField.setStringValue(Integer.toString(i * 20 + j));
|
||||
knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()});
|
||||
writer.addDocument(doc);
|
||||
}
|
||||
|
||||
|
@ -244,35 +226,23 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
|
|||
Directory directory = newDirectory();
|
||||
Document doc = new Document();
|
||||
Field idField = newStringField("id", "", Field.Store.YES);
|
||||
KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f});
|
||||
doc.add(idField);
|
||||
doc.add(knnField);
|
||||
IndexWriterConfig iwc =
|
||||
newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
// Force excessive merging:
|
||||
.setMaxBufferedDocs(2)
|
||||
.setMergePolicy(newLogMergePolicy(100))
|
||||
.setCommitOnClose(false);
|
||||
if (iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler) {
|
||||
iwc.setMergeScheduler(
|
||||
new ConcurrentMergeScheduler() {
|
||||
@Override
|
||||
// override here to ensure even tiny merges get the parallel executor
|
||||
public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
|
||||
assert intraMergeExecutor != null : "scaledExecutor is not initialized";
|
||||
return intraMergeExecutor;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
IndexWriter writer = new IndexWriter(directory, iwc);
|
||||
IndexWriter writer =
|
||||
new IndexWriter(
|
||||
directory,
|
||||
newIndexWriterConfig(new MockAnalyzer(random()))
|
||||
.
|
||||
// Force excessive merging:
|
||||
setMaxBufferedDocs(2)
|
||||
.setMergePolicy(newLogMergePolicy(100))
|
||||
.setCommitOnClose(false));
|
||||
|
||||
int numIters = TEST_NIGHTLY ? 10 : 3;
|
||||
for (int iter = 0; iter < numIters; iter++) {
|
||||
|
||||
for (int j = 0; j < 201; j++) {
|
||||
idField.setStringValue(Integer.toString(iter * 201 + j));
|
||||
knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()});
|
||||
writer.addDocument(doc);
|
||||
}
|
||||
|
||||
|
@ -394,118 +364,6 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
|
|||
dir.close();
|
||||
}
|
||||
|
||||
public void testSmallMergesDonNotGetThreads() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
|
||||
iwc.setMaxBufferedDocs(2);
|
||||
iwc.setMergeScheduler(
|
||||
new ConcurrentMergeScheduler() {
|
||||
@Override
|
||||
protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge)
|
||||
throws IOException {
|
||||
assertTrue(this.getIntraMergeExecutor(merge) instanceof SameThreadExecutorService);
|
||||
super.doMerge(mergeSource, merge);
|
||||
}
|
||||
});
|
||||
IndexWriter w = new IndexWriter(dir, iwc);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Document doc = new Document();
|
||||
doc.add(new StringField("id", "" + i, Field.Store.NO));
|
||||
w.addDocument(doc);
|
||||
}
|
||||
w.forceMerge(1);
|
||||
w.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "Thread sleep")
|
||||
public void testIntraMergeThreadPoolIsLimitedByMaxThreads() throws IOException {
|
||||
ConcurrentMergeScheduler mergeScheduler = new ConcurrentMergeScheduler();
|
||||
MergeScheduler.MergeSource mergeSource =
|
||||
new MergeScheduler.MergeSource() {
|
||||
@Override
|
||||
public MergePolicy.OneMerge getNextMerge() {
|
||||
fail("should not be called");
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMergeFinished(MergePolicy.OneMerge merge) {
|
||||
fail("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPendingMerges() {
|
||||
fail("should not be called");
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void merge(MergePolicy.OneMerge merge) throws IOException {
|
||||
fail("should not be called");
|
||||
}
|
||||
};
|
||||
try (Directory dir = newDirectory();
|
||||
mergeScheduler) {
|
||||
MergePolicy.OneMerge merge =
|
||||
new MergePolicy.OneMerge(
|
||||
List.of(
|
||||
new SegmentCommitInfo(
|
||||
new SegmentInfo(
|
||||
dir,
|
||||
Version.LATEST,
|
||||
null,
|
||||
"test",
|
||||
0,
|
||||
false,
|
||||
false,
|
||||
Codec.getDefault(),
|
||||
Collections.emptyMap(),
|
||||
StringHelper.randomId(),
|
||||
new HashMap<>(),
|
||||
null),
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
new byte[16])));
|
||||
mergeScheduler.initialize(InfoStream.NO_OUTPUT, dir);
|
||||
mergeScheduler.setMaxMergesAndThreads(6, 6);
|
||||
Executor executor = mergeScheduler.intraMergeExecutor;
|
||||
AtomicInteger threadsExecutedOnPool = new AtomicInteger();
|
||||
AtomicInteger threadsExecutedOnSelf = new AtomicInteger();
|
||||
for (int i = 0; i < 4; i++) {
|
||||
mergeScheduler.mergeThreads.add(
|
||||
mergeScheduler.new MergeThread(mergeSource, merge) {
|
||||
@Override
|
||||
@SuppressForbidden(reason = "Thread sleep")
|
||||
public void run() {
|
||||
executor.execute(
|
||||
() -> {
|
||||
if (Thread.currentThread() == this) {
|
||||
threadsExecutedOnSelf.incrementAndGet();
|
||||
} else {
|
||||
threadsExecutedOnPool.incrementAndGet();
|
||||
}
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
for (ConcurrentMergeScheduler.MergeThread thread : mergeScheduler.mergeThreads) {
|
||||
thread.start();
|
||||
}
|
||||
mergeScheduler.sync();
|
||||
assertEquals(3, threadsExecutedOnSelf.get());
|
||||
assertEquals(1, threadsExecutedOnPool.get());
|
||||
}
|
||||
}
|
||||
|
||||
private static class TrackingCMS extends ConcurrentMergeScheduler {
|
||||
long totMergedBytes;
|
||||
CountDownLatch atLeastOneMerge;
|
||||
|
|
|
@ -45,7 +45,6 @@ import org.apache.lucene.tests.store.MockDirectoryWrapper;
|
|||
import org.apache.lucene.tests.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.apache.lucene.util.SameThreadExecutorService;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.apache.lucene.util.Version;
|
||||
|
||||
|
@ -238,8 +237,7 @@ public class TestDoc extends LuceneTestCase {
|
|||
InfoStream.getDefault(),
|
||||
trackingDir,
|
||||
new FieldInfos.FieldNumbers(null, null),
|
||||
context,
|
||||
new SameThreadExecutorService());
|
||||
context);
|
||||
|
||||
merger.merge();
|
||||
r1.close();
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.lucene.tests.util.TestUtil;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.FixedBitSet;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.apache.lucene.util.SameThreadExecutorService;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.apache.lucene.util.Version;
|
||||
import org.apache.lucene.util.packed.PackedLongValues;
|
||||
|
@ -106,8 +105,7 @@ public class TestSegmentMerger extends LuceneTestCase {
|
|||
InfoStream.getDefault(),
|
||||
mergedDir,
|
||||
new FieldInfos.FieldNumbers(null, null),
|
||||
newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))),
|
||||
new SameThreadExecutorService());
|
||||
newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))));
|
||||
MergeState mergeState = merger.merge();
|
||||
int docsMerged = mergeState.segmentInfo.maxDoc();
|
||||
assertTrue(docsMerged == 2);
|
||||
|
|
Loading…
Reference in New Issue