From 97ec69bb69ccbed1a06160a632d64f167482ac86 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Fri, 22 Mar 2013 14:22:58 +0000 Subject: [PATCH] LUCENE-4752: Added SortingMergePolicy that sorts documents when merging. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1459794 13f79535-47bb-0310-9956-ffa450edef68 --- lucene/CHANGES.txt | 3 + .../org/apache/lucene/index/IndexWriter.java | 22 +-- .../org/apache/lucene/index/MergePolicy.java | 17 ++ .../org/apache/lucene/index/MergeState.java | 14 +- .../apache/lucene/index/SegmentMerger.java | 30 +--- .../test/org/apache/lucene/index/TestDoc.java | 8 +- .../lucene/index/TestSegmentMerger.java | 9 +- .../index/sorter/SortingMergePolicy.java | 148 ++++++++++++++++++ .../index/sorter/TestSortingMergePolicy.java | 113 +++++++++++++ 9 files changed, 315 insertions(+), 49 deletions(-) create mode 100644 lucene/misc/src/java/org/apache/lucene/index/sorter/SortingMergePolicy.java create mode 100644 lucene/misc/src/test/org/apache/lucene/index/sorter/TestSortingMergePolicy.java diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 77fd428b866..1a63b1c8133 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -127,6 +127,9 @@ New Features IndexReader leaf by throwing a CollectionTerminatedException in Collector.collect. (Adrien Grand, Shai Erera) +* LUCENE-4752: New SortingMergePolicy (in lucene/misc) that sorts documents + before merging segments. (Adrien Grand, Shai Erera, David Smiley) + API Changes * LUCENE-4844: removed TaxonomyReader.getParent(), you should use diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index c33a805cb15..50776956921 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -2395,8 +2395,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { flush(false, true); String mergedName = newSegmentName(); + final List mergeReaders = new ArrayList(); for (IndexReader indexReader : readers) { numDocs += indexReader.numDocs(); + for (AtomicReaderContext ctx : indexReader.leaves()) { + mergeReaders.add(ctx.reader()); + } } final IOContext context = new IOContext(new MergeInfo(numDocs, -1, true, -1)); @@ -2407,13 +2411,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { SegmentInfo info = new SegmentInfo(directory, Constants.LUCENE_MAIN_VERSION, mergedName, -1, false, codec, null, null); - SegmentMerger merger = new SegmentMerger(info, infoStream, trackingDir, config.getTermIndexInterval(), + SegmentMerger merger = new SegmentMerger(mergeReaders, info, infoStream, trackingDir, config.getTermIndexInterval(), MergeState.CheckAbort.NONE, globalFieldNumberMap, context); - for (IndexReader reader : readers) { // add new indexes - merger.add(reader); - } - MergeState mergeState; boolean success = false; try { @@ -3561,9 +3561,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { final MergeState.CheckAbort checkAbort = new MergeState.CheckAbort(merge, directory); final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(directory); - SegmentMerger merger = new SegmentMerger(merge.info.info, infoStream, dirWrapper, config.getTermIndexInterval(), checkAbort, - globalFieldNumberMap, context); - if (infoStream.isEnabled("IW")) { infoStream.message("IW", "merging " + segString(merge.segments)); } @@ -3633,12 +3630,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { merge.readers.add(reader); assert delCount <= info.info.getDocCount(): "delCount=" + delCount + " info.docCount=" + info.info.getDocCount() + " rld.pendingDeleteCount=" + rld.getPendingDeleteCount() + " info.getDelCount()=" + info.getDelCount(); - if (delCount < info.info.getDocCount()) { - merger.add(reader); - } segUpto++; } + // we pass merge.getMergeReaders() instead of merge.readers to allow the + // OneMerge to return a view over the actual segments to merge + final SegmentMerger merger = new SegmentMerger(merge.getMergeReaders(), + merge.info.info, infoStream, dirWrapper, config.getTermIndexInterval(), + checkAbort, globalFieldNumberMap, context); + merge.checkAborted(directory); // This is where all the work happens: diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java index 656c7e56fd2..66a9db26335 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java @@ -19,6 +19,7 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -104,6 +105,22 @@ public abstract class MergePolicy implements java.io.Closeable, Cloneable { totalDocCount = count; } + /** Get the list of readers to merge. Note that this list does not + * necessarily match the list of segments to merge and should only be used + * to feed SegmentMerger to initialize a merge. */ + public List getMergeReaders() throws IOException { + if (readers == null) { + throw new IllegalStateException("IndexWriter has not initialized readers from the segment infos yet"); + } + final List readers = new ArrayList(this.readers.size()); + for (AtomicReader reader : this.readers) { + if (reader.numDocs() > 0) { + readers.add(reader); + } + } + return Collections.unmodifiableList(readers); + } + /** Record that an exception occurred while executing * this merge */ synchronized void setException(Throwable error) { diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeState.java b/lucene/core/src/java/org/apache/lucene/index/MergeState.java index 16b78cd81de..46551e4b5f8 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergeState.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergeState.java @@ -129,13 +129,13 @@ public class MergeState { } /** {@link SegmentInfo} of the newly merged segment. */ - public SegmentInfo segmentInfo; + public final SegmentInfo segmentInfo; /** {@link FieldInfos} of the newly merged segment. */ public FieldInfos fieldInfos; /** Readers being merged. */ - public List readers; + public final List readers; /** Maps docIDs around deletions. */ public DocMap[] docMaps; @@ -145,10 +145,10 @@ public class MergeState { /** Holds the CheckAbort instance, which is invoked * periodically to see if the merge has been aborted. */ - public CheckAbort checkAbort; + public final CheckAbort checkAbort; /** InfoStream for debugging messages. */ - public InfoStream infoStream; + public final InfoStream infoStream; // TODO: get rid of this? it tells you which segments are 'aligned' (e.g. for bulk merging) // but is this really so expensive to compute again in different components, versus once in SM? @@ -162,7 +162,11 @@ public class MergeState { public int matchedCount; /** Sole constructor. */ - MergeState() { + MergeState(List readers, SegmentInfo segmentInfo, InfoStream infoStream, CheckAbort checkAbort) { + this.readers = readers; + this.segmentInfo = segmentInfo; + this.infoStream = infoStream; + this.checkAbort = checkAbort; } /** diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java index 4afb9116a36..6631020f759 100644 --- a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java +++ b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java @@ -34,12 +34,11 @@ import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; /** - * The SegmentMerger class combines two or more Segments, represented by an IndexReader ({@link #add}, - * into a single Segment. After adding the appropriate readers, call the merge method to combine the + * The SegmentMerger class combines two or more Segments, represented by an + * IndexReader, into a single Segment. Call the merge method to combine the * segments. * * @see #merge - * @see #add */ final class SegmentMerger { private final Directory directory; @@ -49,16 +48,13 @@ final class SegmentMerger { private final IOContext context; - private final MergeState mergeState = new MergeState(); + private final MergeState mergeState; private final FieldInfos.Builder fieldInfosBuilder; // note, just like in codec apis Directory 'dir' is NOT the same as segmentInfo.dir!! - SegmentMerger(SegmentInfo segmentInfo, InfoStream infoStream, Directory dir, int termIndexInterval, + SegmentMerger(List readers, SegmentInfo segmentInfo, InfoStream infoStream, Directory dir, int termIndexInterval, MergeState.CheckAbort checkAbort, FieldInfos.FieldNumbers fieldNumbers, IOContext context) { - mergeState.segmentInfo = segmentInfo; - mergeState.infoStream = infoStream; - mergeState.readers = new ArrayList(); - mergeState.checkAbort = checkAbort; + mergeState = new MergeState(readers, segmentInfo, infoStream, checkAbort); directory = dir; this.termIndexInterval = termIndexInterval; this.codec = segmentInfo.getCodec(); @@ -67,21 +63,7 @@ final class SegmentMerger { } /** - * Add an IndexReader to the collection of readers that are to be merged - */ - void add(IndexReader reader) { - for (final AtomicReaderContext ctx : reader.leaves()) { - final AtomicReader r = ctx.reader(); - mergeState.readers.add(r); - } - } - - void add(SegmentReader reader) { - mergeState.readers.add(reader); - } - - /** - * Merges the readers specified by the {@link #add} method into the directory passed to the constructor + * Merges the readers into the directory passed to the constructor * @return The number of documents that were merged * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDoc.java b/lucene/core/src/test/org/apache/lucene/index/TestDoc.java index db78ffd4b86..04b397bfe58 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestDoc.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestDoc.java @@ -25,6 +25,7 @@ import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.io.StringWriter; import java.io.Writer; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.LinkedList; @@ -218,11 +219,10 @@ public class TestDoc extends LuceneTestCase { TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(si1.info.dir); final SegmentInfo si = new SegmentInfo(si1.info.dir, Constants.LUCENE_MAIN_VERSION, merged, -1, false, codec, null, null); - SegmentMerger merger = new SegmentMerger(si, InfoStream.getDefault(), trackingDir, IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL, - MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), context); + SegmentMerger merger = new SegmentMerger(Arrays.asList(r1, r2), + si, InfoStream.getDefault(), trackingDir, IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL, + MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), context); - merger.add(r1); - merger.add(r2); MergeState mergeState = merger.merge(); r1.close(); r2.close(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java b/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java index a9c825ade4a..a95049796e4 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java @@ -18,6 +18,7 @@ package org.apache.lucene.index; */ import java.io.IOException; +import java.util.Arrays; import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Document; @@ -29,7 +30,6 @@ import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util._TestUtil; -import org.apache.lucene.util.packed.PackedInts; public class TestSegmentMerger extends LuceneTestCase { //The variables for the new merged segment @@ -80,10 +80,9 @@ public class TestSegmentMerger extends LuceneTestCase { final Codec codec = Codec.getDefault(); final SegmentInfo si = new SegmentInfo(mergedDir, Constants.LUCENE_MAIN_VERSION, mergedSegment, -1, false, codec, null, null); - SegmentMerger merger = new SegmentMerger(si, InfoStream.getDefault(), mergedDir, IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL, - MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), newIOContext(random())); - merger.add(reader1); - merger.add(reader2); + SegmentMerger merger = new SegmentMerger(Arrays.asList(reader1, reader2), + si, InfoStream.getDefault(), mergedDir, IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL, + MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), newIOContext(random())); MergeState mergeState = merger.merge(); int docsMerged = mergeState.segmentInfo.getDocCount(); assertTrue(docsMerged == 2); diff --git a/lucene/misc/src/java/org/apache/lucene/index/sorter/SortingMergePolicy.java b/lucene/misc/src/java/org/apache/lucene/index/sorter/SortingMergePolicy.java new file mode 100644 index 00000000000..cfaa5c36984 --- /dev/null +++ b/lucene/misc/src/java/org/apache/lucene/index/sorter/SortingMergePolicy.java @@ -0,0 +1,148 @@ +package org.apache.lucene.index.sorter; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.lucene.index.AtomicReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.MultiReader; +import org.apache.lucene.index.SegmentInfoPerCommit; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SlowCompositeReaderWrapper; +import org.apache.lucene.store.Directory; + +/** A {@link MergePolicy} that reorders documents according to a {@link Sorter} + * before merging them. As a consequence, all segments resulting from a merge + * will be sorted while segments resulting from a flush will be in the order + * in which documents have been added. + *

Never use this {@link MergePolicy} if you rely on + * {@link IndexWriter#addDocuments(Iterable, org.apache.lucene.analysis.Analyzer)} + * to have sequentially-assigned doc IDs, this policy will scatter doc IDs. + * @lucene.experimental */ +public final class SortingMergePolicy extends MergePolicy { + + private class SortingOneMerge extends OneMerge { + + SortingOneMerge(List segments) { + super(segments); + } + + @Override + public List getMergeReaders() throws IOException { + final List readers = super.getMergeReaders(); + switch (readers.size()) { + case 0: + return readers; + case 1: + return Collections.singletonList(SortingAtomicReader.wrap(readers.get(0), sorter)); + default: + final IndexReader multiReader = new MultiReader(readers.toArray(new AtomicReader[readers.size()])); + final AtomicReader atomicReader = SlowCompositeReaderWrapper.wrap(multiReader); + final AtomicReader sortingReader = SortingAtomicReader.wrap(atomicReader, sorter); + if (sortingReader == atomicReader) { + // already sorted, return the original list of readers so that + // codec-specific bulk-merge methods can be used + return readers; + } + return Collections.singletonList(sortingReader); + } + } + + } + + private class SortingMergeSpecification extends MergeSpecification { + + @Override + public void add(OneMerge merge) { + super.add(new SortingOneMerge(merge.segments)); + } + + @Override + public String segString(Directory dir) { + return "SortingMergeSpec(" + super.segString(dir) + ", sorter=" + sorter + ")"; + } + + } + + private MergeSpecification sortedMergeSpecification(MergeSpecification specification) { + if (specification == null) { + return null; + } + MergeSpecification sortingSpec = new SortingMergeSpecification(); + for (OneMerge merge : specification.merges) { + sortingSpec.add(merge); + } + return sortingSpec; + } + + private final MergePolicy in; + private final Sorter sorter; + + /** Create a new {@link MergePolicy} that sorts documents with sorter. */ + public SortingMergePolicy(MergePolicy in, Sorter sorter) { + this.in = in; + this.sorter = sorter; + } + + @Override + public MergeSpecification findMerges(MergeTrigger mergeTrigger, + SegmentInfos segmentInfos) throws IOException { + return sortedMergeSpecification(in.findMerges(mergeTrigger, segmentInfos)); + } + + @Override + public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, + int maxSegmentCount, Map segmentsToMerge) + throws IOException { + return sortedMergeSpecification(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge)); + } + + @Override + public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos) + throws IOException { + return sortedMergeSpecification(in.findForcedDeletesMerges(segmentInfos)); + } + + @Override + public void close() { + in.close(); + } + + @Override + public boolean useCompoundFile(SegmentInfos segments, + SegmentInfoPerCommit newSegment) throws IOException { + return in.useCompoundFile(segments, newSegment); + } + + @Override + public void setIndexWriter(IndexWriter writer) { + in.setIndexWriter(writer); + } + + @Override + public String toString() { + return "SortingMergePolicy(" + in + ", sorter=" + sorter + ")"; + } + +} diff --git a/lucene/misc/src/test/org/apache/lucene/index/sorter/TestSortingMergePolicy.java b/lucene/misc/src/test/org/apache/lucene/index/sorter/TestSortingMergePolicy.java new file mode 100644 index 00000000000..ae8ee463816 --- /dev/null +++ b/lucene/misc/src/test/org/apache/lucene/index/sorter/TestSortingMergePolicy.java @@ -0,0 +1,113 @@ +package org.apache.lucene.index.sorter; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.AtomicReader; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SlowCompositeReaderWrapper; +import org.apache.lucene.index.Term; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util._TestUtil; + +public class TestSortingMergePolicy extends LuceneTestCase { + + private static final String DELETE_TERM = "abc"; + + private Directory dir1, dir2; + private Sorter sorter; + private IndexReader reader; + private IndexReader sortedReader; + + public void setUp() throws Exception { + super.setUp(); + sorter = new NumericDocValuesSorter("ndv"); + createRandomIndexes(); + } + + private Document randomDocument() { + final Document doc = new Document(); + doc.add(new NumericDocValuesField("ndv", random().nextLong())); + doc.add(new StringField("s", rarely() ? DELETE_TERM : _TestUtil.randomSimpleString(random(), 3), Store.YES)); + return doc; + } + + private void createRandomIndexes() throws IOException { + dir1 = newDirectory(); + dir2 = newDirectory(); + final int numDocs = atLeast(100); + final IndexWriterConfig iwc1 = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + final IndexWriterConfig iwc2 = iwc1.clone(); + iwc2.setMergePolicy(new SortingMergePolicy(iwc2.getMergePolicy(), sorter)); + final IndexWriter iw1 = new IndexWriter(dir1, iwc1); + final IndexWriter iw2 = new IndexWriter(dir2, iwc2); + for (int i = 0; i < numDocs; ++i) { + final Document doc = randomDocument(); + iw1.addDocument(doc); + iw2.addDocument(doc); + if (i == numDocs / 2 || rarely()) { + iw1.commit(); + iw2.commit(); + } + } + iw1.deleteDocuments(new Term("s", DELETE_TERM)); + iw2.deleteDocuments(new Term("s", DELETE_TERM)); + iw1.forceMerge(1); + iw2.forceMerge(1); + iw1.close(); + iw2.close(); + reader = DirectoryReader.open(dir1); + sortedReader = DirectoryReader.open(dir2); + } + + public void tearDown() throws Exception { + reader.close(); + sortedReader.close(); + dir1.close(); + dir2.close(); + super.tearDown(); + } + + private void assertSorted(AtomicReader reader) throws IOException { + final NumericDocValues ndv = reader.getNumericDocValues("ndv"); + for (int i = 1; i < reader.maxDoc(); ++i) { + assertTrue(ndv.get(i-1) < ndv.get(i)); + } + } + + public void testSortingMP() throws IOException { + final AtomicReader sortedReader1 = SortingAtomicReader.wrap(SlowCompositeReaderWrapper.wrap(reader), sorter); + final AtomicReader sortedReader2 = SlowCompositeReaderWrapper.wrap(sortedReader); + + assertSorted(sortedReader1); + assertSorted(sortedReader2); + assertReaderEquals("", sortedReader1, sortedReader2); + } + +}