LUCENE-4752: Added SortingMergePolicy that sorts documents when merging.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1459794 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Adrien Grand 2013-03-22 14:22:58 +00:00
parent 276edac42e
commit 97ec69bb69
9 changed files with 315 additions and 49 deletions

View File

@ -127,6 +127,9 @@ New Features
IndexReader leaf by throwing a CollectionTerminatedException in
Collector.collect. (Adrien Grand, Shai Erera)
* LUCENE-4752: New SortingMergePolicy (in lucene/misc) that sorts documents
before merging segments. (Adrien Grand, Shai Erera, David Smiley)
API Changes
* LUCENE-4844: removed TaxonomyReader.getParent(), you should use

View File

@ -2395,8 +2395,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
flush(false, true);
String mergedName = newSegmentName();
final List<AtomicReader> mergeReaders = new ArrayList<AtomicReader>();
for (IndexReader indexReader : readers) {
numDocs += indexReader.numDocs();
for (AtomicReaderContext ctx : indexReader.leaves()) {
mergeReaders.add(ctx.reader());
}
}
final IOContext context = new IOContext(new MergeInfo(numDocs, -1, true, -1));
@ -2407,13 +2411,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
SegmentInfo info = new SegmentInfo(directory, Constants.LUCENE_MAIN_VERSION, mergedName, -1,
false, codec, null, null);
SegmentMerger merger = new SegmentMerger(info, infoStream, trackingDir, config.getTermIndexInterval(),
SegmentMerger merger = new SegmentMerger(mergeReaders, info, infoStream, trackingDir, config.getTermIndexInterval(),
MergeState.CheckAbort.NONE, globalFieldNumberMap, context);
for (IndexReader reader : readers) { // add new indexes
merger.add(reader);
}
MergeState mergeState;
boolean success = false;
try {
@ -3561,9 +3561,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
final MergeState.CheckAbort checkAbort = new MergeState.CheckAbort(merge, directory);
final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(directory);
SegmentMerger merger = new SegmentMerger(merge.info.info, infoStream, dirWrapper, config.getTermIndexInterval(), checkAbort,
globalFieldNumberMap, context);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "merging " + segString(merge.segments));
}
@ -3633,12 +3630,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
merge.readers.add(reader);
assert delCount <= info.info.getDocCount(): "delCount=" + delCount + " info.docCount=" + info.info.getDocCount() + " rld.pendingDeleteCount=" + rld.getPendingDeleteCount() + " info.getDelCount()=" + info.getDelCount();
if (delCount < info.info.getDocCount()) {
merger.add(reader);
}
segUpto++;
}
// we pass merge.getMergeReaders() instead of merge.readers to allow the
// OneMerge to return a view over the actual segments to merge
final SegmentMerger merger = new SegmentMerger(merge.getMergeReaders(),
merge.info.info, infoStream, dirWrapper, config.getTermIndexInterval(),
checkAbort, globalFieldNumberMap, context);
merge.checkAborted(directory);
// This is where all the work happens:

View File

@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -104,6 +105,22 @@ public abstract class MergePolicy implements java.io.Closeable, Cloneable {
totalDocCount = count;
}
/** Get the list of readers to merge. Note that this list does not
* necessarily match the list of segments to merge and should only be used
* to feed SegmentMerger to initialize a merge. */
public List<AtomicReader> getMergeReaders() throws IOException {
if (readers == null) {
throw new IllegalStateException("IndexWriter has not initialized readers from the segment infos yet");
}
final List<AtomicReader> readers = new ArrayList<AtomicReader>(this.readers.size());
for (AtomicReader reader : this.readers) {
if (reader.numDocs() > 0) {
readers.add(reader);
}
}
return Collections.unmodifiableList(readers);
}
/** Record that an exception occurred while executing
* this merge */
synchronized void setException(Throwable error) {

View File

@ -129,13 +129,13 @@ public class MergeState {
}
/** {@link SegmentInfo} of the newly merged segment. */
public SegmentInfo segmentInfo;
public final SegmentInfo segmentInfo;
/** {@link FieldInfos} of the newly merged segment. */
public FieldInfos fieldInfos;
/** Readers being merged. */
public List<AtomicReader> readers;
public final List<AtomicReader> readers;
/** Maps docIDs around deletions. */
public DocMap[] docMaps;
@ -145,10 +145,10 @@ public class MergeState {
/** Holds the CheckAbort instance, which is invoked
* periodically to see if the merge has been aborted. */
public CheckAbort checkAbort;
public final CheckAbort checkAbort;
/** InfoStream for debugging messages. */
public InfoStream infoStream;
public final InfoStream infoStream;
// TODO: get rid of this? it tells you which segments are 'aligned' (e.g. for bulk merging)
// but is this really so expensive to compute again in different components, versus once in SM?
@ -162,7 +162,11 @@ public class MergeState {
public int matchedCount;
/** Sole constructor. */
MergeState() {
MergeState(List<AtomicReader> readers, SegmentInfo segmentInfo, InfoStream infoStream, CheckAbort checkAbort) {
this.readers = readers;
this.segmentInfo = segmentInfo;
this.infoStream = infoStream;
this.checkAbort = checkAbort;
}
/**

View File

@ -34,12 +34,11 @@ import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
/**
* The SegmentMerger class combines two or more Segments, represented by an IndexReader ({@link #add},
* into a single Segment. After adding the appropriate readers, call the merge method to combine the
* The SegmentMerger class combines two or more Segments, represented by an
* IndexReader, into a single Segment. Call the merge method to combine the
* segments.
*
* @see #merge
* @see #add
*/
final class SegmentMerger {
private final Directory directory;
@ -49,16 +48,13 @@ final class SegmentMerger {
private final IOContext context;
private final MergeState mergeState = new MergeState();
private final MergeState mergeState;
private final FieldInfos.Builder fieldInfosBuilder;
// note, just like in codec apis Directory 'dir' is NOT the same as segmentInfo.dir!!
SegmentMerger(SegmentInfo segmentInfo, InfoStream infoStream, Directory dir, int termIndexInterval,
SegmentMerger(List<AtomicReader> readers, SegmentInfo segmentInfo, InfoStream infoStream, Directory dir, int termIndexInterval,
MergeState.CheckAbort checkAbort, FieldInfos.FieldNumbers fieldNumbers, IOContext context) {
mergeState.segmentInfo = segmentInfo;
mergeState.infoStream = infoStream;
mergeState.readers = new ArrayList<AtomicReader>();
mergeState.checkAbort = checkAbort;
mergeState = new MergeState(readers, segmentInfo, infoStream, checkAbort);
directory = dir;
this.termIndexInterval = termIndexInterval;
this.codec = segmentInfo.getCodec();
@ -67,21 +63,7 @@ final class SegmentMerger {
}
/**
* Add an IndexReader to the collection of readers that are to be merged
*/
void add(IndexReader reader) {
for (final AtomicReaderContext ctx : reader.leaves()) {
final AtomicReader r = ctx.reader();
mergeState.readers.add(r);
}
}
void add(SegmentReader reader) {
mergeState.readers.add(reader);
}
/**
* Merges the readers specified by the {@link #add} method into the directory passed to the constructor
* Merges the readers into the directory passed to the constructor
* @return The number of documents that were merged
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error

View File

@ -25,6 +25,7 @@ import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
@ -218,11 +219,10 @@ public class TestDoc extends LuceneTestCase {
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(si1.info.dir);
final SegmentInfo si = new SegmentInfo(si1.info.dir, Constants.LUCENE_MAIN_VERSION, merged, -1, false, codec, null, null);
SegmentMerger merger = new SegmentMerger(si, InfoStream.getDefault(), trackingDir, IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL,
MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), context);
SegmentMerger merger = new SegmentMerger(Arrays.<AtomicReader>asList(r1, r2),
si, InfoStream.getDefault(), trackingDir, IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL,
MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), context);
merger.add(r1);
merger.add(r2);
MergeState mergeState = merger.merge();
r1.close();
r2.close();

View File

@ -18,6 +18,7 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
import java.util.Arrays;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Document;
@ -29,7 +30,6 @@ import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
import org.apache.lucene.util.packed.PackedInts;
public class TestSegmentMerger extends LuceneTestCase {
//The variables for the new merged segment
@ -80,10 +80,9 @@ public class TestSegmentMerger extends LuceneTestCase {
final Codec codec = Codec.getDefault();
final SegmentInfo si = new SegmentInfo(mergedDir, Constants.LUCENE_MAIN_VERSION, mergedSegment, -1, false, codec, null, null);
SegmentMerger merger = new SegmentMerger(si, InfoStream.getDefault(), mergedDir, IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL,
MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), newIOContext(random()));
merger.add(reader1);
merger.add(reader2);
SegmentMerger merger = new SegmentMerger(Arrays.<AtomicReader>asList(reader1, reader2),
si, InfoStream.getDefault(), mergedDir, IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL,
MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), newIOContext(random()));
MergeState mergeState = merger.merge();
int docsMerged = mergeState.segmentInfo.getDocCount();
assertTrue(docsMerged == 2);

View File

@ -0,0 +1,148 @@
package org.apache.lucene.index.sorter;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.index.SegmentInfoPerCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SlowCompositeReaderWrapper;
import org.apache.lucene.store.Directory;
/** A {@link MergePolicy} that reorders documents according to a {@link Sorter}
* before merging them. As a consequence, all segments resulting from a merge
* will be sorted while segments resulting from a flush will be in the order
* in which documents have been added.
* <p>Never use this {@link MergePolicy} if you rely on
* {@link IndexWriter#addDocuments(Iterable, org.apache.lucene.analysis.Analyzer)}
* to have sequentially-assigned doc IDs, this policy will scatter doc IDs.
* @lucene.experimental */
public final class SortingMergePolicy extends MergePolicy {
private class SortingOneMerge extends OneMerge {
SortingOneMerge(List<SegmentInfoPerCommit> segments) {
super(segments);
}
@Override
public List<AtomicReader> getMergeReaders() throws IOException {
final List<AtomicReader> readers = super.getMergeReaders();
switch (readers.size()) {
case 0:
return readers;
case 1:
return Collections.singletonList(SortingAtomicReader.wrap(readers.get(0), sorter));
default:
final IndexReader multiReader = new MultiReader(readers.toArray(new AtomicReader[readers.size()]));
final AtomicReader atomicReader = SlowCompositeReaderWrapper.wrap(multiReader);
final AtomicReader sortingReader = SortingAtomicReader.wrap(atomicReader, sorter);
if (sortingReader == atomicReader) {
// already sorted, return the original list of readers so that
// codec-specific bulk-merge methods can be used
return readers;
}
return Collections.singletonList(sortingReader);
}
}
}
private class SortingMergeSpecification extends MergeSpecification {
@Override
public void add(OneMerge merge) {
super.add(new SortingOneMerge(merge.segments));
}
@Override
public String segString(Directory dir) {
return "SortingMergeSpec(" + super.segString(dir) + ", sorter=" + sorter + ")";
}
}
private MergeSpecification sortedMergeSpecification(MergeSpecification specification) {
if (specification == null) {
return null;
}
MergeSpecification sortingSpec = new SortingMergeSpecification();
for (OneMerge merge : specification.merges) {
sortingSpec.add(merge);
}
return sortingSpec;
}
private final MergePolicy in;
private final Sorter sorter;
/** Create a new {@link MergePolicy} that sorts documents with <code>sorter</code>. */
public SortingMergePolicy(MergePolicy in, Sorter sorter) {
this.in = in;
this.sorter = sorter;
}
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger,
SegmentInfos segmentInfos) throws IOException {
return sortedMergeSpecification(in.findMerges(mergeTrigger, segmentInfos));
}
@Override
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
int maxSegmentCount, Map<SegmentInfoPerCommit,Boolean> segmentsToMerge)
throws IOException {
return sortedMergeSpecification(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge));
}
@Override
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos)
throws IOException {
return sortedMergeSpecification(in.findForcedDeletesMerges(segmentInfos));
}
@Override
public void close() {
in.close();
}
@Override
public boolean useCompoundFile(SegmentInfos segments,
SegmentInfoPerCommit newSegment) throws IOException {
return in.useCompoundFile(segments, newSegment);
}
@Override
public void setIndexWriter(IndexWriter writer) {
in.setIndexWriter(writer);
}
@Override
public String toString() {
return "SortingMergePolicy(" + in + ", sorter=" + sorter + ")";
}
}

View File

@ -0,0 +1,113 @@
package org.apache.lucene.index.sorter;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SlowCompositeReaderWrapper;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
public class TestSortingMergePolicy extends LuceneTestCase {
private static final String DELETE_TERM = "abc";
private Directory dir1, dir2;
private Sorter sorter;
private IndexReader reader;
private IndexReader sortedReader;
public void setUp() throws Exception {
super.setUp();
sorter = new NumericDocValuesSorter("ndv");
createRandomIndexes();
}
private Document randomDocument() {
final Document doc = new Document();
doc.add(new NumericDocValuesField("ndv", random().nextLong()));
doc.add(new StringField("s", rarely() ? DELETE_TERM : _TestUtil.randomSimpleString(random(), 3), Store.YES));
return doc;
}
private void createRandomIndexes() throws IOException {
dir1 = newDirectory();
dir2 = newDirectory();
final int numDocs = atLeast(100);
final IndexWriterConfig iwc1 = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
final IndexWriterConfig iwc2 = iwc1.clone();
iwc2.setMergePolicy(new SortingMergePolicy(iwc2.getMergePolicy(), sorter));
final IndexWriter iw1 = new IndexWriter(dir1, iwc1);
final IndexWriter iw2 = new IndexWriter(dir2, iwc2);
for (int i = 0; i < numDocs; ++i) {
final Document doc = randomDocument();
iw1.addDocument(doc);
iw2.addDocument(doc);
if (i == numDocs / 2 || rarely()) {
iw1.commit();
iw2.commit();
}
}
iw1.deleteDocuments(new Term("s", DELETE_TERM));
iw2.deleteDocuments(new Term("s", DELETE_TERM));
iw1.forceMerge(1);
iw2.forceMerge(1);
iw1.close();
iw2.close();
reader = DirectoryReader.open(dir1);
sortedReader = DirectoryReader.open(dir2);
}
public void tearDown() throws Exception {
reader.close();
sortedReader.close();
dir1.close();
dir2.close();
super.tearDown();
}
private void assertSorted(AtomicReader reader) throws IOException {
final NumericDocValues ndv = reader.getNumericDocValues("ndv");
for (int i = 1; i < reader.maxDoc(); ++i) {
assertTrue(ndv.get(i-1) < ndv.get(i));
}
}
public void testSortingMP() throws IOException {
final AtomicReader sortedReader1 = SortingAtomicReader.wrap(SlowCompositeReaderWrapper.wrap(reader), sorter);
final AtomicReader sortedReader2 = SlowCompositeReaderWrapper.wrap(sortedReader);
assertSorted(sortedReader1);
assertSorted(sortedReader2);
assertReaderEquals("", sortedReader1, sortedReader2);
}
}