Add new parallel merge task executor for parallel actions within a single merge action (#13124)

This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options. 

Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others. 

Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly.

Relates to: https://github.com/apache/lucene/issues/12740
Relates to: https://github.com/apache/lucene/issues/9626
This commit is contained in:
Benjamin Trent 2024-03-14 15:58:41 -04:00 committed by GitHub
parent 3ad73336ae
commit e3a34bfe56
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 357 additions and 50 deletions

View File

@ -208,6 +208,10 @@ Improvements
* GITHUB#13156: Hunspell: don't proceed with other suggestions if we found good REP ones (Peter Gromov)
* GITHUB#13124: MergeScheduler can now provide an executor for intra-merge parallelism. The first
implementation is the ConcurrentMergeScheduler and the Lucene99HnswVectorsFormat will use it if no other
executor is provided. (Ben Trent)
Optimizations
---------------------

View File

@ -24,6 +24,8 @@ import org.apache.lucene.codecs.KnnVectorsFormat;
import org.apache.lucene.codecs.KnnVectorsReader;
import org.apache.lucene.codecs.KnnVectorsWriter;
import org.apache.lucene.codecs.lucene90.IndexedDISI;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.search.DocIdSetIterator;
@ -163,7 +165,8 @@ public final class Lucene99HnswVectorsFormat extends KnnVectorsFormat {
* @param numMergeWorkers number of workers (threads) that will be used when doing merge. If
* larger than 1, a non-null {@link ExecutorService} must be passed as mergeExec
* @param mergeExec the {@link ExecutorService} that will be used by ALL vector writers that are
* generated by this format to do the merge
* generated by this format to do the merge. If null, the configured {@link
* MergeScheduler#getIntraMergeExecutor(MergePolicy.OneMerge)} is used.
*/
public Lucene99HnswVectorsFormat(
int maxConn, int beamWidth, int numMergeWorkers, ExecutorService mergeExec) {
@ -184,10 +187,6 @@ public final class Lucene99HnswVectorsFormat extends KnnVectorsFormat {
}
this.maxConn = maxConn;
this.beamWidth = beamWidth;
if (numMergeWorkers > 1 && mergeExec == null) {
throw new IllegalArgumentException(
"No executor service passed in when " + numMergeWorkers + " merge workers are requested");
}
if (numMergeWorkers == 1 && mergeExec != null) {
throw new IllegalArgumentException(
"No executor service is needed as we'll use single thread to merge");

View File

@ -352,7 +352,14 @@ public final class Lucene99HnswVectorsWriter extends KnnVectorsWriter {
int[][] vectorIndexNodeOffsets = null;
if (scorerSupplier.totalVectorCount() > 0) {
// build graph
HnswGraphMerger merger = createGraphMerger(fieldInfo, scorerSupplier);
HnswGraphMerger merger =
createGraphMerger(
fieldInfo,
scorerSupplier,
mergeState.intraMergeTaskExecutor == null
? null
: new TaskExecutor(mergeState.intraMergeTaskExecutor),
numMergeWorkers);
for (int i = 0; i < mergeState.liveDocs.length; i++) {
merger.addReader(
mergeState.knnVectorsReaders[i], mergeState.docMaps[i], mergeState.liveDocs[i]);
@ -489,11 +496,23 @@ public final class Lucene99HnswVectorsWriter extends KnnVectorsWriter {
}
private HnswGraphMerger createGraphMerger(
FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) {
FieldInfo fieldInfo,
RandomVectorScorerSupplier scorerSupplier,
TaskExecutor parallelMergeTaskExecutor,
int numParallelMergeWorkers) {
if (mergeExec != null) {
return new ConcurrentHnswMerger(
fieldInfo, scorerSupplier, M, beamWidth, mergeExec, numMergeWorkers);
}
if (parallelMergeTaskExecutor != null) {
return new ConcurrentHnswMerger(
fieldInfo,
scorerSupplier,
M,
beamWidth,
parallelMergeTaskExecutor,
numParallelMergeWorkers);
}
return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth);
}

View File

@ -21,9 +21,12 @@ import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.internal.tests.ConcurrentMergeSchedulerAccess;
import org.apache.lucene.internal.tests.TestSecrets;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
@ -109,6 +112,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
private double forceMergeMBPerSec = Double.POSITIVE_INFINITY;
/** The executor provided for intra-merge parallelization */
protected CachedExecutor intraMergeExecutor;
/** Sole constructor, with all settings set to default values. */
public ConcurrentMergeScheduler() {}
@ -259,6 +265,16 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
assert false : "merge thread " + currentThread + " was not found";
}
@Override
public Executor getIntraMergeExecutor(OneMerge merge) {
assert intraMergeExecutor != null : "scaledExecutor is not initialized";
// don't do multithreaded merges for small merges
if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB * 1024 * 1024) {
return super.getIntraMergeExecutor(merge);
}
return intraMergeExecutor;
}
@Override
public Directory wrapForMerge(OneMerge merge, Directory in) {
Thread mergeThread = Thread.currentThread();
@ -446,7 +462,13 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
@Override
public void close() {
sync();
try {
sync();
} finally {
if (intraMergeExecutor != null) {
intraMergeExecutor.shutdown();
}
}
}
/**
@ -510,6 +532,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
void initialize(InfoStream infoStream, Directory directory) throws IOException {
super.initialize(infoStream, directory);
initDynamicDefaults(directory);
if (intraMergeExecutor == null) {
intraMergeExecutor = new CachedExecutor();
}
}
@Override
@ -755,11 +780,16 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
@Override
public String toString() {
StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": ");
sb.append("maxThreadCount=").append(maxThreadCount).append(", ");
sb.append("maxMergeCount=").append(maxMergeCount).append(", ");
sb.append("ioThrottle=").append(doAutoIOThrottle);
return sb.toString();
return getClass().getSimpleName()
+ ": "
+ "maxThreadCount="
+ maxThreadCount
+ ", "
+ "maxMergeCount="
+ maxMergeCount
+ ", "
+ "ioThrottle="
+ doAutoIOThrottle;
}
private boolean isBacklog(long now, OneMerge merge) {
@ -902,12 +932,58 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
}
static {
TestSecrets.setConcurrentMergeSchedulerAccess(
new ConcurrentMergeSchedulerAccess() {
@Override
public void setSuppressExceptions(ConcurrentMergeScheduler cms) {
cms.setSuppressExceptions();
}
});
TestSecrets.setConcurrentMergeSchedulerAccess(ConcurrentMergeScheduler::setSuppressExceptions);
}
/**
* This executor provides intra-merge threads for parallel execution of merge tasks. It provides a
* limited number of threads to execute merge tasks. In particular, if the number of
* `mergeThreads` is equal to `maxThreadCount`, then the executor will execute the merge task in
* the calling thread.
*/
private class CachedExecutor implements Executor {
private final AtomicInteger activeCount = new AtomicInteger(0);
private final ThreadPoolExecutor executor;
public CachedExecutor() {
this.executor =
new ThreadPoolExecutor(0, 1024, 1L, TimeUnit.MINUTES, new SynchronousQueue<>());
}
void shutdown() {
executor.shutdown();
}
@Override
public void execute(Runnable command) {
final boolean isThreadAvailable;
// we need to check if a thread is available before submitting the task to the executor
// synchronize on CMS to get an accurate count of current threads
synchronized (ConcurrentMergeScheduler.this) {
int max = maxThreadCount - mergeThreads.size() - 1;
int value = activeCount.get();
if (value < max) {
activeCount.incrementAndGet();
assert activeCount.get() > 0 : "active count must be greater than 0 after increment";
isThreadAvailable = true;
} else {
isThreadAvailable = false;
}
}
if (isThreadAvailable) {
executor.execute(
() -> {
try {
command.run();
} finally {
activeCount.decrementAndGet();
assert activeCount.get() >= 0 : "unexpected negative active count";
}
});
} else {
command.run();
}
}
}
}

View File

@ -3430,7 +3430,14 @@ public class IndexWriter
}
SegmentMerger merger =
new SegmentMerger(readers, segInfo, infoStream, trackingDir, globalFieldNumberMap, context);
new SegmentMerger(
readers,
segInfo,
infoStream,
trackingDir,
globalFieldNumberMap,
context,
mergeScheduler.getIntraMergeExecutor(merge));
if (!merger.shouldMerge()) {
return;
@ -5219,7 +5226,13 @@ public class IndexWriter
final SegmentMerger merger =
new SegmentMerger(
mergeReaders, merge.info.info, infoStream, dirWrapper, globalFieldNumberMap, context);
mergeReaders,
merge.info.info,
infoStream,
dirWrapper,
globalFieldNumberMap,
context,
mergeScheduler.getIntraMergeExecutor(merge));
merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get()));
merge.checkAborted();

View File

@ -18,10 +18,12 @@ package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Executor;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SameThreadExecutorService;
/**
* Expert: {@link IndexWriter} uses an instance implementing this interface to execute the merges
@ -32,6 +34,8 @@ import org.apache.lucene.util.InfoStream;
*/
public abstract class MergeScheduler implements Closeable {
private final Executor executor = new SameThreadExecutorService();
/** Sole constructor. (For invocation by subclass constructors, typically implicit.) */
protected MergeScheduler() {}
@ -52,6 +56,15 @@ public abstract class MergeScheduler implements Closeable {
return in;
}
/**
* Provides an executor for parallelism during a single merge operation. By default, the method
* returns a {@link SameThreadExecutorService} where all intra-merge actions occur in their
* calling thread.
*/
public Executor getIntraMergeExecutor(OneMerge merge) {
return executor;
}
/** Close this MergeScheduler. */
@Override
public abstract void close() throws IOException;

View File

@ -21,6 +21,7 @@ import static org.apache.lucene.index.IndexWriter.isCongruentSort;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.FieldsProducer;
@ -84,15 +85,23 @@ public class MergeState {
/** InfoStream for debugging messages. */
public final InfoStream infoStream;
/** Executor for intra merge activity */
public final Executor intraMergeTaskExecutor;
/** Indicates if the index needs to be sorted * */
public boolean needsIndexSort;
/** Sole constructor. */
MergeState(List<CodecReader> readers, SegmentInfo segmentInfo, InfoStream infoStream)
MergeState(
List<CodecReader> readers,
SegmentInfo segmentInfo,
InfoStream infoStream,
Executor intraMergeTaskExecutor)
throws IOException {
verifyIndexSort(readers, segmentInfo);
this.infoStream = infoStream;
int numReaders = readers.size();
this.intraMergeTaskExecutor = intraMergeTaskExecutor;
maxDocs = new int[numReaders];
fieldsProducers = new FieldsProducer[numReaders];

View File

@ -16,6 +16,7 @@
*/
package org.apache.lucene.index;
import java.util.concurrent.Executor;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.Directory;
@ -52,4 +53,9 @@ public final class NoMergeScheduler extends MergeScheduler {
public MergeScheduler clone() {
return this;
}
@Override
public Executor getIntraMergeExecutor(OneMerge merge) {
throw new UnsupportedOperationException("NoMergeScheduler does not support merges");
}
}

View File

@ -17,7 +17,10 @@
package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesConsumer;
@ -28,6 +31,7 @@ import org.apache.lucene.codecs.NormsProducer;
import org.apache.lucene.codecs.PointsWriter;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.TermVectorsWriter;
import org.apache.lucene.search.TaskExecutor;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.InfoStream;
@ -56,13 +60,14 @@ final class SegmentMerger {
InfoStream infoStream,
Directory dir,
FieldInfos.FieldNumbers fieldNumbers,
IOContext context)
IOContext context,
Executor intraMergeTaskExecutor)
throws IOException {
if (context.context != IOContext.Context.MERGE) {
throw new IllegalArgumentException(
"IOContext.context should be MERGE; got: " + context.context);
}
mergeState = new MergeState(readers, segmentInfo, infoStream);
mergeState = new MergeState(readers, segmentInfo, infoStream, intraMergeTaskExecutor);
directory = dir;
this.codec = segmentInfo.getCodec();
this.context = context;
@ -130,19 +135,36 @@ final class SegmentMerger {
IOContext.READ,
segmentWriteState.segmentSuffix);
if (mergeState.mergeFieldInfos.hasNorms()) {
mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
}
TaskExecutor taskExecutor = new TaskExecutor(mergeState.intraMergeTaskExecutor);
List<Callable<Void>> mergingTasks = new ArrayList<>();
mergingTasks.add(
() -> {
if (mergeState.mergeFieldInfos.hasNorms()) {
mergeWithLogging(
this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
}
mergeWithLogging(this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
mergeWithLogging(
this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
return null;
});
if (mergeState.mergeFieldInfos.hasDocValues()) {
mergeWithLogging(
this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
mergingTasks.add(
() -> {
mergeWithLogging(
this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
return null;
});
}
if (mergeState.mergeFieldInfos.hasPointValues()) {
mergeWithLogging(this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged);
mergingTasks.add(
() -> {
mergeWithLogging(
this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged);
return null;
});
}
if (mergeState.mergeFieldInfos.hasVectorValues()) {
@ -155,9 +177,14 @@ final class SegmentMerger {
}
if (mergeState.mergeFieldInfos.hasVectors()) {
mergeWithLogging(this::mergeTermVectors, "term vectors");
mergingTasks.add(
() -> {
mergeWithLogging(this::mergeTermVectors, "term vectors");
return null;
});
}
taskExecutor.invokeAll(mergingTasks);
// write the merged infos
mergeWithLogging(
this::mergeFieldInfos, segmentWriteState, segmentReadState, "field infos", numMerged);

View File

@ -178,9 +178,6 @@ public class TestLucene99HnswQuantizedVectorsFormat extends BaseKnnVectorsFormat
expectThrows(
IllegalArgumentException.class,
() -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 0, 0.8f, null));
expectThrows(
IllegalArgumentException.class,
() -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 100, null, null));
expectThrows(
IllegalArgumentException.class,
() ->

View File

@ -49,8 +49,6 @@ public class TestLucene99HnswVectorsFormat extends BaseKnnVectorsFormatTestCase
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, -1));
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(512 + 1, 20));
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 3201));
expectThrows(
IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 100, 100, null));
expectThrows(
IllegalArgumentException.class,
() -> new Lucene99HnswVectorsFormat(20, 100, 1, new SameThreadExecutorService()));

View File

@ -20,16 +20,20 @@ import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.KnnFloatVectorField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
@ -42,7 +46,10 @@ import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SameThreadExecutorService;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.SuppressForbidden;
import org.apache.lucene.util.Version;
public class TestConcurrentMergeScheduler extends LuceneTestCase {
@ -90,12 +97,22 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
|| (th instanceof IllegalStateException
&& th.getMessage().contains("this writer hit an unrecoverable error"));
}
@Override
// override here to ensure even tiny merges get the parallel executor
public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
assert intraMergeExecutor != null : "intraMergeExecutor is not initialized";
return intraMergeExecutor;
}
});
}
IndexWriter writer = new IndexWriter(directory, iwc);
Document doc = new Document();
Field idField = newStringField("id", "", Field.Store.YES);
KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f});
doc.add(idField);
// Add knn float vectors to test parallel merge
doc.add(knnField);
outer:
for (int i = 0; i < 10; i++) {
@ -105,6 +122,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
for (int j = 0; j < 20; j++) {
idField.setStringValue(Integer.toString(i * 20 + j));
knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()});
writer.addDocument(doc);
}
@ -226,23 +244,35 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
Directory directory = newDirectory();
Document doc = new Document();
Field idField = newStringField("id", "", Field.Store.YES);
KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f});
doc.add(idField);
doc.add(knnField);
IndexWriterConfig iwc =
newIndexWriterConfig(new MockAnalyzer(random()))
// Force excessive merging:
.setMaxBufferedDocs(2)
.setMergePolicy(newLogMergePolicy(100))
.setCommitOnClose(false);
if (iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler) {
iwc.setMergeScheduler(
new ConcurrentMergeScheduler() {
@Override
// override here to ensure even tiny merges get the parallel executor
public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
assert intraMergeExecutor != null : "scaledExecutor is not initialized";
return intraMergeExecutor;
}
});
}
IndexWriter writer =
new IndexWriter(
directory,
newIndexWriterConfig(new MockAnalyzer(random()))
.
// Force excessive merging:
setMaxBufferedDocs(2)
.setMergePolicy(newLogMergePolicy(100))
.setCommitOnClose(false));
IndexWriter writer = new IndexWriter(directory, iwc);
int numIters = TEST_NIGHTLY ? 10 : 3;
for (int iter = 0; iter < numIters; iter++) {
for (int j = 0; j < 201; j++) {
idField.setStringValue(Integer.toString(iter * 201 + j));
knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()});
writer.addDocument(doc);
}
@ -364,6 +394,118 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
dir.close();
}
public void testSmallMergesDonNotGetThreads() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
iwc.setMaxBufferedDocs(2);
iwc.setMergeScheduler(
new ConcurrentMergeScheduler() {
@Override
protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge)
throws IOException {
assertTrue(this.getIntraMergeExecutor(merge) instanceof SameThreadExecutorService);
super.doMerge(mergeSource, merge);
}
});
IndexWriter w = new IndexWriter(dir, iwc);
for (int i = 0; i < 10; i++) {
Document doc = new Document();
doc.add(new StringField("id", "" + i, Field.Store.NO));
w.addDocument(doc);
}
w.forceMerge(1);
w.close();
dir.close();
}
@SuppressForbidden(reason = "Thread sleep")
public void testIntraMergeThreadPoolIsLimitedByMaxThreads() throws IOException {
ConcurrentMergeScheduler mergeScheduler = new ConcurrentMergeScheduler();
MergeScheduler.MergeSource mergeSource =
new MergeScheduler.MergeSource() {
@Override
public MergePolicy.OneMerge getNextMerge() {
fail("should not be called");
return null;
}
@Override
public void onMergeFinished(MergePolicy.OneMerge merge) {
fail("should not be called");
}
@Override
public boolean hasPendingMerges() {
fail("should not be called");
return false;
}
@Override
public void merge(MergePolicy.OneMerge merge) throws IOException {
fail("should not be called");
}
};
try (Directory dir = newDirectory();
mergeScheduler) {
MergePolicy.OneMerge merge =
new MergePolicy.OneMerge(
List.of(
new SegmentCommitInfo(
new SegmentInfo(
dir,
Version.LATEST,
null,
"test",
0,
false,
false,
Codec.getDefault(),
Collections.emptyMap(),
StringHelper.randomId(),
new HashMap<>(),
null),
0,
0,
0,
0,
0,
new byte[16])));
mergeScheduler.initialize(InfoStream.NO_OUTPUT, dir);
mergeScheduler.setMaxMergesAndThreads(6, 6);
Executor executor = mergeScheduler.intraMergeExecutor;
AtomicInteger threadsExecutedOnPool = new AtomicInteger();
AtomicInteger threadsExecutedOnSelf = new AtomicInteger();
for (int i = 0; i < 4; i++) {
mergeScheduler.mergeThreads.add(
mergeScheduler.new MergeThread(mergeSource, merge) {
@Override
@SuppressForbidden(reason = "Thread sleep")
public void run() {
executor.execute(
() -> {
if (Thread.currentThread() == this) {
threadsExecutedOnSelf.incrementAndGet();
} else {
threadsExecutedOnPool.incrementAndGet();
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
});
}
for (ConcurrentMergeScheduler.MergeThread thread : mergeScheduler.mergeThreads) {
thread.start();
}
mergeScheduler.sync();
assertEquals(3, threadsExecutedOnSelf.get());
assertEquals(1, threadsExecutedOnPool.get());
}
}
private static class TrackingCMS extends ConcurrentMergeScheduler {
long totMergedBytes;
CountDownLatch atLeastOneMerge;

View File

@ -45,6 +45,7 @@ import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SameThreadExecutorService;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version;
@ -237,7 +238,8 @@ public class TestDoc extends LuceneTestCase {
InfoStream.getDefault(),
trackingDir,
new FieldInfos.FieldNumbers(null, null),
context);
context,
new SameThreadExecutorService());
merger.merge();
r1.close();

View File

@ -32,6 +32,7 @@ import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SameThreadExecutorService;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version;
import org.apache.lucene.util.packed.PackedLongValues;
@ -105,7 +106,8 @@ public class TestSegmentMerger extends LuceneTestCase {
InfoStream.getDefault(),
mergedDir,
new FieldInfos.FieldNumbers(null, null),
newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))));
newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))),
new SameThreadExecutorService());
MergeState mergeState = merger.merge();
int docsMerged = mergeState.segmentInfo.maxDoc();
assertTrue(docsMerged == 2);