Parallel apply term/query delete.

This commit is contained in:
zhouhui 2024-08-24 16:33:11 +08:00
parent ad81402791
commit a20ddcf266
1 changed files with 137 additions and 78 deletions

View File

@ -22,6 +22,10 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntConsumer;
@ -33,8 +37,10 @@ import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* Holds buffered deletes and updates by term or query, once pushed. Pushed deletes/updates are
@ -363,60 +369,87 @@ final class FrozenBufferedUpdates {
long startNS = System.nanoTime();
long delCount = 0;
for (BufferedUpdatesStream.SegmentState segState : segStates) {
if (delGen < segState.delGen) {
// segment is newer than this deletes packet
continue;
}
Throwable exc = null;
try (ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()) {
ExecutorCompletionService<Long> executorCompletionService =
new ExecutorCompletionService<>(executorService);
if (segState.rld.refCount() == 1) {
// This means we are the only remaining reference to this segment, meaning
// it was merged away while we were running, so we can safely skip running
// because we will run on the newly merged segment next:
continue;
}
// Submit task(apply delete).
for (BufferedUpdatesStream.SegmentState segState : segStates) {
executorCompletionService.submit(
() -> {
long subDelCount = 0;
if (delGen < segState.delGen) {
// segment is newer than this deletes packet
return subDelCount;
}
final LeafReaderContext readerContext = segState.reader.getContext();
for (int i = 0; i < deleteQueries.length; i++) {
Query query = deleteQueries[i];
int limit;
if (delGen == segState.delGen) {
assert privateSegment != null;
limit = deleteQueryLimits[i];
} else {
limit = Integer.MAX_VALUE;
}
final IndexSearcher searcher = new IndexSearcher(readerContext.reader());
searcher.setQueryCache(null);
query = searcher.rewrite(query);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1);
final Scorer scorer = weight.scorer(readerContext);
if (scorer != null) {
final DocIdSetIterator it = scorer.iterator();
if (segState.rld.sortMap != null && limit != Integer.MAX_VALUE) {
assert privateSegment != null;
// This segment was sorted on flush; we must apply seg-private deletes carefully in this
// case:
int docID;
while ((docID = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
// The limit is in the pre-sorted doc space:
if (segState.rld.sortMap.newToOld(docID) < limit) {
if (segState.rld.delete(docID)) {
delCount++;
if (segState.rld.refCount() == 1) {
// This means we are the only remaining reference to this segment, meaning
// it was merged away while we were running, so we can safely skip running
// because we will run on the newly merged segment next:
return subDelCount;
}
final LeafReaderContext readerContext = segState.reader.getContext();
for (int i = 0; i < deleteQueries.length; i++) {
Query query = deleteQueries[i];
int limit;
if (delGen == segState.delGen) {
assert privateSegment != null;
limit = deleteQueryLimits[i];
} else {
limit = Integer.MAX_VALUE;
}
final IndexSearcher searcher = new IndexSearcher(readerContext.reader());
searcher.setQueryCache(null);
query = searcher.rewrite(query);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1);
final Scorer scorer = weight.scorer(readerContext);
if (scorer != null) {
final DocIdSetIterator it = scorer.iterator();
if (segState.rld.sortMap != null && limit != Integer.MAX_VALUE) {
assert privateSegment != null;
// This segment was sorted on flush; we must apply seg-private deletes carefully
// in this
// case:
int docID;
while ((docID = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
// The limit is in the pre-sorted doc space:
if (segState.rld.sortMap.newToOld(docID) < limit) {
if (segState.rld.delete(docID)) {
subDelCount++;
}
}
}
} else {
int docID;
while ((docID = it.nextDoc()) < limit) {
if (segState.rld.delete(docID)) {
subDelCount++;
}
}
}
}
}
}
} else {
int docID;
while ((docID = it.nextDoc()) < limit) {
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
}
return subDelCount;
});
}
// Get completed result.
for (int i = 0; i < segStates.length; i++) {
delCount += executorCompletionService.take().get();
}
} catch (ExecutionException e) {
exc = IOUtils.useOrSuppress(exc, e.getCause());
} catch (InterruptedException e) {
exc = IOUtils.useOrSuppress(exc, new ThreadInterruptedException(e));
}
// Handle Exception.
if (exc != null) {
throw IOUtils.rethrowAlways(exc);
}
if (infoStream.isEnabled("BD")) {
@ -447,39 +480,65 @@ final class FrozenBufferedUpdates {
long delCount = 0;
for (BufferedUpdatesStream.SegmentState segState : segStates) {
assert segState.delGen != delGen
: "segState.delGen=" + segState.delGen + " vs this.gen=" + delGen;
if (segState.delGen > delGen) {
// our deletes don't apply to this segment
continue;
}
if (segState.rld.refCount() == 1) {
// This means we are the only remaining reference to this segment, meaning
// it was merged away while we were running, so we can safely skip running
// because we will run on the newly merged segment next:
continue;
Throwable exc = null;
try (ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()) {
ExecutorCompletionService<Long> executorCompletionService =
new ExecutorCompletionService<>(executorService);
// Submit task(apply delete).
for (BufferedUpdatesStream.SegmentState segState : segStates) {
executorCompletionService.submit(
() -> {
long subDelCount = 0;
assert segState.delGen != delGen
: "segState.delGen=" + segState.delGen + " vs this.gen=" + delGen;
if (segState.delGen > delGen) {
// our deletes don't apply to this segment
return subDelCount;
}
if (segState.rld.refCount() == 1) {
// This means we are the only remaining reference to this segment, meaning
// it was merged away while we were running, so we can safely skip running
// because we will run on the newly merged segment next:
return subDelCount;
}
FieldTermIterator iter = deleteTerms.iterator();
BytesRef delTerm;
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, true);
while ((delTerm = iter.next()) != null) {
final DocIdSetIterator iterator = termDocsIterator.nextTerm(iter.field(), delTerm);
if (iterator != null) {
int docID;
while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
// NOTE: there is no limit check on the docID
// when deleting by Term (unlike by Query)
// because on flush we apply all Term deletes to
// each segment. So all Term deleting here is
// against prior segments:
if (segState.rld.delete(docID)) {
subDelCount++;
}
}
}
}
return subDelCount;
});
}
FieldTermIterator iter = deleteTerms.iterator();
BytesRef delTerm;
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, true);
while ((delTerm = iter.next()) != null) {
final DocIdSetIterator iterator = termDocsIterator.nextTerm(iter.field(), delTerm);
if (iterator != null) {
int docID;
while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
// NOTE: there is no limit check on the docID
// when deleting by Term (unlike by Query)
// because on flush we apply all Term deletes to
// each segment. So all Term deleting here is
// against prior segments:
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
// Get completed result.
for (int i = 0; i < segStates.length; i++) {
delCount += executorCompletionService.take().get();
}
} catch (ExecutionException e) {
exc = IOUtils.useOrSuppress(exc, e.getCause());
} catch (InterruptedException e) {
exc = IOUtils.useOrSuppress(exc, new ThreadInterruptedException(e));
}
// Handle Exception.
if (exc != null) {
throw IOUtils.rethrowAlways(exc);
}
if (infoStream.isEnabled("BD")) {