LUCENE-4752: Fix SortingMergePolicy behavior when merges occur concurrently with deletions.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1461732 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Adrien Grand 2013-03-27 17:31:01 +00:00
parent 51b597832e
commit d3eb052c45
5 changed files with 166 additions and 41 deletions

View File

@ -2975,7 +2975,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
* saves the resulting deletes file (incrementing the
* delete generation for merge.info). If no deletes were
* flushed, no new deletes file is saved. */
synchronized private ReadersAndLiveDocs commitMergedDeletes(MergePolicy.OneMerge merge) throws IOException {
synchronized private ReadersAndLiveDocs commitMergedDeletes(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
assert testPoint("startCommitMergeDeletes");
@ -2992,6 +2992,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
// Lazy init (only when we find a delete to carry over):
ReadersAndLiveDocs mergedDeletes = null;
MergePolicy.DocMap docMap = null;
for(int i=0; i < sourceSegments.size(); i++) {
SegmentInfoPerCommit info = sourceSegments.get(i);
@ -3037,8 +3038,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
if (mergedDeletes == null) {
mergedDeletes = readerPool.get(merge.info, true);
mergedDeletes.initWritableLiveDocs();
docMap = merge.getDocMap(mergeState);
assert docMap.isConsistent(merge.info.info.getDocCount());
}
mergedDeletes.delete(docUpto);
mergedDeletes.delete(docMap.map(docUpto));
}
docUpto++;
}
@ -3055,8 +3058,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
if (mergedDeletes == null) {
mergedDeletes = readerPool.get(merge.info, true);
mergedDeletes.initWritableLiveDocs();
docMap = merge.getDocMap(mergeState);
assert docMap.isConsistent(merge.info.info.getDocCount());
}
mergedDeletes.delete(docUpto);
mergedDeletes.delete(docMap.map(docUpto));
}
docUpto++;
}
@ -3081,7 +3086,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
return mergedDeletes;
}
synchronized private boolean commitMerge(MergePolicy.OneMerge merge) throws IOException {
synchronized private boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
assert testPoint("startCommitMerge");
@ -3109,7 +3114,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
return false;
}
final ReadersAndLiveDocs mergedDeletes = merge.info.info.getDocCount() == 0 ? null : commitMergedDeletes(merge);
final ReadersAndLiveDocs mergedDeletes = merge.info.info.getDocCount() == 0 ? null : commitMergedDeletes(merge, mergeState);
assert mergedDeletes == null || mergedDeletes.getPendingDeleteCount() != 0;
@ -3780,7 +3785,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
// Force READ context because we merge deletes onto
// this reader:
if (!commitMerge(merge)) {
if (!commitMerge(merge, mergeState)) {
// commitMerge will return false if this merge was aborted
return 0;
}

View File

@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.SetOnce.AlreadySetException;
import org.apache.lucene.util.SetOnce;
@ -58,7 +59,29 @@ import org.apache.lucene.util.SetOnce;
*/
public abstract class MergePolicy implements java.io.Closeable, Cloneable {
/** A map of doc IDs. */
public static abstract class DocMap {
/** Return the new doc ID according to its old value. */
public abstract int map(int old);
/** Useful from an assert. */
boolean isConsistent(int maxDoc) {
final FixedBitSet targets = new FixedBitSet(maxDoc);
for (int i = 0; i < maxDoc; ++i) {
final int target = map(i);
if (target < 0 || target >= maxDoc) {
assert false : "out of range: " + target + " not in [0-" + maxDoc + "[";
return false;
} else if (targets.get(target)) {
assert false : target + " is already taken (" + i + ")";
return false;
}
}
return true;
}
}
/** OneMerge provides the information necessary to perform
* an individual primitive merge operation, resulting in
* a single new segment. The merge spec includes the
@ -105,9 +128,12 @@ public abstract class MergePolicy implements java.io.Closeable, Cloneable {
totalDocCount = count;
}
/** Get the list of readers to merge. Note that this list does not
/** Expert: Get the list of readers to merge. Note that this list does not
* necessarily match the list of segments to merge and should only be used
* to feed SegmentMerger to initialize a merge. */
* to feed SegmentMerger to initialize a merge. When a {@link OneMerge}
* reorders doc IDs, it must override {@link #getDocMap} too so that
* deletes that happened during the merge can be applied to the newly
* merged segment. */
public List<AtomicReader> getMergeReaders() throws IOException {
if (readers == null) {
throw new IllegalStateException("IndexWriter has not initialized readers from the segment infos yet");
@ -121,6 +147,20 @@ public abstract class MergePolicy implements java.io.Closeable, Cloneable {
return Collections.unmodifiableList(readers);
}
/** Expert: If {@link #getMergeReaders()} reorders document IDs, this method
* must be overridden to return a mapping from the <i>natural</i> doc ID
* (the doc ID that would result from a natural merge) to the actual doc
* ID. This mapping is used to apply deletions that happened during the
* merge to the new segment. */
public DocMap getDocMap(MergeState mergeState) {
return new DocMap() {
@Override
public int map(int docID) {
return docID;
}
};
}
/** Record that an exception occurred while executing
* this merge */
synchronized void setException(Throwable error) {

View File

@ -626,7 +626,11 @@ public class SortingAtomicReader extends FilterAtomicReader {
* defined by <code>sorter</code>. If the reader is already sorted, this
* method might return the reader as-is. */
public static AtomicReader wrap(AtomicReader reader, Sorter sorter) throws IOException {
final Sorter.DocMap docMap = sorter.sort(reader);
return wrap(reader, sorter.sort(reader));
}
/** Expert: same as {@link #wrap(AtomicReader, Sorter)} but operates directly on a {@link Sorter.DocMap}. */
public static AtomicReader wrap(AtomicReader reader, Sorter.DocMap docMap) {
if (docMap == null) {
// the reader is already sorter
return reader;

View File

@ -26,11 +26,14 @@ import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.index.SegmentInfoPerCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SlowCompositeReaderWrapper;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer;
/** A {@link MergePolicy} that reorders documents according to a {@link Sorter}
* before merging them. As a consequence, all segments resulting from a merge
@ -42,7 +45,11 @@ import org.apache.lucene.store.Directory;
* @lucene.experimental */
public final class SortingMergePolicy extends MergePolicy {
private class SortingOneMerge extends OneMerge {
class SortingOneMerge extends OneMerge {
List<AtomicReader> unsortedReaders;
Sorter.DocMap docMap;
AtomicReader sortedView;
SortingOneMerge(List<SegmentInfoPerCommit> segments) {
super(segments);
@ -50,28 +57,62 @@ public final class SortingMergePolicy extends MergePolicy {
@Override
public List<AtomicReader> getMergeReaders() throws IOException {
final List<AtomicReader> readers = super.getMergeReaders();
switch (readers.size()) {
case 0:
return readers;
case 1:
return Collections.singletonList(SortingAtomicReader.wrap(readers.get(0), sorter));
default:
final IndexReader multiReader = new MultiReader(readers.toArray(new AtomicReader[readers.size()]));
final AtomicReader atomicReader = SlowCompositeReaderWrapper.wrap(multiReader);
final AtomicReader sortingReader = SortingAtomicReader.wrap(atomicReader, sorter);
if (sortingReader == atomicReader) {
// already sorted, return the original list of readers so that
// codec-specific bulk-merge methods can be used
return readers;
}
return Collections.singletonList(sortingReader);
if (unsortedReaders == null) {
unsortedReaders = super.getMergeReaders();
final AtomicReader atomicView;
if (unsortedReaders.size() == 1) {
atomicView = unsortedReaders.get(0);
} else {
final IndexReader multiReader = new MultiReader(unsortedReaders.toArray(new AtomicReader[unsortedReaders.size()]));
atomicView = SlowCompositeReaderWrapper.wrap(multiReader);
}
docMap = sorter.sort(atomicView);
sortedView = SortingAtomicReader.wrap(atomicView, docMap);
}
// a null doc map means that the readers are already sorted
return docMap == null ? unsortedReaders : Collections.singletonList(sortedView);
}
private MonotonicAppendingLongBuffer getDeletes(List<AtomicReader> readers) {
MonotonicAppendingLongBuffer deletes = new MonotonicAppendingLongBuffer();
int deleteCount = 0;
for (AtomicReader reader : readers) {
final int maxDoc = reader.maxDoc();
final Bits liveDocs = reader.getLiveDocs();
for (int i = 0; i < maxDoc; ++i) {
if (liveDocs != null && !liveDocs.get(i)) {
++deleteCount;
} else {
deletes.add(deleteCount);
}
}
}
return deletes;
}
@Override
public MergePolicy.DocMap getDocMap(final MergeState mergeState) {
if (unsortedReaders == null) {
throw new IllegalStateException();
}
if (docMap == null) {
return super.getDocMap(mergeState);
}
assert mergeState.docMaps.length == 1; // we returned a singleton reader
final MonotonicAppendingLongBuffer deletes = getDeletes(unsortedReaders);
return new MergePolicy.DocMap() {
@Override
public int map(int old) {
final int oldWithDeletes = old + (int) deletes.get(old);
final int newWithDeletes = docMap.oldToNew(oldWithDeletes);
return mergeState.docMaps[0].get(newWithDeletes);
}
};
}
}
private class SortingMergeSpecification extends MergeSpecification {
class SortingMergeSpecification extends MergeSpecification {
@Override
public void add(OneMerge merge) {
@ -96,8 +137,8 @@ public final class SortingMergePolicy extends MergePolicy {
return sortingSpec;
}
private final MergePolicy in;
private final Sorter sorter;
final MergePolicy in;
final Sorter sorter;
/** Create a new {@link MergePolicy} that sorts documents with <code>sorter</code>. */
public SortingMergePolicy(MergePolicy in, Sorter sorter) {

View File

@ -18,7 +18,11 @@ package org.apache.lucene.index.sorter;
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
@ -29,24 +33,28 @@ import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.index.SlowCompositeReaderWrapper;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
public class TestSortingMergePolicy extends LuceneTestCase {
private static final String DELETE_TERM = "abc";
private List<String> terms;
private Directory dir1, dir2;
private Sorter sorter;
private IndexReader reader;
private IndexReader sortedReader;
@Override
public void setUp() throws Exception {
super.setUp();
sorter = new NumericDocValuesSorter("ndv");
@ -56,32 +64,58 @@ public class TestSortingMergePolicy extends LuceneTestCase {
private Document randomDocument() {
final Document doc = new Document();
doc.add(new NumericDocValuesField("ndv", random().nextLong()));
doc.add(new StringField("s", rarely() ? DELETE_TERM : _TestUtil.randomSimpleString(random(), 3), Store.YES));
doc.add(new StringField("s", RandomPicks.randomFrom(random(), terms), Store.YES));
return doc;
}
private static MergePolicy newSortingMergePolicy(Sorter sorter) {
// create a MP with a low merge factor so that many merges happen
MergePolicy mp;
if (random().nextBoolean()) {
TieredMergePolicy tmp = newTieredMergePolicy(random());
final int numSegs = _TestUtil.nextInt(random(), 3, 5);
tmp.setSegmentsPerTier(numSegs);
tmp.setMaxMergeAtOnce(_TestUtil.nextInt(random(), 2, numSegs));
mp = tmp;
} else {
LogMergePolicy lmp = newLogMergePolicy(random());
lmp.setMergeFactor(_TestUtil.nextInt(random(), 3, 5));
mp = lmp;
}
// wrap it with a sorting mp
return new SortingMergePolicy(mp, sorter);
}
private void createRandomIndexes() throws IOException {
dir1 = newDirectory();
dir2 = newDirectory();
final int numDocs = atLeast(100);
final int numDocs = atLeast(150);
final int numTerms = _TestUtil.nextInt(random(), 1, numDocs / 5);
Set<String> randomTerms = new HashSet<String>();
while (randomTerms.size() < numTerms) {
randomTerms.add(_TestUtil.randomSimpleString(random()));
}
terms = new ArrayList<String>(randomTerms);
final long seed = random().nextLong();
final IndexWriterConfig iwc1 = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(new Random(seed)));
final IndexWriterConfig iwc2 = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(new Random(seed)));
iwc2.setMergePolicy(new SortingMergePolicy(iwc2.getMergePolicy(), sorter));
iwc2.setMergeScheduler(new SerialMergeScheduler()); // Remove this line when LUCENE-4752 is fixed
iwc2.setMergePolicy(newSortingMergePolicy(sorter));
final RandomIndexWriter iw1 = new RandomIndexWriter(new Random(seed), dir1, iwc1);
final RandomIndexWriter iw2 = new RandomIndexWriter(new Random(seed), dir2, iwc2);
for (int i = 0; i < numDocs; ++i) {
final Document doc = randomDocument();
iw1.addDocument(doc);
iw2.addDocument(doc);
if (i == numDocs / 2 || (i != numDocs - 1 && rarely())) {
if (i == numDocs / 2 || (i != numDocs - 1 && random().nextInt(8) == 0)) {
iw1.commit();
iw2.commit();
}
if (random().nextInt(5) == 0) {
final String term = RandomPicks.randomFrom(random(), terms);
iw1.deleteDocuments(new Term("s", term));
iw2.deleteDocuments(new Term("s", term));
}
}
iw1.deleteDocuments(new Term("s", DELETE_TERM));
iw2.deleteDocuments(new Term("s", DELETE_TERM));
iw1.forceMerge(1);
iw2.forceMerge(1);
iw1.close();
@ -90,6 +124,7 @@ public class TestSortingMergePolicy extends LuceneTestCase {
sortedReader = DirectoryReader.open(dir2);
}
@Override
public void tearDown() throws Exception {
reader.close();
sortedReader.close();
@ -98,7 +133,7 @@ public class TestSortingMergePolicy extends LuceneTestCase {
super.tearDown();
}
private void assertSorted(AtomicReader reader) throws IOException {
private static void assertSorted(AtomicReader reader) throws IOException {
final NumericDocValues ndv = reader.getNumericDocValues("ndv");
for (int i = 1; i < reader.maxDoc(); ++i) {
assertTrue(ndv.get(i-1) < ndv.get(i));