Revert "Add new parallel merge task executor for parallel actions within a si…" (#13189)

This reverts commit e3a34bfe56.
This commit is contained in:
Benjamin Trent 2024-03-18 08:31:42 -04:00 committed by GitHub
parent 9fd0474251
commit 5b48474286
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 50 additions and 357 deletions

View File

@ -208,10 +208,6 @@ Improvements
* GITHUB#13156: Hunspell: don't proceed with other suggestions if we found good REP ones (Peter Gromov)
* GITHUB#13124: MergeScheduler can now provide an executor for intra-merge parallelism. The first
implementation is the ConcurrentMergeScheduler and the Lucene99HnswVectorsFormat will use it if no other
executor is provided. (Ben Trent)
Optimizations
---------------------

View File

@ -24,8 +24,6 @@ import org.apache.lucene.codecs.KnnVectorsFormat;
import org.apache.lucene.codecs.KnnVectorsReader;
import org.apache.lucene.codecs.KnnVectorsWriter;
import org.apache.lucene.codecs.lucene90.IndexedDISI;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.search.DocIdSetIterator;
@ -165,8 +163,7 @@ public final class Lucene99HnswVectorsFormat extends KnnVectorsFormat {
* @param numMergeWorkers number of workers (threads) that will be used when doing merge. If
* larger than 1, a non-null {@link ExecutorService} must be passed as mergeExec
* @param mergeExec the {@link ExecutorService} that will be used by ALL vector writers that are
* generated by this format to do the merge. If null, the configured {@link
* MergeScheduler#getIntraMergeExecutor(MergePolicy.OneMerge)} is used.
* generated by this format to do the merge
*/
public Lucene99HnswVectorsFormat(
int maxConn, int beamWidth, int numMergeWorkers, ExecutorService mergeExec) {
@ -187,6 +184,10 @@ public final class Lucene99HnswVectorsFormat extends KnnVectorsFormat {
}
this.maxConn = maxConn;
this.beamWidth = beamWidth;
if (numMergeWorkers > 1 && mergeExec == null) {
throw new IllegalArgumentException(
"No executor service passed in when " + numMergeWorkers + " merge workers are requested");
}
if (numMergeWorkers == 1 && mergeExec != null) {
throw new IllegalArgumentException(
"No executor service is needed as we'll use single thread to merge");

View File

@ -352,14 +352,7 @@ public final class Lucene99HnswVectorsWriter extends KnnVectorsWriter {
int[][] vectorIndexNodeOffsets = null;
if (scorerSupplier.totalVectorCount() > 0) {
// build graph
HnswGraphMerger merger =
createGraphMerger(
fieldInfo,
scorerSupplier,
mergeState.intraMergeTaskExecutor == null
? null
: new TaskExecutor(mergeState.intraMergeTaskExecutor),
numMergeWorkers);
HnswGraphMerger merger = createGraphMerger(fieldInfo, scorerSupplier);
for (int i = 0; i < mergeState.liveDocs.length; i++) {
merger.addReader(
mergeState.knnVectorsReaders[i], mergeState.docMaps[i], mergeState.liveDocs[i]);
@ -496,23 +489,11 @@ public final class Lucene99HnswVectorsWriter extends KnnVectorsWriter {
}
private HnswGraphMerger createGraphMerger(
FieldInfo fieldInfo,
RandomVectorScorerSupplier scorerSupplier,
TaskExecutor parallelMergeTaskExecutor,
int numParallelMergeWorkers) {
FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) {
if (mergeExec != null) {
return new ConcurrentHnswMerger(
fieldInfo, scorerSupplier, M, beamWidth, mergeExec, numMergeWorkers);
}
if (parallelMergeTaskExecutor != null) {
return new ConcurrentHnswMerger(
fieldInfo,
scorerSupplier,
M,
beamWidth,
parallelMergeTaskExecutor,
numParallelMergeWorkers);
}
return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth);
}

View File

@ -21,12 +21,9 @@ import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.internal.tests.ConcurrentMergeSchedulerAccess;
import org.apache.lucene.internal.tests.TestSecrets;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
@ -112,9 +109,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
private double forceMergeMBPerSec = Double.POSITIVE_INFINITY;
/** The executor provided for intra-merge parallelization */
protected CachedExecutor intraMergeExecutor;
/** Sole constructor, with all settings set to default values. */
public ConcurrentMergeScheduler() {}
@ -265,16 +259,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
assert false : "merge thread " + currentThread + " was not found";
}
@Override
public Executor getIntraMergeExecutor(OneMerge merge) {
assert intraMergeExecutor != null : "scaledExecutor is not initialized";
// don't do multithreaded merges for small merges
if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB * 1024 * 1024) {
return super.getIntraMergeExecutor(merge);
}
return intraMergeExecutor;
}
@Override
public Directory wrapForMerge(OneMerge merge, Directory in) {
Thread mergeThread = Thread.currentThread();
@ -462,13 +446,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
@Override
public void close() {
try {
sync();
} finally {
if (intraMergeExecutor != null) {
intraMergeExecutor.shutdown();
}
}
sync();
}
/**
@ -532,9 +510,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
void initialize(InfoStream infoStream, Directory directory) throws IOException {
super.initialize(infoStream, directory);
initDynamicDefaults(directory);
if (intraMergeExecutor == null) {
intraMergeExecutor = new CachedExecutor();
}
}
@Override
@ -780,16 +755,11 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
@Override
public String toString() {
return getClass().getSimpleName()
+ ": "
+ "maxThreadCount="
+ maxThreadCount
+ ", "
+ "maxMergeCount="
+ maxMergeCount
+ ", "
+ "ioThrottle="
+ doAutoIOThrottle;
StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": ");
sb.append("maxThreadCount=").append(maxThreadCount).append(", ");
sb.append("maxMergeCount=").append(maxMergeCount).append(", ");
sb.append("ioThrottle=").append(doAutoIOThrottle);
return sb.toString();
}
private boolean isBacklog(long now, OneMerge merge) {
@ -932,58 +902,12 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
}
static {
TestSecrets.setConcurrentMergeSchedulerAccess(ConcurrentMergeScheduler::setSuppressExceptions);
}
/**
* This executor provides intra-merge threads for parallel execution of merge tasks. It provides a
* limited number of threads to execute merge tasks. In particular, if the number of
* `mergeThreads` is equal to `maxThreadCount`, then the executor will execute the merge task in
* the calling thread.
*/
private class CachedExecutor implements Executor {
private final AtomicInteger activeCount = new AtomicInteger(0);
private final ThreadPoolExecutor executor;
public CachedExecutor() {
this.executor =
new ThreadPoolExecutor(0, 1024, 1L, TimeUnit.MINUTES, new SynchronousQueue<>());
}
void shutdown() {
executor.shutdown();
}
@Override
public void execute(Runnable command) {
final boolean isThreadAvailable;
// we need to check if a thread is available before submitting the task to the executor
// synchronize on CMS to get an accurate count of current threads
synchronized (ConcurrentMergeScheduler.this) {
int max = maxThreadCount - mergeThreads.size() - 1;
int value = activeCount.get();
if (value < max) {
activeCount.incrementAndGet();
assert activeCount.get() > 0 : "active count must be greater than 0 after increment";
isThreadAvailable = true;
} else {
isThreadAvailable = false;
}
}
if (isThreadAvailable) {
executor.execute(
() -> {
try {
command.run();
} finally {
activeCount.decrementAndGet();
assert activeCount.get() >= 0 : "unexpected negative active count";
}
});
} else {
command.run();
}
}
TestSecrets.setConcurrentMergeSchedulerAccess(
new ConcurrentMergeSchedulerAccess() {
@Override
public void setSuppressExceptions(ConcurrentMergeScheduler cms) {
cms.setSuppressExceptions();
}
});
}
}

View File

@ -3430,14 +3430,7 @@ public class IndexWriter
}
SegmentMerger merger =
new SegmentMerger(
readers,
segInfo,
infoStream,
trackingDir,
globalFieldNumberMap,
context,
mergeScheduler.getIntraMergeExecutor(merge));
new SegmentMerger(readers, segInfo, infoStream, trackingDir, globalFieldNumberMap, context);
if (!merger.shouldMerge()) {
return;
@ -5226,13 +5219,7 @@ public class IndexWriter
final SegmentMerger merger =
new SegmentMerger(
mergeReaders,
merge.info.info,
infoStream,
dirWrapper,
globalFieldNumberMap,
context,
mergeScheduler.getIntraMergeExecutor(merge));
mergeReaders, merge.info.info, infoStream, dirWrapper, globalFieldNumberMap, context);
merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get()));
merge.checkAborted();

View File

@ -18,12 +18,10 @@ package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Executor;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SameThreadExecutorService;
/**
* Expert: {@link IndexWriter} uses an instance implementing this interface to execute the merges
@ -34,8 +32,6 @@ import org.apache.lucene.util.SameThreadExecutorService;
*/
public abstract class MergeScheduler implements Closeable {
private final Executor executor = new SameThreadExecutorService();
/** Sole constructor. (For invocation by subclass constructors, typically implicit.) */
protected MergeScheduler() {}
@ -56,15 +52,6 @@ public abstract class MergeScheduler implements Closeable {
return in;
}
/**
* Provides an executor for parallelism during a single merge operation. By default, the method
* returns a {@link SameThreadExecutorService} where all intra-merge actions occur in their
* calling thread.
*/
public Executor getIntraMergeExecutor(OneMerge merge) {
return executor;
}
/** Close this MergeScheduler. */
@Override
public abstract void close() throws IOException;

View File

@ -21,7 +21,6 @@ import static org.apache.lucene.index.IndexWriter.isCongruentSort;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.FieldsProducer;
@ -85,23 +84,15 @@ public class MergeState {
/** InfoStream for debugging messages. */
public final InfoStream infoStream;
/** Executor for intra merge activity */
public final Executor intraMergeTaskExecutor;
/** Indicates if the index needs to be sorted * */
public boolean needsIndexSort;
/** Sole constructor. */
MergeState(
List<CodecReader> readers,
SegmentInfo segmentInfo,
InfoStream infoStream,
Executor intraMergeTaskExecutor)
MergeState(List<CodecReader> readers, SegmentInfo segmentInfo, InfoStream infoStream)
throws IOException {
verifyIndexSort(readers, segmentInfo);
this.infoStream = infoStream;
int numReaders = readers.size();
this.intraMergeTaskExecutor = intraMergeTaskExecutor;
maxDocs = new int[numReaders];
fieldsProducers = new FieldsProducer[numReaders];

View File

@ -16,7 +16,6 @@
*/
package org.apache.lucene.index;
import java.util.concurrent.Executor;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.Directory;
@ -53,9 +52,4 @@ public final class NoMergeScheduler extends MergeScheduler {
public MergeScheduler clone() {
return this;
}
@Override
public Executor getIntraMergeExecutor(OneMerge merge) {
throw new UnsupportedOperationException("NoMergeScheduler does not support merges");
}
}

View File

@ -17,10 +17,7 @@
package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesConsumer;
@ -31,7 +28,6 @@ import org.apache.lucene.codecs.NormsProducer;
import org.apache.lucene.codecs.PointsWriter;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.TermVectorsWriter;
import org.apache.lucene.search.TaskExecutor;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.InfoStream;
@ -60,14 +56,13 @@ final class SegmentMerger {
InfoStream infoStream,
Directory dir,
FieldInfos.FieldNumbers fieldNumbers,
IOContext context,
Executor intraMergeTaskExecutor)
IOContext context)
throws IOException {
if (context.context != IOContext.Context.MERGE) {
throw new IllegalArgumentException(
"IOContext.context should be MERGE; got: " + context.context);
}
mergeState = new MergeState(readers, segmentInfo, infoStream, intraMergeTaskExecutor);
mergeState = new MergeState(readers, segmentInfo, infoStream);
directory = dir;
this.codec = segmentInfo.getCodec();
this.context = context;
@ -135,36 +130,19 @@ final class SegmentMerger {
IOContext.READ,
segmentWriteState.segmentSuffix);
TaskExecutor taskExecutor = new TaskExecutor(mergeState.intraMergeTaskExecutor);
List<Callable<Void>> mergingTasks = new ArrayList<>();
mergingTasks.add(
() -> {
if (mergeState.mergeFieldInfos.hasNorms()) {
mergeWithLogging(
this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
}
if (mergeState.mergeFieldInfos.hasNorms()) {
mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
}
mergeWithLogging(
this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
return null;
});
mergeWithLogging(this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
if (mergeState.mergeFieldInfos.hasDocValues()) {
mergingTasks.add(
() -> {
mergeWithLogging(
this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
return null;
});
mergeWithLogging(
this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
}
if (mergeState.mergeFieldInfos.hasPointValues()) {
mergingTasks.add(
() -> {
mergeWithLogging(
this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged);
return null;
});
mergeWithLogging(this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged);
}
if (mergeState.mergeFieldInfos.hasVectorValues()) {
@ -177,14 +155,9 @@ final class SegmentMerger {
}
if (mergeState.mergeFieldInfos.hasVectors()) {
mergingTasks.add(
() -> {
mergeWithLogging(this::mergeTermVectors, "term vectors");
return null;
});
mergeWithLogging(this::mergeTermVectors, "term vectors");
}
taskExecutor.invokeAll(mergingTasks);
// write the merged infos
mergeWithLogging(
this::mergeFieldInfos, segmentWriteState, segmentReadState, "field infos", numMerged);

View File

@ -178,6 +178,9 @@ public class TestLucene99HnswQuantizedVectorsFormat extends BaseKnnVectorsFormat
expectThrows(
IllegalArgumentException.class,
() -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 0, 0.8f, null));
expectThrows(
IllegalArgumentException.class,
() -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 100, null, null));
expectThrows(
IllegalArgumentException.class,
() ->

View File

@ -49,6 +49,8 @@ public class TestLucene99HnswVectorsFormat extends BaseKnnVectorsFormatTestCase
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, -1));
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(512 + 1, 20));
expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 3201));
expectThrows(
IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 100, 100, null));
expectThrows(
IllegalArgumentException.class,
() -> new Lucene99HnswVectorsFormat(20, 100, 1, new SameThreadExecutorService()));

View File

@ -20,20 +20,16 @@ import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.KnnFloatVectorField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
@ -46,10 +42,7 @@ import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SameThreadExecutorService;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.SuppressForbidden;
import org.apache.lucene.util.Version;
public class TestConcurrentMergeScheduler extends LuceneTestCase {
@ -97,22 +90,12 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
|| (th instanceof IllegalStateException
&& th.getMessage().contains("this writer hit an unrecoverable error"));
}
@Override
// override here to ensure even tiny merges get the parallel executor
public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
assert intraMergeExecutor != null : "intraMergeExecutor is not initialized";
return intraMergeExecutor;
}
});
}
IndexWriter writer = new IndexWriter(directory, iwc);
Document doc = new Document();
Field idField = newStringField("id", "", Field.Store.YES);
KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f});
doc.add(idField);
// Add knn float vectors to test parallel merge
doc.add(knnField);
outer:
for (int i = 0; i < 10; i++) {
@ -122,7 +105,6 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
for (int j = 0; j < 20; j++) {
idField.setStringValue(Integer.toString(i * 20 + j));
knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()});
writer.addDocument(doc);
}
@ -244,35 +226,23 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
Directory directory = newDirectory();
Document doc = new Document();
Field idField = newStringField("id", "", Field.Store.YES);
KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f});
doc.add(idField);
doc.add(knnField);
IndexWriterConfig iwc =
newIndexWriterConfig(new MockAnalyzer(random()))
// Force excessive merging:
.setMaxBufferedDocs(2)
.setMergePolicy(newLogMergePolicy(100))
.setCommitOnClose(false);
if (iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler) {
iwc.setMergeScheduler(
new ConcurrentMergeScheduler() {
@Override
// override here to ensure even tiny merges get the parallel executor
public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
assert intraMergeExecutor != null : "scaledExecutor is not initialized";
return intraMergeExecutor;
}
});
}
IndexWriter writer = new IndexWriter(directory, iwc);
IndexWriter writer =
new IndexWriter(
directory,
newIndexWriterConfig(new MockAnalyzer(random()))
.
// Force excessive merging:
setMaxBufferedDocs(2)
.setMergePolicy(newLogMergePolicy(100))
.setCommitOnClose(false));
int numIters = TEST_NIGHTLY ? 10 : 3;
for (int iter = 0; iter < numIters; iter++) {
for (int j = 0; j < 201; j++) {
idField.setStringValue(Integer.toString(iter * 201 + j));
knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()});
writer.addDocument(doc);
}
@ -394,118 +364,6 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
dir.close();
}
public void testSmallMergesDonNotGetThreads() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
iwc.setMaxBufferedDocs(2);
iwc.setMergeScheduler(
new ConcurrentMergeScheduler() {
@Override
protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge)
throws IOException {
assertTrue(this.getIntraMergeExecutor(merge) instanceof SameThreadExecutorService);
super.doMerge(mergeSource, merge);
}
});
IndexWriter w = new IndexWriter(dir, iwc);
for (int i = 0; i < 10; i++) {
Document doc = new Document();
doc.add(new StringField("id", "" + i, Field.Store.NO));
w.addDocument(doc);
}
w.forceMerge(1);
w.close();
dir.close();
}
@SuppressForbidden(reason = "Thread sleep")
public void testIntraMergeThreadPoolIsLimitedByMaxThreads() throws IOException {
ConcurrentMergeScheduler mergeScheduler = new ConcurrentMergeScheduler();
MergeScheduler.MergeSource mergeSource =
new MergeScheduler.MergeSource() {
@Override
public MergePolicy.OneMerge getNextMerge() {
fail("should not be called");
return null;
}
@Override
public void onMergeFinished(MergePolicy.OneMerge merge) {
fail("should not be called");
}
@Override
public boolean hasPendingMerges() {
fail("should not be called");
return false;
}
@Override
public void merge(MergePolicy.OneMerge merge) throws IOException {
fail("should not be called");
}
};
try (Directory dir = newDirectory();
mergeScheduler) {
MergePolicy.OneMerge merge =
new MergePolicy.OneMerge(
List.of(
new SegmentCommitInfo(
new SegmentInfo(
dir,
Version.LATEST,
null,
"test",
0,
false,
false,
Codec.getDefault(),
Collections.emptyMap(),
StringHelper.randomId(),
new HashMap<>(),
null),
0,
0,
0,
0,
0,
new byte[16])));
mergeScheduler.initialize(InfoStream.NO_OUTPUT, dir);
mergeScheduler.setMaxMergesAndThreads(6, 6);
Executor executor = mergeScheduler.intraMergeExecutor;
AtomicInteger threadsExecutedOnPool = new AtomicInteger();
AtomicInteger threadsExecutedOnSelf = new AtomicInteger();
for (int i = 0; i < 4; i++) {
mergeScheduler.mergeThreads.add(
mergeScheduler.new MergeThread(mergeSource, merge) {
@Override
@SuppressForbidden(reason = "Thread sleep")
public void run() {
executor.execute(
() -> {
if (Thread.currentThread() == this) {
threadsExecutedOnSelf.incrementAndGet();
} else {
threadsExecutedOnPool.incrementAndGet();
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
});
}
for (ConcurrentMergeScheduler.MergeThread thread : mergeScheduler.mergeThreads) {
thread.start();
}
mergeScheduler.sync();
assertEquals(3, threadsExecutedOnSelf.get());
assertEquals(1, threadsExecutedOnPool.get());
}
}
private static class TrackingCMS extends ConcurrentMergeScheduler {
long totMergedBytes;
CountDownLatch atLeastOneMerge;

View File

@ -45,7 +45,6 @@ import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SameThreadExecutorService;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version;
@ -238,8 +237,7 @@ public class TestDoc extends LuceneTestCase {
InfoStream.getDefault(),
trackingDir,
new FieldInfos.FieldNumbers(null, null),
context,
new SameThreadExecutorService());
context);
merger.merge();
r1.close();

View File

@ -32,7 +32,6 @@ import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SameThreadExecutorService;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version;
import org.apache.lucene.util.packed.PackedLongValues;
@ -106,8 +105,7 @@ public class TestSegmentMerger extends LuceneTestCase {
InfoStream.getDefault(),
mergedDir,
new FieldInfos.FieldNumbers(null, null),
newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))),
new SameThreadExecutorService());
newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))));
MergeState mergeState = merger.merge();
int docsMerged = mergeState.segmentInfo.maxDoc();
assertTrue(docsMerged == 2);