LUCENE-2474: add expert ReaderFinishedListener API, for external caches to evict entries for readers

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1063498 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2011-01-25 22:42:37 +00:00
parent fcf6e305b4
commit 323ea4134d
12 changed files with 205 additions and 33 deletions

View File

@ -813,6 +813,11 @@ New features
FieldInvertState so that it can be used in Similarity.computeNorm.
(Robert Muir)
* LUCENE-2474: Added expert ReaderFinishedListener API to
IndexReader, to allow apps that maintain external per-segment caches
to evict entries when a segment is finished. (Shay Banon, Yonik
Seeley, Mike McCandless)
Optimizations
* LUCENE-2494: Use CompletionService in ParallelMultiSearcher instead of

View File

@ -19,6 +19,7 @@ package org.apache.lucene.store.instantiated;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -52,6 +53,7 @@ public class InstantiatedIndexReader extends IndexReader {
public InstantiatedIndexReader(InstantiatedIndex index) {
super();
this.index = index;
readerFinishedListeners = Collections.synchronizedSet(new HashSet<ReaderFinishedListener>());
}
/**

View File

@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@ -758,6 +759,7 @@ public class MemoryIndex implements Serializable {
private MemoryIndexReader() {
super(); // avoid as much superclass baggage as possible
readerFinishedListeners = Collections.synchronizedSet(new HashSet<ReaderFinishedListener>());
}
private Info getInfo(String fieldName) {

View File

@ -37,8 +37,6 @@ import org.apache.lucene.index.codecs.CodecProvider;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close
/**
* An IndexReader which reads indexes with multiple segments.
*/
@ -106,6 +104,7 @@ class DirectoryReader extends IndexReader implements Cloneable {
} else {
this.codecs = codecs;
}
readerFinishedListeners = Collections.synchronizedSet(new HashSet<ReaderFinishedListener>());
// To reduce the chance of hitting FileNotFound
// (and having to retry), we open segments in
@ -117,6 +116,7 @@ class DirectoryReader extends IndexReader implements Cloneable {
boolean success = false;
try {
readers[i] = SegmentReader.get(readOnly, sis.info(i), termInfosIndexDivisor);
readers[i].readerFinishedListeners = readerFinishedListeners;
success = true;
} finally {
if (!success) {
@ -146,6 +146,7 @@ class DirectoryReader extends IndexReader implements Cloneable {
} else {
this.codecs = codecs;
}
readerFinishedListeners = writer.getReaderFinishedListeners();
// IndexWriter synchronizes externally before calling
// us, which ensures infos will not change; so there's
@ -160,6 +161,7 @@ class DirectoryReader extends IndexReader implements Cloneable {
final SegmentInfo info = infos.info(i);
assert info.dir == dir;
readers[i] = writer.readerPool.getReadOnlyClone(info, true, termInfosIndexDivisor);
readers[i].readerFinishedListeners = readerFinishedListeners;
success = true;
} finally {
if (!success) {
@ -182,11 +184,14 @@ class DirectoryReader extends IndexReader implements Cloneable {
/** This constructor is only used for {@link #reopen()} */
DirectoryReader(Directory directory, SegmentInfos infos, SegmentReader[] oldReaders, int[] oldStarts,
boolean readOnly, boolean doClone, int termInfosIndexDivisor, CodecProvider codecs) throws IOException {
boolean readOnly, boolean doClone, int termInfosIndexDivisor, CodecProvider codecs,
Collection<ReaderFinishedListener> readerFinishedListeners) throws IOException {
this.directory = directory;
this.readOnly = readOnly;
this.segmentInfos = infos;
this.termInfosIndexDivisor = termInfosIndexDivisor;
this.readerFinishedListeners = readerFinishedListeners;
if (codecs == null) {
this.codecs = CodecProvider.getDefault();
} else {
@ -232,8 +237,10 @@ class DirectoryReader extends IndexReader implements Cloneable {
// this is a new reader; in case we hit an exception we can close it safely
newReader = SegmentReader.get(readOnly, infos.info(i), termInfosIndexDivisor);
newReader.readerFinishedListeners = readerFinishedListeners;
} else {
newReader = newReaders[i].reopenSegment(infos.info(i), doClone, readOnly);
assert newReader.readerFinishedListeners == readerFinishedListeners;
}
if (newReader == newReaders[i]) {
// this reader will be shared between the old and the new one,
@ -357,6 +364,7 @@ class DirectoryReader extends IndexReader implements Cloneable {
writeLock = null;
hasChanges = false;
}
assert newReader.readerFinishedListeners != null;
return newReader;
}
@ -391,7 +399,9 @@ class DirectoryReader extends IndexReader implements Cloneable {
// TODO: right now we *always* make a new reader; in
// the future we could have write make some effort to
// detect that no changes have occurred
return writer.getReader();
IndexReader reader = writer.getReader();
reader.readerFinishedListeners = readerFinishedListeners;
return reader;
}
private IndexReader doReopen(final boolean openReadOnly, IndexCommit commit) throws CorruptIndexException, IOException {
@ -458,7 +468,7 @@ class DirectoryReader extends IndexReader implements Cloneable {
private synchronized DirectoryReader doReopen(SegmentInfos infos, boolean doClone, boolean openReadOnly) throws CorruptIndexException, IOException {
DirectoryReader reader;
reader = new DirectoryReader(directory, infos, subReaders, starts, openReadOnly, doClone, termInfosIndexDivisor, codecs);
reader = new DirectoryReader(directory, infos, subReaders, starts, openReadOnly, doClone, termInfosIndexDivisor, codecs, readerFinishedListeners);
return reader;
}
@ -808,11 +818,6 @@ class DirectoryReader extends IndexReader implements Cloneable {
}
}
// NOTE: only needed in case someone had asked for
// FieldCache for top-level reader (which is generally
// not a good idea):
FieldCache.DEFAULT.purge(this);
if (writer != null) {
// Since we just closed, writer may now be able to
// delete unused files:

View File

@ -22,13 +22,14 @@ import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.index.IndexReader.ReaderContext;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close
import org.apache.lucene.util.BytesRef;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Collections;
/** A <code>FilterIndexReader</code> contains another IndexReader, which it
* uses as its basic source of data, possibly transforming the data along the
@ -286,6 +287,7 @@ public class FilterIndexReader extends IndexReader {
public FilterIndexReader(IndexReader in) {
super();
this.in = in;
readerFinishedListeners = Collections.synchronizedSet(new HashSet<ReaderFinishedListener>());
}
@Override
@ -391,11 +393,6 @@ public class FilterIndexReader extends IndexReader {
@Override
protected void doClose() throws IOException {
in.close();
// NOTE: only needed in case someone had asked for
// FieldCache for top-level reader (which is generally
// not a good idea):
FieldCache.DEFAULT.purge(this);
}
@ -454,4 +451,16 @@ public class FilterIndexReader extends IndexReader {
buffer.append(')');
return buffer.toString();
}
@Override
public void addReaderFinishedListener(ReaderFinishedListener listener) {
super.addReaderFinishedListener(listener);
in.addReaderFinishedListener(listener);
}
@Override
public void removeReaderFinishedListener(ReaderFinishedListener listener) {
super.removeReaderFinishedListener(listener);
in.removeReaderFinishedListener(listener);
}
}

View File

@ -34,6 +34,7 @@ import java.io.IOException;
import java.io.Closeable;
import java.util.Collection;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@ -81,6 +82,65 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public abstract class IndexReader implements Cloneable,Closeable {
/**
* A custom listener that's invoked when the IndexReader
* is finished.
*
* <p>For a SegmentReader, this listener is called only
* once all SegmentReaders sharing the same core are
* closed. At this point it is safe for apps to evict
* this reader from any caches keyed on {@link
* #getCoreCacheKey}. This is the same interface that
* {@link FieldCache} uses, internally, to evict
* entries.</p>
*
* <p>For other readers, this listener is called when they
* are closed.</p>
*
* @lucene.experimental
*/
public static interface ReaderFinishedListener {
public void finished(IndexReader reader);
}
// Impls must set this if they may call add/removeReaderFinishedListener:
protected volatile Collection<ReaderFinishedListener> readerFinishedListeners;
/** Expert: adds a {@link ReaderFinishedListener}. The
* provided listener is also added to any sub-readers, if
* this is a composite reader. Also, any reader reopened
* or cloned from this one will also copy the listeners at
* the time of reopen.
*
* @lucene.experimental */
public void addReaderFinishedListener(ReaderFinishedListener listener) {
readerFinishedListeners.add(listener);
}
/** Expert: remove a previously added {@link ReaderFinishedListener}.
*
* @lucene.experimental */
public void removeReaderFinishedListener(ReaderFinishedListener listener) {
readerFinishedListeners.remove(listener);
}
protected void notifyReaderFinishedListeners() {
// Defensive (should never be null -- all impls must set
// this):
if (readerFinishedListeners != null) {
// Clone the set so that we don't have to sync on
// readerFinishedListeners while invoking them:
for(ReaderFinishedListener listener : new HashSet<ReaderFinishedListener>(readerFinishedListeners)) {
listener.finished(this);
}
}
}
protected void readerFinished() {
notifyReaderFinishedListeners();
}
/**
* Constants describing field properties, for example used for
* {@link IndexReader#getFieldNames(FieldOption)}.
@ -195,6 +255,7 @@ public abstract class IndexReader implements Cloneable,Closeable {
refCount.incrementAndGet();
}
}
readerFinished();
}
}

View File

@ -30,6 +30,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.Analyzer;
@ -365,6 +366,13 @@ public class IndexWriter implements Closeable {
return r;
}
// Used for all SegmentReaders we open
private final Collection<IndexReader.ReaderFinishedListener> readerFinishedListeners = Collections.synchronizedSet(new HashSet<IndexReader.ReaderFinishedListener>());
Collection<IndexReader.ReaderFinishedListener> getReaderFinishedListeners() throws IOException {
return readerFinishedListeners;
}
/** Holds shared SegmentReader instances. IndexWriter uses
* SegmentReaders for 1) applying deletes, 2) doing
* merges, 3) handing out a real-time reader. This pool
@ -574,6 +582,7 @@ public class IndexWriter implements Closeable {
// synchronized
// Returns a ref, which we xfer to readerMap:
sr = SegmentReader.get(false, info.dir, info, readBufferSize, doOpenStores, termsIndexDivisor);
sr.readerFinishedListeners = readerFinishedListeners;
if (info.dir == directory) {
// Only pool if reader is not external

View File

@ -20,10 +20,11 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.HashSet;
import java.util.Collections;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.ReaderUtil;
@ -82,6 +83,7 @@ public class MultiReader extends IndexReader implements Cloneable {
}
}
starts[subReaders.length] = maxDoc;
readerFinishedListeners = Collections.synchronizedSet(new HashSet<ReaderFinishedListener>());
return ReaderUtil.buildReaderContext(this);
}
@ -345,11 +347,6 @@ public class MultiReader extends IndexReader implements Cloneable {
subReaders[i].close();
}
}
// NOTE: only needed in case someone had asked for
// FieldCache for top-level reader (which is generally
// not a good idea):
FieldCache.DEFAULT.purge(this);
}
@Override
@ -389,4 +386,20 @@ public class MultiReader extends IndexReader implements Cloneable {
public ReaderContext getTopReaderContext() {
return topLevelContext;
}
@Override
public void addReaderFinishedListener(ReaderFinishedListener listener) {
super.addReaderFinishedListener(listener);
for(IndexReader sub : subReaders) {
sub.addReaderFinishedListener(listener);
}
}
@Override
public void removeReaderFinishedListener(ReaderFinishedListener listener) {
super.removeReaderFinishedListener(listener);
for(IndexReader sub : subReaders) {
sub.removeReaderFinishedListener(listener);
}
}
}

View File

@ -22,7 +22,6 @@ import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.document.FieldSelectorResult;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.util.Bits;
import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close
import org.apache.lucene.util.BytesRef;
import java.io.IOException;
@ -73,6 +72,7 @@ public class ParallelReader extends IndexReader {
public ParallelReader(boolean closeSubReaders) throws IOException {
super();
this.incRefReaders = !closeSubReaders;
readerFinishedListeners = Collections.synchronizedSet(new HashSet<ReaderFinishedListener>());
}
/** {@inheritDoc} */
@ -529,8 +529,6 @@ public class ParallelReader extends IndexReader {
readers.get(i).close();
}
}
FieldCache.DEFAULT.purge(this);
}
@Override
@ -548,6 +546,21 @@ public class ParallelReader extends IndexReader {
return topLevelReaderContext;
}
@Override
public void addReaderFinishedListener(ReaderFinishedListener listener) {
super.addReaderFinishedListener(listener);
for (IndexReader reader : readers) {
reader.addReaderFinishedListener(listener);
}
}
@Override
public void removeReaderFinishedListener(ReaderFinishedListener listener) {
super.removeReaderFinishedListener(listener);
for (IndexReader reader : readers) {
reader.removeReaderFinishedListener(listener);
}
}
}

View File

@ -38,7 +38,6 @@ import org.apache.lucene.util.BitVector;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.CloseableThreadLocal;
import org.apache.lucene.index.codecs.FieldsProducer;
import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close
import org.apache.lucene.util.BytesRef;
/**
@ -183,13 +182,9 @@ public class SegmentReader extends IndexReader implements Cloneable {
storeCFSReader.close();
}
// Force FieldCache to evict our entries at this
// point. If the exception occurred while
// initializing the core readers, then
// origInstance will be null, and we don't want
// to call FieldCache.purge (it leads to NPE):
// Now, notify any ReaderFinished listeners:
if (origInstance != null) {
FieldCache.DEFAULT.purge(origInstance);
origInstance.notifyReaderFinishedListeners();
}
}
}
@ -633,6 +628,7 @@ public class SegmentReader extends IndexReader implements Cloneable {
clone.si = si;
clone.readBufferSize = readBufferSize;
clone.pendingDeleteCount = pendingDeleteCount;
clone.readerFinishedListeners = readerFinishedListeners;
if (!openReadOnly && hasChanges) {
// My pending changes transfer to the new reader
@ -1203,4 +1199,14 @@ public class SegmentReader extends IndexReader implements Cloneable {
public int getTermInfosIndexDivisor() {
return core.termsIndexDivisor;
}
@Override
protected void readerFinished() {
// Do nothing here -- we have more careful control on
// when to notify that a SegmentReader has finished,
// because a given core is shared across many cloned
// SegmentReaders. We only notify once that core is no
// longer used (all SegmentReaders sharing it have been
// closed).
}
}

View File

@ -137,6 +137,13 @@ public class FieldCacheImpl implements FieldCache { // Made Public so that
public Object getValue() { return value; }
}
final static IndexReader.ReaderFinishedListener purgeReader = new IndexReader.ReaderFinishedListener() {
// @Override -- not until Java 1.6
public void finished(IndexReader reader) {
FieldCache.DEFAULT.purge(reader);
}
};
/** Expert: Internal cache. */
final static class Cache<T> {
Cache() {
@ -171,8 +178,10 @@ public class FieldCacheImpl implements FieldCache { // Made Public so that
synchronized (readerCache) {
innerCache = readerCache.get(readerKey);
if (innerCache == null) {
// First time this reader is using FieldCache
innerCache = new HashMap<Entry<T>,Object>();
readerCache.put(readerKey, innerCache);
reader.addReaderFinishedListener(purgeReader);
value = null;
} else {
value = innerCache.get(key);

View File

@ -1905,4 +1905,42 @@ public class TestIndexReader extends LuceneTestCase
dir.close();
}
}
// LUCENE-2474
public void testReaderFinishedListener() throws Exception {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer()));
((LogMergePolicy) writer.getConfig().getMergePolicy()).setMergeFactor(3);
writer.setInfoStream(VERBOSE ? System.out : null);
writer.addDocument(new Document());
writer.commit();
writer.addDocument(new Document());
writer.commit();
final IndexReader reader = writer.getReader();
final int[] closeCount = new int[1];
final IndexReader.ReaderFinishedListener listener = new IndexReader.ReaderFinishedListener() {
public void finished(IndexReader reader) {
closeCount[0]++;
}
};
reader.addReaderFinishedListener(listener);
reader.close();
// Just the top reader
assertEquals(1, closeCount[0]);
writer.close();
// Now also the subs
assertEquals(3, closeCount[0]);
IndexReader reader2 = IndexReader.open(dir);
reader2.addReaderFinishedListener(listener);
closeCount[0] = 0;
reader2.close();
assertEquals(3, closeCount[0]);
dir.close();
}
}