From f7cab1645017d863331b42900581b67d3591e2da Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 23 Nov 2023 13:25:00 +0100 Subject: [PATCH] Add a merge policy wrapper that performs recursive graph bisection on merge. (#12622) This adds `BPReorderingMergePolicy`, a merge policy wrapper that reorders doc IDs on merge using a `BPIndexReorderer`. - Reordering always run on forced merges. - A `minNaturalMergeNumDocs` parameter helps only enable reordering on the larger merged segments. This way, small merges retain all merging optimizations like bulk copying of stored fields, and only the larger segments - which are the most important for search performance - get reordered. - If not enough RAM is available to perform reordering, reordering is skipped. To make this work, I had to add the ability for any merge to reorder doc IDs of the merged segment via `OneMerge#reorder`. `MockRandomMergePolicy` from the test framework randomly reverts the order of documents in a merged segment to make sure this logic is properly exercised. --- .../org/apache/lucene/index/IndexWriter.java | 116 +- .../org/apache/lucene/index/MergePolicy.java | 38 +- .../SlowCompositeCodecReaderWrapper.java | 1005 +++++++++++++++++ .../apache/lucene/index/TestAddIndexes.java | 51 + .../index/TestIndexWriterMergePolicy.java | 60 + ...ParentBlockJoinKnnVectorQueryTestCase.java | 23 +- .../lucene/search/join/TestBlockJoin.java | 73 +- .../search/join/TestBlockJoinValidation.java | 4 +- .../search/join/TestCheckJoinIndex.java | 8 +- .../lucene/search/join/TestJoinUtil.java | 13 +- ...estParentBlockJoinFloatKnnVectorQuery.java | 4 +- .../TestParentChildrenBlockJoinQuery.java | 4 +- .../lucene/misc/index/BPIndexReorderer.java | 73 +- .../misc/index/BPReorderingMergePolicy.java | 204 ++++ .../index/TestBPReorderingMergePolicy.java | 260 +++++ .../tests/index/MockRandomMergePolicy.java | 28 + 16 files changed, 1884 insertions(+), 80 deletions(-) create mode 100644 lucene/core/src/java/org/apache/lucene/index/SlowCompositeCodecReaderWrapper.java create mode 100644 lucene/misc/src/java/org/apache/lucene/misc/index/BPReorderingMergePolicy.java create mode 100644 lucene/misc/src/test/org/apache/lucene/misc/index/TestBPReorderingMergePolicy.java diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 209eca938bc..1deb2c80ed5 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -33,6 +33,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -55,6 +56,8 @@ import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.index.FieldInfos.FieldNumbers; import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.index.MergePolicy.MergeReader; +import org.apache.lucene.index.Sorter.DocMap; import org.apache.lucene.internal.tests.IndexPackageAccess; import org.apache.lucene.internal.tests.IndexWriterAccess; import org.apache.lucene.internal.tests.TestSecrets; @@ -3413,8 +3416,20 @@ public class IndexWriter Collections.emptyMap(), config.getIndexSort()); - List readers = - merge.getMergeReader().stream().map(r -> r.codecReader).collect(Collectors.toList()); + List readers = new ArrayList<>(); + for (MergeReader mr : merge.getMergeReader()) { + CodecReader reader = merge.wrapForMerge(mr.codecReader); + readers.add(reader); + } + + if (config.getIndexSort() == null && readers.isEmpty() == false) { + CodecReader mergedReader = SlowCompositeCodecReaderWrapper.wrap(readers); + DocMap docMap = merge.reorder(mergedReader, directory); + if (docMap != null) { + readers = Collections.singletonList(SortingCodecReader.wrap(mergedReader, docMap, null)); + } + } + SegmentMerger merger = new SegmentMerger(readers, segInfo, infoStream, trackingDir, globalFieldNumberMap, context); @@ -3464,6 +3479,8 @@ public class IndexWriter merge.getMergeInfo().info.setUseCompoundFile(true); } + merge.setMergeInfo(merge.info); + // Have codec write SegmentInfo. Must do this after // creating CFS so that 1) .si isn't slurped into CFS, // and 2) .si reflects useCompoundFile=true change @@ -3791,7 +3808,7 @@ public class IndexWriter new OneMergeWrappingMergePolicy( config.getMergePolicy(), toWrap -> - new MergePolicy.OneMerge(toWrap.segments) { + new MergePolicy.OneMerge(toWrap) { SegmentCommitInfo origInfo; final AtomicBoolean onlyOnce = new AtomicBoolean(false); @@ -3890,6 +3907,18 @@ public class IndexWriter public CodecReader wrapForMerge(CodecReader reader) throws IOException { return toWrap.wrapForMerge(reader); // must delegate } + + @Override + public Sorter.DocMap reorder(CodecReader reader, Directory dir) + throws IOException { + return toWrap.reorder(reader, dir); // must delegate + } + + @Override + public void setMergeInfo(SegmentCommitInfo info) { + super.setMergeInfo(info); + toWrap.setMergeInfo(info); + } }), trigger, UNBOUNDED_MAX_MERGE_SEGMENTS); @@ -4312,7 +4341,7 @@ public class IndexWriter * merge.info). If no deletes were flushed, no new deletes file is saved. */ private synchronized ReadersAndUpdates commitMergedDeletesAndUpdates( - MergePolicy.OneMerge merge, MergeState mergeState) throws IOException { + MergePolicy.OneMerge merge, MergeState.DocMap[] docMaps) throws IOException { mergeFinishedGen.incrementAndGet(); @@ -4336,7 +4365,7 @@ public class IndexWriter boolean anyDVUpdates = false; - assert sourceSegments.size() == mergeState.docMaps.length; + assert sourceSegments.size() == docMaps.length; for (int i = 0; i < sourceSegments.size(); i++) { SegmentCommitInfo info = sourceSegments.get(i); minGen = Math.min(info.getBufferedDeletesGen(), minGen); @@ -4346,12 +4375,11 @@ public class IndexWriter // the pool: assert rld != null : "seg=" + info.info.name; - MergeState.DocMap segDocMap = mergeState.docMaps[i]; + MergeState.DocMap segDocMap = docMaps[i]; carryOverHardDeletes( mergedDeletesAndUpdates, maxDoc, - mergeState.liveDocs[i], merge.getMergeReader().get(i).hardLiveDocs, rld.getHardLiveDocs(), segDocMap); @@ -4454,26 +4482,21 @@ public class IndexWriter private static void carryOverHardDeletes( ReadersAndUpdates mergedReadersAndUpdates, int maxDoc, - Bits mergeLiveDocs, // the liveDocs used to build the segDocMaps Bits prevHardLiveDocs, // the hard deletes when the merge reader was pulled Bits currentHardLiveDocs, // the current hard deletes MergeState.DocMap segDocMap) throws IOException { - assert mergeLiveDocs == null || mergeLiveDocs.length() == maxDoc; // if we mix soft and hard deletes we need to make sure that we only carry over deletes // that were not deleted before. Otherwise the segDocMap doesn't contain a mapping. // yet this is also required if any MergePolicy modifies the liveDocs since this is // what the segDocMap is build on. final IntPredicate carryOverDelete = - mergeLiveDocs == null || mergeLiveDocs == prevHardLiveDocs - ? docId -> currentHardLiveDocs.get(docId) == false - : docId -> mergeLiveDocs.get(docId) && currentHardLiveDocs.get(docId) == false; + docId -> segDocMap.get(docId) != -1 && currentHardLiveDocs.get(docId) == false; if (prevHardLiveDocs != null) { // If we had deletions on starting the merge we must // still have deletions now: assert currentHardLiveDocs != null; - assert mergeLiveDocs != null; assert prevHardLiveDocs.length() == maxDoc; assert currentHardLiveDocs.length() == maxDoc; @@ -4516,7 +4539,7 @@ public class IndexWriter } @SuppressWarnings("try") - private synchronized boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) + private synchronized boolean commitMerge(MergePolicy.OneMerge merge, MergeState.DocMap[] docMaps) throws IOException { merge.onMergeComplete(); testPoint("startCommitMerge"); @@ -4559,7 +4582,7 @@ public class IndexWriter } final ReadersAndUpdates mergedUpdates = - merge.info.info.maxDoc() == 0 ? null : commitMergedDeletesAndUpdates(merge, mergeState); + merge.info.info.maxDoc() == 0 ? null : commitMergedDeletesAndUpdates(merge, docMaps); // If the doc store we are using has been closed and // is in now compound format (but wasn't when we @@ -5163,12 +5186,68 @@ public class IndexWriter } mergeReaders.add(wrappedReader); } + + MergeState.DocMap[] reorderDocMaps = null; + if (config.getIndexSort() == null) { + // Create a merged view of the input segments. This effectively does the merge. + CodecReader mergedView = SlowCompositeCodecReaderWrapper.wrap(mergeReaders); + Sorter.DocMap docMap = merge.reorder(mergedView, directory); + if (docMap != null) { + reorderDocMaps = new MergeState.DocMap[mergeReaders.size()]; + int docBase = 0; + int i = 0; + for (CodecReader reader : mergeReaders) { + final int currentDocBase = docBase; + reorderDocMaps[i] = + new MergeState.DocMap() { + @Override + public int get(int docID) { + Objects.checkIndex(docID, reader.maxDoc()); + return docMap.oldToNew(currentDocBase + docID); + } + }; + i++; + docBase += reader.maxDoc(); + } + // This makes merging more expensive as it disables some bulk merging optimizations, so + // only do this if a non-null DocMap is returned. + mergeReaders = + Collections.singletonList(SortingCodecReader.wrap(mergedView, docMap, null)); + } + } + final SegmentMerger merger = new SegmentMerger( mergeReaders, merge.info.info, infoStream, dirWrapper, globalFieldNumberMap, context); merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get())); merge.checkAborted(); + MergeState mergeState = merger.mergeState; + MergeState.DocMap[] docMaps; + if (reorderDocMaps == null) { + docMaps = mergeState.docMaps; + } else { + // Since the reader was reordered, we passed a merged view to MergeState and from its + // perspective there is a single input segment to the merge and the + // SlowCompositeCodecReaderWrapper is effectively doing the merge. + assert mergeState.docMaps.length == 1 + : "Got " + mergeState.docMaps.length + " docMaps, but expected 1"; + MergeState.DocMap compactionDocMap = mergeState.docMaps[0]; + docMaps = new MergeState.DocMap[reorderDocMaps.length]; + for (int i = 0; i < docMaps.length; ++i) { + MergeState.DocMap reorderDocMap = reorderDocMaps[i]; + docMaps[i] = + new MergeState.DocMap() { + @Override + public int get(int docID) { + int reorderedDocId = reorderDocMap.get(docID); + int compactedDocId = compactionDocMap.get(reorderedDocId); + return compactedDocId; + } + }; + } + } + merge.mergeStartNS = System.nanoTime(); // This is where all the work happens: @@ -5176,7 +5255,6 @@ public class IndexWriter merger.merge(); } - MergeState mergeState = merger.mergeState; assert mergeState.segmentInfo == merge.info.info; merge.info.info.setFiles(new HashSet<>(dirWrapper.getCreatedFiles())); Codec codec = config.getCodec(); @@ -5229,7 +5307,7 @@ public class IndexWriter // Merge would produce a 0-doc segment, so we do nothing except commit the merge to remove // all the 0-doc segments that we "merged": assert merge.info.info.maxDoc() == 0; - success = commitMerge(merge, mergeState); + success = commitMerge(merge, docMaps); return 0; } @@ -5309,6 +5387,8 @@ public class IndexWriter success = false; } + merge.setMergeInfo(merge.info); + // Have codec write SegmentInfo. Must do this after // creating CFS so that 1) .si isn't slurped into CFS, // and 2) .si reflects useCompoundFile=true change @@ -5352,7 +5432,7 @@ public class IndexWriter } } - if (!commitMerge(merge, mergeState)) { + if (!commitMerge(merge, docMaps)) { // commitMerge will return false if this merge was // aborted return 0; diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java index 3eb527edb19..0b7a2cb7296 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java @@ -255,6 +255,15 @@ public abstract class MergePolicy { usesPooledReaders = false; } + /** Constructor for wrapping. */ + protected OneMerge(OneMerge oneMerge) { + this.segments = oneMerge.segments; + this.mergeReaders = oneMerge.mergeReaders; + this.totalMaxDoc = oneMerge.totalMaxDoc; + this.mergeProgress = new OneMergeProgress(); + this.usesPooledReaders = oneMerge.usesPooledReaders; + } + /** * Called by {@link IndexWriter} after the merge started and from the thread that will be * executing the merge. @@ -288,11 +297,32 @@ public abstract class MergePolicy { } } - /** Wrap the reader in order to add/remove information to the merged segment. */ + /** + * Wrap a reader prior to merging in order to add/remove fields or documents. + * + *

NOTE: It is illegal to reorder doc IDs here, use {@link + * #reorder(CodecReader,Directory)} instead. + */ public CodecReader wrapForMerge(CodecReader reader) throws IOException { return reader; } + /** + * Extend this method if you wish to renumber doc IDs. This method will be called when index + * sorting is disabled on a merged view of the {@link OneMerge}. A {@code null} return value + * indicates that doc IDs should not be reordered. + * + *

NOTE: Returning a non-null value here disables several optimizations and increases + * the merging overhead. + * + * @param reader The reader to reorder. + * @param dir The {@link Directory} of the index, which may be used to create temporary files. + * @lucene.experimental + */ + public Sorter.DocMap reorder(CodecReader reader, Directory dir) throws IOException { + return null; + } + /** * Expert: Sets the {@link SegmentCommitInfo} of the merged segment. Allows sub-classes to e.g. * {@link SegmentInfo#addDiagnostics(Map) add diagnostic} properties. @@ -355,11 +385,7 @@ public abstract class MergePolicy { * not indicate the number of documents after the merge. */ public int totalNumDocs() { - int total = 0; - for (SegmentCommitInfo info : segments) { - total += info.info.maxDoc(); - } - return total; + return totalMaxDoc; } /** Return {@link MergeInfo} describing this merge. */ diff --git a/lucene/core/src/java/org/apache/lucene/index/SlowCompositeCodecReaderWrapper.java b/lucene/core/src/java/org/apache/lucene/index/SlowCompositeCodecReaderWrapper.java new file mode 100644 index 00000000000..849872a2bd5 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/SlowCompositeCodecReaderWrapper.java @@ -0,0 +1,1005 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.FieldsProducer; +import org.apache.lucene.codecs.KnnVectorsReader; +import org.apache.lucene.codecs.NormsProducer; +import org.apache.lucene.codecs.PointsReader; +import org.apache.lucene.codecs.StoredFieldsReader; +import org.apache.lucene.codecs.TermVectorsReader; +import org.apache.lucene.index.MultiDocValues.MultiSortedDocValues; +import org.apache.lucene.index.MultiDocValues.MultiSortedSetDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.KnnCollector; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.Version; + +/** + * A merged {@link CodecReader} view of multiple {@link CodecReader}. This view is primarily + * targeted at merging, not searching. + */ +final class SlowCompositeCodecReaderWrapper extends CodecReader { + + static CodecReader wrap(List readers) throws IOException { + switch (readers.size()) { + case 0: + throw new IllegalArgumentException("Must take at least one reader, got 0"); + case 1: + return readers.get(0); + default: + return new SlowCompositeCodecReaderWrapper(readers); + } + } + + private final LeafMetaData meta; + private final CodecReader[] codecReaders; + private final int[] docStarts; + private final FieldInfos fieldInfos; + private final Bits liveDocs; + + private SlowCompositeCodecReaderWrapper(List codecReaders) throws IOException { + this.codecReaders = codecReaders.toArray(CodecReader[]::new); + docStarts = new int[codecReaders.size() + 1]; + int i = 0; + int docStart = 0; + for (CodecReader reader : codecReaders) { + i++; + docStart += reader.maxDoc(); + docStarts[i] = docStart; + } + int majorVersion = -1; + Version minVersion = null; + boolean hasBlocks = false; + for (CodecReader reader : codecReaders) { + LeafMetaData readerMeta = reader.getMetaData(); + if (majorVersion == -1) { + majorVersion = readerMeta.getCreatedVersionMajor(); + } else if (majorVersion != readerMeta.getCreatedVersionMajor()) { + throw new IllegalArgumentException( + "Cannot combine leaf readers created with different major versions"); + } + if (minVersion == null) { + minVersion = readerMeta.getMinVersion(); + } else if (minVersion.onOrAfter(readerMeta.getMinVersion())) { + minVersion = readerMeta.getMinVersion(); + } + hasBlocks |= readerMeta.hasBlocks(); + } + meta = new LeafMetaData(majorVersion, minVersion, null, hasBlocks); + MultiReader multiReader = new MultiReader(codecReaders.toArray(CodecReader[]::new)); + fieldInfos = FieldInfos.getMergedFieldInfos(multiReader); + liveDocs = MultiBits.getLiveDocs(multiReader); + } + + private int docIdToReaderId(int doc) { + Objects.checkIndex(doc, docStarts[docStarts.length - 1]); + int readerId = Arrays.binarySearch(docStarts, doc); + if (readerId < 0) { + readerId = -2 - readerId; + } + return readerId; + } + + @Override + public StoredFieldsReader getFieldsReader() { + StoredFieldsReader[] readers = + Arrays.stream(codecReaders) + .map(CodecReader::getFieldsReader) + .toArray(StoredFieldsReader[]::new); + return new SlowCompositeStoredFieldsReaderWrapper(readers, docStarts); + } + + // Remap FieldInfos to make sure consumers only see field infos from the composite reader, not + // from individual leaves + private FieldInfo remap(FieldInfo info) { + return fieldInfos.fieldInfo(info.name); + } + + private class SlowCompositeStoredFieldsReaderWrapper extends StoredFieldsReader { + + private final StoredFieldsReader[] readers; + private final int[] docStarts; + + SlowCompositeStoredFieldsReaderWrapper(StoredFieldsReader[] readers, int[] docStarts) { + this.readers = readers; + this.docStarts = docStarts; + } + + @Override + public void close() throws IOException { + IOUtils.close(readers); + } + + @Override + public StoredFieldsReader clone() { + return new SlowCompositeStoredFieldsReaderWrapper( + Arrays.stream(readers).map(StoredFieldsReader::clone).toArray(StoredFieldsReader[]::new), + docStarts); + } + + @Override + public void checkIntegrity() throws IOException { + for (StoredFieldsReader reader : readers) { + if (reader != null) { + reader.checkIntegrity(); + } + } + } + + @Override + public void document(int docID, StoredFieldVisitor visitor) throws IOException { + int readerId = docIdToReaderId(docID); + readers[readerId].document( + docID - docStarts[readerId], + new StoredFieldVisitor() { + + @Override + public Status needsField(FieldInfo fieldInfo) throws IOException { + return visitor.needsField(remap(fieldInfo)); + } + + @Override + public void binaryField(FieldInfo fieldInfo, byte[] value) throws IOException { + visitor.binaryField(remap(fieldInfo), value); + } + + @Override + public void stringField(FieldInfo fieldInfo, String value) throws IOException { + visitor.stringField(remap(fieldInfo), value); + } + + @Override + public void intField(FieldInfo fieldInfo, int value) throws IOException { + visitor.intField(remap(fieldInfo), value); + } + + @Override + public void longField(FieldInfo fieldInfo, long value) throws IOException { + visitor.longField(remap(fieldInfo), value); + } + + @Override + public void floatField(FieldInfo fieldInfo, float value) throws IOException { + visitor.floatField(remap(fieldInfo), value); + } + + @Override + public void doubleField(FieldInfo fieldInfo, double value) throws IOException { + visitor.doubleField(remap(fieldInfo), value); + } + }); + } + } + + @Override + public TermVectorsReader getTermVectorsReader() { + TermVectorsReader[] readers = + Arrays.stream(codecReaders) + .map(CodecReader::getTermVectorsReader) + .toArray(TermVectorsReader[]::new); + return new SlowCompositeTermVectorsReaderWrapper(readers, docStarts); + } + + private class SlowCompositeTermVectorsReaderWrapper extends TermVectorsReader { + + private final TermVectorsReader[] readers; + private final int[] docStarts; + + SlowCompositeTermVectorsReaderWrapper(TermVectorsReader[] readers, int[] docStarts) { + this.readers = readers; + this.docStarts = docStarts; + } + + @Override + public void close() throws IOException { + IOUtils.close(readers); + } + + @Override + public TermVectorsReader clone() { + return new SlowCompositeTermVectorsReaderWrapper( + Arrays.stream(readers).map(TermVectorsReader::clone).toArray(TermVectorsReader[]::new), + docStarts); + } + + @Override + public void checkIntegrity() throws IOException { + for (TermVectorsReader reader : readers) { + if (reader != null) { + reader.checkIntegrity(); + } + } + } + + @Override + public Fields get(int doc) throws IOException { + int readerId = docIdToReaderId(doc); + TermVectorsReader reader = readers[readerId]; + if (reader == null) { + return null; + } + return reader.get(doc - docStarts[readerId]); + } + } + + @Override + public NormsProducer getNormsReader() { + return new SlowCompositeNormsProducer(codecReaders); + } + + private static class SlowCompositeNormsProducer extends NormsProducer { + + private final CodecReader[] codecReaders; + private final NormsProducer[] producers; + + SlowCompositeNormsProducer(CodecReader[] codecReaders) { + this.codecReaders = codecReaders; + this.producers = + Arrays.stream(codecReaders) + .map(CodecReader::getNormsReader) + .toArray(NormsProducer[]::new); + } + + @Override + public void close() throws IOException { + IOUtils.close(producers); + } + + @Override + public NumericDocValues getNorms(FieldInfo field) throws IOException { + return MultiDocValues.getNormValues(new MultiReader(codecReaders), field.name); + } + + @Override + public void checkIntegrity() throws IOException { + for (NormsProducer producer : producers) { + if (producer != null) { + producer.checkIntegrity(); + } + } + } + } + + private static class DocValuesSub { + private final T sub; + private final int docStart; + private final int docEnd; + + DocValuesSub(T sub, int docStart, int docEnd) { + this.sub = sub; + this.docStart = docStart; + this.docEnd = docEnd; + } + } + + private static class MergedDocIdSetIterator extends DocIdSetIterator { + + final Iterator> it; + final long cost; + DocValuesSub current; + int currentIndex = 0; + int doc = -1; + + MergedDocIdSetIterator(List> subs) { + long cost = 0; + for (DocValuesSub sub : subs) { + if (sub.sub != null) { + cost += sub.sub.cost(); + } + } + this.cost = cost; + this.it = subs.iterator(); + current = it.next(); + } + + private boolean advanceSub(int target) { + while (current.sub == null || current.docEnd <= target) { + if (it.hasNext() == false) { + doc = NO_MORE_DOCS; + return false; + } + current = it.next(); + currentIndex++; + } + return true; + } + + @Override + public int docID() { + return doc; + } + + @Override + public int nextDoc() throws IOException { + while (true) { + if (current.sub != null) { + int next = current.sub.nextDoc(); + if (next != NO_MORE_DOCS) { + return doc = current.docStart + next; + } + } + if (it.hasNext() == false) { + return doc = NO_MORE_DOCS; + } + current = it.next(); + currentIndex++; + } + } + + @Override + public int advance(int target) throws IOException { + while (true) { + if (advanceSub(target) == false) { + return DocIdSetIterator.NO_MORE_DOCS; + } + int next = current.sub.advance(target - current.docStart); + if (next == DocIdSetIterator.NO_MORE_DOCS) { + target = current.docEnd; + } else { + return doc = current.docStart + next; + } + } + } + + @Override + public long cost() { + return cost; + } + } + + @Override + public DocValuesProducer getDocValuesReader() { + return new SlowCompositeDocValuesProducerWrapper(codecReaders, docStarts); + } + + private static class SlowCompositeDocValuesProducerWrapper extends DocValuesProducer { + + private final CodecReader[] codecReaders; + private final DocValuesProducer[] producers; + private final int[] docStarts; + private final Map cachedOrdMaps = new HashMap<>(); + + SlowCompositeDocValuesProducerWrapper(CodecReader[] codecReaders, int[] docStarts) { + this.codecReaders = codecReaders; + this.producers = + Arrays.stream(codecReaders) + .map(CodecReader::getDocValuesReader) + .toArray(DocValuesProducer[]::new); + this.docStarts = docStarts; + } + + @Override + public void close() throws IOException { + IOUtils.close(producers); + } + + @Override + public void checkIntegrity() throws IOException { + for (DocValuesProducer producer : producers) { + if (producer != null) { + producer.checkIntegrity(); + } + } + } + + @Override + public NumericDocValues getNumeric(FieldInfo field) throws IOException { + return MultiDocValues.getNumericValues(new MultiReader(codecReaders), field.name); + } + + @Override + public BinaryDocValues getBinary(FieldInfo field) throws IOException { + return MultiDocValues.getBinaryValues(new MultiReader(codecReaders), field.name); + } + + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + OrdinalMap map = null; + synchronized (cachedOrdMaps) { + map = cachedOrdMaps.get(field.name); + if (map == null) { + // uncached, or not a multi dv + SortedDocValues dv = + MultiDocValues.getSortedValues(new MultiReader(codecReaders), field.name); + if (dv instanceof MultiSortedDocValues) { + map = ((MultiSortedDocValues) dv).mapping; + cachedOrdMaps.put(field.name, map); + } + return dv; + } + } + int size = codecReaders.length; + final SortedDocValues[] values = new SortedDocValues[size]; + long totalCost = 0; + for (int i = 0; i < size; i++) { + final LeafReader reader = codecReaders[i]; + SortedDocValues v = reader.getSortedDocValues(field.name); + if (v == null) { + v = DocValues.emptySorted(); + } + values[i] = v; + totalCost += v.cost(); + } + return new MultiSortedDocValues(values, docStarts, map, totalCost); + } + + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + return MultiDocValues.getSortedNumericValues(new MultiReader(codecReaders), field.name); + } + + @Override + public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + OrdinalMap map = null; + synchronized (cachedOrdMaps) { + map = cachedOrdMaps.get(field.name); + if (map == null) { + // uncached, or not a multi dv + SortedSetDocValues dv = + MultiDocValues.getSortedSetValues(new MultiReader(codecReaders), field.name); + if (dv instanceof MultiSortedSetDocValues) { + map = ((MultiSortedSetDocValues) dv).mapping; + cachedOrdMaps.put(field.name, map); + } + return dv; + } + } + + assert map != null; + int size = codecReaders.length; + final SortedSetDocValues[] values = new SortedSetDocValues[size]; + long totalCost = 0; + for (int i = 0; i < size; i++) { + final LeafReader reader = codecReaders[i]; + SortedSetDocValues v = reader.getSortedSetDocValues(field.name); + if (v == null) { + v = DocValues.emptySortedSet(); + } + values[i] = v; + totalCost += v.cost(); + } + return new MultiSortedSetDocValues(values, docStarts, map, totalCost); + } + } + + @Override + public FieldsProducer getPostingsReader() { + FieldsProducer[] producers = + Arrays.stream(codecReaders) + .map(CodecReader::getPostingsReader) + .toArray(FieldsProducer[]::new); + return new SlowCompositeFieldsProducerWrapper(producers, docStarts); + } + + private static class SlowCompositeFieldsProducerWrapper extends FieldsProducer { + + private final FieldsProducer[] producers; + private final MultiFields fields; + + SlowCompositeFieldsProducerWrapper(FieldsProducer[] producers, int[] docStarts) { + this.producers = producers; + List subs = new ArrayList<>(); + List slices = new ArrayList<>(); + int i = 0; + for (FieldsProducer producer : producers) { + if (producer != null) { + subs.add(producer); + slices.add(new ReaderSlice(docStarts[i], docStarts[i + 1], i)); + } + i++; + } + fields = new MultiFields(subs.toArray(Fields[]::new), slices.toArray(ReaderSlice[]::new)); + } + + @Override + public void close() throws IOException { + IOUtils.close(producers); + } + + @Override + public void checkIntegrity() throws IOException { + for (FieldsProducer producer : producers) { + if (producer != null) { + producer.checkIntegrity(); + } + } + } + + @Override + public Iterator iterator() { + return fields.iterator(); + } + + @Override + public Terms terms(String field) throws IOException { + return fields.terms(field); + } + + @Override + public int size() { + return fields.size(); + } + } + + @Override + public PointsReader getPointsReader() { + return new SlowCompositePointsReaderWrapper(codecReaders, docStarts); + } + + private static class PointValuesSub { + private final PointValues sub; + private final int docBase; + + PointValuesSub(PointValues sub, int docBase) { + this.sub = sub; + this.docBase = docBase; + } + } + + private static class SlowCompositePointsReaderWrapper extends PointsReader { + + private final CodecReader[] codecReaders; + private final PointsReader[] readers; + private final int[] docStarts; + + SlowCompositePointsReaderWrapper(CodecReader[] codecReaders, int[] docStarts) { + this.codecReaders = codecReaders; + this.readers = + Arrays.stream(codecReaders) + .map(CodecReader::getPointsReader) + .toArray(PointsReader[]::new); + this.docStarts = docStarts; + } + + @Override + public void close() throws IOException { + IOUtils.close(readers); + } + + @Override + public void checkIntegrity() throws IOException { + for (PointsReader reader : readers) { + if (reader != null) { + reader.checkIntegrity(); + } + } + } + + @Override + public PointValues getValues(String field) throws IOException { + List values = new ArrayList<>(); + for (int i = 0; i < readers.length; ++i) { + FieldInfo fi = codecReaders[i].getFieldInfos().fieldInfo(field); + if (fi != null && fi.getPointDimensionCount() > 0) { + PointValues v = readers[i].getValues(field); + values.add(new PointValuesSub(v, docStarts[i])); + } + } + if (values.isEmpty()) { + return null; + } + return new PointValues() { + + @Override + public PointTree getPointTree() throws IOException { + return new PointTree() { + + @Override + public PointTree clone() { + return this; + } + + @Override + public void visitDocValues(IntersectVisitor visitor) throws IOException { + for (PointValuesSub sub : values) { + sub.sub.getPointTree().visitDocValues(wrapIntersectVisitor(visitor, sub.docBase)); + } + } + + @Override + public void visitDocIDs(IntersectVisitor visitor) throws IOException { + for (PointValuesSub sub : values) { + sub.sub.getPointTree().visitDocIDs(wrapIntersectVisitor(visitor, sub.docBase)); + } + } + + private IntersectVisitor wrapIntersectVisitor(IntersectVisitor visitor, int docStart) { + return new IntersectVisitor() { + + @Override + public void visit(int docID, byte[] packedValue) throws IOException { + visitor.visit(docStart + docID, packedValue); + } + + @Override + public void visit(int docID) throws IOException { + visitor.visit(docStart + docID); + } + + @Override + public Relation compare(byte[] minPackedValue, byte[] maxPackedValue) { + return visitor.compare(minPackedValue, maxPackedValue); + } + }; + } + + @Override + public long size() { + long size = 0; + for (PointValuesSub sub : values) { + size += sub.sub.size(); + } + return size; + } + + @Override + public boolean moveToSibling() throws IOException { + return false; + } + + @Override + public boolean moveToParent() throws IOException { + return false; + } + + @Override + public boolean moveToChild() throws IOException { + return false; + } + + @Override + public byte[] getMinPackedValue() { + try { + byte[] minPackedValue = null; + for (PointValuesSub sub : values) { + if (minPackedValue == null) { + minPackedValue = sub.sub.getMinPackedValue().clone(); + } else { + byte[] leafMinPackedValue = sub.sub.getMinPackedValue(); + int numIndexDimensions = sub.sub.getNumIndexDimensions(); + int numBytesPerDimension = sub.sub.getBytesPerDimension(); + ArrayUtil.ByteArrayComparator comparator = + ArrayUtil.getUnsignedComparator(numBytesPerDimension); + for (int i = 0; i < numIndexDimensions; ++i) { + if (comparator.compare( + leafMinPackedValue, + i * numBytesPerDimension, + minPackedValue, + i * numBytesPerDimension) + < 0) { + System.arraycopy( + leafMinPackedValue, + i * numBytesPerDimension, + minPackedValue, + i * numBytesPerDimension, + numBytesPerDimension); + } + } + } + } + return minPackedValue; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public byte[] getMaxPackedValue() { + try { + byte[] maxPackedValue = null; + for (PointValuesSub sub : values) { + if (maxPackedValue == null) { + maxPackedValue = sub.sub.getMaxPackedValue().clone(); + } else { + byte[] leafMinPackedValue = sub.sub.getMaxPackedValue(); + int numIndexDimensions = sub.sub.getNumIndexDimensions(); + int numBytesPerDimension = sub.sub.getBytesPerDimension(); + ArrayUtil.ByteArrayComparator comparator = + ArrayUtil.getUnsignedComparator(numBytesPerDimension); + for (int i = 0; i < numIndexDimensions; ++i) { + if (comparator.compare( + leafMinPackedValue, + i * numBytesPerDimension, + maxPackedValue, + i * numBytesPerDimension) + > 0) { + System.arraycopy( + leafMinPackedValue, + i * numBytesPerDimension, + maxPackedValue, + i * numBytesPerDimension, + numBytesPerDimension); + } + } + } + } + return maxPackedValue; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + }; + } + + @Override + public byte[] getMinPackedValue() throws IOException { + return getPointTree().getMinPackedValue(); + } + + @Override + public byte[] getMaxPackedValue() throws IOException { + return getPointTree().getMaxPackedValue(); + } + + @Override + public int getNumDimensions() throws IOException { + return values.get(0).sub.getNumDimensions(); + } + + @Override + public int getNumIndexDimensions() throws IOException { + return values.get(0).sub.getNumIndexDimensions(); + } + + @Override + public int getBytesPerDimension() throws IOException { + return values.get(0).sub.getBytesPerDimension(); + } + + @Override + public long size() { + try { + return getPointTree().size(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public int getDocCount() { + int docCount = 0; + for (PointValuesSub sub : values) { + docCount += sub.sub.getDocCount(); + } + return docCount; + } + }; + } + } + + @Override + public KnnVectorsReader getVectorReader() { + return new SlowCompositeKnnVectorsReaderWrapper(codecReaders, docStarts); + } + + private static class SlowCompositeKnnVectorsReaderWrapper extends KnnVectorsReader { + + private final CodecReader[] codecReaders; + private final KnnVectorsReader[] readers; + private final int[] docStarts; + + SlowCompositeKnnVectorsReaderWrapper(CodecReader[] codecReaders, int[] docStarts) { + this.codecReaders = codecReaders; + this.readers = + Arrays.stream(codecReaders) + .map(CodecReader::getVectorReader) + .toArray(KnnVectorsReader[]::new); + this.docStarts = docStarts; + } + + @Override + public void close() throws IOException { + IOUtils.close(readers); + } + + @Override + public long ramBytesUsed() { + long ramBytesUsed = 0; + for (KnnVectorsReader reader : readers) { + ramBytesUsed += reader.ramBytesUsed(); + } + return ramBytesUsed; + } + + @Override + public void checkIntegrity() throws IOException { + for (KnnVectorsReader reader : readers) { + if (reader != null) { + reader.checkIntegrity(); + } + } + } + + @Override + public FloatVectorValues getFloatVectorValues(String field) throws IOException { + List> subs = new ArrayList<>(); + int i = 0; + int dimension = -1; + int size = 0; + for (CodecReader reader : codecReaders) { + FloatVectorValues values = reader.getFloatVectorValues(field); + if (values != null) { + if (dimension == -1) { + dimension = values.dimension(); + } + size += values.size(); + } + subs.add(new DocValuesSub<>(values, docStarts[i], docStarts[i + 1])); + i++; + } + final int finalDimension = dimension; + final int finalSize = size; + MergedDocIdSetIterator mergedIterator = new MergedDocIdSetIterator<>(subs); + return new FloatVectorValues() { + + @Override + public int dimension() { + return finalDimension; + } + + @Override + public int size() { + return finalSize; + } + + @Override + public float[] vectorValue() throws IOException { + return mergedIterator.current.sub.vectorValue(); + } + + @Override + public int docID() { + return mergedIterator.docID(); + } + + @Override + public int nextDoc() throws IOException { + return mergedIterator.nextDoc(); + } + + @Override + public int advance(int target) throws IOException { + return mergedIterator.advance(target); + } + }; + } + + @Override + public ByteVectorValues getByteVectorValues(String field) throws IOException { + List> subs = new ArrayList<>(); + int i = 0; + int dimension = -1; + int size = 0; + for (CodecReader reader : codecReaders) { + ByteVectorValues values = reader.getByteVectorValues(field); + if (values != null) { + if (dimension == -1) { + dimension = values.dimension(); + } + size += values.size(); + } + subs.add(new DocValuesSub<>(values, docStarts[i], docStarts[i + 1])); + i++; + } + final int finalDimension = dimension; + final int finalSize = size; + MergedDocIdSetIterator mergedIterator = new MergedDocIdSetIterator<>(subs); + return new ByteVectorValues() { + + @Override + public int dimension() { + return finalDimension; + } + + @Override + public int size() { + return finalSize; + } + + @Override + public byte[] vectorValue() throws IOException { + return mergedIterator.current.sub.vectorValue(); + } + + @Override + public int docID() { + return mergedIterator.docID(); + } + + @Override + public int nextDoc() throws IOException { + return mergedIterator.nextDoc(); + } + + @Override + public int advance(int target) throws IOException { + return mergedIterator.advance(target); + } + }; + } + + @Override + public void search(String field, float[] target, KnnCollector knnCollector, Bits acceptDocs) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void search(String field, byte[] target, KnnCollector knnCollector, Bits acceptDocs) + throws IOException { + throw new UnsupportedOperationException(); + } + } + + @Override + public CacheHelper getCoreCacheHelper() { + return null; + } + + @Override + public FieldInfos getFieldInfos() { + return fieldInfos; + } + + @Override + public Bits getLiveDocs() { + return liveDocs; + } + + @Override + public LeafMetaData getMetaData() { + return meta; + } + + int numDocs = -1; + + @Override + public synchronized int numDocs() { + // Compute the number of docs lazily, in case some leaves need to recompute it the first time it + // is called, see BaseCompositeReader#numDocs. + if (numDocs == -1) { + numDocs = 0; + for (CodecReader reader : codecReaders) { + numDocs += reader.numDocs(); + } + } + return numDocs; + } + + @Override + public int maxDoc() { + return docStarts[docStarts.length - 1]; + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; + } +} diff --git a/lucene/core/src/test/org/apache/lucene/index/TestAddIndexes.java b/lucene/core/src/test/org/apache/lucene/index/TestAddIndexes.java index 666ec1e44e9..cba815fbde3 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestAddIndexes.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestAddIndexes.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import org.apache.lucene.codecs.Codec; @@ -1886,4 +1887,54 @@ public class TestAddIndexes extends LuceneTestCase { } } } + + public void testSetDiagnostics() throws IOException { + MergePolicy myMergePolicy = + new FilterMergePolicy(newLogMergePolicy(4)) { + @Override + public MergeSpecification findMerges(CodecReader... readers) throws IOException { + MergeSpecification spec = super.findMerges(readers); + if (spec == null) { + return null; + } + MergeSpecification newSpec = new MergeSpecification(); + for (OneMerge merge : spec.merges) { + newSpec.add( + new OneMerge(merge) { + @Override + public void setMergeInfo(SegmentCommitInfo info) { + super.setMergeInfo(info); + info.info.addDiagnostics( + Collections.singletonMap("merge_policy", "my_merge_policy")); + } + }); + } + return newSpec; + } + }; + Directory sourceDir = newDirectory(); + try (IndexWriter w = new IndexWriter(sourceDir, newIndexWriterConfig())) { + Document doc = new Document(); + w.addDocument(doc); + } + DirectoryReader reader = DirectoryReader.open(sourceDir); + CodecReader codecReader = SlowCodecReaderWrapper.wrap(reader.leaves().get(0).reader()); + + Directory targetDir = newDirectory(); + try (IndexWriter w = + new IndexWriter(targetDir, newIndexWriterConfig().setMergePolicy(myMergePolicy))) { + w.addIndexes(codecReader); + } + + SegmentInfos si = SegmentInfos.readLatestCommit(targetDir); + assertNotEquals(0, si.size()); + for (SegmentCommitInfo sci : si) { + assertEquals( + IndexWriter.SOURCE_ADDINDEXES_READERS, sci.info.getDiagnostics().get(IndexWriter.SOURCE)); + assertEquals("my_merge_policy", sci.info.getDiagnostics().get("merge_policy")); + } + reader.close(); + targetDir.close(); + sourceDir.close(); + } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java index 727f9ce3bd5..4e9bfb0533f 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java @@ -18,6 +18,7 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -890,4 +891,63 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase { return null; } } + + public void testSetDiagnostics() throws IOException { + MergePolicy myMergePolicy = + new FilterMergePolicy(newLogMergePolicy(4)) { + @Override + public MergeSpecification findMerges( + MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) + throws IOException { + return wrapSpecification(super.findMerges(mergeTrigger, segmentInfos, mergeContext)); + } + + @Override + public MergeSpecification findFullFlushMerges( + MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) + throws IOException { + return wrapSpecification( + super.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext)); + } + + private MergeSpecification wrapSpecification(MergeSpecification spec) { + if (spec == null) { + return null; + } + MergeSpecification newSpec = new MergeSpecification(); + for (OneMerge merge : spec.merges) { + newSpec.add( + new OneMerge(merge) { + @Override + public void setMergeInfo(SegmentCommitInfo info) { + super.setMergeInfo(info); + info.info.addDiagnostics( + Collections.singletonMap("merge_policy", "my_merge_policy")); + } + }); + } + return newSpec; + } + }; + Directory dir = newDirectory(); + IndexWriter w = + new IndexWriter( + dir, newIndexWriterConfig().setMergePolicy(myMergePolicy).setMaxBufferedDocs(2)); + Document doc = new Document(); + for (int i = 0; i < 20; ++i) { + w.addDocument(doc); + } + w.close(); + SegmentInfos si = SegmentInfos.readLatestCommit(dir); + boolean hasOneMergedSegment = false; + for (SegmentCommitInfo sci : si) { + if (IndexWriter.SOURCE_MERGE.equals(sci.info.getDiagnostics().get(IndexWriter.SOURCE))) { + assertEquals("my_merge_policy", sci.info.getDiagnostics().get("merge_policy")); + hasOneMergedSegment = true; + } + } + assertTrue(hasOneMergedSegment); + w.close(); + dir.close(); + } } diff --git a/lucene/join/src/test/org/apache/lucene/search/join/ParentBlockJoinKnnVectorQueryTestCase.java b/lucene/join/src/test/org/apache/lucene/search/join/ParentBlockJoinKnnVectorQueryTestCase.java index 8f8544eac11..e8e415ca8ca 100644 --- a/lucene/join/src/test/org/apache/lucene/search/join/ParentBlockJoinKnnVectorQueryTestCase.java +++ b/lucene/join/src/test/org/apache/lucene/search/join/ParentBlockJoinKnnVectorQueryTestCase.java @@ -29,7 +29,6 @@ import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.index.VectorSimilarityFunction; import org.apache.lucene.search.DocIdSetIterator; @@ -90,7 +89,9 @@ abstract class ParentBlockJoinKnnVectorQueryTestCase extends LuceneTestCase { public void testIndexWithNoVectorsNorParents() throws IOException { try (Directory d = newDirectory()) { - try (IndexWriter w = new IndexWriter(d, new IndexWriterConfig())) { + try (IndexWriter w = + new IndexWriter( + d, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)))) { // Add some documents without a vector for (int i = 0; i < 5; i++) { Document doc = new Document(); @@ -123,7 +124,9 @@ abstract class ParentBlockJoinKnnVectorQueryTestCase extends LuceneTestCase { public void testIndexWithNoParents() throws IOException { try (Directory d = newDirectory()) { - try (IndexWriter w = new IndexWriter(d, new IndexWriterConfig())) { + try (IndexWriter w = + new IndexWriter( + d, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)))) { for (int i = 0; i < 3; ++i) { Document doc = new Document(); doc.add(getKnnVectorField("field", new float[] {2, 2})); @@ -175,7 +178,9 @@ abstract class ParentBlockJoinKnnVectorQueryTestCase extends LuceneTestCase { public void testScoringWithMultipleChildren() throws IOException { try (Directory d = newDirectory()) { - try (IndexWriter w = new IndexWriter(d, new IndexWriterConfig())) { + try (IndexWriter w = + new IndexWriter( + d, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)))) { List toAdd = new ArrayList<>(); for (int j = 1; j <= 5; j++) { Document doc = new Document(); @@ -227,7 +232,9 @@ abstract class ParentBlockJoinKnnVectorQueryTestCase extends LuceneTestCase { * randomly fail to find one). */ try (Directory d = newDirectory()) { - try (IndexWriter w = new IndexWriter(d, new IndexWriterConfig())) { + try (IndexWriter w = + new IndexWriter( + d, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)))) { int r = 0; for (int i = 0; i < 5; i++) { for (int j = 0; j < 5; j++) { @@ -273,7 +280,11 @@ abstract class ParentBlockJoinKnnVectorQueryTestCase extends LuceneTestCase { Directory getIndexStore(String field, float[]... contents) throws IOException { Directory indexStore = newDirectory(); - RandomIndexWriter writer = new RandomIndexWriter(random(), indexStore); + RandomIndexWriter writer = + new RandomIndexWriter( + random(), + indexStore, + newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); for (int i = 0; i < contents.length; ++i) { List toAdd = new ArrayList<>(); Document doc = new Document(); diff --git a/lucene/join/src/test/org/apache/lucene/search/join/TestBlockJoin.java b/lucene/join/src/test/org/apache/lucene/search/join/TestBlockJoin.java index 1d1b5c65311..4e445d7eeb2 100644 --- a/lucene/join/src/test/org/apache/lucene/search/join/TestBlockJoin.java +++ b/lucene/join/src/test/org/apache/lucene/search/join/TestBlockJoin.java @@ -176,7 +176,9 @@ public class TestBlockJoin extends LuceneTestCase { // You must use ToParentBlockJoinSearcher if you want to do BQ SHOULD queries: public void testBQShouldJoinedChild() throws Exception { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); final List docs = new ArrayList<>(); @@ -248,7 +250,9 @@ public class TestBlockJoin extends LuceneTestCase { public void testSimpleKnn() throws Exception { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); final List docs = new ArrayList<>(); @@ -294,7 +298,9 @@ public class TestBlockJoin extends LuceneTestCase { public void testSimple() throws Exception { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); final List docs = new ArrayList<>(); @@ -383,7 +389,9 @@ public class TestBlockJoin extends LuceneTestCase { public void testSimpleFilter() throws Exception { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); final List docs = new ArrayList<>(); docs.add(makeJob("java", 2007)); @@ -515,7 +523,9 @@ public class TestBlockJoin extends LuceneTestCase { public void testBoostBug() throws Exception { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); IndexReader r = w.getReader(); w.close(); IndexSearcher s = newSearcher(r); @@ -608,8 +618,14 @@ public class TestBlockJoin extends LuceneTestCase { final List toDelete = new ArrayList<>(); // TODO: parallel star join, nested join cases too! - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); - final RandomIndexWriter joinW = new RandomIndexWriter(random(), joinDir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); + final RandomIndexWriter joinW = + new RandomIndexWriter( + random(), + joinDir, + newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); for (int parentDocID = 0; parentDocID < numParentDocs; parentDocID++) { Document parentDoc = new Document(); Document parentJoinDoc = new Document(); @@ -1187,7 +1203,9 @@ public class TestBlockJoin extends LuceneTestCase { public void testMultiChildTypes() throws Exception { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); final List docs = new ArrayList<>(); @@ -1259,7 +1277,9 @@ public class TestBlockJoin extends LuceneTestCase { public void testAdvanceSingleParentSingleChild() throws Exception { Directory dir = newDirectory(); - RandomIndexWriter w = new RandomIndexWriter(random(), dir); + RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); Document childDoc = new Document(); childDoc.add(newStringField("child", "1", Field.Store.NO)); Document parentDoc = new Document(); @@ -1322,7 +1342,9 @@ public class TestBlockJoin extends LuceneTestCase { // LUCENE-4968 public void testChildQueryNeverMatches() throws Exception { Directory d = newDirectory(); - RandomIndexWriter w = new RandomIndexWriter(random(), d); + RandomIndexWriter w = + new RandomIndexWriter( + random(), d, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); Document parent = new Document(); parent.add(new StoredField("parentID", "0")); parent.add(new SortedDocValuesField("parentID", new BytesRef("0"))); @@ -1392,7 +1414,9 @@ public class TestBlockJoin extends LuceneTestCase { public void testAdvanceSingleDeletedParentNoChild() throws Exception { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); // First doc with 1 children Document parentDoc = new Document(); @@ -1437,7 +1461,9 @@ public class TestBlockJoin extends LuceneTestCase { public void testIntersectionWithRandomApproximation() throws IOException { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); final int numBlocks = atLeast(100); for (int i = 0; i < numBlocks; ++i) { @@ -1483,7 +1509,9 @@ public class TestBlockJoin extends LuceneTestCase { // delete documents to simulate FilteredQuery applying a filter as acceptDocs public void testParentScoringBug() throws Exception { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); final List docs = new ArrayList<>(); docs.add(makeJob("java", 2007)); @@ -1521,7 +1549,9 @@ public class TestBlockJoin extends LuceneTestCase { public void testToChildBlockJoinQueryExplain() throws Exception { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); final List docs = new ArrayList<>(); docs.add(makeJob("java", 2007)); @@ -1563,7 +1593,9 @@ public class TestBlockJoin extends LuceneTestCase { public void testToChildInitialAdvanceParentButNoKids() throws Exception { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); // degenerate case: first doc has no children w.addDocument(makeResume("first", "nokids")); @@ -1601,7 +1633,9 @@ public class TestBlockJoin extends LuceneTestCase { public void testMultiChildQueriesOfDiffParentLevels() throws Exception { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); // randomly generate resume->jobs[]->qualifications[] final int numResumes = atLeast(100); @@ -1680,7 +1714,12 @@ public class TestBlockJoin extends LuceneTestCase { }; Directory dir = newDirectory(); RandomIndexWriter w = - new RandomIndexWriter(random(), dir, newIndexWriterConfig().setSimilarity(sim)); + new RandomIndexWriter( + random(), + dir, + newIndexWriterConfig() + .setSimilarity(sim) + .setMergePolicy(newMergePolicy(random(), false))); w.addDocuments( Arrays.asList( Collections.singleton(newTextField("foo", "bar bar", Store.NO)), diff --git a/lucene/join/src/test/org/apache/lucene/search/join/TestBlockJoinValidation.java b/lucene/join/src/test/org/apache/lucene/search/join/TestBlockJoinValidation.java index f943ff544de..a1b85fe9bbb 100644 --- a/lucene/join/src/test/org/apache/lucene/search/join/TestBlockJoinValidation.java +++ b/lucene/join/src/test/org/apache/lucene/search/join/TestBlockJoinValidation.java @@ -58,7 +58,9 @@ public class TestBlockJoinValidation extends LuceneTestCase { public void setUp() throws Exception { super.setUp(); directory = newDirectory(); - final IndexWriterConfig config = new IndexWriterConfig(new MockAnalyzer(random())); + final IndexWriterConfig config = + new IndexWriterConfig(new MockAnalyzer(random())) + .setMergePolicy(newMergePolicy(random(), false)); final IndexWriter indexWriter = new IndexWriter(directory, config); for (int i = 0; i < AMOUNT_OF_SEGMENTS; i++) { List segmentDocs = createDocsForSegment(i); diff --git a/lucene/join/src/test/org/apache/lucene/search/join/TestCheckJoinIndex.java b/lucene/join/src/test/org/apache/lucene/search/join/TestCheckJoinIndex.java index 2d7e0af8131..2c4a94011c6 100644 --- a/lucene/join/src/test/org/apache/lucene/search/join/TestCheckJoinIndex.java +++ b/lucene/join/src/test/org/apache/lucene/search/join/TestCheckJoinIndex.java @@ -37,7 +37,9 @@ public class TestCheckJoinIndex extends LuceneTestCase { public void testNoParent() throws IOException { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); final int numDocs = TestUtil.nextInt(random(), 1, 3); for (int i = 0; i < numDocs; ++i) { w.addDocument(new Document()); @@ -55,7 +57,9 @@ public class TestCheckJoinIndex extends LuceneTestCase { public void testOrphans() throws IOException { final Directory dir = newDirectory(); - final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + final RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); { // Add a first valid block diff --git a/lucene/join/src/test/org/apache/lucene/search/join/TestJoinUtil.java b/lucene/join/src/test/org/apache/lucene/search/join/TestJoinUtil.java index ae55587edcb..d02577004c4 100644 --- a/lucene/join/src/test/org/apache/lucene/search/join/TestJoinUtil.java +++ b/lucene/join/src/test/org/apache/lucene/search/join/TestJoinUtil.java @@ -489,7 +489,8 @@ public class TestJoinUtil extends LuceneTestCase { new RandomIndexWriter( random(), dir, - newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.KEYWORD, false))); + newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.KEYWORD, false)) + .setMergePolicy(newMergePolicy(random(), false))); Map lowestScoresPerParent = new HashMap<>(); Map highestScoresPerParent = new HashMap<>(); @@ -632,7 +633,8 @@ public class TestJoinUtil extends LuceneTestCase { new RandomIndexWriter( random(), dir, - newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.KEYWORD, false))); + newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.KEYWORD, false)) + .setMergePolicy(newMergePolicy(random(), false))); int minChildDocsPerParent = 2; int maxChildDocsPerParent = 16; @@ -700,7 +702,9 @@ public class TestJoinUtil extends LuceneTestCase { public void testRewrite() throws IOException { Directory dir = newDirectory(); - RandomIndexWriter w = new RandomIndexWriter(random(), dir); + RandomIndexWriter w = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); Document doc = new Document(); doc.add(new SortedDocValuesField("join_field", new BytesRef("abc"))); w.addDocument(doc); @@ -1637,7 +1641,8 @@ public class TestJoinUtil extends LuceneTestCase { new RandomIndexWriter( random, dir, - newIndexWriterConfig(new MockAnalyzer(random, MockTokenizer.KEYWORD, false))); + newIndexWriterConfig(new MockAnalyzer(random, MockTokenizer.KEYWORD, false)) + .setMergePolicy(newMergePolicy(random(), false))); IndexIterationContext context = new IndexIterationContext(); int numRandomValues = nDocs / RandomNumbers.randomIntBetween(random, 1, 4); diff --git a/lucene/join/src/test/org/apache/lucene/search/join/TestParentBlockJoinFloatKnnVectorQuery.java b/lucene/join/src/test/org/apache/lucene/search/join/TestParentBlockJoinFloatKnnVectorQuery.java index 775806795ff..5983ff06a65 100644 --- a/lucene/join/src/test/org/apache/lucene/search/join/TestParentBlockJoinFloatKnnVectorQuery.java +++ b/lucene/join/src/test/org/apache/lucene/search/join/TestParentBlockJoinFloatKnnVectorQuery.java @@ -49,7 +49,9 @@ public class TestParentBlockJoinFloatKnnVectorQuery extends ParentBlockJoinKnnVe public void testScoreCosine() throws IOException { try (Directory d = newDirectory()) { - try (IndexWriter w = new IndexWriter(d, new IndexWriterConfig())) { + try (IndexWriter w = + new IndexWriter( + d, new IndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)))) { for (int j = 1; j <= 5; j++) { List toAdd = new ArrayList<>(); Document doc = new Document(); diff --git a/lucene/join/src/test/org/apache/lucene/search/join/TestParentChildrenBlockJoinQuery.java b/lucene/join/src/test/org/apache/lucene/search/join/TestParentChildrenBlockJoinQuery.java index 10cbe16a100..54b0bf01718 100644 --- a/lucene/join/src/test/org/apache/lucene/search/join/TestParentChildrenBlockJoinQuery.java +++ b/lucene/join/src/test/org/apache/lucene/search/join/TestParentChildrenBlockJoinQuery.java @@ -46,7 +46,9 @@ public class TestParentChildrenBlockJoinQuery extends LuceneTestCase { int maxChildDocsPerParent = 8 + random().nextInt(8); Directory dir = newDirectory(); - RandomIndexWriter writer = new RandomIndexWriter(random(), dir); + RandomIndexWriter writer = + new RandomIndexWriter( + random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false))); for (int i = 0; i < numParentDocs; i++) { int numChildDocs = random().nextInt(maxChildDocsPerParent); List docs = new ArrayList<>(numChildDocs + 1); diff --git a/lucene/misc/src/java/org/apache/lucene/misc/index/BPIndexReorderer.java b/lucene/misc/src/java/org/apache/lucene/misc/index/BPIndexReorderer.java index dd885f40c74..457d72bc4a3 100644 --- a/lucene/misc/src/java/org/apache/lucene/misc/index/BPIndexReorderer.java +++ b/lucene/misc/src/java/org/apache/lucene/misc/index/BPIndexReorderer.java @@ -31,6 +31,7 @@ import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.PostingsEnum; import org.apache.lucene.index.Sorter; +import org.apache.lucene.index.Sorter.DocMap; import org.apache.lucene.index.SortingCodecReader; import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; @@ -116,6 +117,7 @@ public final class BPIndexReorderer { public static final int DEFAULT_MAX_ITERS = 20; private int minDocFreq; + private float maxDocFreq; private int minPartitionSize; private int maxIters; private ForkJoinPool forkJoinPool; @@ -125,6 +127,7 @@ public final class BPIndexReorderer { /** Constructor. */ public BPIndexReorderer() { setMinDocFreq(DEFAULT_MIN_DOC_FREQ); + setMaxDocFreq(1f); setMinPartitionSize(DEFAULT_MIN_PARTITION_SIZE); setMaxIters(DEFAULT_MAX_ITERS); setForkJoinPool(null); @@ -141,6 +144,19 @@ public final class BPIndexReorderer { this.minDocFreq = minDocFreq; } + /** + * Set the maximum document frequency for terms to be considered, as a ratio of {@code maxDoc}. + * This is useful because very frequent terms (stop words) add significant overhead to the + * reordering logic while not being very relevant for ordering. This value must be in (0, 1]. + * Default value is 1. + */ + public void setMaxDocFreq(float maxDocFreq) { + if (maxDocFreq > 0 == false || maxDocFreq <= 1 == false) { + throw new IllegalArgumentException("maxDocFreq must be in (0, 1], got " + maxDocFreq); + } + this.maxDocFreq = maxDocFreq; + } + /** Set the minimum partition size, when the algorithm stops recursing, 32 by default. */ public void setMinPartitionSize(int minPartitionSize) { if (minPartitionSize < 1) { @@ -616,6 +632,7 @@ public final class BPIndexReorderer { ((ramBudgetMB * 1024 * 1024 - docRAMRequirements(reader.maxDoc())) / getParallelism() / termRAMRequirementsPerThreadPerTerm()); + final int maxDocFreq = (int) ((double) this.maxDocFreq * reader.maxDoc()); int numTerms = 0; for (String field : fields) { @@ -633,7 +650,8 @@ public final class BPIndexReorderer { TermsEnum iterator = terms.iterator(); PostingsEnum postings = null; for (BytesRef term = iterator.next(); term != null; term = iterator.next()) { - if (iterator.docFreq() < minDocFreq) { + final int docFreq = iterator.docFreq(); + if (docFreq < minDocFreq || docFreq > maxDocFreq) { continue; } if (numTerms >= ArrayUtil.MAX_ARRAY_LENGTH) { @@ -723,15 +741,11 @@ public final class BPIndexReorderer { } /** - * Reorder the given {@link CodecReader} into a reader that tries to minimize the log gap between - * consecutive documents in postings, which usually helps improve space efficiency and query - * evaluation efficiency. Note that the returned {@link CodecReader} is slow and should typically - * be used in a call to {@link IndexWriter#addIndexes(CodecReader...)}. - * - * @throws NotEnoughRAMException if not enough RAM is provided + * Expert: Compute the {@link DocMap} that holds the new doc ID numbering. This is exposed to + * enable integration into {@link BPReorderingMergePolicy}, {@link #reorder(CodecReader, + * Directory)} should be preferred in general. */ - public CodecReader reorder(CodecReader reader, Directory tempDir) throws IOException { - + public Sorter.DocMap computeDocMap(CodecReader reader, Directory tempDir) throws IOException { if (docRAMRequirements(reader.maxDoc()) >= ramBudgetMB * 1024 * 1024) { throw new NotEnoughRAMException( "At least " @@ -756,24 +770,35 @@ public final class BPIndexReorderer { for (int i = 0; i < newToOld.length; ++i) { oldToNew[newToOld[i]] = i; } - final Sorter.DocMap docMap = - new Sorter.DocMap() { + return new Sorter.DocMap() { - @Override - public int size() { - return newToOld.length; - } + @Override + public int size() { + return newToOld.length; + } - @Override - public int oldToNew(int docID) { - return oldToNew[docID]; - } + @Override + public int oldToNew(int docID) { + return oldToNew[docID]; + } - @Override - public int newToOld(int docID) { - return newToOld[docID]; - } - }; + @Override + public int newToOld(int docID) { + return newToOld[docID]; + } + }; + } + + /** + * Reorder the given {@link CodecReader} into a reader that tries to minimize the log gap between + * consecutive documents in postings, which usually helps improve space efficiency and query + * evaluation efficiency. Note that the returned {@link CodecReader} is slow and should typically + * be used in a call to {@link IndexWriter#addIndexes(CodecReader...)}. + * + * @throws NotEnoughRAMException if not enough RAM is provided + */ + public CodecReader reorder(CodecReader reader, Directory tempDir) throws IOException { + Sorter.DocMap docMap = computeDocMap(reader, tempDir); return SortingCodecReader.wrap(reader, docMap, null); } diff --git a/lucene/misc/src/java/org/apache/lucene/misc/index/BPReorderingMergePolicy.java b/lucene/misc/src/java/org/apache/lucene/misc/index/BPReorderingMergePolicy.java new file mode 100644 index 00000000000..077b3891556 --- /dev/null +++ b/lucene/misc/src/java/org/apache/lucene/misc/index/BPReorderingMergePolicy.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.misc.index; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import org.apache.lucene.index.CodecReader; +import org.apache.lucene.index.FilterMergePolicy; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.MergeTrigger; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.Sorter; +import org.apache.lucene.misc.index.BPIndexReorderer.NotEnoughRAMException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.SetOnce; + +/** + * A merge policy that reorders merged segments according to a {@link BPIndexReorderer}. When + * reordering doesn't have enough RAM, it simply skips reordering in order not to fail the merge. So + * make sure to give enough RAM to your {@link BPIndexReorderer} via {@link + * BPIndexReorderer#setRAMBudgetMB(double)}. + */ +public final class BPReorderingMergePolicy extends FilterMergePolicy { + + /** Whether a segment has been reordered. */ + static final String REORDERED = "bp.reordered"; + + private final BPIndexReorderer reorderer; + private int minNaturalMergeNumDocs = 1; + private float minNaturalMergeRatioFromBiggestSegment = 0f; + + /** + * Sole constructor. It takes the merge policy that should be used to compute merges, and will + * then reorder doc IDs from all merges above the configured minimum doc count, as well as all + * forced merges. + * + *

If you wish to only run reordering upon forced merges, pass {@link Integer#MAX_VALUE} as a + * {@code minNaturalMergeNumDocs}. Otherwise a default value of {@code 2^18 = 262,144} is + * suggested. This should help retain merging optimizations on small merges while reordering the + * larger segments that are important for good search performance. + * + * @param in the merge policy to use to compute merges + * @param reorderer the {@link BPIndexReorderer} to use to renumber doc IDs + */ + public BPReorderingMergePolicy(MergePolicy in, BPIndexReorderer reorderer) { + super(in); + this.reorderer = reorderer; + } + + /** + * Set the minimum number of docs that a merge must have for the resulting segment to be + * reordered. + */ + public void setMinNaturalMergeNumDocs(int minNaturalMergeNumDocs) { + if (minNaturalMergeNumDocs < 1) { + throw new IllegalArgumentException( + "minNaturalMergeNumDocs must be at least 1, got " + minNaturalMergeNumDocs); + } + this.minNaturalMergeNumDocs = minNaturalMergeNumDocs; + } + + /** + * Set the minimum number of docs that a merge must have for the resulting segment to be + * reordered, as a ratio of the total number of documents of the current biggest segment in the + * index. This parameter helps only enable reordering on segments that are large enough that they + * will significantly contribute to overall search performance. + */ + public void setMinNaturalMergeRatioFromBiggestSegment( + float minNaturalMergeRatioFromBiggestSegment) { + if (minNaturalMergeRatioFromBiggestSegment >= 0 == false + || minNaturalMergeRatioFromBiggestSegment < 1 == false) { + throw new IllegalArgumentException( + "minNaturalMergeRatioFromBiggestSegment must be in [0, 1), got " + + minNaturalMergeRatioFromBiggestSegment); + } + this.minNaturalMergeRatioFromBiggestSegment = minNaturalMergeRatioFromBiggestSegment; + } + + private MergeSpecification maybeReorder( + MergeSpecification spec, boolean forced, SegmentInfos infos) { + if (spec == null) { + return null; + } + + final int minNumDocs; + if (forced) { + // No minimum size for forced merges + minNumDocs = 1; + } else { + int maxMaxDoc = 0; + if (infos != null) { + for (SegmentCommitInfo sci : infos) { + maxMaxDoc = Math.max(sci.info.maxDoc(), maxMaxDoc); + } + } + minNumDocs = + Math.max( + this.minNaturalMergeNumDocs, + (int) ((double) minNaturalMergeRatioFromBiggestSegment * maxMaxDoc)); + } + + MergeSpecification newSpec = new MergeSpecification(); + for (OneMerge oneMerge : spec.merges) { + + newSpec.add( + new OneMerge(oneMerge) { + + private final SetOnce reordered = new SetOnce<>(); + + @Override + public CodecReader wrapForMerge(CodecReader reader) throws IOException { + return oneMerge.wrapForMerge(reader); + } + + @Override + public Sorter.DocMap reorder(CodecReader reader, Directory dir) throws IOException { + Sorter.DocMap docMap = null; + if (reader.numDocs() >= minNumDocs) { + try { + docMap = reorderer.computeDocMap(reader, dir); + } catch ( + @SuppressWarnings("unused") + NotEnoughRAMException e) { + // skip reordering, we don't have enough RAM anyway + } + } + reordered.set(docMap != null); + return docMap; + } + + @Override + public void setMergeInfo(SegmentCommitInfo info) { + Boolean reordered = this.reordered.get(); + if (reordered == null) { + // reordering was not called, likely because an index sort is configured + reordered = false; + } + info.info.addDiagnostics( + Collections.singletonMap(REORDERED, Boolean.toString(reordered))); + super.setMergeInfo(info); + } + }); + } + return newSpec; + } + + @Override + public MergeSpecification findMerges( + MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) + throws IOException { + return maybeReorder( + super.findMerges(mergeTrigger, segmentInfos, mergeContext), false, segmentInfos); + } + + @Override + public MergeSpecification findForcedMerges( + SegmentInfos segmentInfos, + int maxSegmentCount, + Map segmentsToMerge, + MergeContext mergeContext) + throws IOException { + return maybeReorder( + super.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext), + true, + segmentInfos); + } + + @Override + public MergeSpecification findForcedDeletesMerges( + SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { + return maybeReorder( + super.findForcedDeletesMerges(segmentInfos, mergeContext), true, segmentInfos); + } + + @Override + public MergeSpecification findFullFlushMerges( + MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) + throws IOException { + return maybeReorder( + super.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext), false, segmentInfos); + } + + @Override + public MergeSpecification findMerges(CodecReader... readers) throws IOException { + // addIndexes is considered a forced merge + return maybeReorder(super.findMerges(readers), true, null); + } +} diff --git a/lucene/misc/src/test/org/apache/lucene/misc/index/TestBPReorderingMergePolicy.java b/lucene/misc/src/test/org/apache/lucene/misc/index/TestBPReorderingMergePolicy.java new file mode 100644 index 00000000000..b4c68b30c85 --- /dev/null +++ b/lucene/misc/src/test/org/apache/lucene/misc/index/TestBPReorderingMergePolicy.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.misc.index; + +import java.io.IOException; +import java.io.UncheckedIOException; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.CodecReader; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.SlowCodecReaderWrapper; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.util.IOUtils; + +public class TestBPReorderingMergePolicy extends LuceneTestCase { + + public void testReorderOnMerge() throws IOException { + Directory dir1 = newDirectory(); + Directory dir2 = newDirectory(); + IndexWriter w1 = + new IndexWriter(dir1, newIndexWriterConfig().setMergePolicy(newLogMergePolicy())); + BPIndexReorderer reorderer = new BPIndexReorderer(); + reorderer.setMinDocFreq(2); + reorderer.setMinPartitionSize(2); + BPReorderingMergePolicy mp = new BPReorderingMergePolicy(newLogMergePolicy(), reorderer); + mp.setMinNaturalMergeNumDocs(2); + IndexWriter w2 = new IndexWriter(dir2, newIndexWriterConfig().setMergePolicy(mp)); + Document doc = new Document(); + StringField idField = new StringField("id", "", Store.YES); + doc.add(idField); + StringField bodyField = new StringField("body", "", Store.YES); + doc.add(bodyField); + + for (int i = 0; i < 10000; ++i) { + idField.setStringValue(Integer.toString(i)); + bodyField.setStringValue(Integer.toString(i % 2 == 0 ? 0 : i % 10)); + w1.addDocument(doc); + w2.addDocument(doc); + + if (i % 10 == 0) { + w1.deleteDocuments(new Term("id", Integer.toString(i / 3))); + w2.deleteDocuments(new Term("id", Integer.toString(i / 3))); + } + if (i % 3 == 0) { + DirectoryReader.open(w2).close(); + } + } + + w1.forceMerge(1); + w2.forceMerge(1); + + IndexReader reader1 = DirectoryReader.open(w1); + IndexReader reader2 = DirectoryReader.open(w2); + assertEquals(reader1.maxDoc(), reader2.maxDoc()); + + StoredFields storedFields1 = reader1.storedFields(); + StoredFields storedFields2 = reader2.storedFields(); + + // Check that data is consistent + for (int i = 0; i < reader1.maxDoc(); ++i) { + Document doc1 = storedFields1.document(i); + String id = doc1.get("id"); + String body = doc1.get("body"); + + PostingsEnum pe = reader2.leaves().get(0).reader().postings(new Term("id", id)); + assertNotNull(pe); + int docID2 = pe.nextDoc(); + assertNotEquals(DocIdSetIterator.NO_MORE_DOCS, docID2); + assertEquals(DocIdSetIterator.NO_MORE_DOCS, pe.nextDoc()); + + Document doc2 = storedFields2.document(docID2); + assertEquals(id, doc2.get("id")); + assertEquals(body, doc2.get("body")); + } + + // Check that reader2 actually got reordered. This can only happen due to BPIndexReorderer since + // it uses a log merge-policy under the hood, which only merges adjacent segments. + boolean reordered = false; + int previousId = -1; + for (int i = 0; i < reader2.maxDoc(); ++i) { + Document doc2 = storedFields2.document(i); + String idString = doc2.get("id"); + int id = Integer.parseInt(idString); + if (id < previousId) { + reordered = true; + break; + } + previousId = id; + } + assertTrue(reordered); + + SegmentReader sr = (SegmentReader) reader2.leaves().get(0).reader(); + final String reorderedString = + sr.getSegmentInfo().info.getDiagnostics().get(BPReorderingMergePolicy.REORDERED); + assertEquals(Boolean.TRUE.toString(), reorderedString); + + IOUtils.close(reader1, reader2, w1, w2, dir1, dir2); + } + + public void testReorderOnAddIndexes() throws IOException { + Directory dir1 = newDirectory(); + IndexWriter w1 = + new IndexWriter(dir1, newIndexWriterConfig().setMergePolicy(newLogMergePolicy())); + + Document doc = new Document(); + StringField idField = new StringField("id", "", Store.YES); + doc.add(idField); + StringField bodyField = new StringField("body", "", Store.YES); + doc.add(bodyField); + + for (int i = 0; i < 10000; ++i) { + idField.setStringValue(Integer.toString(i)); + bodyField.setStringValue(Integer.toString(i % 2 == 0 ? 0 : i % 10)); + w1.addDocument(doc); + + if (i % 3 == 0) { + DirectoryReader.open(w1).close(); + } + } + + for (int i = 0; i < 10000; i += 10) { + w1.deleteDocuments(new Term("id", Integer.toString(i / 3))); + } + + Directory dir2 = newDirectory(); + BPIndexReorderer reorderer = new BPIndexReorderer(); + reorderer.setMinDocFreq(2); + reorderer.setMinPartitionSize(2); + BPReorderingMergePolicy mp = new BPReorderingMergePolicy(newLogMergePolicy(), reorderer); + mp.setMinNaturalMergeNumDocs(2); + IndexWriter w2 = new IndexWriter(dir2, newIndexWriterConfig().setMergePolicy(mp)); + IndexReader reader1 = DirectoryReader.open(w1); + CodecReader[] codecReaders = + reader1.leaves().stream() + .map(LeafReaderContext::reader) + .map( + t -> { + try { + return SlowCodecReaderWrapper.wrap(t); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }) + .toArray(CodecReader[]::new); + w2.addIndexes(codecReaders); + w1.forceMerge(1); + + reader1.close(); + reader1 = DirectoryReader.open(w1); + IndexReader reader2 = DirectoryReader.open(w2); + assertEquals(1, reader2.leaves().size()); + assertEquals(reader1.maxDoc(), reader2.maxDoc()); + + StoredFields storedFields1 = reader1.storedFields(); + StoredFields storedFields2 = reader2.storedFields(); + + // Check that data is consistent + for (int i = 0; i < reader1.maxDoc(); ++i) { + Document doc1 = storedFields1.document(i); + String id = doc1.get("id"); + String body = doc1.get("body"); + + PostingsEnum pe = reader2.leaves().get(0).reader().postings(new Term("id", id)); + assertNotNull(pe); + int docID2 = pe.nextDoc(); + assertNotEquals(DocIdSetIterator.NO_MORE_DOCS, docID2); + assertEquals(DocIdSetIterator.NO_MORE_DOCS, pe.nextDoc()); + + Document doc2 = storedFields2.document(docID2); + assertEquals(id, doc2.get("id")); + assertEquals(body, doc2.get("body")); + } + + // Check that reader2 actually got reordered. This can only happen due to BPIndexReorderer since + // it uses a log merge-policy under the hood, which only merges adjacent segments. + boolean reordered = false; + int previousId = -1; + for (int i = 0; i < reader2.maxDoc(); ++i) { + Document doc2 = storedFields2.document(i); + String idString = doc2.get("id"); + int id = Integer.parseInt(idString); + if (id < previousId) { + reordered = true; + break; + } + previousId = id; + } + assertTrue(reordered); + + SegmentReader sr = (SegmentReader) reader2.leaves().get(0).reader(); + final String reorderedString = + sr.getSegmentInfo().info.getDiagnostics().get(BPReorderingMergePolicy.REORDERED); + assertEquals(Boolean.TRUE.toString(), reorderedString); + + IOUtils.close(reader1, reader2, w1, w2, dir1, dir2); + } + + public void testReorderDoesntHaveEnoughRAM() throws IOException { + // This just makes sure that reordering the index on merge does not corrupt its content + Directory dir = newDirectory(); + BPIndexReorderer reorderer = new BPIndexReorderer(); + reorderer.setMinDocFreq(2); + reorderer.setMinPartitionSize(2); + reorderer.setRAMBudgetMB(Double.MIN_VALUE); + BPReorderingMergePolicy mp = new BPReorderingMergePolicy(newLogMergePolicy(), reorderer); + mp.setMinNaturalMergeNumDocs(2); + IndexWriter w = new IndexWriter(dir, newIndexWriterConfig().setMergePolicy(mp)); + Document doc = new Document(); + StringField idField = new StringField("id", "", Store.YES); + doc.add(idField); + StringField bodyField = new StringField("body", "", Store.YES); + doc.add(bodyField); + + for (int i = 0; i < 10; ++i) { + idField.setStringValue(Integer.toString(i)); + bodyField.setStringValue(Integer.toString(i % 2 == 0 ? 0 : i % 10)); + w.addDocument(doc); + DirectoryReader.open(w).close(); + } + + w.forceMerge(1); + + DirectoryReader reader = DirectoryReader.open(w); + StoredFields storedFields = reader.storedFields(); + + // This test fails if exceptions get thrown due to lack of RAM + // We expect BP to not run, so the doc ID order should not be modified + for (int i = 0; i < reader.maxDoc(); ++i) { + Document storedDoc = storedFields.document(i); + String id = storedDoc.get("id"); + assertEquals(Integer.toString(i), id); + } + + IOUtils.close(reader, w, dir); + } +} diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/index/MockRandomMergePolicy.java b/lucene/test-framework/src/java/org/apache/lucene/tests/index/MockRandomMergePolicy.java index 3a9657e16b9..19a74ceed8d 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/index/MockRandomMergePolicy.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/index/MockRandomMergePolicy.java @@ -30,6 +30,8 @@ import org.apache.lucene.index.MergeTrigger; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SlowCodecReaderWrapper; +import org.apache.lucene.index.Sorter; +import org.apache.lucene.store.Directory; import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.tests.util.TestUtil; @@ -235,5 +237,31 @@ public class MockRandomMergePolicy extends MergePolicy { return reader; } } + + @Override + public Sorter.DocMap reorder(CodecReader reader, Directory dir) throws IOException { + if (r.nextBoolean()) { + // Reverse the doc ID order + final int maxDoc = reader.maxDoc(); + return new Sorter.DocMap() { + + @Override + public int size() { + return maxDoc; + } + + @Override + public int oldToNew(int docID) { + return maxDoc - 1 - docID; + } + + @Override + public int newToOld(int docID) { + return maxDoc - 1 - docID; + } + }; + } + return null; + } } }