mirror of https://github.com/apache/lucene.git
LUCENE-2382: check for aborted merge when merging postings
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1062889 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3e8f55cd0d
commit
8d1994b2e3
|
@ -17,15 +17,15 @@ package org.apache.lucene.index;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.HashSet;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
|
||||
import org.apache.lucene.index.codecs.MergeState;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
|
||||
/**
|
||||
* Combines multiple files into a single compound file.
|
||||
|
@ -76,7 +76,7 @@ final class CompoundFileWriter {
|
|||
private HashSet<String> ids;
|
||||
private LinkedList<FileEntry> entries;
|
||||
private boolean merged = false;
|
||||
private SegmentMerger.CheckAbort checkAbort;
|
||||
private MergeState.CheckAbort checkAbort;
|
||||
|
||||
/** Create the compound stream in the specified file. The file name is the
|
||||
* entire name (no extensions are added).
|
||||
|
@ -86,7 +86,7 @@ final class CompoundFileWriter {
|
|||
this(dir, name, null);
|
||||
}
|
||||
|
||||
CompoundFileWriter(Directory dir, String name, SegmentMerger.CheckAbort checkAbort) {
|
||||
CompoundFileWriter(Directory dir, String name, MergeState.CheckAbort checkAbort) {
|
||||
if (dir == null)
|
||||
throw new NullPointerException("directory cannot be null");
|
||||
if (name == null)
|
||||
|
|
|
@ -481,6 +481,7 @@ public abstract class LogMergePolicy extends MergePolicy {
|
|||
if (size < 1)
|
||||
size = 1;
|
||||
levels[i] = (float) Math.log(size)/norm;
|
||||
message("seg " + info.name + " level=" + levels[i]);
|
||||
}
|
||||
|
||||
final float levelFloor;
|
||||
|
|
|
@ -110,7 +110,7 @@ public abstract class MergePolicy implements java.io.Closeable {
|
|||
return aborted;
|
||||
}
|
||||
|
||||
synchronized void checkAborted(Directory dir) throws MergeAbortedException {
|
||||
public synchronized void checkAborted(Directory dir) throws MergeAbortedException {
|
||||
if (aborted) {
|
||||
throw new MergeAbortedException("merge is aborted: " + segString(dir));
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ final class SegmentMerger {
|
|||
|
||||
private int mergedDocs;
|
||||
|
||||
private final CheckAbort checkAbort;
|
||||
private final MergeState.CheckAbort checkAbort;
|
||||
|
||||
/** Maximum number of contiguous documents to bulk-copy
|
||||
when merging stored fields */
|
||||
|
@ -78,9 +78,9 @@ final class SegmentMerger {
|
|||
this.fieldInfos = fieldInfos;
|
||||
segment = name;
|
||||
if (merge != null) {
|
||||
checkAbort = new CheckAbort(merge, directory);
|
||||
checkAbort = new MergeState.CheckAbort(merge, directory);
|
||||
} else {
|
||||
checkAbort = new CheckAbort(null, null) {
|
||||
checkAbort = new MergeState.CheckAbort(null, null) {
|
||||
@Override
|
||||
public void work(double units) throws MergeAbortedException {
|
||||
// do nothing
|
||||
|
@ -508,6 +508,7 @@ final class SegmentMerger {
|
|||
mergeState.hasPayloadProcessorProvider = payloadProcessorProvider != null;
|
||||
mergeState.dirPayloadProcessor = new PayloadProcessorProvider.DirPayloadProcessor[mergeState.readerCount];
|
||||
mergeState.currentPayloadProcessor = new PayloadProcessorProvider.PayloadProcessor[mergeState.readerCount];
|
||||
mergeState.checkAbort = checkAbort;
|
||||
|
||||
docBase = 0;
|
||||
int inputDocBase = 0;
|
||||
|
@ -612,31 +613,4 @@ final class SegmentMerger {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class CheckAbort {
|
||||
private double workCount;
|
||||
private MergePolicy.OneMerge merge;
|
||||
private Directory dir;
|
||||
public CheckAbort(MergePolicy.OneMerge merge, Directory dir) {
|
||||
this.merge = merge;
|
||||
this.dir = dir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Records the fact that roughly units amount of work
|
||||
* have been done since this method was last called.
|
||||
* When adding time-consuming code into SegmentMerger,
|
||||
* you should test different values for units to ensure
|
||||
* that the time in between calls to merge.checkAborted
|
||||
* is up to ~ 1 second.
|
||||
*/
|
||||
public void work(double units) throws MergePolicy.MergeAbortedException {
|
||||
workCount += units;
|
||||
if (workCount >= 10000.0) {
|
||||
merge.checkAborted(dir);
|
||||
workCount = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,13 +17,16 @@ package org.apache.lucene.index.codecs;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.lucene.index.FieldInfo;
|
||||
import org.apache.lucene.index.FieldInfos;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.MergePolicy;
|
||||
import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
|
||||
import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import java.util.List;
|
||||
|
||||
/** Holds common state used during segment merging
|
||||
*
|
||||
|
@ -37,6 +40,7 @@ public class MergeState {
|
|||
public int[] docBase; // New docID base per reader
|
||||
public int mergedDocCount; // Total # merged docs
|
||||
public Bits multiDeletedDocs;
|
||||
public CheckAbort checkAbort;
|
||||
|
||||
// Updated per field;
|
||||
public FieldInfo fieldInfo;
|
||||
|
@ -45,5 +49,30 @@ public class MergeState {
|
|||
public boolean hasPayloadProcessorProvider;
|
||||
public DirPayloadProcessor[] dirPayloadProcessor;
|
||||
public PayloadProcessor[] currentPayloadProcessor;
|
||||
|
||||
|
||||
public static class CheckAbort {
|
||||
private double workCount;
|
||||
private MergePolicy.OneMerge merge;
|
||||
private Directory dir;
|
||||
public CheckAbort(MergePolicy.OneMerge merge, Directory dir) {
|
||||
this.merge = merge;
|
||||
this.dir = dir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Records the fact that roughly units amount of work
|
||||
* have been done since this method was last called.
|
||||
* When adding time-consuming code into SegmentMerger,
|
||||
* you should test different values for units to ensure
|
||||
* that the time in between calls to merge.checkAborted
|
||||
* is up to ~ 1 second.
|
||||
*/
|
||||
public void work(double units) throws MergePolicy.MergeAbortedException {
|
||||
workCount += units;
|
||||
if (workCount >= 10000.0) {
|
||||
merge.checkAborted(dir);
|
||||
workCount = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,6 +56,7 @@ public abstract class TermsConsumer {
|
|||
BytesRef term;
|
||||
assert termsEnum != null;
|
||||
long sumTotalTermFreq = 0;
|
||||
long sumDF = 0;
|
||||
|
||||
if (mergeState.fieldInfo.omitTermFreqAndPositions) {
|
||||
if (docsEnum == null) {
|
||||
|
@ -73,6 +74,11 @@ public abstract class TermsConsumer {
|
|||
final TermStats stats = postingsConsumer.merge(mergeState, docsEnum);
|
||||
if (stats.docFreq > 0) {
|
||||
finishTerm(term, stats);
|
||||
sumDF += stats.docFreq;
|
||||
if (sumDF > 60000) {
|
||||
mergeState.checkAbort.work(sumDF/5.0);
|
||||
sumDF = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -99,6 +105,11 @@ public abstract class TermsConsumer {
|
|||
if (stats.docFreq > 0) {
|
||||
finishTerm(term, stats);
|
||||
sumTotalTermFreq += stats.totalTermFreq;
|
||||
sumDF += stats.docFreq;
|
||||
if (sumDF > 60000) {
|
||||
mergeState.checkAbort.work(sumDF/5.0);
|
||||
sumDF = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue