LUCENE-10216: Use MergeScheduler and MergePolicy to run addIndexes(CodecReader[]) merges. (#633)

* Use merge policy and merge scheduler to run addIndexes merges

* wrapped reader does not see deletes - debug

* Partially fixed tests in TestAddIndexes

* Use writer object to invoke addIndexes merge

* Use merge object info

* Add javadocs for new methods

* TestAddIndexes passing

* verify field info schemas upfront from incoming readers

* rename flag to track pooled readers

* Keep addIndexes API transactional

* Maintain transactionality - register segments with iw after all merges complete

* fix checkstyle

* PR comments

* Fix pendingDocs - numDocs mismatch bug

* Tests with 1-1 merges and partial merge failures

* variable renaming and better comments

* add test for partial merge failures. change tests to use 1-1 findmerges

* abort pending merges gracefully

* test null and empty merge specs

* test interim files are deleted

* test with empty readers

* test cascading merges triggered

* remove nocommits

* gradle check errors

* remove unused line

* remove printf

* spotless apply

* update TestIndexWriterOnDiskFull to accept mergeException from failing addIndexes calls

* return singleton reader mergespec in NoMergePolicy

* rethrow exceptions seen in merge threads on failure

* spotless apply

* update test to new exception type thrown

* spotlessApply

* test for maxDoc limit in IndexWriter

* spotlessApply

* Use DocValuesIterator instead of DocValuesFieldExistsQuery for counting soft deletes

* spotless apply

* change exception message for closed IW

* remove non-essential comments

* update api doc string

* doc string update

* spotless

* Changes file entry

* simplify findMerges API, add 1-1 merges to MockRandomMergePolicy

* update merge policies to new api

* remove unused imports

* spotless apply

* move changes entry to end of list

* fix testAddIndicesWithSoftDeletes

* test with 1-1 merge policy always enabled

* please spotcheck

* tidy

* test - never use 1-1 merge policy

* use 1-1 merge policy randomly

* Remove concurrent addIndexes findMerges from MockRandomMergePolicy

* Bug Fix: RuntimeException in addIndexes

Aborted pending merges were slipping through the merge exception check in
API, and getting caught later in the RuntimeException check.

* tidy

* Rebase on main. Move changes to 10.0

* Synchronize IW.AddIndexesMergeSource on outer class IW object

* tidy
This commit is contained in:
Vigya Sharma 2022-07-06 15:15:47 -07:00 committed by GitHub
parent d537013e70
commit 698f40ad51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 719 additions and 149 deletions

View File

@ -45,6 +45,9 @@ Improvements
* LUCENE-10416: Update Korean Dictionary to mecab-ko-dic-2.1.1-20180720 for Nori.
(Uihyun Kim)
* LUCENE-10216: Use MergePolicy to define and MergeScheduler to trigger the reader merges
required by addIndexes(CodecReader[]) API. (Vigya Sharma, Michael McCandless)
Optimizations
---------------------
(No changes)

View File

@ -352,6 +352,14 @@ public class FieldInfos implements Iterable<FieldInfo> {
this.softDeletesFieldName = softDeletesFieldName;
}
void verifyFieldInfo(FieldInfo fi) {
String fieldName = fi.getName();
verifySoftDeletedFieldName(fieldName, fi.isSoftDeletesField());
if (nameToNumber.containsKey(fieldName)) {
verifySameSchema(fi);
}
}
/**
* Returns the global field number for the given field name. If the name does not exist yet it
* tries to add it with the given preferred field number assigned if possible otherwise the

View File

@ -47,6 +47,11 @@ public class FilterMergePolicy extends MergePolicy implements Unwrappable<MergeP
return in.findMerges(mergeTrigger, segmentInfos, mergeContext);
}
@Override
public MergeSpecification findMerges(CodecReader... readers) throws IOException {
return in.findMerges(readers);
}
@Override
public MergeSpecification findForcedMerges(
SegmentInfos segmentInfos,

View File

@ -276,6 +276,7 @@ public class IndexWriter
final DocumentsWriter docWriter;
private final EventQueue eventQueue = new EventQueue(this);
private final MergeScheduler.MergeSource mergeSource = new IndexWriterMergeSource(this);
private final AddIndexesMergeSource addIndexesMergeSource = new AddIndexesMergeSource(this);
private final ReentrantLock writeDocValuesLock = new ReentrantLock();
@ -2335,6 +2336,10 @@ public class IndexWriter
case COMMIT:
spec = mergePolicy.findFullFlushMerges(trigger, segmentInfos, this);
break;
case ADD_INDEXES:
throw new IllegalStateException(
"Merges with ADD_INDEXES trigger should be "
+ "called from within the addIndexes() API flow");
case EXPLICIT:
case FULL_FLUSH:
case MERGE_FINISHED:
@ -2671,6 +2676,9 @@ public class IndexWriter
});
pendingMerges.clear();
// abort any merges pending from addIndexes(CodecReader...)
addIndexesMergeSource.abortPendingMerges();
for (final MergePolicy.OneMerge merge : runningMerges) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort running merge " + segString(merge.segments));
@ -3115,13 +3123,10 @@ public class IndexWriter
*
* <p><b>NOTE:</b> empty segments are dropped by this method and not added to this index.
*
* <p><b>NOTE:</b> this merges all given {@link LeafReader}s in one merge. If you intend to merge
* a large number of readers, it may be better to call this method multiple times, each time with
* a small set of readers. In principle, if you use a merge policy with a {@code mergeFactor} or
* {@code maxMergeAtOnce} parameter, you should pass that many readers in one call.
*
* <p><b>NOTE:</b> this method does not call or make use of the {@link MergeScheduler}, so any
* custom bandwidth throttling is at the moment ignored.
* <p><b>NOTE:</b> provided {@link LeafReader}s are merged as specified by the {@link
* MergePolicy#findMerges(CodecReader...)} API. Default behavior is to merge all provided readers
* into a single segment. You can modify this by overriding the <code>findMerge</code> API in your
* custom merge policy.
*
* @return The <a href="#sequence_number">sequence number</a> for this operation
* @throws CorruptIndexException if the index is corrupt
@ -3135,19 +3140,219 @@ public class IndexWriter
// long so we can detect int overflow:
long numDocs = 0;
long seqNo;
try {
// Best effort up front validations
for (CodecReader leaf : readers) {
validateMergeReader(leaf);
for (FieldInfo fi : leaf.getFieldInfos()) {
globalFieldNumberMap.verifyFieldInfo(fi);
}
numDocs += leaf.numDocs();
}
testReserveDocs(numDocs);
synchronized (this) {
ensureOpen();
if (merges.areEnabled() == false) {
throw new AlreadyClosedException(
"this IndexWriter is closed. Cannot execute addIndexes(CodecReaders...) API");
}
}
MergePolicy mergePolicy = config.getMergePolicy();
MergePolicy.MergeSpecification spec = mergePolicy.findMerges(readers);
boolean mergeSuccess = false;
if (spec != null && spec.merges.size() > 0) {
try {
spec.merges.forEach(addIndexesMergeSource::registerMerge);
mergeScheduler.merge(addIndexesMergeSource, MergeTrigger.ADD_INDEXES);
spec.await();
mergeSuccess =
spec.merges.stream().allMatch(m -> m.hasCompletedSuccessfully().orElse(false));
} finally {
if (mergeSuccess == false) {
for (MergePolicy.OneMerge merge : spec.merges) {
if (merge.getMergeInfo() != null) {
deleteNewFiles(merge.getMergeInfo().files());
}
}
}
}
} else {
if (infoStream.isEnabled("IW")) {
if (spec == null) {
infoStream.message(
"addIndexes(CodecReaders...)",
"received null mergeSpecification from MergePolicy. No indexes to add, returning..");
} else {
infoStream.message(
"addIndexes(CodecReaders...)",
"received empty mergeSpecification from MergePolicy. No indexes to add, returning..");
}
}
return docWriter.getNextSequenceNumber();
}
if (mergeSuccess) {
List<SegmentCommitInfo> infos = new ArrayList<>();
long totalDocs = 0;
for (MergePolicy.OneMerge merge : spec.merges) {
totalDocs += merge.totalMaxDoc;
if (merge.getMergeInfo() != null) {
infos.add(merge.getMergeInfo());
}
}
synchronized (this) {
if (infos.isEmpty() == false) {
boolean registerSegmentSuccess = false;
try {
ensureOpen();
// Reserve the docs, just before we update SIS:
reserveDocs(totalDocs);
registerSegmentSuccess = true;
} finally {
if (registerSegmentSuccess == false) {
for (SegmentCommitInfo sipc : infos) {
// Safe: these files must exist
deleteNewFiles(sipc.files());
}
}
}
segmentInfos.addAll(infos);
checkpoint();
}
seqNo = docWriter.getNextSequenceNumber();
}
} else {
if (infoStream.isEnabled("IW")) {
infoStream.message(
"addIndexes(CodecReaders...)", "failed to successfully merge all provided readers.");
}
for (MergePolicy.OneMerge merge : spec.merges) {
if (merge.isAborted()) {
throw new MergePolicy.MergeAbortedException("merge was aborted.");
}
Throwable t = merge.getException();
if (t != null) {
IOUtils.rethrowAlways(t);
}
}
// If no merge hit an exception, and merge was not aborted, but we still failed to add
// indexes, fail the API
throw new RuntimeException(
"failed to successfully merge all provided readers in addIndexes(CodecReader...)");
}
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "addIndexes(CodecReader...)");
throw tragedy;
}
maybeMerge();
return seqNo;
}
private class AddIndexesMergeSource implements MergeScheduler.MergeSource {
private final Queue<MergePolicy.OneMerge> pendingAddIndexesMerges = new ArrayDeque<>();
private final IndexWriter writer;
public AddIndexesMergeSource(IndexWriter writer) {
this.writer = writer;
}
public void registerMerge(MergePolicy.OneMerge merge) {
synchronized (IndexWriter.this) {
pendingAddIndexesMerges.add(merge);
}
}
@Override
public MergePolicy.OneMerge getNextMerge() {
synchronized (IndexWriter.this) {
if (hasPendingMerges() == false) {
return null;
}
MergePolicy.OneMerge merge = pendingAddIndexesMerges.remove();
runningMerges.add(merge);
return merge;
}
}
@Override
public void onMergeFinished(MergePolicy.OneMerge merge) {
synchronized (IndexWriter.this) {
runningMerges.remove(merge);
}
}
@Override
public boolean hasPendingMerges() {
return pendingAddIndexesMerges.size() > 0;
}
public void abortPendingMerges() throws IOException {
synchronized (IndexWriter.this) {
IOUtils.applyToAll(
pendingAddIndexesMerges,
merge -> {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort pending addIndexes merge");
}
merge.setAborted();
merge.close(false, false, mr -> {});
onMergeFinished(merge);
});
pendingAddIndexesMerges.clear();
}
}
@Override
public void merge(MergePolicy.OneMerge merge) throws IOException {
boolean success = false;
try {
writer.addIndexesReaderMerge(merge);
success = true;
} catch (Throwable t) {
handleMergeException(t, merge);
} finally {
synchronized (IndexWriter.this) {
merge.close(success, false, mr -> {});
onMergeFinished(merge);
}
}
}
}
/**
* Runs a single merge operation for {@link IndexWriter#addIndexes(CodecReader...)}.
*
* <p>Merges and creates a SegmentInfo, for the readers grouped together in provided OneMerge.
*
* @param merge OneMerge object initialized from readers.
* @throws IOException if there is a low-level IO error
*/
public void addIndexesReaderMerge(MergePolicy.OneMerge merge) throws IOException {
merge.mergeInit();
merge.checkAborted();
// long so we can detect int overflow:
long numDocs = 0;
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "flush at addIndexes(CodecReader...)");
}
flush(false, true);
String mergedName = newSegmentName();
Directory mergeDirectory = mergeScheduler.wrapForMerge(merge, directory);
int numSoftDeleted = 0;
for (CodecReader leaf : readers) {
for (MergePolicy.MergeReader reader : merge.getMergeReader()) {
CodecReader leaf = reader.codecReader;
numDocs += leaf.numDocs();
validateMergeReader(leaf);
if (softDeletesEnabled) {
Bits liveDocs = leaf.getLiveDocs();
Bits liveDocs = reader.hardLiveDocs;
numSoftDeleted +=
PendingSoftDeletes.countSoftDeletes(
FieldExistsQuery.getDocValuesDocIdSetIterator(config.getSoftDeletesField(), leaf),
@ -3162,12 +3367,10 @@ public class IndexWriter
new IOContext(
new MergeInfo(Math.toIntExact(numDocs), -1, false, UNBOUNDED_MAX_MERGE_SEGMENTS));
// TODO: somehow we should fix this merge so it's
// abortable so that IW.close(false) is able to stop it
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory);
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(mergeDirectory);
Codec codec = config.getCodec();
// We set the min version to null for now, it will be set later by SegmentMerger
SegmentInfo info =
SegmentInfo segInfo =
new SegmentInfo(
directoryOrig,
Version.LATEST,
@ -3181,19 +3384,20 @@ public class IndexWriter
Collections.emptyMap(),
config.getIndexSort());
List<CodecReader> readers =
merge.getMergeReader().stream().map(r -> r.codecReader).collect(Collectors.toList());
SegmentMerger merger =
new SegmentMerger(
Arrays.asList(readers), info, infoStream, trackingDir, globalFieldNumberMap, context);
new SegmentMerger(readers, segInfo, infoStream, trackingDir, globalFieldNumberMap, context);
if (!merger.shouldMerge()) {
return docWriter.getNextSequenceNumber();
return;
}
merge.checkAborted();
synchronized (this) {
ensureOpen();
assert merges.areEnabled();
runningAddIndexesMerges.add(merger);
}
merge.mergeStartNS = System.nanoTime();
try {
merger.merge(); // merge 'em
} finally {
@ -3202,75 +3406,47 @@ public class IndexWriter
notifyAll();
}
}
SegmentCommitInfo infoPerCommit =
new SegmentCommitInfo(info, 0, numSoftDeleted, -1L, -1L, -1L, StringHelper.randomId());
info.setFiles(new HashSet<>(trackingDir.getCreatedFiles()));
merge.setMergeInfo(
new SegmentCommitInfo(segInfo, 0, numSoftDeleted, -1L, -1L, -1L, StringHelper.randomId()));
merge.getMergeInfo().info.setFiles(new HashSet<>(trackingDir.getCreatedFiles()));
trackingDir.clearCreatedFiles();
setDiagnostics(info, SOURCE_ADDINDEXES_READERS);
setDiagnostics(merge.getMergeInfo().info, SOURCE_ADDINDEXES_READERS);
final MergePolicy mergePolicy = config.getMergePolicy();
boolean useCompoundFile;
synchronized (this) { // Guard segmentInfos
if (merges.areEnabled() == false) {
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
return docWriter.getNextSequenceNumber();
}
ensureOpen();
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this);
synchronized (this) {
merge.checkAborted();
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, merge.getMergeInfo(), this);
}
// Now create the compound file if needed
if (useCompoundFile) {
Collection<String> filesToDelete = infoPerCommit.files();
TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(directory);
Collection<String> filesToDelete = merge.getMergeInfo().files();
TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(mergeDirectory);
// TODO: unlike merge, on exception we arent sniping any trash cfs files here?
// createCompoundFile tries to cleanup, but it might not always be able to...
try {
createCompoundFile(infoStream, trackingCFSDir, info, context, this::deleteNewFiles);
createCompoundFile(
infoStream, trackingCFSDir, merge.getMergeInfo().info, context, this::deleteNewFiles);
} finally {
// delete new non cfs files directly: they were never
// registered with IFD
deleteNewFiles(filesToDelete);
}
info.setUseCompoundFile(true);
merge.getMergeInfo().info.setUseCompoundFile(true);
}
// Have codec write SegmentInfo. Must do this after
// creating CFS so that 1) .si isn't slurped into CFS,
// and 2) .si reflects useCompoundFile=true change
// above:
codec.segmentInfoFormat().write(trackingDir, info, context);
info.addFiles(trackingDir.getCreatedFiles());
// Register the new segment
synchronized (this) {
if (merges.areEnabled() == false) {
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
return docWriter.getNextSequenceNumber();
}
ensureOpen();
// Now reserve the docs, just before we update SIS:
reserveDocs(numDocs);
segmentInfos.add(infoPerCommit);
seqNo = docWriter.getNextSequenceNumber();
checkpoint();
}
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "addIndexes(CodecReader...)");
throw tragedy;
}
maybeMerge();
return seqNo;
codec.segmentInfoFormat().write(trackingDir, merge.getMergeInfo().info, context);
merge.getMergeInfo().info.addFiles(trackingDir.getCreatedFiles());
// Return without registering the segment files with IndexWriter.
// We do this together for all merges triggered by an addIndexes API,
// to keep the API transactional.
}
/** Copies the segment files as-is into the IndexWriter's directory. */
@ -4818,6 +4994,7 @@ public class IndexWriter
suppressExceptions == false,
droppedSegment,
mr -> {
if (merge.usesPooledReaders) {
final SegmentReader sr = mr.reader;
final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false);
// We still hold a ref so it should not have been removed:
@ -4832,6 +5009,7 @@ public class IndexWriter
if (drop) {
readerPool.drop(rld.info);
}
}
});
} else {
assert merge.getMergeReader().isEmpty()

View File

@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -193,6 +194,7 @@ public abstract class MergePolicy {
long mergeGen; // used by IndexWriter
boolean isExternal; // used by IndexWriter
int maxNumSegments = -1; // used by IndexWriter
boolean usesPooledReaders; // used by IndexWriter to drop readers while closing
/** Estimated size in bytes of the merged segment. */
public volatile long estimatedMergeBytes; // used by IndexWriter
@ -229,6 +231,28 @@ public abstract class MergePolicy {
totalMaxDoc = segments.stream().mapToInt(i -> i.info.maxDoc()).sum();
mergeProgress = new OneMergeProgress();
mergeReaders = List.of();
usesPooledReaders = true;
}
/**
* Create a OneMerge directly from CodecReaders. Used to merge incoming readers in {@link
* IndexWriter#addIndexes(CodecReader...)}. This OneMerge works directly on readers and has an
* empty segments list.
*
* @param codecReaders Codec readers to merge
*/
public OneMerge(CodecReader... codecReaders) {
List<MergeReader> readers = new ArrayList<>(codecReaders.length);
int totalDocs = 0;
for (CodecReader r : codecReaders) {
readers.add(new MergeReader(r, r.getLiveDocs()));
totalDocs += r.numDocs();
}
mergeReaders = List.copyOf(readers);
segments = List.of();
totalMaxDoc = totalDocs;
mergeProgress = new OneMergeProgress();
usesPooledReaders = false;
}
/**
@ -472,15 +496,31 @@ public abstract class MergePolicy {
return b.toString();
}
/** Waits if necessary for at most the given time for all merges. */
boolean await(long timeout, TimeUnit unit) {
try {
CompletableFuture<Void> future =
CompletableFuture.allOf(
CompletableFuture<Void> getMergeCompletedFutures() {
return CompletableFuture.allOf(
merges.stream()
.map(m -> m.mergeCompleted)
.collect(Collectors.toList())
.toArray(CompletableFuture<?>[]::new));
}
/** Waits, until interrupted, for all merges to complete. */
boolean await() {
try {
CompletableFuture<Void> future = getMergeCompletedFutures();
future.get();
return true;
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
} catch (@SuppressWarnings("unused") ExecutionException | CancellationException e) {
return false;
}
}
/** Waits if necessary for at most the given time for all merges. */
boolean await(long timeout, TimeUnit unit) {
try {
CompletableFuture<Void> future = getMergeCompletedFutures();
future.get(timeout, unit);
return true;
} catch (InterruptedException e) {
@ -569,6 +609,24 @@ public abstract class MergePolicy {
MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException;
/**
* Define the set of merge operations to perform on provided codec readers in {@link
* IndexWriter#addIndexes(CodecReader...)}.
*
* <p>The merge operation is required to convert provided readers into segments that can be added
* to the writer. This API can be overridden in custom merge policies to control the concurrency
* for addIndexes. Default implementation creates a single merge operation for all provided
* readers (lowest concurrency). Creating a merge for each reader, would provide the highest level
* of concurrency possible with the configured merge scheduler.
*
* @param readers CodecReader(s) to merge into the main index
*/
public MergeSpecification findMerges(CodecReader... readers) throws IOException {
MergeSpecification mergeSpec = new MergeSpecification();
mergeSpec.add(new OneMerge(readers));
return mergeSpec;
}
/**
* Determine what set of merge operations is necessary in order to merge to {@code <=} the
* specified segment count. {@link IndexWriter} calls this when its {@link IndexWriter#forceMerge}
@ -844,12 +902,24 @@ public abstract class MergePolicy {
}
static final class MergeReader {
final CodecReader codecReader;
final SegmentReader reader;
final Bits hardLiveDocs;
MergeReader(SegmentReader reader, Bits hardLiveDocs) {
this.codecReader = reader;
this.reader = reader;
this.hardLiveDocs = hardLiveDocs;
}
MergeReader(CodecReader reader, Bits hardLiveDocs) {
if (SegmentReader.class.isAssignableFrom(reader.getClass())) {
this.reader = (SegmentReader) reader;
} else {
this.reader = null;
}
this.codecReader = reader;
this.hardLiveDocs = hardLiveDocs;
}
}
}

View File

@ -43,4 +43,6 @@ public enum MergeTrigger {
COMMIT,
/** Merge was triggered on opening NRT readers. */
GET_READER,
/** Merge was triggered by an {@link IndexWriter#addIndexes(CodecReader...)} operation */
ADD_INDEXES,
}

View File

@ -39,6 +39,16 @@ public final class NoMergePolicy extends MergePolicy {
return null;
}
@Override
public MergeSpecification findMerges(CodecReader... readers) throws IOException {
// addIndexes(CodecReader...) now uses MergePolicy and MergeScheduler to add
// provided readers (LUCENE-10216). We retain the default behavior here to enable
// addIndexes for consumers like IndexRearranger.
// This does not merge existing segments, but uses SegmentMerger to add
// new incoming readers to the index.
return super.findMerges(readers);
}
@Override
public MergeSpecification findForcedMerges(
SegmentInfos segmentInfos,

View File

@ -20,7 +20,9 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.PostingsFormat;
@ -683,6 +685,290 @@ public class TestAddIndexes extends LuceneTestCase {
writer.addDocument(doc);
}
private class ConcurrentAddIndexesMergePolicy extends TieredMergePolicy {
@Override
public MergeSpecification findMerges(CodecReader... readers) throws IOException {
// create a oneMerge for each reader to let them get concurrently processed by addIndexes()
MergeSpecification mergeSpec = new MergeSpecification();
for (CodecReader reader : readers) {
mergeSpec.add(new OneMerge(reader));
}
if (VERBOSE) {
System.out.println(
"No. of addIndexes merges returned by MergePolicy: " + mergeSpec.merges.size());
}
return mergeSpec;
}
}
private class AddIndexesWithReadersSetup {
Directory dir, destDir;
IndexWriter destWriter;
final DirectoryReader[] readers;
final int ADDED_DOCS_PER_READER = 15;
final int INIT_DOCS = 25;
final int NUM_READERS = 15;
public AddIndexesWithReadersSetup(MergeScheduler ms, MergePolicy mp) throws IOException {
dir = new MockDirectoryWrapper(random(), new ByteBuffersDirectory());
IndexWriter writer =
new IndexWriter(
dir, new IndexWriterConfig(new MockAnalyzer(random())).setMaxBufferedDocs(2));
for (int i = 0; i < ADDED_DOCS_PER_READER; i++) addDoc(writer);
writer.close();
destDir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
iwc.setMergePolicy(mp);
iwc.setMergeScheduler(ms);
destWriter = new IndexWriter(destDir, iwc);
for (int i = 0; i < INIT_DOCS; i++) addDoc(destWriter);
destWriter.commit();
readers = new DirectoryReader[NUM_READERS];
for (int i = 0; i < NUM_READERS; i++) readers[i] = DirectoryReader.open(dir);
}
public void closeAll() throws IOException {
destWriter.close();
for (int i = 0; i < NUM_READERS; i++) readers[i].close();
destDir.close();
dir.close();
}
}
public void testAddIndexesWithConcurrentMerges() throws Exception {
ConcurrentAddIndexesMergePolicy mp = new ConcurrentAddIndexesMergePolicy();
AddIndexesWithReadersSetup c =
new AddIndexesWithReadersSetup(new ConcurrentMergeScheduler(), mp);
TestUtil.addIndexesSlowly(c.destWriter, c.readers);
c.destWriter.commit();
try (IndexReader reader = DirectoryReader.open(c.destDir)) {
assertEquals(c.INIT_DOCS + c.NUM_READERS * c.ADDED_DOCS_PER_READER, reader.numDocs());
}
c.closeAll();
}
private class PartialMergeScheduler extends MergeScheduler {
int mergesToDo;
int mergesTriggered = 0;
public PartialMergeScheduler(int mergesToDo) {
this.mergesToDo = mergesToDo;
if (VERBOSE) {
System.out.println(
"PartialMergeScheduler configured to mark all merges as failed, "
+ "after triggering ["
+ mergesToDo
+ "] merges.");
}
}
@Override
public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
while (true) {
MergePolicy.OneMerge merge = mergeSource.getNextMerge();
if (merge == null) {
break;
}
if (mergesTriggered >= mergesToDo) {
merge.close(false, false, mr -> {});
mergeSource.onMergeFinished(merge);
} else {
mergeSource.merge(merge);
mergesTriggered++;
}
}
}
@Override
public void close() throws IOException {}
}
public void testAddIndexesWithPartialMergeFailures() throws Exception {
final List<MergePolicy.OneMerge> merges = new ArrayList<>();
ConcurrentAddIndexesMergePolicy mp =
new ConcurrentAddIndexesMergePolicy() {
@Override
public MergeSpecification findMerges(CodecReader... readers) throws IOException {
MergeSpecification spec = super.findMerges(readers);
merges.addAll(spec.merges);
return spec;
}
};
AddIndexesWithReadersSetup c = new AddIndexesWithReadersSetup(new PartialMergeScheduler(2), mp);
assertThrows(RuntimeException.class, () -> TestUtil.addIndexesSlowly(c.destWriter, c.readers));
c.destWriter.commit();
// verify no docs got added and all interim files from successful merges have been deleted
try (IndexReader reader = DirectoryReader.open(c.destDir)) {
assertEquals(c.INIT_DOCS, reader.numDocs());
}
for (MergePolicy.OneMerge merge : merges) {
if (merge.getMergeInfo() != null) {
assertFalse(
Arrays.stream(c.destDir.listAll())
.collect(Collectors.toSet())
.containsAll(merge.getMergeInfo().files()));
}
}
c.closeAll();
}
public void testAddIndexesWithNullMergeSpec() throws Exception {
MergePolicy mp =
new TieredMergePolicy() {
@Override
public MergeSpecification findMerges(CodecReader... readers) throws IOException {
return null;
}
};
AddIndexesWithReadersSetup c =
new AddIndexesWithReadersSetup(new ConcurrentMergeScheduler(), mp);
TestUtil.addIndexesSlowly(c.destWriter, c.readers);
c.destWriter.commit();
try (IndexReader reader = DirectoryReader.open(c.destDir)) {
assertEquals(c.INIT_DOCS, reader.numDocs());
}
c.closeAll();
}
public void testAddIndexesWithEmptyMergeSpec() throws Exception {
MergePolicy mp =
new TieredMergePolicy() {
@Override
public MergeSpecification findMerges(CodecReader... readers) throws IOException {
return new MergeSpecification();
}
};
AddIndexesWithReadersSetup c =
new AddIndexesWithReadersSetup(new ConcurrentMergeScheduler(), mp);
TestUtil.addIndexesSlowly(c.destWriter, c.readers);
c.destWriter.commit();
try (IndexReader reader = DirectoryReader.open(c.destDir)) {
assertEquals(c.INIT_DOCS, reader.numDocs());
}
c.closeAll();
}
private class CountingSerialMergeScheduler extends MergeScheduler {
int explicitMerges = 0;
int addIndexesMerges = 0;
@Override
public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
while (true) {
MergePolicy.OneMerge merge = mergeSource.getNextMerge();
if (merge == null) {
break;
}
mergeSource.merge(merge);
if (trigger == MergeTrigger.EXPLICIT) {
explicitMerges++;
}
if (trigger == MergeTrigger.ADD_INDEXES) {
addIndexesMerges++;
}
}
}
@Override
public void close() throws IOException {}
}
public void testAddIndexesWithEmptyReaders() throws Exception {
Directory destDir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
iwc.setMergePolicy(new ConcurrentAddIndexesMergePolicy());
CountingSerialMergeScheduler ms = new CountingSerialMergeScheduler();
iwc.setMergeScheduler(ms);
IndexWriter destWriter = new IndexWriter(destDir, iwc);
final int initialDocs = 15;
for (int i = 0; i < initialDocs; i++) addDoc(destWriter);
destWriter.commit();
// create empty readers
Directory dir = new MockDirectoryWrapper(random(), new ByteBuffersDirectory());
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(new MockAnalyzer(random())));
writer.close();
final int numReaders = 20;
DirectoryReader[] readers = new DirectoryReader[numReaders];
for (int i = 0; i < numReaders; i++) readers[i] = DirectoryReader.open(dir);
TestUtil.addIndexesSlowly(destWriter, readers);
destWriter.commit();
// verify no docs were added
try (IndexReader reader = DirectoryReader.open(destDir)) {
assertEquals(initialDocs, reader.numDocs());
}
// verify no merges were triggered
assertEquals(0, ms.addIndexesMerges);
destWriter.close();
for (int i = 0; i < numReaders; i++) readers[i].close();
destDir.close();
dir.close();
}
public void testCascadingMergesTriggered() throws Exception {
ConcurrentAddIndexesMergePolicy mp = new ConcurrentAddIndexesMergePolicy();
CountingSerialMergeScheduler ms = new CountingSerialMergeScheduler();
AddIndexesWithReadersSetup c = new AddIndexesWithReadersSetup(ms, mp);
TestUtil.addIndexesSlowly(c.destWriter, c.readers);
assertTrue(ms.explicitMerges > 0);
c.closeAll();
}
public void testAddIndexesHittingMaxDocsLimit() throws Exception {
final int writerMaxDocs = 15;
IndexWriter.setMaxDocs(writerMaxDocs);
// create destination writer
Directory destDir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
iwc.setMergePolicy(new ConcurrentAddIndexesMergePolicy());
CountingSerialMergeScheduler ms = new CountingSerialMergeScheduler();
iwc.setMergeScheduler(ms);
IndexWriter destWriter = new IndexWriter(destDir, iwc);
for (int i = 0; i < writerMaxDocs; i++) addDoc(destWriter);
destWriter.commit();
// create readers to add
Directory dir = new MockDirectoryWrapper(random(), new ByteBuffersDirectory());
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(new MockAnalyzer(random())));
for (int i = 0; i < 10; i++) addDoc(writer);
writer.close();
final int numReaders = 20;
DirectoryReader[] readers = new DirectoryReader[numReaders];
for (int i = 0; i < numReaders; i++) readers[i] = DirectoryReader.open(dir);
boolean success = false;
try {
TestUtil.addIndexesSlowly(destWriter, readers);
success = true;
} catch (IllegalArgumentException e) {
assertTrue(
e.getMessage()
.contains("number of documents in the index cannot exceed " + writerMaxDocs));
}
assertFalse(success);
// verify no docs were added
destWriter.commit();
try (IndexReader reader = DirectoryReader.open(destDir)) {
assertEquals(writerMaxDocs, reader.numDocs());
}
destWriter.close();
for (int i = 0; i < numReaders; i++) readers[i].close();
destDir.close();
dir.close();
}
private abstract class RunAddIndexesThreads {
Directory dir, dir2;
@ -704,7 +990,10 @@ public class TestAddIndexes extends LuceneTestCase {
writer.close();
dir2 = newDirectory();
writer2 = new IndexWriter(dir2, new IndexWriterConfig(new MockAnalyzer(random())));
MergePolicy mp = new ConcurrentAddIndexesMergePolicy();
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
iwc.setMergePolicy(mp);
writer2 = new IndexWriter(dir2, iwc);
writer2.commit();
readers = new DirectoryReader[NUM_COPY];
@ -985,7 +1274,6 @@ public class TestAddIndexes extends LuceneTestCase {
System.out.println("TEST: done join threads");
}
c.closeDir();
assertTrue(c.failures.size() == 0);
}
@ -1014,7 +1302,6 @@ public class TestAddIndexes extends LuceneTestCase {
}
c.closeDir();
assertTrue(c.failures.size() == 0);
}
@ -1488,7 +1775,9 @@ public class TestAddIndexes extends LuceneTestCase {
}
writer.commit();
writer.close();
DirectoryReader reader = DirectoryReader.open(dir1);
// wrappedReader filters out soft deleted docs
DirectoryReader wrappedReader = new SoftDeletesDirectoryReaderWrapper(reader, "soft_delete");
Directory dir2 = newDirectory();
int numDocs = reader.numDocs();
@ -1504,9 +1793,13 @@ public class TestAddIndexes extends LuceneTestCase {
assertEquals(wrappedReader.numDocs(), writer.getDocStats().numDocs);
assertEquals(maxDoc, writer.getDocStats().maxDoc);
writer.commit();
SegmentCommitInfo commitInfo = writer.cloneSegmentInfos().info(0);
assertEquals(maxDoc - wrappedReader.numDocs(), commitInfo.getSoftDelCount());
int softDeleteCount =
writer.cloneSegmentInfos().asList().stream()
.mapToInt(SegmentCommitInfo::getSoftDelCount)
.sum();
assertEquals(maxDoc - wrappedReader.numDocs(), softDeleteCount);
writer.close();
Directory dir3 = newDirectory();
iwc1 = newIndexWriterConfig(new MockAnalyzer(random())).setSoftDeletesField("soft_delete");
writer = new IndexWriter(dir3, iwc1);
@ -1517,6 +1810,7 @@ public class TestAddIndexes extends LuceneTestCase {
}
writer.addIndexes(readers);
assertEquals(wrappedReader.numDocs(), writer.getDocStats().numDocs);
// soft deletes got filtered out when wrappedReader(s) were added.
assertEquals(wrappedReader.numDocs(), writer.getDocStats().maxDoc);
IOUtils.close(reader, writer, dir3, dir2, dir1);
}

View File

@ -352,7 +352,7 @@ public class TestIndexWriterOnDiskFull extends LuceneTestCase {
done = true;
}
} catch (IllegalStateException | IOException e) {
} catch (IllegalStateException | IOException | MergePolicy.MergeException e) {
success = false;
err = e;
if (VERBOSE) {
@ -360,7 +360,7 @@ public class TestIndexWriterOnDiskFull extends LuceneTestCase {
e.printStackTrace(System.out);
}
if (1 == x) {
if (1 == x && (e instanceof MergePolicy.MergeException == false)) {
e.printStackTrace(System.out);
fail(methodName + " hit IOException after disk space was freed up");
}