diff --git a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java index 6849de77b08..4adfa8d69d9 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java @@ -22,6 +22,10 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.IntConsumer; @@ -33,8 +37,10 @@ import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Weight; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.ThreadInterruptedException; /** * Holds buffered deletes and updates by term or query, once pushed. Pushed deletes/updates are @@ -363,60 +369,87 @@ final class FrozenBufferedUpdates { long startNS = System.nanoTime(); long delCount = 0; - for (BufferedUpdatesStream.SegmentState segState : segStates) { - if (delGen < segState.delGen) { - // segment is newer than this deletes packet - continue; - } + Throwable exc = null; + try (ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()) { + ExecutorCompletionService executorCompletionService = + new ExecutorCompletionService<>(executorService); - if (segState.rld.refCount() == 1) { - // This means we are the only remaining reference to this segment, meaning - // it was merged away while we were running, so we can safely skip running - // because we will run on the newly merged segment next: - continue; - } + // Submit task(apply delete). + for (BufferedUpdatesStream.SegmentState segState : segStates) { + executorCompletionService.submit( + () -> { + long subDelCount = 0; + if (delGen < segState.delGen) { + // segment is newer than this deletes packet + return subDelCount; + } - final LeafReaderContext readerContext = segState.reader.getContext(); - for (int i = 0; i < deleteQueries.length; i++) { - Query query = deleteQueries[i]; - int limit; - if (delGen == segState.delGen) { - assert privateSegment != null; - limit = deleteQueryLimits[i]; - } else { - limit = Integer.MAX_VALUE; - } - final IndexSearcher searcher = new IndexSearcher(readerContext.reader()); - searcher.setQueryCache(null); - query = searcher.rewrite(query); - final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1); - final Scorer scorer = weight.scorer(readerContext); - if (scorer != null) { - final DocIdSetIterator it = scorer.iterator(); - if (segState.rld.sortMap != null && limit != Integer.MAX_VALUE) { - assert privateSegment != null; - // This segment was sorted on flush; we must apply seg-private deletes carefully in this - // case: - int docID; - while ((docID = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - // The limit is in the pre-sorted doc space: - if (segState.rld.sortMap.newToOld(docID) < limit) { - if (segState.rld.delete(docID)) { - delCount++; + if (segState.rld.refCount() == 1) { + // This means we are the only remaining reference to this segment, meaning + // it was merged away while we were running, so we can safely skip running + // because we will run on the newly merged segment next: + return subDelCount; + } + + final LeafReaderContext readerContext = segState.reader.getContext(); + for (int i = 0; i < deleteQueries.length; i++) { + Query query = deleteQueries[i]; + int limit; + if (delGen == segState.delGen) { + assert privateSegment != null; + limit = deleteQueryLimits[i]; + } else { + limit = Integer.MAX_VALUE; + } + final IndexSearcher searcher = new IndexSearcher(readerContext.reader()); + searcher.setQueryCache(null); + query = searcher.rewrite(query); + final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1); + final Scorer scorer = weight.scorer(readerContext); + if (scorer != null) { + final DocIdSetIterator it = scorer.iterator(); + if (segState.rld.sortMap != null && limit != Integer.MAX_VALUE) { + assert privateSegment != null; + // This segment was sorted on flush; we must apply seg-private deletes carefully + // in this + // case: + int docID; + while ((docID = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + // The limit is in the pre-sorted doc space: + if (segState.rld.sortMap.newToOld(docID) < limit) { + if (segState.rld.delete(docID)) { + subDelCount++; + } + } + } + } else { + int docID; + while ((docID = it.nextDoc()) < limit) { + if (segState.rld.delete(docID)) { + subDelCount++; + } + } + } } } - } - } else { - int docID; - while ((docID = it.nextDoc()) < limit) { - if (segState.rld.delete(docID)) { - delCount++; - } - } - } - } + return subDelCount; + }); } + + // Get completed result. + for (int i = 0; i < segStates.length; i++) { + delCount += executorCompletionService.take().get(); + } + } catch (ExecutionException e) { + exc = IOUtils.useOrSuppress(exc, e.getCause()); + } catch (InterruptedException e) { + exc = IOUtils.useOrSuppress(exc, new ThreadInterruptedException(e)); + } + + // Handle Exception. + if (exc != null) { + throw IOUtils.rethrowAlways(exc); } if (infoStream.isEnabled("BD")) { @@ -447,39 +480,65 @@ final class FrozenBufferedUpdates { long delCount = 0; - for (BufferedUpdatesStream.SegmentState segState : segStates) { - assert segState.delGen != delGen - : "segState.delGen=" + segState.delGen + " vs this.gen=" + delGen; - if (segState.delGen > delGen) { - // our deletes don't apply to this segment - continue; - } - if (segState.rld.refCount() == 1) { - // This means we are the only remaining reference to this segment, meaning - // it was merged away while we were running, so we can safely skip running - // because we will run on the newly merged segment next: - continue; + Throwable exc = null; + try (ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()) { + ExecutorCompletionService executorCompletionService = + new ExecutorCompletionService<>(executorService); + + // Submit task(apply delete). + for (BufferedUpdatesStream.SegmentState segState : segStates) { + executorCompletionService.submit( + () -> { + long subDelCount = 0; + assert segState.delGen != delGen + : "segState.delGen=" + segState.delGen + " vs this.gen=" + delGen; + if (segState.delGen > delGen) { + // our deletes don't apply to this segment + return subDelCount; + } + if (segState.rld.refCount() == 1) { + // This means we are the only remaining reference to this segment, meaning + // it was merged away while we were running, so we can safely skip running + // because we will run on the newly merged segment next: + return subDelCount; + } + + FieldTermIterator iter = deleteTerms.iterator(); + BytesRef delTerm; + TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, true); + while ((delTerm = iter.next()) != null) { + final DocIdSetIterator iterator = termDocsIterator.nextTerm(iter.field(), delTerm); + if (iterator != null) { + int docID; + while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + // NOTE: there is no limit check on the docID + // when deleting by Term (unlike by Query) + // because on flush we apply all Term deletes to + // each segment. So all Term deleting here is + // against prior segments: + if (segState.rld.delete(docID)) { + subDelCount++; + } + } + } + } + return subDelCount; + }); } - FieldTermIterator iter = deleteTerms.iterator(); - BytesRef delTerm; - TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, true); - while ((delTerm = iter.next()) != null) { - final DocIdSetIterator iterator = termDocsIterator.nextTerm(iter.field(), delTerm); - if (iterator != null) { - int docID; - while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - // NOTE: there is no limit check on the docID - // when deleting by Term (unlike by Query) - // because on flush we apply all Term deletes to - // each segment. So all Term deleting here is - // against prior segments: - if (segState.rld.delete(docID)) { - delCount++; - } - } - } + // Get completed result. + for (int i = 0; i < segStates.length; i++) { + delCount += executorCompletionService.take().get(); } + } catch (ExecutionException e) { + exc = IOUtils.useOrSuppress(exc, e.getCause()); + } catch (InterruptedException e) { + exc = IOUtils.useOrSuppress(exc, new ThreadInterruptedException(e)); + } + + // Handle Exception. + if (exc != null) { + throw IOUtils.rethrowAlways(exc); } if (infoStream.isEnabled("BD")) {