diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index e61fcd526b8..3c639f84cc9 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -45,6 +45,9 @@ Improvements * LUCENE-10416: Update Korean Dictionary to mecab-ko-dic-2.1.1-20180720 for Nori. (Uihyun Kim) +* LUCENE-10216: Use MergePolicy to define and MergeScheduler to trigger the reader merges + required by addIndexes(CodecReader[]) API. (Vigya Sharma, Michael McCandless) + Optimizations --------------------- (No changes) diff --git a/lucene/core/src/java/org/apache/lucene/index/FieldInfos.java b/lucene/core/src/java/org/apache/lucene/index/FieldInfos.java index 76cf9bb654a..5013ca755c3 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FieldInfos.java +++ b/lucene/core/src/java/org/apache/lucene/index/FieldInfos.java @@ -352,6 +352,14 @@ public class FieldInfos implements Iterable { this.softDeletesFieldName = softDeletesFieldName; } + void verifyFieldInfo(FieldInfo fi) { + String fieldName = fi.getName(); + verifySoftDeletedFieldName(fieldName, fi.isSoftDeletesField()); + if (nameToNumber.containsKey(fieldName)) { + verifySameSchema(fi); + } + } + /** * Returns the global field number for the given field name. If the name does not exist yet it * tries to add it with the given preferred field number assigned if possible otherwise the diff --git a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java index 6eeb61cde73..08c0196a413 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java @@ -47,6 +47,11 @@ public class FilterMergePolicy extends MergePolicy implements UnwrappableNOTE: empty segments are dropped by this method and not added to this index. * - *

NOTE: this merges all given {@link LeafReader}s in one merge. If you intend to merge - * a large number of readers, it may be better to call this method multiple times, each time with - * a small set of readers. In principle, if you use a merge policy with a {@code mergeFactor} or - * {@code maxMergeAtOnce} parameter, you should pass that many readers in one call. - * - *

NOTE: this method does not call or make use of the {@link MergeScheduler}, so any - * custom bandwidth throttling is at the moment ignored. + *

NOTE: provided {@link LeafReader}s are merged as specified by the {@link + * MergePolicy#findMerges(CodecReader...)} API. Default behavior is to merge all provided readers + * into a single segment. You can modify this by overriding the findMerge API in your + * custom merge policy. * * @return The sequence number for this operation * @throws CorruptIndexException if the index is corrupt @@ -3135,144 +3140,315 @@ public class IndexWriter // long so we can detect int overflow: long numDocs = 0; long seqNo; - try { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "flush at addIndexes(CodecReader...)"); - } - flush(false, true); - String mergedName = newSegmentName(); - int numSoftDeleted = 0; + try { + // Best effort up front validations for (CodecReader leaf : readers) { - numDocs += leaf.numDocs(); validateMergeReader(leaf); - if (softDeletesEnabled) { - Bits liveDocs = leaf.getLiveDocs(); - numSoftDeleted += - PendingSoftDeletes.countSoftDeletes( - FieldExistsQuery.getDocValuesDocIdSetIterator(config.getSoftDeletesField(), leaf), - liveDocs); + for (FieldInfo fi : leaf.getFieldInfos()) { + globalFieldNumberMap.verifyFieldInfo(fi); + } + numDocs += leaf.numDocs(); + } + testReserveDocs(numDocs); + + synchronized (this) { + ensureOpen(); + if (merges.areEnabled() == false) { + throw new AlreadyClosedException( + "this IndexWriter is closed. Cannot execute addIndexes(CodecReaders...) API"); } } - // Best-effort up front check: - testReserveDocs(numDocs); - - final IOContext context = - new IOContext( - new MergeInfo(Math.toIntExact(numDocs), -1, false, UNBOUNDED_MAX_MERGE_SEGMENTS)); - - // TODO: somehow we should fix this merge so it's - // abortable so that IW.close(false) is able to stop it - TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory); - Codec codec = config.getCodec(); - // We set the min version to null for now, it will be set later by SegmentMerger - SegmentInfo info = - new SegmentInfo( - directoryOrig, - Version.LATEST, - null, - mergedName, - -1, - false, - codec, - Collections.emptyMap(), - StringHelper.randomId(), - Collections.emptyMap(), - config.getIndexSort()); - - SegmentMerger merger = - new SegmentMerger( - Arrays.asList(readers), info, infoStream, trackingDir, globalFieldNumberMap, context); - - if (!merger.shouldMerge()) { + MergePolicy mergePolicy = config.getMergePolicy(); + MergePolicy.MergeSpecification spec = mergePolicy.findMerges(readers); + boolean mergeSuccess = false; + if (spec != null && spec.merges.size() > 0) { + try { + spec.merges.forEach(addIndexesMergeSource::registerMerge); + mergeScheduler.merge(addIndexesMergeSource, MergeTrigger.ADD_INDEXES); + spec.await(); + mergeSuccess = + spec.merges.stream().allMatch(m -> m.hasCompletedSuccessfully().orElse(false)); + } finally { + if (mergeSuccess == false) { + for (MergePolicy.OneMerge merge : spec.merges) { + if (merge.getMergeInfo() != null) { + deleteNewFiles(merge.getMergeInfo().files()); + } + } + } + } + } else { + if (infoStream.isEnabled("IW")) { + if (spec == null) { + infoStream.message( + "addIndexes(CodecReaders...)", + "received null mergeSpecification from MergePolicy. No indexes to add, returning.."); + } else { + infoStream.message( + "addIndexes(CodecReaders...)", + "received empty mergeSpecification from MergePolicy. No indexes to add, returning.."); + } + } return docWriter.getNextSequenceNumber(); } - synchronized (this) { - ensureOpen(); - assert merges.areEnabled(); - runningAddIndexesMerges.add(merger); - } - try { - merger.merge(); // merge 'em - } finally { + if (mergeSuccess) { + List infos = new ArrayList<>(); + long totalDocs = 0; + for (MergePolicy.OneMerge merge : spec.merges) { + totalDocs += merge.totalMaxDoc; + if (merge.getMergeInfo() != null) { + infos.add(merge.getMergeInfo()); + } + } + synchronized (this) { - runningAddIndexesMerges.remove(merger); - notifyAll(); + if (infos.isEmpty() == false) { + boolean registerSegmentSuccess = false; + try { + ensureOpen(); + // Reserve the docs, just before we update SIS: + reserveDocs(totalDocs); + registerSegmentSuccess = true; + } finally { + if (registerSegmentSuccess == false) { + for (SegmentCommitInfo sipc : infos) { + // Safe: these files must exist + deleteNewFiles(sipc.files()); + } + } + } + segmentInfos.addAll(infos); + checkpoint(); + } + seqNo = docWriter.getNextSequenceNumber(); } - } - SegmentCommitInfo infoPerCommit = - new SegmentCommitInfo(info, 0, numSoftDeleted, -1L, -1L, -1L, StringHelper.randomId()); - - info.setFiles(new HashSet<>(trackingDir.getCreatedFiles())); - trackingDir.clearCreatedFiles(); - - setDiagnostics(info, SOURCE_ADDINDEXES_READERS); - - final MergePolicy mergePolicy = config.getMergePolicy(); - boolean useCompoundFile; - synchronized (this) { // Guard segmentInfos - if (merges.areEnabled() == false) { - // Safe: these files must exist - deleteNewFiles(infoPerCommit.files()); - - return docWriter.getNextSequenceNumber(); + } else { + if (infoStream.isEnabled("IW")) { + infoStream.message( + "addIndexes(CodecReaders...)", "failed to successfully merge all provided readers."); } - ensureOpen(); - useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this); - } - - // Now create the compound file if needed - if (useCompoundFile) { - Collection filesToDelete = infoPerCommit.files(); - TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(directory); - // TODO: unlike merge, on exception we arent sniping any trash cfs files here? - // createCompoundFile tries to cleanup, but it might not always be able to... - try { - createCompoundFile(infoStream, trackingCFSDir, info, context, this::deleteNewFiles); - } finally { - // delete new non cfs files directly: they were never - // registered with IFD - deleteNewFiles(filesToDelete); + for (MergePolicy.OneMerge merge : spec.merges) { + if (merge.isAborted()) { + throw new MergePolicy.MergeAbortedException("merge was aborted."); + } + Throwable t = merge.getException(); + if (t != null) { + IOUtils.rethrowAlways(t); + } } - info.setUseCompoundFile(true); - } - - // Have codec write SegmentInfo. Must do this after - // creating CFS so that 1) .si isn't slurped into CFS, - // and 2) .si reflects useCompoundFile=true change - // above: - codec.segmentInfoFormat().write(trackingDir, info, context); - - info.addFiles(trackingDir.getCreatedFiles()); - - // Register the new segment - synchronized (this) { - if (merges.areEnabled() == false) { - // Safe: these files must exist - deleteNewFiles(infoPerCommit.files()); - - return docWriter.getNextSequenceNumber(); - } - ensureOpen(); - - // Now reserve the docs, just before we update SIS: - reserveDocs(numDocs); - - segmentInfos.add(infoPerCommit); - seqNo = docWriter.getNextSequenceNumber(); - checkpoint(); + // If no merge hit an exception, and merge was not aborted, but we still failed to add + // indexes, fail the API + throw new RuntimeException( + "failed to successfully merge all provided readers in addIndexes(CodecReader...)"); } } catch (VirtualMachineError tragedy) { tragicEvent(tragedy, "addIndexes(CodecReader...)"); throw tragedy; } - maybeMerge(); + maybeMerge(); return seqNo; } + private class AddIndexesMergeSource implements MergeScheduler.MergeSource { + + private final Queue pendingAddIndexesMerges = new ArrayDeque<>(); + private final IndexWriter writer; + + public AddIndexesMergeSource(IndexWriter writer) { + this.writer = writer; + } + + public void registerMerge(MergePolicy.OneMerge merge) { + synchronized (IndexWriter.this) { + pendingAddIndexesMerges.add(merge); + } + } + + @Override + public MergePolicy.OneMerge getNextMerge() { + synchronized (IndexWriter.this) { + if (hasPendingMerges() == false) { + return null; + } + MergePolicy.OneMerge merge = pendingAddIndexesMerges.remove(); + runningMerges.add(merge); + return merge; + } + } + + @Override + public void onMergeFinished(MergePolicy.OneMerge merge) { + synchronized (IndexWriter.this) { + runningMerges.remove(merge); + } + } + + @Override + public boolean hasPendingMerges() { + return pendingAddIndexesMerges.size() > 0; + } + + public void abortPendingMerges() throws IOException { + synchronized (IndexWriter.this) { + IOUtils.applyToAll( + pendingAddIndexesMerges, + merge -> { + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "now abort pending addIndexes merge"); + } + merge.setAborted(); + merge.close(false, false, mr -> {}); + onMergeFinished(merge); + }); + pendingAddIndexesMerges.clear(); + } + } + + @Override + public void merge(MergePolicy.OneMerge merge) throws IOException { + boolean success = false; + try { + writer.addIndexesReaderMerge(merge); + success = true; + } catch (Throwable t) { + handleMergeException(t, merge); + } finally { + synchronized (IndexWriter.this) { + merge.close(success, false, mr -> {}); + onMergeFinished(merge); + } + } + } + } + + /** + * Runs a single merge operation for {@link IndexWriter#addIndexes(CodecReader...)}. + * + *

Merges and creates a SegmentInfo, for the readers grouped together in provided OneMerge. + * + * @param merge OneMerge object initialized from readers. + * @throws IOException if there is a low-level IO error + */ + public void addIndexesReaderMerge(MergePolicy.OneMerge merge) throws IOException { + + merge.mergeInit(); + merge.checkAborted(); + + // long so we can detect int overflow: + long numDocs = 0; + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "flush at addIndexes(CodecReader...)"); + } + flush(false, true); + + String mergedName = newSegmentName(); + Directory mergeDirectory = mergeScheduler.wrapForMerge(merge, directory); + int numSoftDeleted = 0; + for (MergePolicy.MergeReader reader : merge.getMergeReader()) { + CodecReader leaf = reader.codecReader; + numDocs += leaf.numDocs(); + if (softDeletesEnabled) { + Bits liveDocs = reader.hardLiveDocs; + numSoftDeleted += + PendingSoftDeletes.countSoftDeletes( + FieldExistsQuery.getDocValuesDocIdSetIterator(config.getSoftDeletesField(), leaf), + liveDocs); + } + } + + // Best-effort up front check: + testReserveDocs(numDocs); + + final IOContext context = + new IOContext( + new MergeInfo(Math.toIntExact(numDocs), -1, false, UNBOUNDED_MAX_MERGE_SEGMENTS)); + + TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(mergeDirectory); + Codec codec = config.getCodec(); + // We set the min version to null for now, it will be set later by SegmentMerger + SegmentInfo segInfo = + new SegmentInfo( + directoryOrig, + Version.LATEST, + null, + mergedName, + -1, + false, + codec, + Collections.emptyMap(), + StringHelper.randomId(), + Collections.emptyMap(), + config.getIndexSort()); + + List readers = + merge.getMergeReader().stream().map(r -> r.codecReader).collect(Collectors.toList()); + SegmentMerger merger = + new SegmentMerger(readers, segInfo, infoStream, trackingDir, globalFieldNumberMap, context); + + if (!merger.shouldMerge()) { + return; + } + + merge.checkAborted(); + synchronized (this) { + runningAddIndexesMerges.add(merger); + } + merge.mergeStartNS = System.nanoTime(); + try { + merger.merge(); // merge 'em + } finally { + synchronized (this) { + runningAddIndexesMerges.remove(merger); + notifyAll(); + } + } + + merge.setMergeInfo( + new SegmentCommitInfo(segInfo, 0, numSoftDeleted, -1L, -1L, -1L, StringHelper.randomId())); + merge.getMergeInfo().info.setFiles(new HashSet<>(trackingDir.getCreatedFiles())); + trackingDir.clearCreatedFiles(); + + setDiagnostics(merge.getMergeInfo().info, SOURCE_ADDINDEXES_READERS); + + final MergePolicy mergePolicy = config.getMergePolicy(); + boolean useCompoundFile; + synchronized (this) { + merge.checkAborted(); + useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, merge.getMergeInfo(), this); + } + + // Now create the compound file if needed + if (useCompoundFile) { + Collection filesToDelete = merge.getMergeInfo().files(); + TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(mergeDirectory); + // TODO: unlike merge, on exception we arent sniping any trash cfs files here? + // createCompoundFile tries to cleanup, but it might not always be able to... + try { + createCompoundFile( + infoStream, trackingCFSDir, merge.getMergeInfo().info, context, this::deleteNewFiles); + } finally { + // delete new non cfs files directly: they were never + // registered with IFD + deleteNewFiles(filesToDelete); + } + merge.getMergeInfo().info.setUseCompoundFile(true); + } + + // Have codec write SegmentInfo. Must do this after + // creating CFS so that 1) .si isn't slurped into CFS, + // and 2) .si reflects useCompoundFile=true change + // above: + codec.segmentInfoFormat().write(trackingDir, merge.getMergeInfo().info, context); + merge.getMergeInfo().info.addFiles(trackingDir.getCreatedFiles()); + // Return without registering the segment files with IndexWriter. + // We do this together for all merges triggered by an addIndexes API, + // to keep the API transactional. + } + /** Copies the segment files as-is into the IndexWriter's directory. */ private SegmentCommitInfo copySegmentAsIs( SegmentCommitInfo info, String segName, IOContext context) throws IOException { @@ -4818,19 +4994,21 @@ public class IndexWriter suppressExceptions == false, droppedSegment, mr -> { - final SegmentReader sr = mr.reader; - final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false); - // We still hold a ref so it should not have been removed: - assert rld != null; - if (drop) { - rld.dropChanges(); - } else { - rld.dropMergingUpdates(); - } - rld.release(sr); - release(rld); - if (drop) { - readerPool.drop(rld.info); + if (merge.usesPooledReaders) { + final SegmentReader sr = mr.reader; + final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false); + // We still hold a ref so it should not have been removed: + assert rld != null; + if (drop) { + rld.dropChanges(); + } else { + rld.dropMergingUpdates(); + } + rld.release(sr); + release(rld); + if (drop) { + readerPool.drop(rld.info); + } } }); } else { diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java index b84958ab609..a5d712bfdbb 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -193,6 +194,7 @@ public abstract class MergePolicy { long mergeGen; // used by IndexWriter boolean isExternal; // used by IndexWriter int maxNumSegments = -1; // used by IndexWriter + boolean usesPooledReaders; // used by IndexWriter to drop readers while closing /** Estimated size in bytes of the merged segment. */ public volatile long estimatedMergeBytes; // used by IndexWriter @@ -229,6 +231,28 @@ public abstract class MergePolicy { totalMaxDoc = segments.stream().mapToInt(i -> i.info.maxDoc()).sum(); mergeProgress = new OneMergeProgress(); mergeReaders = List.of(); + usesPooledReaders = true; + } + + /** + * Create a OneMerge directly from CodecReaders. Used to merge incoming readers in {@link + * IndexWriter#addIndexes(CodecReader...)}. This OneMerge works directly on readers and has an + * empty segments list. + * + * @param codecReaders Codec readers to merge + */ + public OneMerge(CodecReader... codecReaders) { + List readers = new ArrayList<>(codecReaders.length); + int totalDocs = 0; + for (CodecReader r : codecReaders) { + readers.add(new MergeReader(r, r.getLiveDocs())); + totalDocs += r.numDocs(); + } + mergeReaders = List.copyOf(readers); + segments = List.of(); + totalMaxDoc = totalDocs; + mergeProgress = new OneMergeProgress(); + usesPooledReaders = false; } /** @@ -472,15 +496,31 @@ public abstract class MergePolicy { return b.toString(); } + CompletableFuture getMergeCompletedFutures() { + return CompletableFuture.allOf( + merges.stream() + .map(m -> m.mergeCompleted) + .collect(Collectors.toList()) + .toArray(CompletableFuture[]::new)); + } + + /** Waits, until interrupted, for all merges to complete. */ + boolean await() { + try { + CompletableFuture future = getMergeCompletedFutures(); + future.get(); + return true; + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } catch (@SuppressWarnings("unused") ExecutionException | CancellationException e) { + return false; + } + } + /** Waits if necessary for at most the given time for all merges. */ boolean await(long timeout, TimeUnit unit) { try { - CompletableFuture future = - CompletableFuture.allOf( - merges.stream() - .map(m -> m.mergeCompleted) - .collect(Collectors.toList()) - .toArray(CompletableFuture[]::new)); + CompletableFuture future = getMergeCompletedFutures(); future.get(timeout, unit); return true; } catch (InterruptedException e) { @@ -569,6 +609,24 @@ public abstract class MergePolicy { MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException; + /** + * Define the set of merge operations to perform on provided codec readers in {@link + * IndexWriter#addIndexes(CodecReader...)}. + * + *

The merge operation is required to convert provided readers into segments that can be added + * to the writer. This API can be overridden in custom merge policies to control the concurrency + * for addIndexes. Default implementation creates a single merge operation for all provided + * readers (lowest concurrency). Creating a merge for each reader, would provide the highest level + * of concurrency possible with the configured merge scheduler. + * + * @param readers CodecReader(s) to merge into the main index + */ + public MergeSpecification findMerges(CodecReader... readers) throws IOException { + MergeSpecification mergeSpec = new MergeSpecification(); + mergeSpec.add(new OneMerge(readers)); + return mergeSpec; + } + /** * Determine what set of merge operations is necessary in order to merge to {@code <=} the * specified segment count. {@link IndexWriter} calls this when its {@link IndexWriter#forceMerge} @@ -844,12 +902,24 @@ public abstract class MergePolicy { } static final class MergeReader { + final CodecReader codecReader; final SegmentReader reader; final Bits hardLiveDocs; MergeReader(SegmentReader reader, Bits hardLiveDocs) { + this.codecReader = reader; this.reader = reader; this.hardLiveDocs = hardLiveDocs; } + + MergeReader(CodecReader reader, Bits hardLiveDocs) { + if (SegmentReader.class.isAssignableFrom(reader.getClass())) { + this.reader = (SegmentReader) reader; + } else { + this.reader = null; + } + this.codecReader = reader; + this.hardLiveDocs = hardLiveDocs; + } } } diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java index b6b06988ef1..ca23658592a 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java @@ -43,4 +43,6 @@ public enum MergeTrigger { COMMIT, /** Merge was triggered on opening NRT readers. */ GET_READER, + /** Merge was triggered by an {@link IndexWriter#addIndexes(CodecReader...)} operation */ + ADD_INDEXES, } diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java index 3429b6174d0..ce7b1ce006b 100644 --- a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java @@ -39,6 +39,16 @@ public final class NoMergePolicy extends MergePolicy { return null; } + @Override + public MergeSpecification findMerges(CodecReader... readers) throws IOException { + // addIndexes(CodecReader...) now uses MergePolicy and MergeScheduler to add + // provided readers (LUCENE-10216). We retain the default behavior here to enable + // addIndexes for consumers like IndexRearranger. + // This does not merge existing segments, but uses SegmentMerger to add + // new incoming readers to the index. + return super.findMerges(readers); + } + @Override public MergeSpecification findForcedMerges( SegmentInfos segmentInfos, diff --git a/lucene/core/src/test/org/apache/lucene/index/TestAddIndexes.java b/lucene/core/src/test/org/apache/lucene/index/TestAddIndexes.java index 45d42b86d6c..c49aabfc166 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestAddIndexes.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestAddIndexes.java @@ -20,7 +20,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.FilterCodec; import org.apache.lucene.codecs.PostingsFormat; @@ -683,6 +685,290 @@ public class TestAddIndexes extends LuceneTestCase { writer.addDocument(doc); } + private class ConcurrentAddIndexesMergePolicy extends TieredMergePolicy { + + @Override + public MergeSpecification findMerges(CodecReader... readers) throws IOException { + // create a oneMerge for each reader to let them get concurrently processed by addIndexes() + MergeSpecification mergeSpec = new MergeSpecification(); + for (CodecReader reader : readers) { + mergeSpec.add(new OneMerge(reader)); + } + if (VERBOSE) { + System.out.println( + "No. of addIndexes merges returned by MergePolicy: " + mergeSpec.merges.size()); + } + return mergeSpec; + } + } + + private class AddIndexesWithReadersSetup { + Directory dir, destDir; + IndexWriter destWriter; + final DirectoryReader[] readers; + final int ADDED_DOCS_PER_READER = 15; + final int INIT_DOCS = 25; + final int NUM_READERS = 15; + + public AddIndexesWithReadersSetup(MergeScheduler ms, MergePolicy mp) throws IOException { + dir = new MockDirectoryWrapper(random(), new ByteBuffersDirectory()); + IndexWriter writer = + new IndexWriter( + dir, new IndexWriterConfig(new MockAnalyzer(random())).setMaxBufferedDocs(2)); + for (int i = 0; i < ADDED_DOCS_PER_READER; i++) addDoc(writer); + writer.close(); + + destDir = newDirectory(); + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())); + iwc.setMergePolicy(mp); + iwc.setMergeScheduler(ms); + destWriter = new IndexWriter(destDir, iwc); + for (int i = 0; i < INIT_DOCS; i++) addDoc(destWriter); + destWriter.commit(); + + readers = new DirectoryReader[NUM_READERS]; + for (int i = 0; i < NUM_READERS; i++) readers[i] = DirectoryReader.open(dir); + } + + public void closeAll() throws IOException { + destWriter.close(); + for (int i = 0; i < NUM_READERS; i++) readers[i].close(); + destDir.close(); + dir.close(); + } + } + + public void testAddIndexesWithConcurrentMerges() throws Exception { + ConcurrentAddIndexesMergePolicy mp = new ConcurrentAddIndexesMergePolicy(); + AddIndexesWithReadersSetup c = + new AddIndexesWithReadersSetup(new ConcurrentMergeScheduler(), mp); + TestUtil.addIndexesSlowly(c.destWriter, c.readers); + c.destWriter.commit(); + try (IndexReader reader = DirectoryReader.open(c.destDir)) { + assertEquals(c.INIT_DOCS + c.NUM_READERS * c.ADDED_DOCS_PER_READER, reader.numDocs()); + } + c.closeAll(); + } + + private class PartialMergeScheduler extends MergeScheduler { + + int mergesToDo; + int mergesTriggered = 0; + + public PartialMergeScheduler(int mergesToDo) { + this.mergesToDo = mergesToDo; + if (VERBOSE) { + System.out.println( + "PartialMergeScheduler configured to mark all merges as failed, " + + "after triggering [" + + mergesToDo + + "] merges."); + } + } + + @Override + public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException { + while (true) { + MergePolicy.OneMerge merge = mergeSource.getNextMerge(); + if (merge == null) { + break; + } + if (mergesTriggered >= mergesToDo) { + merge.close(false, false, mr -> {}); + mergeSource.onMergeFinished(merge); + } else { + mergeSource.merge(merge); + mergesTriggered++; + } + } + } + + @Override + public void close() throws IOException {} + } + + public void testAddIndexesWithPartialMergeFailures() throws Exception { + final List merges = new ArrayList<>(); + ConcurrentAddIndexesMergePolicy mp = + new ConcurrentAddIndexesMergePolicy() { + @Override + public MergeSpecification findMerges(CodecReader... readers) throws IOException { + MergeSpecification spec = super.findMerges(readers); + merges.addAll(spec.merges); + return spec; + } + }; + AddIndexesWithReadersSetup c = new AddIndexesWithReadersSetup(new PartialMergeScheduler(2), mp); + assertThrows(RuntimeException.class, () -> TestUtil.addIndexesSlowly(c.destWriter, c.readers)); + c.destWriter.commit(); + + // verify no docs got added and all interim files from successful merges have been deleted + try (IndexReader reader = DirectoryReader.open(c.destDir)) { + assertEquals(c.INIT_DOCS, reader.numDocs()); + } + for (MergePolicy.OneMerge merge : merges) { + if (merge.getMergeInfo() != null) { + assertFalse( + Arrays.stream(c.destDir.listAll()) + .collect(Collectors.toSet()) + .containsAll(merge.getMergeInfo().files())); + } + } + c.closeAll(); + } + + public void testAddIndexesWithNullMergeSpec() throws Exception { + MergePolicy mp = + new TieredMergePolicy() { + @Override + public MergeSpecification findMerges(CodecReader... readers) throws IOException { + return null; + } + }; + AddIndexesWithReadersSetup c = + new AddIndexesWithReadersSetup(new ConcurrentMergeScheduler(), mp); + TestUtil.addIndexesSlowly(c.destWriter, c.readers); + c.destWriter.commit(); + try (IndexReader reader = DirectoryReader.open(c.destDir)) { + assertEquals(c.INIT_DOCS, reader.numDocs()); + } + c.closeAll(); + } + + public void testAddIndexesWithEmptyMergeSpec() throws Exception { + MergePolicy mp = + new TieredMergePolicy() { + @Override + public MergeSpecification findMerges(CodecReader... readers) throws IOException { + return new MergeSpecification(); + } + }; + AddIndexesWithReadersSetup c = + new AddIndexesWithReadersSetup(new ConcurrentMergeScheduler(), mp); + TestUtil.addIndexesSlowly(c.destWriter, c.readers); + c.destWriter.commit(); + try (IndexReader reader = DirectoryReader.open(c.destDir)) { + assertEquals(c.INIT_DOCS, reader.numDocs()); + } + c.closeAll(); + } + + private class CountingSerialMergeScheduler extends MergeScheduler { + + int explicitMerges = 0; + int addIndexesMerges = 0; + + @Override + public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException { + while (true) { + MergePolicy.OneMerge merge = mergeSource.getNextMerge(); + if (merge == null) { + break; + } + mergeSource.merge(merge); + if (trigger == MergeTrigger.EXPLICIT) { + explicitMerges++; + } + if (trigger == MergeTrigger.ADD_INDEXES) { + addIndexesMerges++; + } + } + } + + @Override + public void close() throws IOException {} + } + + public void testAddIndexesWithEmptyReaders() throws Exception { + Directory destDir = newDirectory(); + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())); + iwc.setMergePolicy(new ConcurrentAddIndexesMergePolicy()); + CountingSerialMergeScheduler ms = new CountingSerialMergeScheduler(); + iwc.setMergeScheduler(ms); + IndexWriter destWriter = new IndexWriter(destDir, iwc); + final int initialDocs = 15; + for (int i = 0; i < initialDocs; i++) addDoc(destWriter); + destWriter.commit(); + + // create empty readers + Directory dir = new MockDirectoryWrapper(random(), new ByteBuffersDirectory()); + IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(new MockAnalyzer(random()))); + writer.close(); + final int numReaders = 20; + DirectoryReader[] readers = new DirectoryReader[numReaders]; + for (int i = 0; i < numReaders; i++) readers[i] = DirectoryReader.open(dir); + + TestUtil.addIndexesSlowly(destWriter, readers); + destWriter.commit(); + + // verify no docs were added + try (IndexReader reader = DirectoryReader.open(destDir)) { + assertEquals(initialDocs, reader.numDocs()); + } + // verify no merges were triggered + assertEquals(0, ms.addIndexesMerges); + + destWriter.close(); + for (int i = 0; i < numReaders; i++) readers[i].close(); + destDir.close(); + dir.close(); + } + + public void testCascadingMergesTriggered() throws Exception { + ConcurrentAddIndexesMergePolicy mp = new ConcurrentAddIndexesMergePolicy(); + CountingSerialMergeScheduler ms = new CountingSerialMergeScheduler(); + AddIndexesWithReadersSetup c = new AddIndexesWithReadersSetup(ms, mp); + TestUtil.addIndexesSlowly(c.destWriter, c.readers); + assertTrue(ms.explicitMerges > 0); + c.closeAll(); + } + + public void testAddIndexesHittingMaxDocsLimit() throws Exception { + final int writerMaxDocs = 15; + IndexWriter.setMaxDocs(writerMaxDocs); + + // create destination writer + Directory destDir = newDirectory(); + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())); + iwc.setMergePolicy(new ConcurrentAddIndexesMergePolicy()); + CountingSerialMergeScheduler ms = new CountingSerialMergeScheduler(); + iwc.setMergeScheduler(ms); + IndexWriter destWriter = new IndexWriter(destDir, iwc); + for (int i = 0; i < writerMaxDocs; i++) addDoc(destWriter); + destWriter.commit(); + + // create readers to add + Directory dir = new MockDirectoryWrapper(random(), new ByteBuffersDirectory()); + IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(new MockAnalyzer(random()))); + for (int i = 0; i < 10; i++) addDoc(writer); + writer.close(); + final int numReaders = 20; + DirectoryReader[] readers = new DirectoryReader[numReaders]; + for (int i = 0; i < numReaders; i++) readers[i] = DirectoryReader.open(dir); + + boolean success = false; + try { + TestUtil.addIndexesSlowly(destWriter, readers); + success = true; + } catch (IllegalArgumentException e) { + assertTrue( + e.getMessage() + .contains("number of documents in the index cannot exceed " + writerMaxDocs)); + } + assertFalse(success); + + // verify no docs were added + destWriter.commit(); + try (IndexReader reader = DirectoryReader.open(destDir)) { + assertEquals(writerMaxDocs, reader.numDocs()); + } + + destWriter.close(); + for (int i = 0; i < numReaders; i++) readers[i].close(); + destDir.close(); + dir.close(); + } + private abstract class RunAddIndexesThreads { Directory dir, dir2; @@ -704,7 +990,10 @@ public class TestAddIndexes extends LuceneTestCase { writer.close(); dir2 = newDirectory(); - writer2 = new IndexWriter(dir2, new IndexWriterConfig(new MockAnalyzer(random()))); + MergePolicy mp = new ConcurrentAddIndexesMergePolicy(); + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())); + iwc.setMergePolicy(mp); + writer2 = new IndexWriter(dir2, iwc); writer2.commit(); readers = new DirectoryReader[NUM_COPY]; @@ -985,7 +1274,6 @@ public class TestAddIndexes extends LuceneTestCase { System.out.println("TEST: done join threads"); } c.closeDir(); - assertTrue(c.failures.size() == 0); } @@ -1014,7 +1302,6 @@ public class TestAddIndexes extends LuceneTestCase { } c.closeDir(); - assertTrue(c.failures.size() == 0); } @@ -1488,7 +1775,9 @@ public class TestAddIndexes extends LuceneTestCase { } writer.commit(); writer.close(); + DirectoryReader reader = DirectoryReader.open(dir1); + // wrappedReader filters out soft deleted docs DirectoryReader wrappedReader = new SoftDeletesDirectoryReaderWrapper(reader, "soft_delete"); Directory dir2 = newDirectory(); int numDocs = reader.numDocs(); @@ -1504,9 +1793,13 @@ public class TestAddIndexes extends LuceneTestCase { assertEquals(wrappedReader.numDocs(), writer.getDocStats().numDocs); assertEquals(maxDoc, writer.getDocStats().maxDoc); writer.commit(); - SegmentCommitInfo commitInfo = writer.cloneSegmentInfos().info(0); - assertEquals(maxDoc - wrappedReader.numDocs(), commitInfo.getSoftDelCount()); + int softDeleteCount = + writer.cloneSegmentInfos().asList().stream() + .mapToInt(SegmentCommitInfo::getSoftDelCount) + .sum(); + assertEquals(maxDoc - wrappedReader.numDocs(), softDeleteCount); writer.close(); + Directory dir3 = newDirectory(); iwc1 = newIndexWriterConfig(new MockAnalyzer(random())).setSoftDeletesField("soft_delete"); writer = new IndexWriter(dir3, iwc1); @@ -1517,6 +1810,7 @@ public class TestAddIndexes extends LuceneTestCase { } writer.addIndexes(readers); assertEquals(wrappedReader.numDocs(), writer.getDocStats().numDocs); + // soft deletes got filtered out when wrappedReader(s) were added. assertEquals(wrappedReader.numDocs(), writer.getDocStats().maxDoc); IOUtils.close(reader, writer, dir3, dir2, dir1); } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java index 0847ff63fd1..19d3284b11d 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java @@ -352,7 +352,7 @@ public class TestIndexWriterOnDiskFull extends LuceneTestCase { done = true; } - } catch (IllegalStateException | IOException e) { + } catch (IllegalStateException | IOException | MergePolicy.MergeException e) { success = false; err = e; if (VERBOSE) { @@ -360,7 +360,7 @@ public class TestIndexWriterOnDiskFull extends LuceneTestCase { e.printStackTrace(System.out); } - if (1 == x) { + if (1 == x && (e instanceof MergePolicy.MergeException == false)) { e.printStackTrace(System.out); fail(methodName + " hit IOException after disk space was freed up"); }