Add a merge policy wrapper that performs recursive graph bisection on merge. (#12622)

This adds `BPReorderingMergePolicy`, a merge policy wrapper that reorders doc
IDs on merge using a `BPIndexReorderer`.
 - Reordering always run on forced merges.
 - A `minNaturalMergeNumDocs` parameter helps only enable reordering on the
   larger merged segments. This way, small merges retain all merging
   optimizations like bulk copying of stored fields, and only the larger
   segments - which are the most important for search performance - get
   reordered.
 - If not enough RAM is available to perform reordering, reordering is skipped.

To make this work, I had to add the ability for any merge to reorder doc IDs of
the merged segment via `OneMerge#reorder`. `MockRandomMergePolicy` from the
test framework randomly reverts the order of documents in a merged segment to
make sure this logic is properly exercised.
This commit is contained in:
Adrien Grand 2023-11-23 13:25:00 +01:00 committed by GitHub
parent 76fe6bdbc1
commit f7cab16450
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1884 additions and 80 deletions

View File

@ -33,6 +33,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -55,6 +56,8 @@ import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
import org.apache.lucene.index.FieldInfos.FieldNumbers;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.index.MergePolicy.MergeReader;
import org.apache.lucene.index.Sorter.DocMap;
import org.apache.lucene.internal.tests.IndexPackageAccess;
import org.apache.lucene.internal.tests.IndexWriterAccess;
import org.apache.lucene.internal.tests.TestSecrets;
@ -3413,8 +3416,20 @@ public class IndexWriter
Collections.emptyMap(),
config.getIndexSort());
List<CodecReader> readers =
merge.getMergeReader().stream().map(r -> r.codecReader).collect(Collectors.toList());
List<CodecReader> readers = new ArrayList<>();
for (MergeReader mr : merge.getMergeReader()) {
CodecReader reader = merge.wrapForMerge(mr.codecReader);
readers.add(reader);
}
if (config.getIndexSort() == null && readers.isEmpty() == false) {
CodecReader mergedReader = SlowCompositeCodecReaderWrapper.wrap(readers);
DocMap docMap = merge.reorder(mergedReader, directory);
if (docMap != null) {
readers = Collections.singletonList(SortingCodecReader.wrap(mergedReader, docMap, null));
}
}
SegmentMerger merger =
new SegmentMerger(readers, segInfo, infoStream, trackingDir, globalFieldNumberMap, context);
@ -3464,6 +3479,8 @@ public class IndexWriter
merge.getMergeInfo().info.setUseCompoundFile(true);
}
merge.setMergeInfo(merge.info);
// Have codec write SegmentInfo. Must do this after
// creating CFS so that 1) .si isn't slurped into CFS,
// and 2) .si reflects useCompoundFile=true change
@ -3791,7 +3808,7 @@ public class IndexWriter
new OneMergeWrappingMergePolicy(
config.getMergePolicy(),
toWrap ->
new MergePolicy.OneMerge(toWrap.segments) {
new MergePolicy.OneMerge(toWrap) {
SegmentCommitInfo origInfo;
final AtomicBoolean onlyOnce = new AtomicBoolean(false);
@ -3890,6 +3907,18 @@ public class IndexWriter
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
return toWrap.wrapForMerge(reader); // must delegate
}
@Override
public Sorter.DocMap reorder(CodecReader reader, Directory dir)
throws IOException {
return toWrap.reorder(reader, dir); // must delegate
}
@Override
public void setMergeInfo(SegmentCommitInfo info) {
super.setMergeInfo(info);
toWrap.setMergeInfo(info);
}
}),
trigger,
UNBOUNDED_MAX_MERGE_SEGMENTS);
@ -4312,7 +4341,7 @@ public class IndexWriter
* merge.info). If no deletes were flushed, no new deletes file is saved.
*/
private synchronized ReadersAndUpdates commitMergedDeletesAndUpdates(
MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
MergePolicy.OneMerge merge, MergeState.DocMap[] docMaps) throws IOException {
mergeFinishedGen.incrementAndGet();
@ -4336,7 +4365,7 @@ public class IndexWriter
boolean anyDVUpdates = false;
assert sourceSegments.size() == mergeState.docMaps.length;
assert sourceSegments.size() == docMaps.length;
for (int i = 0; i < sourceSegments.size(); i++) {
SegmentCommitInfo info = sourceSegments.get(i);
minGen = Math.min(info.getBufferedDeletesGen(), minGen);
@ -4346,12 +4375,11 @@ public class IndexWriter
// the pool:
assert rld != null : "seg=" + info.info.name;
MergeState.DocMap segDocMap = mergeState.docMaps[i];
MergeState.DocMap segDocMap = docMaps[i];
carryOverHardDeletes(
mergedDeletesAndUpdates,
maxDoc,
mergeState.liveDocs[i],
merge.getMergeReader().get(i).hardLiveDocs,
rld.getHardLiveDocs(),
segDocMap);
@ -4454,26 +4482,21 @@ public class IndexWriter
private static void carryOverHardDeletes(
ReadersAndUpdates mergedReadersAndUpdates,
int maxDoc,
Bits mergeLiveDocs, // the liveDocs used to build the segDocMaps
Bits prevHardLiveDocs, // the hard deletes when the merge reader was pulled
Bits currentHardLiveDocs, // the current hard deletes
MergeState.DocMap segDocMap)
throws IOException {
assert mergeLiveDocs == null || mergeLiveDocs.length() == maxDoc;
// if we mix soft and hard deletes we need to make sure that we only carry over deletes
// that were not deleted before. Otherwise the segDocMap doesn't contain a mapping.
// yet this is also required if any MergePolicy modifies the liveDocs since this is
// what the segDocMap is build on.
final IntPredicate carryOverDelete =
mergeLiveDocs == null || mergeLiveDocs == prevHardLiveDocs
? docId -> currentHardLiveDocs.get(docId) == false
: docId -> mergeLiveDocs.get(docId) && currentHardLiveDocs.get(docId) == false;
docId -> segDocMap.get(docId) != -1 && currentHardLiveDocs.get(docId) == false;
if (prevHardLiveDocs != null) {
// If we had deletions on starting the merge we must
// still have deletions now:
assert currentHardLiveDocs != null;
assert mergeLiveDocs != null;
assert prevHardLiveDocs.length() == maxDoc;
assert currentHardLiveDocs.length() == maxDoc;
@ -4516,7 +4539,7 @@ public class IndexWriter
}
@SuppressWarnings("try")
private synchronized boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState)
private synchronized boolean commitMerge(MergePolicy.OneMerge merge, MergeState.DocMap[] docMaps)
throws IOException {
merge.onMergeComplete();
testPoint("startCommitMerge");
@ -4559,7 +4582,7 @@ public class IndexWriter
}
final ReadersAndUpdates mergedUpdates =
merge.info.info.maxDoc() == 0 ? null : commitMergedDeletesAndUpdates(merge, mergeState);
merge.info.info.maxDoc() == 0 ? null : commitMergedDeletesAndUpdates(merge, docMaps);
// If the doc store we are using has been closed and
// is in now compound format (but wasn't when we
@ -5163,12 +5186,68 @@ public class IndexWriter
}
mergeReaders.add(wrappedReader);
}
MergeState.DocMap[] reorderDocMaps = null;
if (config.getIndexSort() == null) {
// Create a merged view of the input segments. This effectively does the merge.
CodecReader mergedView = SlowCompositeCodecReaderWrapper.wrap(mergeReaders);
Sorter.DocMap docMap = merge.reorder(mergedView, directory);
if (docMap != null) {
reorderDocMaps = new MergeState.DocMap[mergeReaders.size()];
int docBase = 0;
int i = 0;
for (CodecReader reader : mergeReaders) {
final int currentDocBase = docBase;
reorderDocMaps[i] =
new MergeState.DocMap() {
@Override
public int get(int docID) {
Objects.checkIndex(docID, reader.maxDoc());
return docMap.oldToNew(currentDocBase + docID);
}
};
i++;
docBase += reader.maxDoc();
}
// This makes merging more expensive as it disables some bulk merging optimizations, so
// only do this if a non-null DocMap is returned.
mergeReaders =
Collections.singletonList(SortingCodecReader.wrap(mergedView, docMap, null));
}
}
final SegmentMerger merger =
new SegmentMerger(
mergeReaders, merge.info.info, infoStream, dirWrapper, globalFieldNumberMap, context);
merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get()));
merge.checkAborted();
MergeState mergeState = merger.mergeState;
MergeState.DocMap[] docMaps;
if (reorderDocMaps == null) {
docMaps = mergeState.docMaps;
} else {
// Since the reader was reordered, we passed a merged view to MergeState and from its
// perspective there is a single input segment to the merge and the
// SlowCompositeCodecReaderWrapper is effectively doing the merge.
assert mergeState.docMaps.length == 1
: "Got " + mergeState.docMaps.length + " docMaps, but expected 1";
MergeState.DocMap compactionDocMap = mergeState.docMaps[0];
docMaps = new MergeState.DocMap[reorderDocMaps.length];
for (int i = 0; i < docMaps.length; ++i) {
MergeState.DocMap reorderDocMap = reorderDocMaps[i];
docMaps[i] =
new MergeState.DocMap() {
@Override
public int get(int docID) {
int reorderedDocId = reorderDocMap.get(docID);
int compactedDocId = compactionDocMap.get(reorderedDocId);
return compactedDocId;
}
};
}
}
merge.mergeStartNS = System.nanoTime();
// This is where all the work happens:
@ -5176,7 +5255,6 @@ public class IndexWriter
merger.merge();
}
MergeState mergeState = merger.mergeState;
assert mergeState.segmentInfo == merge.info.info;
merge.info.info.setFiles(new HashSet<>(dirWrapper.getCreatedFiles()));
Codec codec = config.getCodec();
@ -5229,7 +5307,7 @@ public class IndexWriter
// Merge would produce a 0-doc segment, so we do nothing except commit the merge to remove
// all the 0-doc segments that we "merged":
assert merge.info.info.maxDoc() == 0;
success = commitMerge(merge, mergeState);
success = commitMerge(merge, docMaps);
return 0;
}
@ -5309,6 +5387,8 @@ public class IndexWriter
success = false;
}
merge.setMergeInfo(merge.info);
// Have codec write SegmentInfo. Must do this after
// creating CFS so that 1) .si isn't slurped into CFS,
// and 2) .si reflects useCompoundFile=true change
@ -5352,7 +5432,7 @@ public class IndexWriter
}
}
if (!commitMerge(merge, mergeState)) {
if (!commitMerge(merge, docMaps)) {
// commitMerge will return false if this merge was
// aborted
return 0;

View File

@ -255,6 +255,15 @@ public abstract class MergePolicy {
usesPooledReaders = false;
}
/** Constructor for wrapping. */
protected OneMerge(OneMerge oneMerge) {
this.segments = oneMerge.segments;
this.mergeReaders = oneMerge.mergeReaders;
this.totalMaxDoc = oneMerge.totalMaxDoc;
this.mergeProgress = new OneMergeProgress();
this.usesPooledReaders = oneMerge.usesPooledReaders;
}
/**
* Called by {@link IndexWriter} after the merge started and from the thread that will be
* executing the merge.
@ -288,11 +297,32 @@ public abstract class MergePolicy {
}
}
/** Wrap the reader in order to add/remove information to the merged segment. */
/**
* Wrap a reader prior to merging in order to add/remove fields or documents.
*
* <p><b>NOTE:</b> It is illegal to reorder doc IDs here, use {@link
* #reorder(CodecReader,Directory)} instead.
*/
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
return reader;
}
/**
* Extend this method if you wish to renumber doc IDs. This method will be called when index
* sorting is disabled on a merged view of the {@link OneMerge}. A {@code null} return value
* indicates that doc IDs should not be reordered.
*
* <p><b>NOTE:</b> Returning a non-null value here disables several optimizations and increases
* the merging overhead.
*
* @param reader The reader to reorder.
* @param dir The {@link Directory} of the index, which may be used to create temporary files.
* @lucene.experimental
*/
public Sorter.DocMap reorder(CodecReader reader, Directory dir) throws IOException {
return null;
}
/**
* Expert: Sets the {@link SegmentCommitInfo} of the merged segment. Allows sub-classes to e.g.
* {@link SegmentInfo#addDiagnostics(Map) add diagnostic} properties.
@ -355,11 +385,7 @@ public abstract class MergePolicy {
* not indicate the number of documents after the merge.
*/
public int totalNumDocs() {
int total = 0;
for (SegmentCommitInfo info : segments) {
total += info.info.maxDoc();
}
return total;
return totalMaxDoc;
}
/** Return {@link MergeInfo} describing this merge. */

File diff suppressed because it is too large Load Diff

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.lucene.codecs.Codec;
@ -1886,4 +1887,54 @@ public class TestAddIndexes extends LuceneTestCase {
}
}
}
public void testSetDiagnostics() throws IOException {
MergePolicy myMergePolicy =
new FilterMergePolicy(newLogMergePolicy(4)) {
@Override
public MergeSpecification findMerges(CodecReader... readers) throws IOException {
MergeSpecification spec = super.findMerges(readers);
if (spec == null) {
return null;
}
MergeSpecification newSpec = new MergeSpecification();
for (OneMerge merge : spec.merges) {
newSpec.add(
new OneMerge(merge) {
@Override
public void setMergeInfo(SegmentCommitInfo info) {
super.setMergeInfo(info);
info.info.addDiagnostics(
Collections.singletonMap("merge_policy", "my_merge_policy"));
}
});
}
return newSpec;
}
};
Directory sourceDir = newDirectory();
try (IndexWriter w = new IndexWriter(sourceDir, newIndexWriterConfig())) {
Document doc = new Document();
w.addDocument(doc);
}
DirectoryReader reader = DirectoryReader.open(sourceDir);
CodecReader codecReader = SlowCodecReaderWrapper.wrap(reader.leaves().get(0).reader());
Directory targetDir = newDirectory();
try (IndexWriter w =
new IndexWriter(targetDir, newIndexWriterConfig().setMergePolicy(myMergePolicy))) {
w.addIndexes(codecReader);
}
SegmentInfos si = SegmentInfos.readLatestCommit(targetDir);
assertNotEquals(0, si.size());
for (SegmentCommitInfo sci : si) {
assertEquals(
IndexWriter.SOURCE_ADDINDEXES_READERS, sci.info.getDiagnostics().get(IndexWriter.SOURCE));
assertEquals("my_merge_policy", sci.info.getDiagnostics().get("merge_policy"));
}
reader.close();
targetDir.close();
sourceDir.close();
}
}

View File

@ -18,6 +18,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -890,4 +891,63 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
return null;
}
}
public void testSetDiagnostics() throws IOException {
MergePolicy myMergePolicy =
new FilterMergePolicy(newLogMergePolicy(4)) {
@Override
public MergeSpecification findMerges(
MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
return wrapSpecification(super.findMerges(mergeTrigger, segmentInfos, mergeContext));
}
@Override
public MergeSpecification findFullFlushMerges(
MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
return wrapSpecification(
super.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext));
}
private MergeSpecification wrapSpecification(MergeSpecification spec) {
if (spec == null) {
return null;
}
MergeSpecification newSpec = new MergeSpecification();
for (OneMerge merge : spec.merges) {
newSpec.add(
new OneMerge(merge) {
@Override
public void setMergeInfo(SegmentCommitInfo info) {
super.setMergeInfo(info);
info.info.addDiagnostics(
Collections.singletonMap("merge_policy", "my_merge_policy"));
}
});
}
return newSpec;
}
};
Directory dir = newDirectory();
IndexWriter w =
new IndexWriter(
dir, newIndexWriterConfig().setMergePolicy(myMergePolicy).setMaxBufferedDocs(2));
Document doc = new Document();
for (int i = 0; i < 20; ++i) {
w.addDocument(doc);
}
w.close();
SegmentInfos si = SegmentInfos.readLatestCommit(dir);
boolean hasOneMergedSegment = false;
for (SegmentCommitInfo sci : si) {
if (IndexWriter.SOURCE_MERGE.equals(sci.info.getDiagnostics().get(IndexWriter.SOURCE))) {
assertEquals("my_merge_policy", sci.info.getDiagnostics().get("merge_policy"));
hasOneMergedSegment = true;
}
}
assertTrue(hasOneMergedSegment);
w.close();
dir.close();
}
}

View File

@ -29,7 +29,6 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.apache.lucene.search.DocIdSetIterator;
@ -90,7 +89,9 @@ abstract class ParentBlockJoinKnnVectorQueryTestCase extends LuceneTestCase {
public void testIndexWithNoVectorsNorParents() throws IOException {
try (Directory d = newDirectory()) {
try (IndexWriter w = new IndexWriter(d, new IndexWriterConfig())) {
try (IndexWriter w =
new IndexWriter(
d, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)))) {
// Add some documents without a vector
for (int i = 0; i < 5; i++) {
Document doc = new Document();
@ -123,7 +124,9 @@ abstract class ParentBlockJoinKnnVectorQueryTestCase extends LuceneTestCase {
public void testIndexWithNoParents() throws IOException {
try (Directory d = newDirectory()) {
try (IndexWriter w = new IndexWriter(d, new IndexWriterConfig())) {
try (IndexWriter w =
new IndexWriter(
d, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)))) {
for (int i = 0; i < 3; ++i) {
Document doc = new Document();
doc.add(getKnnVectorField("field", new float[] {2, 2}));
@ -175,7 +178,9 @@ abstract class ParentBlockJoinKnnVectorQueryTestCase extends LuceneTestCase {
public void testScoringWithMultipleChildren() throws IOException {
try (Directory d = newDirectory()) {
try (IndexWriter w = new IndexWriter(d, new IndexWriterConfig())) {
try (IndexWriter w =
new IndexWriter(
d, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)))) {
List<Document> toAdd = new ArrayList<>();
for (int j = 1; j <= 5; j++) {
Document doc = new Document();
@ -227,7 +232,9 @@ abstract class ParentBlockJoinKnnVectorQueryTestCase extends LuceneTestCase {
* randomly fail to find one).
*/
try (Directory d = newDirectory()) {
try (IndexWriter w = new IndexWriter(d, new IndexWriterConfig())) {
try (IndexWriter w =
new IndexWriter(
d, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)))) {
int r = 0;
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 5; j++) {
@ -273,7 +280,11 @@ abstract class ParentBlockJoinKnnVectorQueryTestCase extends LuceneTestCase {
Directory getIndexStore(String field, float[]... contents) throws IOException {
Directory indexStore = newDirectory();
RandomIndexWriter writer = new RandomIndexWriter(random(), indexStore);
RandomIndexWriter writer =
new RandomIndexWriter(
random(),
indexStore,
newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
for (int i = 0; i < contents.length; ++i) {
List<Document> toAdd = new ArrayList<>();
Document doc = new Document();

View File

@ -176,7 +176,9 @@ public class TestBlockJoin extends LuceneTestCase {
// You must use ToParentBlockJoinSearcher if you want to do BQ SHOULD queries:
public void testBQShouldJoinedChild() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
final List<Document> docs = new ArrayList<>();
@ -248,7 +250,9 @@ public class TestBlockJoin extends LuceneTestCase {
public void testSimpleKnn() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
final List<Document> docs = new ArrayList<>();
@ -294,7 +298,9 @@ public class TestBlockJoin extends LuceneTestCase {
public void testSimple() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
final List<Document> docs = new ArrayList<>();
@ -383,7 +389,9 @@ public class TestBlockJoin extends LuceneTestCase {
public void testSimpleFilter() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
final List<Document> docs = new ArrayList<>();
docs.add(makeJob("java", 2007));
@ -515,7 +523,9 @@ public class TestBlockJoin extends LuceneTestCase {
public void testBoostBug() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
IndexReader r = w.getReader();
w.close();
IndexSearcher s = newSearcher(r);
@ -608,8 +618,14 @@ public class TestBlockJoin extends LuceneTestCase {
final List<Integer> toDelete = new ArrayList<>();
// TODO: parallel star join, nested join cases too!
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter joinW = new RandomIndexWriter(random(), joinDir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
final RandomIndexWriter joinW =
new RandomIndexWriter(
random(),
joinDir,
newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
for (int parentDocID = 0; parentDocID < numParentDocs; parentDocID++) {
Document parentDoc = new Document();
Document parentJoinDoc = new Document();
@ -1187,7 +1203,9 @@ public class TestBlockJoin extends LuceneTestCase {
public void testMultiChildTypes() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
final List<Document> docs = new ArrayList<>();
@ -1259,7 +1277,9 @@ public class TestBlockJoin extends LuceneTestCase {
public void testAdvanceSingleParentSingleChild() throws Exception {
Directory dir = newDirectory();
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
Document childDoc = new Document();
childDoc.add(newStringField("child", "1", Field.Store.NO));
Document parentDoc = new Document();
@ -1322,7 +1342,9 @@ public class TestBlockJoin extends LuceneTestCase {
// LUCENE-4968
public void testChildQueryNeverMatches() throws Exception {
Directory d = newDirectory();
RandomIndexWriter w = new RandomIndexWriter(random(), d);
RandomIndexWriter w =
new RandomIndexWriter(
random(), d, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
Document parent = new Document();
parent.add(new StoredField("parentID", "0"));
parent.add(new SortedDocValuesField("parentID", new BytesRef("0")));
@ -1392,7 +1414,9 @@ public class TestBlockJoin extends LuceneTestCase {
public void testAdvanceSingleDeletedParentNoChild() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
// First doc with 1 children
Document parentDoc = new Document();
@ -1437,7 +1461,9 @@ public class TestBlockJoin extends LuceneTestCase {
public void testIntersectionWithRandomApproximation() throws IOException {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
final int numBlocks = atLeast(100);
for (int i = 0; i < numBlocks; ++i) {
@ -1483,7 +1509,9 @@ public class TestBlockJoin extends LuceneTestCase {
// delete documents to simulate FilteredQuery applying a filter as acceptDocs
public void testParentScoringBug() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
final List<Document> docs = new ArrayList<>();
docs.add(makeJob("java", 2007));
@ -1521,7 +1549,9 @@ public class TestBlockJoin extends LuceneTestCase {
public void testToChildBlockJoinQueryExplain() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
final List<Document> docs = new ArrayList<>();
docs.add(makeJob("java", 2007));
@ -1563,7 +1593,9 @@ public class TestBlockJoin extends LuceneTestCase {
public void testToChildInitialAdvanceParentButNoKids() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
// degenerate case: first doc has no children
w.addDocument(makeResume("first", "nokids"));
@ -1601,7 +1633,9 @@ public class TestBlockJoin extends LuceneTestCase {
public void testMultiChildQueriesOfDiffParentLevels() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
// randomly generate resume->jobs[]->qualifications[]
final int numResumes = atLeast(100);
@ -1680,7 +1714,12 @@ public class TestBlockJoin extends LuceneTestCase {
};
Directory dir = newDirectory();
RandomIndexWriter w =
new RandomIndexWriter(random(), dir, newIndexWriterConfig().setSimilarity(sim));
new RandomIndexWriter(
random(),
dir,
newIndexWriterConfig()
.setSimilarity(sim)
.setMergePolicy(newMergePolicy(random(), false)));
w.addDocuments(
Arrays.asList(
Collections.singleton(newTextField("foo", "bar bar", Store.NO)),

View File

@ -58,7 +58,9 @@ public class TestBlockJoinValidation extends LuceneTestCase {
public void setUp() throws Exception {
super.setUp();
directory = newDirectory();
final IndexWriterConfig config = new IndexWriterConfig(new MockAnalyzer(random()));
final IndexWriterConfig config =
new IndexWriterConfig(new MockAnalyzer(random()))
.setMergePolicy(newMergePolicy(random(), false));
final IndexWriter indexWriter = new IndexWriter(directory, config);
for (int i = 0; i < AMOUNT_OF_SEGMENTS; i++) {
List<Document> segmentDocs = createDocsForSegment(i);

View File

@ -37,7 +37,9 @@ public class TestCheckJoinIndex extends LuceneTestCase {
public void testNoParent() throws IOException {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
final int numDocs = TestUtil.nextInt(random(), 1, 3);
for (int i = 0; i < numDocs; ++i) {
w.addDocument(new Document());
@ -55,7 +57,9 @@ public class TestCheckJoinIndex extends LuceneTestCase {
public void testOrphans() throws IOException {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
{
// Add a first valid block

View File

@ -489,7 +489,8 @@ public class TestJoinUtil extends LuceneTestCase {
new RandomIndexWriter(
random(),
dir,
newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.KEYWORD, false)));
newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.KEYWORD, false))
.setMergePolicy(newMergePolicy(random(), false)));
Map<String, Float> lowestScoresPerParent = new HashMap<>();
Map<String, Float> highestScoresPerParent = new HashMap<>();
@ -632,7 +633,8 @@ public class TestJoinUtil extends LuceneTestCase {
new RandomIndexWriter(
random(),
dir,
newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.KEYWORD, false)));
newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.KEYWORD, false))
.setMergePolicy(newMergePolicy(random(), false)));
int minChildDocsPerParent = 2;
int maxChildDocsPerParent = 16;
@ -700,7 +702,9 @@ public class TestJoinUtil extends LuceneTestCase {
public void testRewrite() throws IOException {
Directory dir = newDirectory();
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
RandomIndexWriter w =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
Document doc = new Document();
doc.add(new SortedDocValuesField("join_field", new BytesRef("abc")));
w.addDocument(doc);
@ -1637,7 +1641,8 @@ public class TestJoinUtil extends LuceneTestCase {
new RandomIndexWriter(
random,
dir,
newIndexWriterConfig(new MockAnalyzer(random, MockTokenizer.KEYWORD, false)));
newIndexWriterConfig(new MockAnalyzer(random, MockTokenizer.KEYWORD, false))
.setMergePolicy(newMergePolicy(random(), false)));
IndexIterationContext context = new IndexIterationContext();
int numRandomValues = nDocs / RandomNumbers.randomIntBetween(random, 1, 4);

View File

@ -49,7 +49,9 @@ public class TestParentBlockJoinFloatKnnVectorQuery extends ParentBlockJoinKnnVe
public void testScoreCosine() throws IOException {
try (Directory d = newDirectory()) {
try (IndexWriter w = new IndexWriter(d, new IndexWriterConfig())) {
try (IndexWriter w =
new IndexWriter(
d, new IndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)))) {
for (int j = 1; j <= 5; j++) {
List<Document> toAdd = new ArrayList<>();
Document doc = new Document();

View File

@ -46,7 +46,9 @@ public class TestParentChildrenBlockJoinQuery extends LuceneTestCase {
int maxChildDocsPerParent = 8 + random().nextInt(8);
Directory dir = newDirectory();
RandomIndexWriter writer = new RandomIndexWriter(random(), dir);
RandomIndexWriter writer =
new RandomIndexWriter(
random(), dir, newIndexWriterConfig().setMergePolicy(newMergePolicy(random(), false)));
for (int i = 0; i < numParentDocs; i++) {
int numChildDocs = random().nextInt(maxChildDocsPerParent);
List<Document> docs = new ArrayList<>(numChildDocs + 1);

View File

@ -31,6 +31,7 @@ import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Sorter;
import org.apache.lucene.index.Sorter.DocMap;
import org.apache.lucene.index.SortingCodecReader;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
@ -116,6 +117,7 @@ public final class BPIndexReorderer {
public static final int DEFAULT_MAX_ITERS = 20;
private int minDocFreq;
private float maxDocFreq;
private int minPartitionSize;
private int maxIters;
private ForkJoinPool forkJoinPool;
@ -125,6 +127,7 @@ public final class BPIndexReorderer {
/** Constructor. */
public BPIndexReorderer() {
setMinDocFreq(DEFAULT_MIN_DOC_FREQ);
setMaxDocFreq(1f);
setMinPartitionSize(DEFAULT_MIN_PARTITION_SIZE);
setMaxIters(DEFAULT_MAX_ITERS);
setForkJoinPool(null);
@ -141,6 +144,19 @@ public final class BPIndexReorderer {
this.minDocFreq = minDocFreq;
}
/**
* Set the maximum document frequency for terms to be considered, as a ratio of {@code maxDoc}.
* This is useful because very frequent terms (stop words) add significant overhead to the
* reordering logic while not being very relevant for ordering. This value must be in (0, 1].
* Default value is 1.
*/
public void setMaxDocFreq(float maxDocFreq) {
if (maxDocFreq > 0 == false || maxDocFreq <= 1 == false) {
throw new IllegalArgumentException("maxDocFreq must be in (0, 1], got " + maxDocFreq);
}
this.maxDocFreq = maxDocFreq;
}
/** Set the minimum partition size, when the algorithm stops recursing, 32 by default. */
public void setMinPartitionSize(int minPartitionSize) {
if (minPartitionSize < 1) {
@ -616,6 +632,7 @@ public final class BPIndexReorderer {
((ramBudgetMB * 1024 * 1024 - docRAMRequirements(reader.maxDoc()))
/ getParallelism()
/ termRAMRequirementsPerThreadPerTerm());
final int maxDocFreq = (int) ((double) this.maxDocFreq * reader.maxDoc());
int numTerms = 0;
for (String field : fields) {
@ -633,7 +650,8 @@ public final class BPIndexReorderer {
TermsEnum iterator = terms.iterator();
PostingsEnum postings = null;
for (BytesRef term = iterator.next(); term != null; term = iterator.next()) {
if (iterator.docFreq() < minDocFreq) {
final int docFreq = iterator.docFreq();
if (docFreq < minDocFreq || docFreq > maxDocFreq) {
continue;
}
if (numTerms >= ArrayUtil.MAX_ARRAY_LENGTH) {
@ -723,15 +741,11 @@ public final class BPIndexReorderer {
}
/**
* Reorder the given {@link CodecReader} into a reader that tries to minimize the log gap between
* consecutive documents in postings, which usually helps improve space efficiency and query
* evaluation efficiency. Note that the returned {@link CodecReader} is slow and should typically
* be used in a call to {@link IndexWriter#addIndexes(CodecReader...)}.
*
* @throws NotEnoughRAMException if not enough RAM is provided
* Expert: Compute the {@link DocMap} that holds the new doc ID numbering. This is exposed to
* enable integration into {@link BPReorderingMergePolicy}, {@link #reorder(CodecReader,
* Directory)} should be preferred in general.
*/
public CodecReader reorder(CodecReader reader, Directory tempDir) throws IOException {
public Sorter.DocMap computeDocMap(CodecReader reader, Directory tempDir) throws IOException {
if (docRAMRequirements(reader.maxDoc()) >= ramBudgetMB * 1024 * 1024) {
throw new NotEnoughRAMException(
"At least "
@ -756,8 +770,7 @@ public final class BPIndexReorderer {
for (int i = 0; i < newToOld.length; ++i) {
oldToNew[newToOld[i]] = i;
}
final Sorter.DocMap docMap =
new Sorter.DocMap() {
return new Sorter.DocMap() {
@Override
public int size() {
@ -774,6 +787,18 @@ public final class BPIndexReorderer {
return newToOld[docID];
}
};
}
/**
* Reorder the given {@link CodecReader} into a reader that tries to minimize the log gap between
* consecutive documents in postings, which usually helps improve space efficiency and query
* evaluation efficiency. Note that the returned {@link CodecReader} is slow and should typically
* be used in a call to {@link IndexWriter#addIndexes(CodecReader...)}.
*
* @throws NotEnoughRAMException if not enough RAM is provided
*/
public CodecReader reorder(CodecReader reader, Directory tempDir) throws IOException {
Sorter.DocMap docMap = computeDocMap(reader, tempDir);
return SortingCodecReader.wrap(reader, docMap, null);
}

View File

@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.misc.index;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.FilterMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeTrigger;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Sorter;
import org.apache.lucene.misc.index.BPIndexReorderer.NotEnoughRAMException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.SetOnce;
/**
* A merge policy that reorders merged segments according to a {@link BPIndexReorderer}. When
* reordering doesn't have enough RAM, it simply skips reordering in order not to fail the merge. So
* make sure to give enough RAM to your {@link BPIndexReorderer} via {@link
* BPIndexReorderer#setRAMBudgetMB(double)}.
*/
public final class BPReorderingMergePolicy extends FilterMergePolicy {
/** Whether a segment has been reordered. */
static final String REORDERED = "bp.reordered";
private final BPIndexReorderer reorderer;
private int minNaturalMergeNumDocs = 1;
private float minNaturalMergeRatioFromBiggestSegment = 0f;
/**
* Sole constructor. It takes the merge policy that should be used to compute merges, and will
* then reorder doc IDs from all merges above the configured minimum doc count, as well as all
* forced merges.
*
* <p>If you wish to only run reordering upon forced merges, pass {@link Integer#MAX_VALUE} as a
* {@code minNaturalMergeNumDocs}. Otherwise a default value of {@code 2^18 = 262,144} is
* suggested. This should help retain merging optimizations on small merges while reordering the
* larger segments that are important for good search performance.
*
* @param in the merge policy to use to compute merges
* @param reorderer the {@link BPIndexReorderer} to use to renumber doc IDs
*/
public BPReorderingMergePolicy(MergePolicy in, BPIndexReorderer reorderer) {
super(in);
this.reorderer = reorderer;
}
/**
* Set the minimum number of docs that a merge must have for the resulting segment to be
* reordered.
*/
public void setMinNaturalMergeNumDocs(int minNaturalMergeNumDocs) {
if (minNaturalMergeNumDocs < 1) {
throw new IllegalArgumentException(
"minNaturalMergeNumDocs must be at least 1, got " + minNaturalMergeNumDocs);
}
this.minNaturalMergeNumDocs = minNaturalMergeNumDocs;
}
/**
* Set the minimum number of docs that a merge must have for the resulting segment to be
* reordered, as a ratio of the total number of documents of the current biggest segment in the
* index. This parameter helps only enable reordering on segments that are large enough that they
* will significantly contribute to overall search performance.
*/
public void setMinNaturalMergeRatioFromBiggestSegment(
float minNaturalMergeRatioFromBiggestSegment) {
if (minNaturalMergeRatioFromBiggestSegment >= 0 == false
|| minNaturalMergeRatioFromBiggestSegment < 1 == false) {
throw new IllegalArgumentException(
"minNaturalMergeRatioFromBiggestSegment must be in [0, 1), got "
+ minNaturalMergeRatioFromBiggestSegment);
}
this.minNaturalMergeRatioFromBiggestSegment = minNaturalMergeRatioFromBiggestSegment;
}
private MergeSpecification maybeReorder(
MergeSpecification spec, boolean forced, SegmentInfos infos) {
if (spec == null) {
return null;
}
final int minNumDocs;
if (forced) {
// No minimum size for forced merges
minNumDocs = 1;
} else {
int maxMaxDoc = 0;
if (infos != null) {
for (SegmentCommitInfo sci : infos) {
maxMaxDoc = Math.max(sci.info.maxDoc(), maxMaxDoc);
}
}
minNumDocs =
Math.max(
this.minNaturalMergeNumDocs,
(int) ((double) minNaturalMergeRatioFromBiggestSegment * maxMaxDoc));
}
MergeSpecification newSpec = new MergeSpecification();
for (OneMerge oneMerge : spec.merges) {
newSpec.add(
new OneMerge(oneMerge) {
private final SetOnce<Boolean> reordered = new SetOnce<>();
@Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
return oneMerge.wrapForMerge(reader);
}
@Override
public Sorter.DocMap reorder(CodecReader reader, Directory dir) throws IOException {
Sorter.DocMap docMap = null;
if (reader.numDocs() >= minNumDocs) {
try {
docMap = reorderer.computeDocMap(reader, dir);
} catch (
@SuppressWarnings("unused")
NotEnoughRAMException e) {
// skip reordering, we don't have enough RAM anyway
}
}
reordered.set(docMap != null);
return docMap;
}
@Override
public void setMergeInfo(SegmentCommitInfo info) {
Boolean reordered = this.reordered.get();
if (reordered == null) {
// reordering was not called, likely because an index sort is configured
reordered = false;
}
info.info.addDiagnostics(
Collections.singletonMap(REORDERED, Boolean.toString(reordered)));
super.setMergeInfo(info);
}
});
}
return newSpec;
}
@Override
public MergeSpecification findMerges(
MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
return maybeReorder(
super.findMerges(mergeTrigger, segmentInfos, mergeContext), false, segmentInfos);
}
@Override
public MergeSpecification findForcedMerges(
SegmentInfos segmentInfos,
int maxSegmentCount,
Map<SegmentCommitInfo, Boolean> segmentsToMerge,
MergeContext mergeContext)
throws IOException {
return maybeReorder(
super.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext),
true,
segmentInfos);
}
@Override
public MergeSpecification findForcedDeletesMerges(
SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
return maybeReorder(
super.findForcedDeletesMerges(segmentInfos, mergeContext), true, segmentInfos);
}
@Override
public MergeSpecification findFullFlushMerges(
MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
throws IOException {
return maybeReorder(
super.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext), false, segmentInfos);
}
@Override
public MergeSpecification findMerges(CodecReader... readers) throws IOException {
// addIndexes is considered a forced merge
return maybeReorder(super.findMerges(readers), true, null);
}
}

View File

@ -0,0 +1,260 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.misc.index;
import java.io.IOException;
import java.io.UncheckedIOException;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SlowCodecReaderWrapper;
import org.apache.lucene.index.StoredFields;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.util.IOUtils;
public class TestBPReorderingMergePolicy extends LuceneTestCase {
public void testReorderOnMerge() throws IOException {
Directory dir1 = newDirectory();
Directory dir2 = newDirectory();
IndexWriter w1 =
new IndexWriter(dir1, newIndexWriterConfig().setMergePolicy(newLogMergePolicy()));
BPIndexReorderer reorderer = new BPIndexReorderer();
reorderer.setMinDocFreq(2);
reorderer.setMinPartitionSize(2);
BPReorderingMergePolicy mp = new BPReorderingMergePolicy(newLogMergePolicy(), reorderer);
mp.setMinNaturalMergeNumDocs(2);
IndexWriter w2 = new IndexWriter(dir2, newIndexWriterConfig().setMergePolicy(mp));
Document doc = new Document();
StringField idField = new StringField("id", "", Store.YES);
doc.add(idField);
StringField bodyField = new StringField("body", "", Store.YES);
doc.add(bodyField);
for (int i = 0; i < 10000; ++i) {
idField.setStringValue(Integer.toString(i));
bodyField.setStringValue(Integer.toString(i % 2 == 0 ? 0 : i % 10));
w1.addDocument(doc);
w2.addDocument(doc);
if (i % 10 == 0) {
w1.deleteDocuments(new Term("id", Integer.toString(i / 3)));
w2.deleteDocuments(new Term("id", Integer.toString(i / 3)));
}
if (i % 3 == 0) {
DirectoryReader.open(w2).close();
}
}
w1.forceMerge(1);
w2.forceMerge(1);
IndexReader reader1 = DirectoryReader.open(w1);
IndexReader reader2 = DirectoryReader.open(w2);
assertEquals(reader1.maxDoc(), reader2.maxDoc());
StoredFields storedFields1 = reader1.storedFields();
StoredFields storedFields2 = reader2.storedFields();
// Check that data is consistent
for (int i = 0; i < reader1.maxDoc(); ++i) {
Document doc1 = storedFields1.document(i);
String id = doc1.get("id");
String body = doc1.get("body");
PostingsEnum pe = reader2.leaves().get(0).reader().postings(new Term("id", id));
assertNotNull(pe);
int docID2 = pe.nextDoc();
assertNotEquals(DocIdSetIterator.NO_MORE_DOCS, docID2);
assertEquals(DocIdSetIterator.NO_MORE_DOCS, pe.nextDoc());
Document doc2 = storedFields2.document(docID2);
assertEquals(id, doc2.get("id"));
assertEquals(body, doc2.get("body"));
}
// Check that reader2 actually got reordered. This can only happen due to BPIndexReorderer since
// it uses a log merge-policy under the hood, which only merges adjacent segments.
boolean reordered = false;
int previousId = -1;
for (int i = 0; i < reader2.maxDoc(); ++i) {
Document doc2 = storedFields2.document(i);
String idString = doc2.get("id");
int id = Integer.parseInt(idString);
if (id < previousId) {
reordered = true;
break;
}
previousId = id;
}
assertTrue(reordered);
SegmentReader sr = (SegmentReader) reader2.leaves().get(0).reader();
final String reorderedString =
sr.getSegmentInfo().info.getDiagnostics().get(BPReorderingMergePolicy.REORDERED);
assertEquals(Boolean.TRUE.toString(), reorderedString);
IOUtils.close(reader1, reader2, w1, w2, dir1, dir2);
}
public void testReorderOnAddIndexes() throws IOException {
Directory dir1 = newDirectory();
IndexWriter w1 =
new IndexWriter(dir1, newIndexWriterConfig().setMergePolicy(newLogMergePolicy()));
Document doc = new Document();
StringField idField = new StringField("id", "", Store.YES);
doc.add(idField);
StringField bodyField = new StringField("body", "", Store.YES);
doc.add(bodyField);
for (int i = 0; i < 10000; ++i) {
idField.setStringValue(Integer.toString(i));
bodyField.setStringValue(Integer.toString(i % 2 == 0 ? 0 : i % 10));
w1.addDocument(doc);
if (i % 3 == 0) {
DirectoryReader.open(w1).close();
}
}
for (int i = 0; i < 10000; i += 10) {
w1.deleteDocuments(new Term("id", Integer.toString(i / 3)));
}
Directory dir2 = newDirectory();
BPIndexReorderer reorderer = new BPIndexReorderer();
reorderer.setMinDocFreq(2);
reorderer.setMinPartitionSize(2);
BPReorderingMergePolicy mp = new BPReorderingMergePolicy(newLogMergePolicy(), reorderer);
mp.setMinNaturalMergeNumDocs(2);
IndexWriter w2 = new IndexWriter(dir2, newIndexWriterConfig().setMergePolicy(mp));
IndexReader reader1 = DirectoryReader.open(w1);
CodecReader[] codecReaders =
reader1.leaves().stream()
.map(LeafReaderContext::reader)
.map(
t -> {
try {
return SlowCodecReaderWrapper.wrap(t);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})
.toArray(CodecReader[]::new);
w2.addIndexes(codecReaders);
w1.forceMerge(1);
reader1.close();
reader1 = DirectoryReader.open(w1);
IndexReader reader2 = DirectoryReader.open(w2);
assertEquals(1, reader2.leaves().size());
assertEquals(reader1.maxDoc(), reader2.maxDoc());
StoredFields storedFields1 = reader1.storedFields();
StoredFields storedFields2 = reader2.storedFields();
// Check that data is consistent
for (int i = 0; i < reader1.maxDoc(); ++i) {
Document doc1 = storedFields1.document(i);
String id = doc1.get("id");
String body = doc1.get("body");
PostingsEnum pe = reader2.leaves().get(0).reader().postings(new Term("id", id));
assertNotNull(pe);
int docID2 = pe.nextDoc();
assertNotEquals(DocIdSetIterator.NO_MORE_DOCS, docID2);
assertEquals(DocIdSetIterator.NO_MORE_DOCS, pe.nextDoc());
Document doc2 = storedFields2.document(docID2);
assertEquals(id, doc2.get("id"));
assertEquals(body, doc2.get("body"));
}
// Check that reader2 actually got reordered. This can only happen due to BPIndexReorderer since
// it uses a log merge-policy under the hood, which only merges adjacent segments.
boolean reordered = false;
int previousId = -1;
for (int i = 0; i < reader2.maxDoc(); ++i) {
Document doc2 = storedFields2.document(i);
String idString = doc2.get("id");
int id = Integer.parseInt(idString);
if (id < previousId) {
reordered = true;
break;
}
previousId = id;
}
assertTrue(reordered);
SegmentReader sr = (SegmentReader) reader2.leaves().get(0).reader();
final String reorderedString =
sr.getSegmentInfo().info.getDiagnostics().get(BPReorderingMergePolicy.REORDERED);
assertEquals(Boolean.TRUE.toString(), reorderedString);
IOUtils.close(reader1, reader2, w1, w2, dir1, dir2);
}
public void testReorderDoesntHaveEnoughRAM() throws IOException {
// This just makes sure that reordering the index on merge does not corrupt its content
Directory dir = newDirectory();
BPIndexReorderer reorderer = new BPIndexReorderer();
reorderer.setMinDocFreq(2);
reorderer.setMinPartitionSize(2);
reorderer.setRAMBudgetMB(Double.MIN_VALUE);
BPReorderingMergePolicy mp = new BPReorderingMergePolicy(newLogMergePolicy(), reorderer);
mp.setMinNaturalMergeNumDocs(2);
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig().setMergePolicy(mp));
Document doc = new Document();
StringField idField = new StringField("id", "", Store.YES);
doc.add(idField);
StringField bodyField = new StringField("body", "", Store.YES);
doc.add(bodyField);
for (int i = 0; i < 10; ++i) {
idField.setStringValue(Integer.toString(i));
bodyField.setStringValue(Integer.toString(i % 2 == 0 ? 0 : i % 10));
w.addDocument(doc);
DirectoryReader.open(w).close();
}
w.forceMerge(1);
DirectoryReader reader = DirectoryReader.open(w);
StoredFields storedFields = reader.storedFields();
// This test fails if exceptions get thrown due to lack of RAM
// We expect BP to not run, so the doc ID order should not be modified
for (int i = 0; i < reader.maxDoc(); ++i) {
Document storedDoc = storedFields.document(i);
String id = storedDoc.get("id");
assertEquals(Integer.toString(i), id);
}
IOUtils.close(reader, w, dir);
}
}

View File

@ -30,6 +30,8 @@ import org.apache.lucene.index.MergeTrigger;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SlowCodecReaderWrapper;
import org.apache.lucene.index.Sorter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
@ -235,5 +237,31 @@ public class MockRandomMergePolicy extends MergePolicy {
return reader;
}
}
@Override
public Sorter.DocMap reorder(CodecReader reader, Directory dir) throws IOException {
if (r.nextBoolean()) {
// Reverse the doc ID order
final int maxDoc = reader.maxDoc();
return new Sorter.DocMap() {
@Override
public int size() {
return maxDoc;
}
@Override
public int oldToNew(int docID) {
return maxDoc - 1 - docID;
}
@Override
public int newToOld(int docID) {
return maxDoc - 1 - docID;
}
};
}
return null;
}
}
}