mirror of https://github.com/apache/lucene.git
LUCENE-3292: don't share SR instances for merging and searching in IW's readerPool
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1148938 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a54eccd951
commit
3822c0e4cc
|
@ -489,6 +489,11 @@ Optimizations
|
|||
MultiTermQuery now stores TermState per leaf reader during rewrite to re-
|
||||
seek the term dictionary in TermQuery / TermWeight.
|
||||
(Simon Willnauer, Mike McCandless, Robert Muir)
|
||||
|
||||
* LUCENE-3292: IndexWriter no longer shares the same SegmentReader
|
||||
instance for merging and NRT readers, which enables directory impls
|
||||
to separately tune IO flags for each. (Varun Thacker, Simon
|
||||
Willnauer, Mike McCandless)
|
||||
|
||||
Bug fixes
|
||||
|
||||
|
|
|
@ -233,7 +233,7 @@ public class NRTManager implements Closeable {
|
|||
**/
|
||||
public synchronized IndexSearcher get(long targetGen, boolean requireDeletes) {
|
||||
|
||||
assert noDeletesSearchingGen.get() >= searchingGen.get();
|
||||
assert noDeletesSearchingGen.get() >= searchingGen.get(): "noDeletesSearchingGen=" + noDeletesSearchingGen.get() + " searchingGen=" + searchingGen.get();
|
||||
|
||||
if (targetGen > getCurrentSearchingGen(requireDeletes)) {
|
||||
// Must wait
|
||||
|
|
|
@ -235,7 +235,7 @@ class BufferedDeletesStream {
|
|||
delCount += applyQueryDeletes(packet.queriesIterable(), reader);
|
||||
segAllDeletes = reader.numDocs() == 0;
|
||||
} finally {
|
||||
readerPool.release(reader);
|
||||
readerPool.release(reader, IOContext.Context.READ);
|
||||
}
|
||||
anyNewDeletes |= delCount > 0;
|
||||
|
||||
|
@ -277,7 +277,7 @@ class BufferedDeletesStream {
|
|||
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
|
||||
segAllDeletes = reader.numDocs() == 0;
|
||||
} finally {
|
||||
readerPool.release(reader);
|
||||
readerPool.release(reader, IOContext.Context.READ);
|
||||
}
|
||||
anyNewDeletes |= delCount > 0;
|
||||
|
||||
|
|
|
@ -466,6 +466,8 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
|
||||
// Ignore the exception if it was due to abort:
|
||||
if (!(exc instanceof MergePolicy.MergeAbortedException)) {
|
||||
//System.out.println(Thread.currentThread().getName() + ": CMS: exc");
|
||||
//exc.printStackTrace(System.out);
|
||||
if (!suppressExceptions) {
|
||||
// suppressExceptions is normally only set during
|
||||
// testing.
|
||||
|
|
|
@ -143,12 +143,12 @@ class DirectoryReader extends IndexReader implements Cloneable {
|
|||
}
|
||||
|
||||
// Used by near real-time search
|
||||
DirectoryReader(IndexWriter writer, SegmentInfos infos, int termInfosIndexDivisor, CodecProvider codecs, boolean applyAllDeletes) throws IOException {
|
||||
DirectoryReader(IndexWriter writer, SegmentInfos infos, CodecProvider codecs, boolean applyAllDeletes) throws IOException {
|
||||
this.directory = writer.getDirectory();
|
||||
this.readOnly = true;
|
||||
this.applyAllDeletes = applyAllDeletes; // saved for reopen
|
||||
|
||||
this.termInfosIndexDivisor = termInfosIndexDivisor;
|
||||
this.termInfosIndexDivisor = writer.getConfig().getReaderTermsIndexDivisor();
|
||||
if (codecs == null) {
|
||||
this.codecs = CodecProvider.getDefault();
|
||||
} else {
|
||||
|
@ -171,8 +171,7 @@ class DirectoryReader extends IndexReader implements Cloneable {
|
|||
try {
|
||||
final SegmentInfo info = infos.info(i);
|
||||
assert info.dir == dir;
|
||||
final SegmentReader reader = writer.readerPool.getReadOnlyClone(info, true, termInfosIndexDivisor,
|
||||
IOContext.READ);
|
||||
final SegmentReader reader = writer.readerPool.getReadOnlyClone(info, IOContext.READ);
|
||||
if (reader.numDocs() > 0 || writer.getKeepFullyDeletedSegments()) {
|
||||
reader.readerFinishedListeners = readerFinishedListeners;
|
||||
readers.add(reader);
|
||||
|
|
|
@ -53,7 +53,6 @@ import org.apache.lucene.store.Lock;
|
|||
import org.apache.lucene.store.LockObtainFailedException;
|
||||
import org.apache.lucene.store.MergeInfo;
|
||||
import org.apache.lucene.util.BitVector;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
|
@ -378,7 +377,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
// just like we do when loading segments_N
|
||||
synchronized(this) {
|
||||
maybeApplyDeletes(applyAllDeletes);
|
||||
r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
|
||||
r = new DirectoryReader(this, segmentInfos, codecs, applyAllDeletes);
|
||||
if (infoStream != null) {
|
||||
message("return reader version=" + r.getVersion() + " reader=" + r);
|
||||
}
|
||||
|
@ -416,18 +415,48 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
* has been called on this instance). */
|
||||
|
||||
class ReaderPool {
|
||||
|
||||
final class SegmentCacheKey {
|
||||
public final SegmentInfo si;
|
||||
public final IOContext.Context context;
|
||||
|
||||
public SegmentCacheKey(SegmentInfo segInfo, IOContext.Context context) {
|
||||
assert context == IOContext.Context.MERGE || context == IOContext.Context.READ;
|
||||
this.si = segInfo;
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return si.hashCode() + context.hashCode();
|
||||
}
|
||||
|
||||
private final Map<SegmentInfo,SegmentReader> readerMap = new HashMap<SegmentInfo,SegmentReader>();
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SegmentCacheKey(" + si + "," + context + ")";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object _other) {
|
||||
if (!(_other instanceof SegmentCacheKey)) {
|
||||
return false;
|
||||
}
|
||||
final SegmentCacheKey other = (SegmentCacheKey) _other;
|
||||
return si.equals(other.si) && context == other.context;
|
||||
}
|
||||
}
|
||||
|
||||
private final Map<SegmentCacheKey,SegmentReader> readerMap = new HashMap<SegmentCacheKey,SegmentReader>();
|
||||
|
||||
/** Forcefully clear changes for the specified segments. This is called on successful merge. */
|
||||
synchronized void clear(List<SegmentInfo> infos) throws IOException {
|
||||
if (infos == null) {
|
||||
for (Map.Entry<SegmentInfo,SegmentReader> ent: readerMap.entrySet()) {
|
||||
for (Map.Entry<SegmentCacheKey,SegmentReader> ent: readerMap.entrySet()) {
|
||||
ent.getValue().hasChanges = false;
|
||||
}
|
||||
} else {
|
||||
for (final SegmentInfo info: infos) {
|
||||
final SegmentReader r = readerMap.get(info);
|
||||
final SegmentReader r = readerMap.get(new SegmentCacheKey(info, IOContext.Context.MERGE));
|
||||
if (r != null) {
|
||||
r.hasChanges = false;
|
||||
}
|
||||
|
@ -437,9 +466,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
|
||||
// used only by asserts
|
||||
public synchronized boolean infoIsLive(SegmentInfo info) {
|
||||
return infoIsLive(info, "");
|
||||
}
|
||||
|
||||
public synchronized boolean infoIsLive(SegmentInfo info, String message) {
|
||||
int idx = segmentInfos.indexOf(info);
|
||||
assert idx != -1: "info=" + info + " isn't in pool";
|
||||
assert segmentInfos.info(idx) == info: "info=" + info + " doesn't match live info in segmentInfos";
|
||||
assert idx != -1: "info=" + info + " isn't live: " + message;
|
||||
assert segmentInfos.info(idx) == info: "info=" + info + " doesn't match live info in segmentInfos: " + message;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -460,8 +493,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
* @param sr
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized boolean release(SegmentReader sr) throws IOException {
|
||||
return release(sr, false);
|
||||
public synchronized boolean release(SegmentReader sr, IOContext.Context context) throws IOException {
|
||||
return release(sr, false, context);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -474,10 +507,32 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
* @throws IOException
|
||||
*/
|
||||
public synchronized boolean release(SegmentReader sr, boolean drop) throws IOException {
|
||||
final SegmentCacheKey cacheKey = new SegmentCacheKey(sr.getSegmentInfo(), IOContext.Context.READ);
|
||||
final SegmentReader other = readerMap.get(cacheKey);
|
||||
if (sr == other) {
|
||||
return release(sr, drop, IOContext.Context.READ);
|
||||
} else {
|
||||
assert sr == readerMap.get(new SegmentCacheKey(sr.getSegmentInfo(), IOContext.Context.MERGE));
|
||||
return release(sr, drop, IOContext.Context.MERGE);
|
||||
}
|
||||
}
|
||||
|
||||
final boolean pooled = readerMap.containsKey(sr.getSegmentInfo());
|
||||
/**
|
||||
* Release the segment reader (i.e. decRef it and close if there
|
||||
* are no more references.
|
||||
* @return true if this release altered the index (eg
|
||||
* the SegmentReader had pending changes to del docs and
|
||||
* was closed). Caller must call checkpoint() if so.
|
||||
* @param sr
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized boolean release(SegmentReader sr, boolean drop, IOContext.Context context) throws IOException {
|
||||
|
||||
assert !pooled || readerMap.get(sr.getSegmentInfo()) == sr;
|
||||
SegmentCacheKey cacheKey = new SegmentCacheKey(sr.getSegmentInfo(), context);
|
||||
|
||||
final boolean pooled = readerMap.containsKey(cacheKey);
|
||||
|
||||
assert !pooled || readerMap.get(cacheKey) == sr;
|
||||
|
||||
// Drop caller's ref; for an external reader (not
|
||||
// pooled), this decRef will close it
|
||||
|
@ -492,9 +547,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
// Discard (don't save) changes when we are dropping
|
||||
// the reader; this is used only on the sub-readers
|
||||
// after a successful merge.
|
||||
sr.hasChanges &= !drop;
|
||||
|
||||
final boolean hasChanges = sr.hasChanges;
|
||||
final boolean hasChanges;
|
||||
if (drop) {
|
||||
hasChanges = sr.hasChanges = false;
|
||||
} else {
|
||||
hasChanges = sr.hasChanges;
|
||||
}
|
||||
|
||||
// Drop our ref -- this will commit any pending
|
||||
// changes to the dir
|
||||
|
@ -502,7 +560,20 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
|
||||
// We are the last ref to this reader; since we're
|
||||
// not pooling readers, we release it:
|
||||
readerMap.remove(sr.getSegmentInfo());
|
||||
readerMap.remove(cacheKey);
|
||||
|
||||
if (drop && context == IOContext.Context.MERGE) {
|
||||
// Also drop the READ reader if present: we don't
|
||||
// need its deletes since they've been carried
|
||||
// over to the merged segment
|
||||
cacheKey = new SegmentCacheKey(sr.getSegmentInfo(), IOContext.Context.READ);
|
||||
SegmentReader sr2 = readerMap.get(cacheKey);
|
||||
if (sr2 != null) {
|
||||
readerMap.remove(cacheKey);
|
||||
sr2.hasChanges = false;
|
||||
sr2.close();
|
||||
}
|
||||
}
|
||||
|
||||
return hasChanges;
|
||||
}
|
||||
|
@ -511,16 +582,26 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
|
||||
public synchronized void drop(List<SegmentInfo> infos) throws IOException {
|
||||
drop(infos, IOContext.Context.READ);
|
||||
drop(infos, IOContext.Context.MERGE);
|
||||
}
|
||||
|
||||
public synchronized void drop(List<SegmentInfo> infos, IOContext.Context context) throws IOException {
|
||||
for(SegmentInfo info : infos) {
|
||||
drop(info);
|
||||
drop(info, context);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void drop(SegmentInfo info) throws IOException {
|
||||
final SegmentReader sr = readerMap.get(info);
|
||||
if (sr != null) {
|
||||
drop(info, IOContext.Context.READ);
|
||||
drop(info, IOContext.Context.MERGE);
|
||||
}
|
||||
|
||||
public synchronized void drop(SegmentInfo info, IOContext.Context context) throws IOException {
|
||||
final SegmentReader sr;
|
||||
if ((sr = readerMap.remove(new SegmentCacheKey(info, context))) != null) {
|
||||
sr.hasChanges = false;
|
||||
readerMap.remove(info);
|
||||
readerMap.remove(new SegmentCacheKey(info, context));
|
||||
sr.close();
|
||||
}
|
||||
}
|
||||
|
@ -532,14 +613,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
// sync'd on IW:
|
||||
assert Thread.holdsLock(IndexWriter.this);
|
||||
|
||||
Iterator<Map.Entry<SegmentInfo,SegmentReader>> iter = readerMap.entrySet().iterator();
|
||||
Iterator<Map.Entry<SegmentCacheKey,SegmentReader>> iter = readerMap.entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
|
||||
Map.Entry<SegmentInfo,SegmentReader> ent = iter.next();
|
||||
Map.Entry<SegmentCacheKey,SegmentReader> ent = iter.next();
|
||||
|
||||
SegmentReader sr = ent.getValue();
|
||||
if (sr.hasChanges) {
|
||||
assert infoIsLive(sr.getSegmentInfo());
|
||||
assert infoIsLive(sr.getSegmentInfo(), "key=" + ent.getKey());
|
||||
sr.doCommit(null);
|
||||
|
||||
// Must checkpoint w/ deleter, because this
|
||||
|
@ -567,10 +648,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
// We invoke deleter.checkpoint below, so we must be
|
||||
// sync'd on IW:
|
||||
assert Thread.holdsLock(IndexWriter.this);
|
||||
|
||||
|
||||
for (SegmentInfo info : infos) {
|
||||
|
||||
final SegmentReader sr = readerMap.get(info);
|
||||
final SegmentReader sr = readerMap.get(new SegmentCacheKey(info, IOContext.Context.READ));
|
||||
if (sr != null && sr.hasChanges) {
|
||||
assert infoIsLive(info);
|
||||
sr.doCommit(null);
|
||||
|
@ -582,13 +662,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
}
|
||||
|
||||
public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, IOContext context) throws IOException {
|
||||
return getReadOnlyClone(info, true, context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a ref to a clone. NOTE: this clone is not
|
||||
* enrolled in the pool, so you should simply close()
|
||||
* it when you're done (ie, do not call release()).
|
||||
*/
|
||||
public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, boolean doOpenStores, int termInfosIndexDivisor, IOContext context) throws IOException {
|
||||
SegmentReader sr = get(info, doOpenStores, context, termInfosIndexDivisor);
|
||||
public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, boolean doOpenStores, IOContext context) throws IOException {
|
||||
SegmentReader sr = get(info, doOpenStores, context);
|
||||
try {
|
||||
return (SegmentReader) sr.clone(true);
|
||||
} finally {
|
||||
|
@ -596,6 +680,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
}
|
||||
|
||||
public synchronized SegmentReader get(SegmentInfo info, IOContext context) throws IOException {
|
||||
return get(info, true, context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain a SegmentReader from the readerPool. The reader
|
||||
* must be returned by calling {@link #release(SegmentReader)}
|
||||
|
@ -605,53 +693,24 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
* @throws IOException
|
||||
*/
|
||||
public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, IOContext context) throws IOException {
|
||||
return get(info, doOpenStores, context, config.getReaderTermsIndexDivisor());
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain a SegmentReader from the readerPool. The reader
|
||||
* must be returned by calling {@link #release(SegmentReader)}
|
||||
*
|
||||
* @see #release(SegmentReader)
|
||||
* @param info
|
||||
* @param doOpenStores
|
||||
* @param readBufferSize
|
||||
* @param termsIndexDivisor
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, IOContext context, int termsIndexDivisor) throws IOException {
|
||||
|
||||
// if (poolReaders) {
|
||||
// readBufferSize = BufferedIndexInput.BUFFER_SIZE;
|
||||
// }
|
||||
|
||||
// TODO: context should be part of the key used to cache that reader in the pool.
|
||||
|
||||
SegmentReader sr = readerMap.get(info);
|
||||
SegmentCacheKey cacheKey = new SegmentCacheKey(info, context.context);
|
||||
SegmentReader sr = readerMap.get(cacheKey);
|
||||
if (sr == null) {
|
||||
// TODO: we may want to avoid doing this while
|
||||
// synchronized
|
||||
// Returns a ref, which we xfer to readerMap:
|
||||
sr = SegmentReader.get(false, info.dir, info, doOpenStores, termsIndexDivisor, context);
|
||||
sr = SegmentReader.get(false, info.dir, info, doOpenStores, context.context == IOContext.Context.MERGE ? -1 : config.getReaderTermsIndexDivisor(), context);
|
||||
sr.readerFinishedListeners = readerFinishedListeners;
|
||||
|
||||
if (info.dir == directory) {
|
||||
// Only pool if reader is not external
|
||||
readerMap.put(info, sr);
|
||||
readerMap.put(cacheKey, sr);
|
||||
}
|
||||
} else {
|
||||
if (doOpenStores) {
|
||||
sr.openDocStores();
|
||||
}
|
||||
if (termsIndexDivisor != -1) {
|
||||
// If this reader was originally opened because we
|
||||
// needed to merge it, we didn't load the terms
|
||||
// index. But now, if the caller wants the terms
|
||||
// index (eg because it's doing deletes, or an NRT
|
||||
// reader is being opened) we ask the reader to
|
||||
// load its terms index.
|
||||
sr.loadTermsIndex(termsIndexDivisor);
|
||||
}
|
||||
}
|
||||
|
||||
// Return a ref to our caller
|
||||
|
@ -664,13 +723,23 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
|
||||
// Returns a ref
|
||||
public synchronized SegmentReader getIfExists(SegmentInfo info) throws IOException {
|
||||
SegmentReader sr = readerMap.get(info);
|
||||
SegmentReader sr = getIfExists(info, IOContext.Context.READ);
|
||||
if (sr == null) {
|
||||
sr = getIfExists(info, IOContext.Context.MERGE);
|
||||
}
|
||||
return sr;
|
||||
}
|
||||
|
||||
// Returns a ref
|
||||
public synchronized SegmentReader getIfExists(SegmentInfo info, IOContext.Context context) throws IOException {
|
||||
SegmentCacheKey cacheKey = new SegmentCacheKey(info, context);
|
||||
SegmentReader sr = readerMap.get(cacheKey);
|
||||
if (sr != null) {
|
||||
sr.incRef();
|
||||
}
|
||||
return sr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain the number of deleted docs for a pooled reader.
|
||||
|
@ -687,7 +756,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
readerPool.release(reader);
|
||||
readerPool.release(reader, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2853,7 +2922,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
if (!keepFullyDeletedSegments && result.allDeleted != null) {
|
||||
if (infoStream != null) {
|
||||
message("drop 100% deleted segments: " + result.allDeleted);
|
||||
message("drop 100% deleted segments: " + segString(result.allDeleted));
|
||||
}
|
||||
for (SegmentInfo info : result.allDeleted) {
|
||||
// If a merge has already registered for this
|
||||
|
@ -2929,16 +2998,27 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
for(int i=0; i < sourceSegments.size(); i++) {
|
||||
SegmentInfo info = sourceSegments.get(i);
|
||||
minGen = Math.min(info.getBufferedDeletesGen(), minGen);
|
||||
int docCount = info.docCount;
|
||||
final SegmentReader previousReader = merge.readerClones.get(i);
|
||||
if (previousReader == null) {
|
||||
// Reader was skipped because it was 100% deletions
|
||||
continue;
|
||||
final int docCount = info.docCount;
|
||||
final BitVector prevLiveDocs = merge.readerLiveDocs.get(i);
|
||||
final BitVector currentLiveDocs;
|
||||
{
|
||||
final SegmentReader currentReader = readerPool.getIfExists(info, IOContext.Context.READ);
|
||||
if (currentReader != null) {
|
||||
currentLiveDocs = (BitVector) currentReader.getLiveDocs();
|
||||
readerPool.release(currentReader, false, IOContext.Context.READ);
|
||||
} else {
|
||||
assert readerPool.infoIsLive(info);
|
||||
if (info.hasDeletions()) {
|
||||
currentLiveDocs = new BitVector(directory,
|
||||
info.getDelFileName(),
|
||||
new IOContext(IOContext.Context.READ));
|
||||
} else {
|
||||
currentLiveDocs = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
final Bits prevLiveDocs = previousReader.getLiveDocs();
|
||||
final SegmentReader currentReader = merge.readers.get(i);
|
||||
final Bits currentLiveDocs = currentReader.getLiveDocs();
|
||||
if (previousReader.hasDeletions()) {
|
||||
|
||||
if (prevLiveDocs != null) {
|
||||
|
||||
// There were deletes on this segment when the merge
|
||||
// started. The merge has collapsed away those
|
||||
|
@ -2947,14 +3027,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
// newly flushed deletes but mapping them to the new
|
||||
// docIDs.
|
||||
|
||||
if (currentReader.numDeletedDocs() > previousReader.numDeletedDocs()) {
|
||||
// This means this segment has had new deletes
|
||||
// committed since we started the merge, so we
|
||||
if (currentLiveDocs.count() < prevLiveDocs.count()) {
|
||||
// This means this segment received new deletes
|
||||
// since we started the merge, so we
|
||||
// must merge them:
|
||||
for(int j=0;j<docCount;j++) {
|
||||
if (!prevLiveDocs.get(j))
|
||||
if (!prevLiveDocs.get(j)) {
|
||||
assert !currentLiveDocs.get(j);
|
||||
else {
|
||||
} else {
|
||||
if (!currentLiveDocs.get(j)) {
|
||||
mergedReader.doDelete(docUpto);
|
||||
delCount++;
|
||||
|
@ -2963,9 +3043,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
docUpto += docCount - previousReader.numDeletedDocs();
|
||||
assert currentLiveDocs.count() == prevLiveDocs.count(): "currentLiveDocs.count()==" + currentLiveDocs.count() + " vs prevLiveDocs.count()=" + prevLiveDocs.count() + " info=" + info;
|
||||
docUpto += currentLiveDocs.count();
|
||||
}
|
||||
} else if (currentReader.hasDeletions()) {
|
||||
} else if (currentLiveDocs != null) {
|
||||
// This segment had no deletes before but now it
|
||||
// does:
|
||||
for(int j=0; j<docCount; j++) {
|
||||
|
@ -2975,9 +3056,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
docUpto++;
|
||||
}
|
||||
} else
|
||||
} else {
|
||||
// No deletes before or after
|
||||
docUpto += info.docCount;
|
||||
}
|
||||
}
|
||||
|
||||
assert mergedReader.numDeletedDocs() == delCount;
|
||||
|
@ -3373,13 +3455,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
private final synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException {
|
||||
final int numSegments = merge.readers.size();
|
||||
Throwable th = null;
|
||||
|
||||
|
||||
boolean anyChanges = false;
|
||||
boolean drop = !suppressExceptions;
|
||||
|
||||
for (int i = 0; i < numSegments; i++) {
|
||||
if (merge.readers.get(i) != null) {
|
||||
try {
|
||||
anyChanges |= readerPool.release(merge.readers.get(i), drop);
|
||||
anyChanges |= readerPool.release(merge.readers.get(i), drop, IOContext.Context.MERGE);
|
||||
} catch (Throwable t) {
|
||||
if (th == null) {
|
||||
th = t;
|
||||
|
@ -3387,20 +3470,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
merge.readers.set(i, null);
|
||||
}
|
||||
|
||||
if (i < merge.readerClones.size() && merge.readerClones.get(i) != null) {
|
||||
try {
|
||||
merge.readerClones.get(i).close();
|
||||
} catch (Throwable t) {
|
||||
if (th == null) {
|
||||
th = t;
|
||||
}
|
||||
}
|
||||
// This was a private clone and we had the
|
||||
// only reference
|
||||
assert merge.readerClones.get(i).getRefCount() == 0: "refCount should be 0 but is " + merge.readerClones.get(i).getRefCount();
|
||||
merge.readerClones.set(i, null);
|
||||
}
|
||||
}
|
||||
|
||||
if (suppressExceptions && anyChanges) {
|
||||
|
@ -3416,6 +3485,27 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
}
|
||||
|
||||
private synchronized BitVector getLiveDocsClone(SegmentInfo info, SegmentReader other) throws IOException {
|
||||
final SegmentReader delReader = readerPool.getIfExists(info, IOContext.Context.READ);
|
||||
BitVector liveDocs;
|
||||
if (delReader != null) {
|
||||
liveDocs = (BitVector) delReader.getLiveDocs();
|
||||
readerPool.release(delReader, false, IOContext.Context.READ);
|
||||
if (liveDocs != null) {
|
||||
// We clone the del docs because other
|
||||
// deletes may come in while we're merging. We
|
||||
// need frozen deletes while merging, and then
|
||||
// we carry over any new deletions in
|
||||
// commitMergedDeletes.
|
||||
liveDocs = (BitVector) liveDocs.clone();
|
||||
}
|
||||
} else {
|
||||
liveDocs = (BitVector) other.getLiveDocs();
|
||||
}
|
||||
|
||||
return liveDocs;
|
||||
}
|
||||
|
||||
/** Does the actual (time-consuming) work of the merge,
|
||||
* but without holding synchronized lock on IndexWriter
|
||||
* instance */
|
||||
|
@ -3440,7 +3530,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
|
||||
merge.readers = new ArrayList<SegmentReader>();
|
||||
merge.readerClones = new ArrayList<SegmentReader>();
|
||||
merge.readerLiveDocs = new ArrayList<BitVector>();
|
||||
|
||||
// This is try/finally to make sure merger's readers are
|
||||
// closed:
|
||||
boolean success = false;
|
||||
|
@ -3453,20 +3544,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
|
||||
// Hold onto the "live" reader; we will use this to
|
||||
// commit merged deletes
|
||||
final SegmentReader reader = readerPool.get(info, true,
|
||||
context,
|
||||
-config.getReaderTermsIndexDivisor());
|
||||
final SegmentReader reader = readerPool.get(info, context);
|
||||
|
||||
// Carefully pull the most recent live docs:
|
||||
final BitVector liveDocs = getLiveDocsClone(info, reader);
|
||||
|
||||
merge.readerLiveDocs.add(liveDocs);
|
||||
merge.readers.add(reader);
|
||||
|
||||
// We clone the segment readers because other
|
||||
// deletes may come in while we're merging so we
|
||||
// need readers that will not change
|
||||
final SegmentReader clone = (SegmentReader) reader.clone(true);
|
||||
merge.readerClones.add(clone);
|
||||
|
||||
if (clone.numDocs() > 0) {
|
||||
merger.add(clone);
|
||||
totDocCount += clone.numDocs();
|
||||
if (liveDocs == null || liveDocs.count() > 0) {
|
||||
merger.add(reader, liveDocs);
|
||||
totDocCount += liveDocs == null ? reader.maxDoc() : liveDocs.count();
|
||||
}
|
||||
segUpto++;
|
||||
}
|
||||
|
@ -3562,25 +3650,24 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
|
||||
final IndexReaderWarmer mergedSegmentWarmer = config.getMergedSegmentWarmer();
|
||||
final int termsIndexDivisor;
|
||||
final boolean loadDocStores;
|
||||
|
||||
if (mergedSegmentWarmer != null) {
|
||||
// Load terms index & doc stores so the segment
|
||||
// warmer can run searches, load documents/term
|
||||
// vectors
|
||||
termsIndexDivisor = config.getReaderTermsIndexDivisor();
|
||||
loadDocStores = true;
|
||||
} else {
|
||||
termsIndexDivisor = -1;
|
||||
loadDocStores = false;
|
||||
}
|
||||
|
||||
// TODO: in the non-realtime case, we may want to only
|
||||
// keep deletes (it's costly to open entire reader
|
||||
// when we just need deletes)
|
||||
|
||||
final SegmentReader mergedReader = readerPool.get(merge.info, loadDocStores, context, termsIndexDivisor);
|
||||
final boolean loadDocStores;
|
||||
if (mergedSegmentWarmer != null) {
|
||||
// Load terms index & doc stores so the segment
|
||||
// warmer can run searches, load documents/term
|
||||
// vectors
|
||||
loadDocStores = true;
|
||||
} else {
|
||||
loadDocStores = false;
|
||||
}
|
||||
|
||||
// Force READ context because we merge deletes onto
|
||||
// this reader:
|
||||
final SegmentReader mergedReader = readerPool.get(merge.info, loadDocStores, new IOContext(IOContext.Context.READ));
|
||||
try {
|
||||
if (poolReaders && mergedSegmentWarmer != null) {
|
||||
mergedSegmentWarmer.warm(mergedReader);
|
||||
|
@ -3592,7 +3679,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
} finally {
|
||||
synchronized(this) {
|
||||
if (readerPool.release(mergedReader)) {
|
||||
if (readerPool.release(mergedReader, IOContext.Context.READ)) {
|
||||
// Must checkpoint after releasing the
|
||||
// mergedReader since it may have written a new
|
||||
// deletes file:
|
||||
|
@ -3667,7 +3754,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
}
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
readerPool.release(reader);
|
||||
readerPool.release(reader, false);
|
||||
}
|
||||
}
|
||||
return buffer.toString();
|
||||
|
|
|
@ -17,15 +17,16 @@ package org.apache.lucene.index;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.MergeInfo;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.apache.lucene.util.BitVector;
|
||||
import org.apache.lucene.util.SetOnce.AlreadySetException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
|
||||
/**
|
||||
* <p>Expert: a MergePolicy determines the sequence of
|
||||
|
@ -75,7 +76,7 @@ public abstract class MergePolicy implements java.io.Closeable {
|
|||
int maxNumSegmentsOptimize; // used by IndexWriter
|
||||
public long estimatedMergeBytes; // used by IndexWriter
|
||||
List<SegmentReader> readers; // used by IndexWriter
|
||||
List<SegmentReader> readerClones; // used by IndexWriter
|
||||
List<BitVector> readerLiveDocs; // used by IndexWriter
|
||||
public final List<SegmentInfo> segments;
|
||||
public final int totalDocCount;
|
||||
boolean aborted;
|
||||
|
|
|
@ -174,14 +174,6 @@ final class PerFieldCodecWrapper extends Codec {
|
|||
public void close() throws IOException {
|
||||
IOUtils.closeSafely(false, codecs.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadTermsIndex(int indexDivisor) throws IOException {
|
||||
Iterator<FieldsProducer> it = codecs.values().iterator();
|
||||
while (it.hasNext()) {
|
||||
it.next().loadTermsIndex(indexDivisor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -54,7 +54,7 @@ final class SegmentMerger {
|
|||
private String segment;
|
||||
private int termIndexInterval = IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL;
|
||||
|
||||
private List<IndexReader> readers = new ArrayList<IndexReader>();
|
||||
private List<MergeState.IndexReaderAndLiveDocs> readers = new ArrayList<MergeState.IndexReaderAndLiveDocs>();
|
||||
private final FieldInfos fieldInfos;
|
||||
|
||||
private int mergedDocs;
|
||||
|
@ -100,7 +100,21 @@ final class SegmentMerger {
|
|||
* @param reader
|
||||
*/
|
||||
final void add(IndexReader reader) {
|
||||
ReaderUtil.gatherSubReaders(readers, reader);
|
||||
try {
|
||||
new ReaderUtil.Gather(reader) {
|
||||
@Override
|
||||
protected void add(int base, IndexReader r) {
|
||||
readers.add(new MergeState.IndexReaderAndLiveDocs(r, r.getLiveDocs()));
|
||||
}
|
||||
}.run();
|
||||
} catch (IOException ioe) {
|
||||
// won't happen
|
||||
throw new RuntimeException(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
final void add(SegmentReader reader, Bits liveDocs) {
|
||||
readers.add(new MergeState.IndexReaderAndLiveDocs(reader, liveDocs));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -122,8 +136,9 @@ final class SegmentMerger {
|
|||
mergePerDoc();
|
||||
mergeNorms();
|
||||
|
||||
if (fieldInfos.hasVectors())
|
||||
if (fieldInfos.hasVectors()) {
|
||||
mergeVectors();
|
||||
}
|
||||
return mergedDocs;
|
||||
}
|
||||
|
||||
|
@ -188,9 +203,9 @@ final class SegmentMerger {
|
|||
// FieldInfos, then we can do a bulk copy of the
|
||||
// stored fields:
|
||||
for (int i = 0; i < numReaders; i++) {
|
||||
IndexReader reader = readers.get(i);
|
||||
if (reader instanceof SegmentReader) {
|
||||
SegmentReader segmentReader = (SegmentReader) reader;
|
||||
MergeState.IndexReaderAndLiveDocs reader = readers.get(i);
|
||||
if (reader.reader instanceof SegmentReader) {
|
||||
SegmentReader segmentReader = (SegmentReader) reader.reader;
|
||||
boolean same = true;
|
||||
FieldInfos segmentFieldInfos = segmentReader.fieldInfos();
|
||||
for (FieldInfo fi : segmentFieldInfos) {
|
||||
|
@ -215,7 +230,8 @@ final class SegmentMerger {
|
|||
* @throws IOException if there is a low-level IO error
|
||||
*/
|
||||
private int mergeFields() throws CorruptIndexException, IOException {
|
||||
for (IndexReader reader : readers) {
|
||||
for (MergeState.IndexReaderAndLiveDocs readerAndLiveDocs : readers) {
|
||||
final IndexReader reader = readerAndLiveDocs.reader;
|
||||
if (reader instanceof SegmentReader) {
|
||||
SegmentReader segmentReader = (SegmentReader) reader;
|
||||
FieldInfos readerFieldInfos = segmentReader.fieldInfos();
|
||||
|
@ -244,7 +260,7 @@ final class SegmentMerger {
|
|||
final FieldsWriter fieldsWriter = new FieldsWriter(directory, segment, context);
|
||||
try {
|
||||
int idx = 0;
|
||||
for (IndexReader reader : readers) {
|
||||
for (MergeState.IndexReaderAndLiveDocs reader : readers) {
|
||||
final SegmentReader matchingSegmentReader = matchingSegmentReaders[idx++];
|
||||
FieldsReader matchingFieldsReader = null;
|
||||
if (matchingSegmentReader != null) {
|
||||
|
@ -253,7 +269,7 @@ final class SegmentMerger {
|
|||
matchingFieldsReader = fieldsReader;
|
||||
}
|
||||
}
|
||||
if (reader.hasDeletions()) {
|
||||
if (reader.liveDocs != null) {
|
||||
docCount += copyFieldsWithDeletions(fieldsWriter,
|
||||
reader, matchingFieldsReader);
|
||||
} else {
|
||||
|
@ -280,12 +296,12 @@ final class SegmentMerger {
|
|||
return docCount;
|
||||
}
|
||||
|
||||
private int copyFieldsWithDeletions(final FieldsWriter fieldsWriter, final IndexReader reader,
|
||||
private int copyFieldsWithDeletions(final FieldsWriter fieldsWriter, final MergeState.IndexReaderAndLiveDocs reader,
|
||||
final FieldsReader matchingFieldsReader)
|
||||
throws IOException, MergeAbortedException, CorruptIndexException {
|
||||
int docCount = 0;
|
||||
final int maxDoc = reader.maxDoc();
|
||||
final Bits liveDocs = reader.getLiveDocs();
|
||||
final int maxDoc = reader.reader.maxDoc();
|
||||
final Bits liveDocs = reader.liveDocs;
|
||||
assert liveDocs != null;
|
||||
if (matchingFieldsReader != null) {
|
||||
// We can bulk-copy because the fieldInfos are "congruent"
|
||||
|
@ -321,7 +337,7 @@ final class SegmentMerger {
|
|||
}
|
||||
// NOTE: it's very important to first assign to doc then pass it to
|
||||
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
|
||||
Document doc = reader.document(j);
|
||||
Document doc = reader.reader.document(j);
|
||||
fieldsWriter.addDocument(doc, fieldInfos);
|
||||
docCount++;
|
||||
checkAbort.work(300);
|
||||
|
@ -330,10 +346,10 @@ final class SegmentMerger {
|
|||
return docCount;
|
||||
}
|
||||
|
||||
private int copyFieldsNoDeletions(final FieldsWriter fieldsWriter, final IndexReader reader,
|
||||
private int copyFieldsNoDeletions(final FieldsWriter fieldsWriter, final MergeState.IndexReaderAndLiveDocs reader,
|
||||
final FieldsReader matchingFieldsReader)
|
||||
throws IOException, MergeAbortedException, CorruptIndexException {
|
||||
final int maxDoc = reader.maxDoc();
|
||||
final int maxDoc = reader.reader.maxDoc();
|
||||
int docCount = 0;
|
||||
if (matchingFieldsReader != null) {
|
||||
// We can bulk-copy because the fieldInfos are "congruent"
|
||||
|
@ -348,7 +364,7 @@ final class SegmentMerger {
|
|||
for (; docCount < maxDoc; docCount++) {
|
||||
// NOTE: it's very important to first assign to doc then pass it to
|
||||
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
|
||||
Document doc = reader.document(docCount);
|
||||
Document doc = reader.reader.document(docCount);
|
||||
fieldsWriter.addDocument(doc, fieldInfos);
|
||||
checkAbort.work(300);
|
||||
}
|
||||
|
@ -361,12 +377,11 @@ final class SegmentMerger {
|
|||
* @throws IOException
|
||||
*/
|
||||
private final void mergeVectors() throws IOException {
|
||||
TermVectorsWriter termVectorsWriter =
|
||||
new TermVectorsWriter(directory, segment, fieldInfos, context);
|
||||
TermVectorsWriter termVectorsWriter = new TermVectorsWriter(directory, segment, fieldInfos, context);
|
||||
|
||||
try {
|
||||
int idx = 0;
|
||||
for (final IndexReader reader : readers) {
|
||||
for (final MergeState.IndexReaderAndLiveDocs reader : readers) {
|
||||
final SegmentReader matchingSegmentReader = matchingSegmentReaders[idx++];
|
||||
TermVectorsReader matchingVectorsReader = null;
|
||||
if (matchingSegmentReader != null) {
|
||||
|
@ -377,11 +392,10 @@ final class SegmentMerger {
|
|||
matchingVectorsReader = vectorsReader;
|
||||
}
|
||||
}
|
||||
if (reader.hasDeletions()) {
|
||||
if (reader.liveDocs != null) {
|
||||
copyVectorsWithDeletions(termVectorsWriter, matchingVectorsReader, reader);
|
||||
} else {
|
||||
copyVectorsNoDeletions(termVectorsWriter, matchingVectorsReader, reader);
|
||||
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -402,10 +416,10 @@ final class SegmentMerger {
|
|||
|
||||
private void copyVectorsWithDeletions(final TermVectorsWriter termVectorsWriter,
|
||||
final TermVectorsReader matchingVectorsReader,
|
||||
final IndexReader reader)
|
||||
final MergeState.IndexReaderAndLiveDocs reader)
|
||||
throws IOException, MergeAbortedException {
|
||||
final int maxDoc = reader.maxDoc();
|
||||
final Bits liveDocs = reader.getLiveDocs();
|
||||
final int maxDoc = reader.reader.maxDoc();
|
||||
final Bits liveDocs = reader.liveDocs;
|
||||
if (matchingVectorsReader != null) {
|
||||
// We can bulk-copy because the fieldInfos are "congruent"
|
||||
for (int docNum = 0; docNum < maxDoc;) {
|
||||
|
@ -440,7 +454,7 @@ final class SegmentMerger {
|
|||
|
||||
// NOTE: it's very important to first assign to vectors then pass it to
|
||||
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
|
||||
TermFreqVector[] vectors = reader.getTermFreqVectors(docNum);
|
||||
TermFreqVector[] vectors = reader.reader.getTermFreqVectors(docNum);
|
||||
termVectorsWriter.addAllDocVectors(vectors);
|
||||
checkAbort.work(300);
|
||||
}
|
||||
|
@ -449,9 +463,9 @@ final class SegmentMerger {
|
|||
|
||||
private void copyVectorsNoDeletions(final TermVectorsWriter termVectorsWriter,
|
||||
final TermVectorsReader matchingVectorsReader,
|
||||
final IndexReader reader)
|
||||
final MergeState.IndexReaderAndLiveDocs reader)
|
||||
throws IOException, MergeAbortedException {
|
||||
final int maxDoc = reader.maxDoc();
|
||||
final int maxDoc = reader.reader.maxDoc();
|
||||
if (matchingVectorsReader != null) {
|
||||
// We can bulk-copy because the fieldInfos are "congruent"
|
||||
int docCount = 0;
|
||||
|
@ -466,7 +480,7 @@ final class SegmentMerger {
|
|||
for (int docNum = 0; docNum < maxDoc; docNum++) {
|
||||
// NOTE: it's very important to first assign to vectors then pass it to
|
||||
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
|
||||
TermFreqVector[] vectors = reader.getTermFreqVectors(docNum);
|
||||
TermFreqVector[] vectors = reader.reader.getTermFreqVectors(docNum);
|
||||
termVectorsWriter.addAllDocVectors(vectors);
|
||||
checkAbort.work(300);
|
||||
}
|
||||
|
@ -487,23 +501,17 @@ final class SegmentMerger {
|
|||
|
||||
final List<Fields> fields = new ArrayList<Fields>();
|
||||
final List<ReaderUtil.Slice> slices = new ArrayList<ReaderUtil.Slice>();
|
||||
final List<Bits> bits = new ArrayList<Bits>();
|
||||
final List<Integer> bitsStarts = new ArrayList<Integer>();
|
||||
|
||||
for(IndexReader r : readers) {
|
||||
final Fields f = r.fields();
|
||||
final int maxDoc = r.maxDoc();
|
||||
for(MergeState.IndexReaderAndLiveDocs r : readers) {
|
||||
final Fields f = r.reader.fields();
|
||||
final int maxDoc = r.reader.maxDoc();
|
||||
if (f != null) {
|
||||
slices.add(new ReaderUtil.Slice(docBase, maxDoc, fields.size()));
|
||||
fields.add(f);
|
||||
bits.add(r.getLiveDocs());
|
||||
bitsStarts.add(docBase);
|
||||
}
|
||||
docBase += maxDoc;
|
||||
}
|
||||
|
||||
bitsStarts.add(docBase);
|
||||
|
||||
// we may gather more readers than mergeState.readerCount
|
||||
mergeState = new MergeState();
|
||||
mergeState.readers = readers;
|
||||
|
@ -524,31 +532,32 @@ final class SegmentMerger {
|
|||
|
||||
for(int i=0;i<mergeState.readerCount;i++) {
|
||||
|
||||
final IndexReader reader = readers.get(i);
|
||||
final MergeState.IndexReaderAndLiveDocs reader = readers.get(i);
|
||||
|
||||
mergeState.docBase[i] = docBase;
|
||||
docBase += reader.numDocs();
|
||||
inputDocBase += reader.maxDoc();
|
||||
if (reader.hasDeletions()) {
|
||||
inputDocBase += reader.reader.maxDoc();
|
||||
final int maxDoc = reader.reader.maxDoc();
|
||||
if (reader.liveDocs != null) {
|
||||
int delCount = 0;
|
||||
final Bits liveDocs = reader.getLiveDocs();
|
||||
final Bits liveDocs = reader.liveDocs;
|
||||
assert liveDocs != null;
|
||||
final int maxDoc = reader.maxDoc();
|
||||
final int[] docMap = mergeState.docMaps[i] = new int[maxDoc];
|
||||
int newDocID = 0;
|
||||
for(int j=0;j<maxDoc;j++) {
|
||||
if (!liveDocs.get(j)) {
|
||||
docMap[j] = -1;
|
||||
delCount++; // only for assert
|
||||
delCount++;
|
||||
} else {
|
||||
docMap[j] = newDocID++;
|
||||
}
|
||||
}
|
||||
assert delCount == reader.numDeletedDocs(): "reader delCount=" + reader.numDeletedDocs() + " vs recomputed delCount=" + delCount;
|
||||
docBase += maxDoc - delCount;
|
||||
} else {
|
||||
docBase += maxDoc;
|
||||
}
|
||||
|
||||
if (payloadProcessorProvider != null) {
|
||||
mergeState.dirPayloadProcessor[i] = payloadProcessorProvider.getDirProcessor(reader.directory());
|
||||
mergeState.dirPayloadProcessor[i] = payloadProcessorProvider.getDirProcessor(reader.reader.directory());
|
||||
}
|
||||
}
|
||||
codec = segmentWriteState.segmentCodecs.codec();
|
||||
|
@ -565,22 +574,17 @@ final class SegmentMerger {
|
|||
private void mergePerDoc() throws IOException {
|
||||
final List<PerDocValues> perDocProducers = new ArrayList<PerDocValues>();
|
||||
final List<ReaderUtil.Slice> perDocSlices = new ArrayList<ReaderUtil.Slice>();
|
||||
final List<Bits> perDocBits = new ArrayList<Bits>();
|
||||
final List<Integer> perDocBitsStarts = new ArrayList<Integer>();
|
||||
int docBase = 0;
|
||||
for (IndexReader r : readers) {
|
||||
final int maxDoc = r.maxDoc();
|
||||
final PerDocValues producer = r.perDocValues();
|
||||
for (MergeState.IndexReaderAndLiveDocs r : readers) {
|
||||
final int maxDoc = r.reader.maxDoc();
|
||||
final PerDocValues producer = r.reader.perDocValues();
|
||||
if (producer != null) {
|
||||
perDocSlices.add(new ReaderUtil.Slice(docBase, maxDoc, perDocProducers
|
||||
.size()));
|
||||
perDocProducers.add(producer);
|
||||
perDocBits.add(r.getLiveDocs());
|
||||
perDocBitsStarts.add(docBase);
|
||||
}
|
||||
docBase += maxDoc;
|
||||
}
|
||||
perDocBitsStarts.add(docBase);
|
||||
if (!perDocSlices.isEmpty()) {
|
||||
final PerDocConsumer docsConsumer = codec
|
||||
.docsConsumer(new PerDocWriteState(segmentWriteState));
|
||||
|
@ -616,22 +620,22 @@ final class SegmentMerger {
|
|||
output = directory.createOutput(IndexFileNames.segmentFileName(segment, "", IndexFileNames.NORMS_EXTENSION), context);
|
||||
output.writeBytes(SegmentNorms.NORMS_HEADER, SegmentNorms.NORMS_HEADER.length);
|
||||
}
|
||||
for (IndexReader reader : readers) {
|
||||
final int maxDoc = reader.maxDoc();
|
||||
byte normBuffer[] = reader.norms(fi.name);
|
||||
for (MergeState.IndexReaderAndLiveDocs reader : readers) {
|
||||
final int maxDoc = reader.reader.maxDoc();
|
||||
byte normBuffer[] = reader.reader.norms(fi.name);
|
||||
if (normBuffer == null) {
|
||||
// Can be null if this segment doesn't have
|
||||
// any docs with this field
|
||||
normBuffer = new byte[maxDoc];
|
||||
Arrays.fill(normBuffer, (byte)0);
|
||||
}
|
||||
if (!reader.hasDeletions()) {
|
||||
if (reader.liveDocs == null) {
|
||||
//optimized case for segments without deleted docs
|
||||
output.writeBytes(normBuffer, maxDoc);
|
||||
} else {
|
||||
// this segment has deleted docs, so we have to
|
||||
// check for every doc if it is deleted or not
|
||||
final Bits liveDocs = reader.getLiveDocs();
|
||||
final Bits liveDocs = reader.liveDocs;
|
||||
for (int k = 0; k < maxDoc; k++) {
|
||||
if (liveDocs.get(k)) {
|
||||
output.writeByte(normBuffer[k]);
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.lucene.document.Document;
|
|||
import org.apache.lucene.document.FieldSelector;
|
||||
import org.apache.lucene.index.FieldInfo.IndexOptions;
|
||||
import org.apache.lucene.index.codecs.PerDocValues;
|
||||
import org.apache.lucene.store.BufferedIndexInput;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
|
@ -161,9 +160,6 @@ public class SegmentReader extends IndexReader implements Cloneable {
|
|||
// NOTE: the bitvector is stored using the regular directory, not cfs
|
||||
if (hasDeletions(si)) {
|
||||
liveDocs = new BitVector(directory(), si.getDelFileName(), new IOContext(context, true));
|
||||
if (liveDocs.getVersion() < BitVector.VERSION_DGAPS_CLEARED) {
|
||||
liveDocs.invertAll();
|
||||
}
|
||||
liveDocsRef = new AtomicInteger(1);
|
||||
assert checkLiveCounts();
|
||||
if (liveDocs.size() != si.docCount) {
|
||||
|
@ -637,15 +633,6 @@ public class SegmentReader extends IndexReader implements Cloneable {
|
|||
}
|
||||
}
|
||||
|
||||
// NOTE: only called from IndexWriter when a near
|
||||
// real-time reader is opened, or applyDeletes is run,
|
||||
// sharing a segment that's still being merged. This
|
||||
// method is not thread safe, and relies on the
|
||||
// synchronization in IndexWriter
|
||||
void loadTermsIndex(int indexDivisor) throws IOException {
|
||||
core.fields.loadTermsIndex(indexDivisor);
|
||||
}
|
||||
|
||||
// for testing only
|
||||
boolean normsClosed() {
|
||||
if (singleNormStream != null) {
|
||||
|
|
|
@ -164,11 +164,6 @@ public class BlockTermsReader extends FieldsProducer {
|
|||
input.seek(dirOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadTermsIndex(int indexDivisor) throws IOException {
|
||||
indexReader.loadTermsIndex(indexDivisor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
|
|
|
@ -103,21 +103,20 @@ public abstract class DocValuesConsumer {
|
|||
// TODO we need some kind of compatibility notation for values such
|
||||
// that two slightly different segments can be merged eg. fixed vs.
|
||||
// variable byte len or float32 vs. float64
|
||||
int docBase = 0;
|
||||
boolean merged = false;
|
||||
/*
|
||||
* We ignore the given DocValues here and merge from the subReaders directly
|
||||
* to support bulk copies on the DocValues Writer level. if this gets merged
|
||||
* with MultiDocValues the writer can not optimize for bulk-copyable data
|
||||
*/
|
||||
for (final IndexReader reader : mergeState.readers) {
|
||||
final IndexDocValues r = reader.docValues(mergeState.fieldInfo.name);
|
||||
for(int readerIDX=0;readerIDX<mergeState.readers.size();readerIDX++) {
|
||||
final org.apache.lucene.index.codecs.MergeState.IndexReaderAndLiveDocs reader = mergeState.readers.get(readerIDX);
|
||||
final IndexDocValues r = reader.reader.docValues(mergeState.fieldInfo.name);
|
||||
if (r != null) {
|
||||
merged = true;
|
||||
merge(new Writer.MergeState(r, docBase, reader.maxDoc(),
|
||||
reader.getLiveDocs()));
|
||||
merge(new Writer.MergeState(r, mergeState.docBase[readerIDX], reader.reader.maxDoc(),
|
||||
reader.liveDocs));
|
||||
}
|
||||
docBase += reader.numDocs();
|
||||
}
|
||||
if (merged) {
|
||||
finish(mergeState.mergedDocCount);
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.lucene.index.Terms;
|
|||
|
||||
public abstract class FieldsProducer extends Fields implements Closeable {
|
||||
public abstract void close() throws IOException;
|
||||
public abstract void loadTermsIndex(int indexDivisor) throws IOException;
|
||||
|
||||
public static final FieldsProducer EMPTY = new FieldsProducer() {
|
||||
|
||||
|
@ -47,12 +46,7 @@ public abstract class FieldsProducer extends Fields implements Closeable {
|
|||
public FieldsEnum iterator() throws IOException {
|
||||
return FieldsEnum.EMPTY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadTermsIndex(int indexDivisor) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.lucene.index.codecs;
|
|||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IOContext.Context;
|
||||
import org.apache.lucene.index.FieldInfos;
|
||||
import org.apache.lucene.index.FieldInfo;
|
||||
import org.apache.lucene.index.SegmentInfo;
|
||||
|
@ -31,7 +30,6 @@ import org.apache.lucene.util.PagedBytes;
|
|||
import org.apache.lucene.util.packed.PackedInts;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.io.IOException;
|
||||
|
@ -75,6 +73,8 @@ public class FixedGapTermsIndexReader extends TermsIndexReaderBase {
|
|||
|
||||
this.termComp = termComp;
|
||||
|
||||
assert indexDivisor == -1 || indexDivisor > 0;
|
||||
|
||||
in = dir.openInput(IndexFileNames.segmentFileName(segment, codecId, FixedGapTermsIndexWriter.TERMS_INDEX_EXTENSION), context);
|
||||
|
||||
boolean success = false;
|
||||
|
@ -251,7 +251,7 @@ public class FixedGapTermsIndexReader extends TermsIndexReaderBase {
|
|||
}
|
||||
}
|
||||
|
||||
public void loadTermsIndex() throws IOException {
|
||||
private void loadTermsIndex() throws IOException {
|
||||
if (coreIndex == null) {
|
||||
coreIndex = new CoreFieldIndex(indexStart, termsStart, packedIndexStart, packedOffsetsStart, numIndexTerms);
|
||||
}
|
||||
|
@ -375,29 +375,6 @@ public class FixedGapTermsIndexReader extends TermsIndexReaderBase {
|
|||
}
|
||||
}
|
||||
|
||||
// Externally synced in IndexWriter
|
||||
@Override
|
||||
public void loadTermsIndex(int indexDivisor) throws IOException {
|
||||
if (!indexLoaded) {
|
||||
|
||||
if (indexDivisor < 0) {
|
||||
this.indexDivisor = -indexDivisor;
|
||||
} else {
|
||||
this.indexDivisor = indexDivisor;
|
||||
}
|
||||
this.totalIndexInterval = indexInterval * this.indexDivisor;
|
||||
|
||||
Iterator<FieldIndexData> it = fields.values().iterator();
|
||||
while(it.hasNext()) {
|
||||
it.next().loadTermsIndex();
|
||||
}
|
||||
|
||||
indexLoaded = true;
|
||||
in.close();
|
||||
termBytesReader = termBytes.freeze(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FieldIndexEnum getFieldEnum(FieldInfo fieldInfo) {
|
||||
final FieldIndexData fieldData = fields.get(fieldInfo);
|
||||
|
|
|
@ -26,13 +26,25 @@ import org.apache.lucene.index.MergePolicy;
|
|||
import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
|
||||
import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.Bits;
|
||||
|
||||
/** Holds common state used during segment merging
|
||||
*
|
||||
* @lucene.experimental */
|
||||
public class MergeState {
|
||||
|
||||
public static class IndexReaderAndLiveDocs {
|
||||
public final IndexReader reader;
|
||||
public final Bits liveDocs;
|
||||
|
||||
public IndexReaderAndLiveDocs(IndexReader reader, Bits liveDocs) {
|
||||
this.reader = reader;
|
||||
this.liveDocs = liveDocs;
|
||||
}
|
||||
}
|
||||
|
||||
public FieldInfos fieldInfos;
|
||||
public List<IndexReader> readers; // Readers being merged
|
||||
public List<IndexReaderAndLiveDocs> readers; // Readers & liveDocs being merged
|
||||
public int readerCount; // Number of readers being merged
|
||||
public int[][] docMaps; // Maps docIDs around deletions
|
||||
public int[] docBase; // New docID base per reader
|
||||
|
|
|
@ -43,8 +43,6 @@ public abstract class TermsIndexReaderBase implements Closeable {
|
|||
|
||||
public abstract FieldIndexEnum getFieldEnum(FieldInfo fieldInfo);
|
||||
|
||||
public abstract void loadTermsIndex(int indexDivisor) throws IOException;
|
||||
|
||||
public abstract void close() throws IOException;
|
||||
|
||||
public abstract void getExtensions(Collection<String> extensions);
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.io.OutputStreamWriter; // for toDot
|
|||
import java.io.Writer; // for toDot
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.lucene.index.FieldInfo;
|
||||
import org.apache.lucene.index.FieldInfos;
|
||||
|
@ -63,6 +62,7 @@ public class VariableGapTermsIndexReader extends TermsIndexReaderBase {
|
|||
in = dir.openInput(IndexFileNames.segmentFileName(segment, codecId, VariableGapTermsIndexWriter.TERMS_INDEX_EXTENSION), new IOContext(context, true));
|
||||
this.segment = segment;
|
||||
boolean success = false;
|
||||
assert indexDivisor == -1 || indexDivisor > 0;
|
||||
|
||||
try {
|
||||
|
||||
|
@ -170,7 +170,7 @@ public class VariableGapTermsIndexReader extends TermsIndexReaderBase {
|
|||
}
|
||||
}
|
||||
|
||||
public void loadTermsIndex() throws IOException {
|
||||
private void loadTermsIndex() throws IOException {
|
||||
if (fst == null) {
|
||||
IndexInput clone = (IndexInput) in.clone();
|
||||
clone.seek(indexStart);
|
||||
|
@ -205,27 +205,6 @@ public class VariableGapTermsIndexReader extends TermsIndexReaderBase {
|
|||
}
|
||||
}
|
||||
|
||||
// Externally synced in IndexWriter
|
||||
@Override
|
||||
public void loadTermsIndex(int indexDivisor) throws IOException {
|
||||
if (!indexLoaded) {
|
||||
|
||||
if (indexDivisor < 0) {
|
||||
this.indexDivisor = -indexDivisor;
|
||||
} else {
|
||||
this.indexDivisor = indexDivisor;
|
||||
}
|
||||
|
||||
Iterator<FieldIndexData> it = fields.values().iterator();
|
||||
while(it.hasNext()) {
|
||||
it.next().loadTermsIndex();
|
||||
}
|
||||
|
||||
indexLoaded = true;
|
||||
in.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FieldIndexEnum getFieldEnum(FieldInfo fieldInfo) {
|
||||
final FieldIndexData fieldData = fields.get(fieldInfo);
|
||||
|
|
|
@ -766,11 +766,6 @@ public class MemoryCodec extends Codec {
|
|||
return fields.get(field);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadTermsIndex(int indexDivisor) {
|
||||
// no op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// Drop ref to FST:
|
||||
|
|
|
@ -170,30 +170,6 @@ public class PreFlexFields extends FieldsProducer {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
synchronized public void loadTermsIndex(int indexDivisor) throws IOException {
|
||||
if (tis == null) {
|
||||
Directory dir0;
|
||||
if (si.getUseCompoundFile()) {
|
||||
// In some cases, we were originally opened when CFS
|
||||
// was not used, but then we are asked to open the
|
||||
// terms reader with index, the segment has switched
|
||||
// to CFS
|
||||
|
||||
if (!(dir instanceof CompoundFileDirectory)) {
|
||||
dir0 = cfsReader = dir.openCompoundInput(IndexFileNames.segmentFileName(si.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION), context);
|
||||
} else {
|
||||
dir0 = dir;
|
||||
}
|
||||
dir0 = cfsReader;
|
||||
} else {
|
||||
dir0 = dir;
|
||||
}
|
||||
|
||||
tis = new TermInfosReader(dir0, si.name, fieldInfos, context, indexDivisor);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (tis != null) {
|
||||
|
|
|
@ -592,10 +592,6 @@ class SimpleTextFieldsReader extends FieldsProducer {
|
|||
return terms;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadTermsIndex(int indexDivisor) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
in.close();
|
||||
|
|
|
@ -353,6 +353,11 @@ public final class BitVector implements Cloneable, Bits {
|
|||
} else {
|
||||
readBits(input);
|
||||
}
|
||||
|
||||
if (version < VERSION_DGAPS_CLEARED) {
|
||||
invertAll();
|
||||
}
|
||||
|
||||
assert verifyCount();
|
||||
} finally {
|
||||
input.close();
|
||||
|
|
|
@ -272,14 +272,18 @@ public class MockDirectoryWrapper extends Directory {
|
|||
}
|
||||
|
||||
void maybeThrowIOException() throws IOException {
|
||||
maybeThrowIOException(null);
|
||||
}
|
||||
|
||||
void maybeThrowIOException(String message) throws IOException {
|
||||
if (randomIOExceptionRate > 0.0) {
|
||||
int number = Math.abs(randomState.nextInt() % 1000);
|
||||
if (number < randomIOExceptionRate*1000) {
|
||||
if (LuceneTestCase.VERBOSE) {
|
||||
System.out.println(Thread.currentThread().getName() + ": MockDirectoryWrapper: now throw random exception");
|
||||
System.out.println(Thread.currentThread().getName() + ": MockDirectoryWrapper: now throw random exception" + (message == null ? "" : " (" + message + ")"));
|
||||
new Throwable().printStackTrace(System.out);
|
||||
}
|
||||
throw new IOException("a random IOException");
|
||||
throw new IOException("a random IOException" + (message == null ? "" : "(" + message + ")"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -147,4 +147,10 @@ public class MockIndexInputWrapper extends IndexInput {
|
|||
public long readVLong() throws IOException {
|
||||
return delegate.readVLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MockIndexInputWrapper(" + delegate + ")";
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -126,7 +126,7 @@ public class MockIndexOutputWrapper extends IndexOutput {
|
|||
// Maybe throw random exception; only do this on first
|
||||
// write to a new file:
|
||||
first = false;
|
||||
dir.maybeThrowIOException();
|
||||
dir.maybeThrowIOException(name);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -156,4 +156,9 @@ public class MockIndexOutputWrapper extends IndexOutput {
|
|||
// TODO: we may need to check disk full here as well
|
||||
dir.maybeThrowDeterministicException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MockIndexOutputWrapper(" + delegate + ")";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,10 +92,6 @@ public class TestExternalCodecs extends LuceneTestCase {
|
|||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadTermsIndex(int indexDivisor) {
|
||||
}
|
||||
}
|
||||
|
||||
static class RAMField extends Terms {
|
||||
|
|
|
@ -551,7 +551,9 @@ public class TestIndexWriterDelete extends LuceneTestCase {
|
|||
if (!success) {
|
||||
// Must force the close else the writer can have
|
||||
// open files which cause exc in MockRAMDir.close
|
||||
|
||||
if (VERBOSE) {
|
||||
System.out.println("TEST: now rollback");
|
||||
}
|
||||
modifier.rollback();
|
||||
}
|
||||
|
||||
|
|
|
@ -90,6 +90,7 @@ public class TestRollingUpdates extends LuceneTestCase {
|
|||
for (int r = 0; r < 3; r++) {
|
||||
final IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(
|
||||
TEST_VERSION_CURRENT, new MockAnalyzer(random)).setMaxBufferedDocs(2));
|
||||
w.setInfoStream(VERBOSE ? System.out : null);
|
||||
final int numUpdates = atLeast(20);
|
||||
int numThreads = _TestUtil.nextInt(random, 2, 6);
|
||||
IndexingThread[] threads = new IndexingThread[numThreads];
|
||||
|
|
Loading…
Reference in New Issue