From e1d24589f29752d04609b76301831aef9bb8ccf0 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 18 Mar 2024 08:15:55 -0400 Subject: [PATCH] =?UTF-8?q?Revert=20"Add=20new=20parallel=20merge=20task?= =?UTF-8?q?=20executor=20for=20parallel=20actions=20within=20a=20si?= =?UTF-8?q?=E2=80=A6"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit e3a34bfe56ae555fdf36f7a43c2f0d2698101c89. --- lucene/CHANGES.txt | 4 - .../lucene99/Lucene99HnswVectorsFormat.java | 9 +- .../lucene99/Lucene99HnswVectorsWriter.java | 23 +-- .../index/ConcurrentMergeScheduler.java | 104 ++---------- .../org/apache/lucene/index/IndexWriter.java | 17 +- .../apache/lucene/index/MergeScheduler.java | 13 -- .../org/apache/lucene/index/MergeState.java | 11 +- .../apache/lucene/index/NoMergeScheduler.java | 6 - .../apache/lucene/index/SegmentMerger.java | 47 ++--- ...estLucene99HnswQuantizedVectorsFormat.java | 3 + .../TestLucene99HnswVectorsFormat.java | 2 + .../index/TestConcurrentMergeScheduler.java | 160 +----------------- .../test/org/apache/lucene/index/TestDoc.java | 4 +- .../lucene/index/TestSegmentMerger.java | 4 +- 14 files changed, 50 insertions(+), 357 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index c11c77528f3..2d98bc55d77 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -208,10 +208,6 @@ Improvements * GITHUB#13156: Hunspell: don't proceed with other suggestions if we found good REP ones (Peter Gromov) -* GITHUB#13124: MergeScheduler can now provide an executor for intra-merge parallelism. The first - implementation is the ConcurrentMergeScheduler and the Lucene99HnswVectorsFormat will use it if no other - executor is provided. (Ben Trent) - Optimizations --------------------- diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsFormat.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsFormat.java index bcea4017086..582155d9b76 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsFormat.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsFormat.java @@ -24,8 +24,6 @@ import org.apache.lucene.codecs.KnnVectorsFormat; import org.apache.lucene.codecs.KnnVectorsReader; import org.apache.lucene.codecs.KnnVectorsWriter; import org.apache.lucene.codecs.lucene90.IndexedDISI; -import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.MergeScheduler; import org.apache.lucene.index.SegmentReadState; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.search.DocIdSetIterator; @@ -165,8 +163,7 @@ public final class Lucene99HnswVectorsFormat extends KnnVectorsFormat { * @param numMergeWorkers number of workers (threads) that will be used when doing merge. If * larger than 1, a non-null {@link ExecutorService} must be passed as mergeExec * @param mergeExec the {@link ExecutorService} that will be used by ALL vector writers that are - * generated by this format to do the merge. If null, the configured {@link - * MergeScheduler#getIntraMergeExecutor(MergePolicy.OneMerge)} is used. + * generated by this format to do the merge */ public Lucene99HnswVectorsFormat( int maxConn, int beamWidth, int numMergeWorkers, ExecutorService mergeExec) { @@ -187,6 +184,10 @@ public final class Lucene99HnswVectorsFormat extends KnnVectorsFormat { } this.maxConn = maxConn; this.beamWidth = beamWidth; + if (numMergeWorkers > 1 && mergeExec == null) { + throw new IllegalArgumentException( + "No executor service passed in when " + numMergeWorkers + " merge workers are requested"); + } if (numMergeWorkers == 1 && mergeExec != null) { throw new IllegalArgumentException( "No executor service is needed as we'll use single thread to merge"); diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsWriter.java index 1bd2c53b6d6..a236dd7c65b 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsWriter.java @@ -352,14 +352,7 @@ public final class Lucene99HnswVectorsWriter extends KnnVectorsWriter { int[][] vectorIndexNodeOffsets = null; if (scorerSupplier.totalVectorCount() > 0) { // build graph - HnswGraphMerger merger = - createGraphMerger( - fieldInfo, - scorerSupplier, - mergeState.intraMergeTaskExecutor == null - ? null - : new TaskExecutor(mergeState.intraMergeTaskExecutor), - numMergeWorkers); + HnswGraphMerger merger = createGraphMerger(fieldInfo, scorerSupplier); for (int i = 0; i < mergeState.liveDocs.length; i++) { merger.addReader( mergeState.knnVectorsReaders[i], mergeState.docMaps[i], mergeState.liveDocs[i]); @@ -496,23 +489,11 @@ public final class Lucene99HnswVectorsWriter extends KnnVectorsWriter { } private HnswGraphMerger createGraphMerger( - FieldInfo fieldInfo, - RandomVectorScorerSupplier scorerSupplier, - TaskExecutor parallelMergeTaskExecutor, - int numParallelMergeWorkers) { + FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) { if (mergeExec != null) { return new ConcurrentHnswMerger( fieldInfo, scorerSupplier, M, beamWidth, mergeExec, numMergeWorkers); } - if (parallelMergeTaskExecutor != null) { - return new ConcurrentHnswMerger( - fieldInfo, - scorerSupplier, - M, - beamWidth, - parallelMergeTaskExecutor, - numParallelMergeWorkers); - } return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth); } diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java index 25b489a744f..8ffbbd759a4 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java @@ -21,12 +21,9 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.List; import java.util.Locale; -import java.util.concurrent.Executor; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.index.MergePolicy.OneMerge; +import org.apache.lucene.internal.tests.ConcurrentMergeSchedulerAccess; import org.apache.lucene.internal.tests.TestSecrets; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; @@ -112,9 +109,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler { private double forceMergeMBPerSec = Double.POSITIVE_INFINITY; - /** The executor provided for intra-merge parallelization */ - protected CachedExecutor intraMergeExecutor; - /** Sole constructor, with all settings set to default values. */ public ConcurrentMergeScheduler() {} @@ -265,16 +259,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler { assert false : "merge thread " + currentThread + " was not found"; } - @Override - public Executor getIntraMergeExecutor(OneMerge merge) { - assert intraMergeExecutor != null : "scaledExecutor is not initialized"; - // don't do multithreaded merges for small merges - if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB * 1024 * 1024) { - return super.getIntraMergeExecutor(merge); - } - return intraMergeExecutor; - } - @Override public Directory wrapForMerge(OneMerge merge, Directory in) { Thread mergeThread = Thread.currentThread(); @@ -462,13 +446,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler { @Override public void close() { - try { - sync(); - } finally { - if (intraMergeExecutor != null) { - intraMergeExecutor.shutdown(); - } - } + sync(); } /** @@ -532,9 +510,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler { void initialize(InfoStream infoStream, Directory directory) throws IOException { super.initialize(infoStream, directory); initDynamicDefaults(directory); - if (intraMergeExecutor == null) { - intraMergeExecutor = new CachedExecutor(); - } } @Override @@ -780,16 +755,11 @@ public class ConcurrentMergeScheduler extends MergeScheduler { @Override public String toString() { - return getClass().getSimpleName() - + ": " - + "maxThreadCount=" - + maxThreadCount - + ", " - + "maxMergeCount=" - + maxMergeCount - + ", " - + "ioThrottle=" - + doAutoIOThrottle; + StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": "); + sb.append("maxThreadCount=").append(maxThreadCount).append(", "); + sb.append("maxMergeCount=").append(maxMergeCount).append(", "); + sb.append("ioThrottle=").append(doAutoIOThrottle); + return sb.toString(); } private boolean isBacklog(long now, OneMerge merge) { @@ -932,58 +902,12 @@ public class ConcurrentMergeScheduler extends MergeScheduler { } static { - TestSecrets.setConcurrentMergeSchedulerAccess(ConcurrentMergeScheduler::setSuppressExceptions); - } - - /** - * This executor provides intra-merge threads for parallel execution of merge tasks. It provides a - * limited number of threads to execute merge tasks. In particular, if the number of - * `mergeThreads` is equal to `maxThreadCount`, then the executor will execute the merge task in - * the calling thread. - */ - private class CachedExecutor implements Executor { - - private final AtomicInteger activeCount = new AtomicInteger(0); - private final ThreadPoolExecutor executor; - - public CachedExecutor() { - this.executor = - new ThreadPoolExecutor(0, 1024, 1L, TimeUnit.MINUTES, new SynchronousQueue<>()); - } - - void shutdown() { - executor.shutdown(); - } - - @Override - public void execute(Runnable command) { - final boolean isThreadAvailable; - // we need to check if a thread is available before submitting the task to the executor - // synchronize on CMS to get an accurate count of current threads - synchronized (ConcurrentMergeScheduler.this) { - int max = maxThreadCount - mergeThreads.size() - 1; - int value = activeCount.get(); - if (value < max) { - activeCount.incrementAndGet(); - assert activeCount.get() > 0 : "active count must be greater than 0 after increment"; - isThreadAvailable = true; - } else { - isThreadAvailable = false; - } - } - if (isThreadAvailable) { - executor.execute( - () -> { - try { - command.run(); - } finally { - activeCount.decrementAndGet(); - assert activeCount.get() >= 0 : "unexpected negative active count"; - } - }); - } else { - command.run(); - } - } + TestSecrets.setConcurrentMergeSchedulerAccess( + new ConcurrentMergeSchedulerAccess() { + @Override + public void setSuppressExceptions(ConcurrentMergeScheduler cms) { + cms.setSuppressExceptions(); + } + }); } } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 6797a74e21d..c81abc3f1b0 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -3430,14 +3430,7 @@ public class IndexWriter } SegmentMerger merger = - new SegmentMerger( - readers, - segInfo, - infoStream, - trackingDir, - globalFieldNumberMap, - context, - mergeScheduler.getIntraMergeExecutor(merge)); + new SegmentMerger(readers, segInfo, infoStream, trackingDir, globalFieldNumberMap, context); if (!merger.shouldMerge()) { return; @@ -5226,13 +5219,7 @@ public class IndexWriter final SegmentMerger merger = new SegmentMerger( - mergeReaders, - merge.info.info, - infoStream, - dirWrapper, - globalFieldNumberMap, - context, - mergeScheduler.getIntraMergeExecutor(merge)); + mergeReaders, merge.info.info, infoStream, dirWrapper, globalFieldNumberMap, context); merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get())); merge.checkAborted(); diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java index b6cfe73a472..101720488a9 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java @@ -18,12 +18,10 @@ package org.apache.lucene.index; import java.io.Closeable; import java.io.IOException; -import java.util.concurrent.Executor; import org.apache.lucene.index.MergePolicy.OneMerge; import org.apache.lucene.store.Directory; import org.apache.lucene.store.RateLimitedIndexOutput; import org.apache.lucene.util.InfoStream; -import org.apache.lucene.util.SameThreadExecutorService; /** * Expert: {@link IndexWriter} uses an instance implementing this interface to execute the merges @@ -34,8 +32,6 @@ import org.apache.lucene.util.SameThreadExecutorService; */ public abstract class MergeScheduler implements Closeable { - private final Executor executor = new SameThreadExecutorService(); - /** Sole constructor. (For invocation by subclass constructors, typically implicit.) */ protected MergeScheduler() {} @@ -56,15 +52,6 @@ public abstract class MergeScheduler implements Closeable { return in; } - /** - * Provides an executor for parallelism during a single merge operation. By default, the method - * returns a {@link SameThreadExecutorService} where all intra-merge actions occur in their - * calling thread. - */ - public Executor getIntraMergeExecutor(OneMerge merge) { - return executor; - } - /** Close this MergeScheduler. */ @Override public abstract void close() throws IOException; diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeState.java b/lucene/core/src/java/org/apache/lucene/index/MergeState.java index de3c8d8e416..6153a57693b 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergeState.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergeState.java @@ -21,7 +21,6 @@ import static org.apache.lucene.index.IndexWriter.isCongruentSort; import java.io.IOException; import java.util.List; import java.util.Locale; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.FieldsProducer; @@ -85,23 +84,15 @@ public class MergeState { /** InfoStream for debugging messages. */ public final InfoStream infoStream; - /** Executor for intra merge activity */ - public final Executor intraMergeTaskExecutor; - /** Indicates if the index needs to be sorted * */ public boolean needsIndexSort; /** Sole constructor. */ - MergeState( - List readers, - SegmentInfo segmentInfo, - InfoStream infoStream, - Executor intraMergeTaskExecutor) + MergeState(List readers, SegmentInfo segmentInfo, InfoStream infoStream) throws IOException { verifyIndexSort(readers, segmentInfo); this.infoStream = infoStream; int numReaders = readers.size(); - this.intraMergeTaskExecutor = intraMergeTaskExecutor; maxDocs = new int[numReaders]; fieldsProducers = new FieldsProducer[numReaders]; diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java index fd6d2a8e81b..0142288d383 100644 --- a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java @@ -16,7 +16,6 @@ */ package org.apache.lucene.index; -import java.util.concurrent.Executor; import org.apache.lucene.index.MergePolicy.OneMerge; import org.apache.lucene.store.Directory; @@ -53,9 +52,4 @@ public final class NoMergeScheduler extends MergeScheduler { public MergeScheduler clone() { return this; } - - @Override - public Executor getIntraMergeExecutor(OneMerge merge) { - throw new UnsupportedOperationException("NoMergeScheduler does not support merges"); - } } diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java index 4589f7ec53e..7fc2f046a65 100644 --- a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java +++ b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java @@ -17,10 +17,7 @@ package org.apache.lucene.index; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.DocValuesConsumer; @@ -31,7 +28,6 @@ import org.apache.lucene.codecs.NormsProducer; import org.apache.lucene.codecs.PointsWriter; import org.apache.lucene.codecs.StoredFieldsWriter; import org.apache.lucene.codecs.TermVectorsWriter; -import org.apache.lucene.search.TaskExecutor; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.InfoStream; @@ -60,14 +56,13 @@ final class SegmentMerger { InfoStream infoStream, Directory dir, FieldInfos.FieldNumbers fieldNumbers, - IOContext context, - Executor intraMergeTaskExecutor) + IOContext context) throws IOException { if (context.context != IOContext.Context.MERGE) { throw new IllegalArgumentException( "IOContext.context should be MERGE; got: " + context.context); } - mergeState = new MergeState(readers, segmentInfo, infoStream, intraMergeTaskExecutor); + mergeState = new MergeState(readers, segmentInfo, infoStream); directory = dir; this.codec = segmentInfo.getCodec(); this.context = context; @@ -135,36 +130,19 @@ final class SegmentMerger { IOContext.READ, segmentWriteState.segmentSuffix); - TaskExecutor taskExecutor = new TaskExecutor(mergeState.intraMergeTaskExecutor); - List> mergingTasks = new ArrayList<>(); - mergingTasks.add( - () -> { - if (mergeState.mergeFieldInfos.hasNorms()) { - mergeWithLogging( - this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged); - } + if (mergeState.mergeFieldInfos.hasNorms()) { + mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged); + } - mergeWithLogging( - this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged); - return null; - }); + mergeWithLogging(this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged); if (mergeState.mergeFieldInfos.hasDocValues()) { - mergingTasks.add( - () -> { - mergeWithLogging( - this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged); - return null; - }); + mergeWithLogging( + this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged); } if (mergeState.mergeFieldInfos.hasPointValues()) { - mergingTasks.add( - () -> { - mergeWithLogging( - this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged); - return null; - }); + mergeWithLogging(this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged); } if (mergeState.mergeFieldInfos.hasVectorValues()) { @@ -177,14 +155,9 @@ final class SegmentMerger { } if (mergeState.mergeFieldInfos.hasVectors()) { - mergingTasks.add( - () -> { - mergeWithLogging(this::mergeTermVectors, "term vectors"); - return null; - }); + mergeWithLogging(this::mergeTermVectors, "term vectors"); } - taskExecutor.invokeAll(mergingTasks); // write the merged infos mergeWithLogging( this::mergeFieldInfos, segmentWriteState, segmentReadState, "field infos", numMerged); diff --git a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswQuantizedVectorsFormat.java b/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswQuantizedVectorsFormat.java index 66ae7bc68d1..382389bc8f3 100644 --- a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswQuantizedVectorsFormat.java +++ b/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswQuantizedVectorsFormat.java @@ -178,6 +178,9 @@ public class TestLucene99HnswQuantizedVectorsFormat extends BaseKnnVectorsFormat expectThrows( IllegalArgumentException.class, () -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 0, 0.8f, null)); + expectThrows( + IllegalArgumentException.class, + () -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 100, null, null)); expectThrows( IllegalArgumentException.class, () -> diff --git a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswVectorsFormat.java b/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswVectorsFormat.java index 493b2cd5b92..c444802679a 100644 --- a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswVectorsFormat.java +++ b/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswVectorsFormat.java @@ -49,6 +49,8 @@ public class TestLucene99HnswVectorsFormat extends BaseKnnVectorsFormatTestCase expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, -1)); expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(512 + 1, 20)); expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 3201)); + expectThrows( + IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 100, 100, null)); expectThrows( IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 100, 1, new SameThreadExecutorService())); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java index 08b9bf9ac1b..17ea6e8c146 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java @@ -20,20 +20,16 @@ import com.carrotsearch.randomizedtesting.generators.RandomStrings; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; -import org.apache.lucene.document.KnnFloatVectorField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriterConfig.OpenMode; @@ -46,10 +42,7 @@ import org.apache.lucene.tests.store.MockDirectoryWrapper; import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.InfoStream; -import org.apache.lucene.util.SameThreadExecutorService; -import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.SuppressForbidden; -import org.apache.lucene.util.Version; public class TestConcurrentMergeScheduler extends LuceneTestCase { @@ -97,22 +90,12 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { || (th instanceof IllegalStateException && th.getMessage().contains("this writer hit an unrecoverable error")); } - - @Override - // override here to ensure even tiny merges get the parallel executor - public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) { - assert intraMergeExecutor != null : "intraMergeExecutor is not initialized"; - return intraMergeExecutor; - } }); } IndexWriter writer = new IndexWriter(directory, iwc); Document doc = new Document(); Field idField = newStringField("id", "", Field.Store.YES); - KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f}); doc.add(idField); - // Add knn float vectors to test parallel merge - doc.add(knnField); outer: for (int i = 0; i < 10; i++) { @@ -122,7 +105,6 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { for (int j = 0; j < 20; j++) { idField.setStringValue(Integer.toString(i * 20 + j)); - knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()}); writer.addDocument(doc); } @@ -244,35 +226,23 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { Directory directory = newDirectory(); Document doc = new Document(); Field idField = newStringField("id", "", Field.Store.YES); - KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f}); doc.add(idField); - doc.add(knnField); - IndexWriterConfig iwc = - newIndexWriterConfig(new MockAnalyzer(random())) - // Force excessive merging: - .setMaxBufferedDocs(2) - .setMergePolicy(newLogMergePolicy(100)) - .setCommitOnClose(false); - if (iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler) { - iwc.setMergeScheduler( - new ConcurrentMergeScheduler() { - @Override - // override here to ensure even tiny merges get the parallel executor - public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) { - assert intraMergeExecutor != null : "scaledExecutor is not initialized"; - return intraMergeExecutor; - } - }); - } - IndexWriter writer = new IndexWriter(directory, iwc); + IndexWriter writer = + new IndexWriter( + directory, + newIndexWriterConfig(new MockAnalyzer(random())) + . + // Force excessive merging: + setMaxBufferedDocs(2) + .setMergePolicy(newLogMergePolicy(100)) + .setCommitOnClose(false)); int numIters = TEST_NIGHTLY ? 10 : 3; for (int iter = 0; iter < numIters; iter++) { for (int j = 0; j < 201; j++) { idField.setStringValue(Integer.toString(iter * 201 + j)); - knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()}); writer.addDocument(doc); } @@ -394,118 +364,6 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase { dir.close(); } - public void testSmallMergesDonNotGetThreads() throws IOException { - Directory dir = newDirectory(); - IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())); - iwc.setMaxBufferedDocs(2); - iwc.setMergeScheduler( - new ConcurrentMergeScheduler() { - @Override - protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) - throws IOException { - assertTrue(this.getIntraMergeExecutor(merge) instanceof SameThreadExecutorService); - super.doMerge(mergeSource, merge); - } - }); - IndexWriter w = new IndexWriter(dir, iwc); - for (int i = 0; i < 10; i++) { - Document doc = new Document(); - doc.add(new StringField("id", "" + i, Field.Store.NO)); - w.addDocument(doc); - } - w.forceMerge(1); - w.close(); - dir.close(); - } - - @SuppressForbidden(reason = "Thread sleep") - public void testIntraMergeThreadPoolIsLimitedByMaxThreads() throws IOException { - ConcurrentMergeScheduler mergeScheduler = new ConcurrentMergeScheduler(); - MergeScheduler.MergeSource mergeSource = - new MergeScheduler.MergeSource() { - @Override - public MergePolicy.OneMerge getNextMerge() { - fail("should not be called"); - return null; - } - - @Override - public void onMergeFinished(MergePolicy.OneMerge merge) { - fail("should not be called"); - } - - @Override - public boolean hasPendingMerges() { - fail("should not be called"); - return false; - } - - @Override - public void merge(MergePolicy.OneMerge merge) throws IOException { - fail("should not be called"); - } - }; - try (Directory dir = newDirectory(); - mergeScheduler) { - MergePolicy.OneMerge merge = - new MergePolicy.OneMerge( - List.of( - new SegmentCommitInfo( - new SegmentInfo( - dir, - Version.LATEST, - null, - "test", - 0, - false, - false, - Codec.getDefault(), - Collections.emptyMap(), - StringHelper.randomId(), - new HashMap<>(), - null), - 0, - 0, - 0, - 0, - 0, - new byte[16]))); - mergeScheduler.initialize(InfoStream.NO_OUTPUT, dir); - mergeScheduler.setMaxMergesAndThreads(6, 6); - Executor executor = mergeScheduler.intraMergeExecutor; - AtomicInteger threadsExecutedOnPool = new AtomicInteger(); - AtomicInteger threadsExecutedOnSelf = new AtomicInteger(); - for (int i = 0; i < 4; i++) { - mergeScheduler.mergeThreads.add( - mergeScheduler.new MergeThread(mergeSource, merge) { - @Override - @SuppressForbidden(reason = "Thread sleep") - public void run() { - executor.execute( - () -> { - if (Thread.currentThread() == this) { - threadsExecutedOnSelf.incrementAndGet(); - } else { - threadsExecutedOnPool.incrementAndGet(); - } - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); - } - }); - } - for (ConcurrentMergeScheduler.MergeThread thread : mergeScheduler.mergeThreads) { - thread.start(); - } - mergeScheduler.sync(); - assertEquals(3, threadsExecutedOnSelf.get()); - assertEquals(1, threadsExecutedOnPool.get()); - } - } - private static class TrackingCMS extends ConcurrentMergeScheduler { long totMergedBytes; CountDownLatch atLeastOneMerge; diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDoc.java b/lucene/core/src/test/org/apache/lucene/index/TestDoc.java index 0cdd645e7e0..3b245dd4132 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestDoc.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestDoc.java @@ -45,7 +45,6 @@ import org.apache.lucene.tests.store.MockDirectoryWrapper; import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.util.Bits; import org.apache.lucene.util.InfoStream; -import org.apache.lucene.util.SameThreadExecutorService; import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.Version; @@ -238,8 +237,7 @@ public class TestDoc extends LuceneTestCase { InfoStream.getDefault(), trackingDir, new FieldInfos.FieldNumbers(null, null), - context, - new SameThreadExecutorService()); + context); merger.merge(); r1.close(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java b/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java index 94a1849a35e..615accbb4df 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java @@ -32,7 +32,6 @@ import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.InfoStream; -import org.apache.lucene.util.SameThreadExecutorService; import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.Version; import org.apache.lucene.util.packed.PackedLongValues; @@ -106,8 +105,7 @@ public class TestSegmentMerger extends LuceneTestCase { InfoStream.getDefault(), mergedDir, new FieldInfos.FieldNumbers(null, null), - newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))), - new SameThreadExecutorService()); + newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1)))); MergeState mergeState = merger.merge(); int docsMerged = mergeState.segmentInfo.maxDoc(); assertTrue(docsMerged == 2);