From 323ea4134d277535176ddebc6fc23d58f961e6b3 Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Tue, 25 Jan 2011 22:42:37 +0000 Subject: [PATCH] LUCENE-2474: add expert ReaderFinishedListener API, for external caches to evict entries for readers git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1063498 13f79535-47bb-0310-9956-ffa450edef68 --- lucene/CHANGES.txt | 5 ++ .../instantiated/InstantiatedIndexReader.java | 2 + .../lucene/index/memory/MemoryIndex.java | 2 + .../apache/lucene/index/DirectoryReader.java | 25 +++++--- .../lucene/index/FilterIndexReader.java | 23 ++++--- .../org/apache/lucene/index/IndexReader.java | 61 +++++++++++++++++++ .../org/apache/lucene/index/IndexWriter.java | 9 +++ .../org/apache/lucene/index/MultiReader.java | 25 ++++++-- .../apache/lucene/index/ParallelReader.java | 19 +++++- .../apache/lucene/index/SegmentReader.java | 20 +++--- .../apache/lucene/search/FieldCacheImpl.java | 9 +++ .../apache/lucene/index/TestIndexReader.java | 38 ++++++++++++ 12 files changed, 205 insertions(+), 33 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 843def76ff1..f40b16255a9 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -812,6 +812,11 @@ New features * LUCENE-2864: Add getMaxTermFrequency (maximum within-document TF) to FieldInvertState so that it can be used in Similarity.computeNorm. (Robert Muir) + +* LUCENE-2474: Added expert ReaderFinishedListener API to + IndexReader, to allow apps that maintain external per-segment caches + to evict entries when a segment is finished. (Shay Banon, Yonik + Seeley, Mike McCandless) Optimizations diff --git a/lucene/contrib/instantiated/src/java/org/apache/lucene/store/instantiated/InstantiatedIndexReader.java b/lucene/contrib/instantiated/src/java/org/apache/lucene/store/instantiated/InstantiatedIndexReader.java index 7cece688d33..301ff986fc2 100644 --- a/lucene/contrib/instantiated/src/java/org/apache/lucene/store/instantiated/InstantiatedIndexReader.java +++ b/lucene/contrib/instantiated/src/java/org/apache/lucene/store/instantiated/InstantiatedIndexReader.java @@ -19,6 +19,7 @@ package org.apache.lucene.store.instantiated; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -52,6 +53,7 @@ public class InstantiatedIndexReader extends IndexReader { public InstantiatedIndexReader(InstantiatedIndex index) { super(); this.index = index; + readerFinishedListeners = Collections.synchronizedSet(new HashSet()); } /** diff --git a/lucene/contrib/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java b/lucene/contrib/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java index 3fc82b7651b..437d313b9c9 100644 --- a/lucene/contrib/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java +++ b/lucene/contrib/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; @@ -758,6 +759,7 @@ public class MemoryIndex implements Serializable { private MemoryIndexReader() { super(); // avoid as much superclass baggage as possible + readerFinishedListeners = Collections.synchronizedSet(new HashSet()); } private Info getInfo(String fieldName) { diff --git a/lucene/src/java/org/apache/lucene/index/DirectoryReader.java b/lucene/src/java/org/apache/lucene/index/DirectoryReader.java index f339133b7ca..aa372be4b66 100644 --- a/lucene/src/java/org/apache/lucene/index/DirectoryReader.java +++ b/lucene/src/java/org/apache/lucene/index/DirectoryReader.java @@ -37,8 +37,6 @@ import org.apache.lucene.index.codecs.CodecProvider; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close - /** * An IndexReader which reads indexes with multiple segments. */ @@ -106,6 +104,7 @@ class DirectoryReader extends IndexReader implements Cloneable { } else { this.codecs = codecs; } + readerFinishedListeners = Collections.synchronizedSet(new HashSet()); // To reduce the chance of hitting FileNotFound // (and having to retry), we open segments in @@ -117,6 +116,7 @@ class DirectoryReader extends IndexReader implements Cloneable { boolean success = false; try { readers[i] = SegmentReader.get(readOnly, sis.info(i), termInfosIndexDivisor); + readers[i].readerFinishedListeners = readerFinishedListeners; success = true; } finally { if (!success) { @@ -146,6 +146,7 @@ class DirectoryReader extends IndexReader implements Cloneable { } else { this.codecs = codecs; } + readerFinishedListeners = writer.getReaderFinishedListeners(); // IndexWriter synchronizes externally before calling // us, which ensures infos will not change; so there's @@ -160,6 +161,7 @@ class DirectoryReader extends IndexReader implements Cloneable { final SegmentInfo info = infos.info(i); assert info.dir == dir; readers[i] = writer.readerPool.getReadOnlyClone(info, true, termInfosIndexDivisor); + readers[i].readerFinishedListeners = readerFinishedListeners; success = true; } finally { if (!success) { @@ -182,11 +184,14 @@ class DirectoryReader extends IndexReader implements Cloneable { /** This constructor is only used for {@link #reopen()} */ DirectoryReader(Directory directory, SegmentInfos infos, SegmentReader[] oldReaders, int[] oldStarts, - boolean readOnly, boolean doClone, int termInfosIndexDivisor, CodecProvider codecs) throws IOException { + boolean readOnly, boolean doClone, int termInfosIndexDivisor, CodecProvider codecs, + Collection readerFinishedListeners) throws IOException { this.directory = directory; this.readOnly = readOnly; this.segmentInfos = infos; this.termInfosIndexDivisor = termInfosIndexDivisor; + this.readerFinishedListeners = readerFinishedListeners; + if (codecs == null) { this.codecs = CodecProvider.getDefault(); } else { @@ -232,8 +237,10 @@ class DirectoryReader extends IndexReader implements Cloneable { // this is a new reader; in case we hit an exception we can close it safely newReader = SegmentReader.get(readOnly, infos.info(i), termInfosIndexDivisor); + newReader.readerFinishedListeners = readerFinishedListeners; } else { newReader = newReaders[i].reopenSegment(infos.info(i), doClone, readOnly); + assert newReader.readerFinishedListeners == readerFinishedListeners; } if (newReader == newReaders[i]) { // this reader will be shared between the old and the new one, @@ -357,6 +364,7 @@ class DirectoryReader extends IndexReader implements Cloneable { writeLock = null; hasChanges = false; } + assert newReader.readerFinishedListeners != null; return newReader; } @@ -391,7 +399,9 @@ class DirectoryReader extends IndexReader implements Cloneable { // TODO: right now we *always* make a new reader; in // the future we could have write make some effort to // detect that no changes have occurred - return writer.getReader(); + IndexReader reader = writer.getReader(); + reader.readerFinishedListeners = readerFinishedListeners; + return reader; } private IndexReader doReopen(final boolean openReadOnly, IndexCommit commit) throws CorruptIndexException, IOException { @@ -458,7 +468,7 @@ class DirectoryReader extends IndexReader implements Cloneable { private synchronized DirectoryReader doReopen(SegmentInfos infos, boolean doClone, boolean openReadOnly) throws CorruptIndexException, IOException { DirectoryReader reader; - reader = new DirectoryReader(directory, infos, subReaders, starts, openReadOnly, doClone, termInfosIndexDivisor, codecs); + reader = new DirectoryReader(directory, infos, subReaders, starts, openReadOnly, doClone, termInfosIndexDivisor, codecs, readerFinishedListeners); return reader; } @@ -808,11 +818,6 @@ class DirectoryReader extends IndexReader implements Cloneable { } } - // NOTE: only needed in case someone had asked for - // FieldCache for top-level reader (which is generally - // not a good idea): - FieldCache.DEFAULT.purge(this); - if (writer != null) { // Since we just closed, writer may now be able to // delete unused files: diff --git a/lucene/src/java/org/apache/lucene/index/FilterIndexReader.java b/lucene/src/java/org/apache/lucene/index/FilterIndexReader.java index f7874dc4ec0..6dc2f48227e 100644 --- a/lucene/src/java/org/apache/lucene/index/FilterIndexReader.java +++ b/lucene/src/java/org/apache/lucene/index/FilterIndexReader.java @@ -22,13 +22,14 @@ import org.apache.lucene.document.FieldSelector; import org.apache.lucene.index.IndexReader.ReaderContext; import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; -import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close import org.apache.lucene.util.BytesRef; import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.Comparator; +import java.util.HashSet; +import java.util.Collections; /** A FilterIndexReader contains another IndexReader, which it * uses as its basic source of data, possibly transforming the data along the @@ -286,6 +287,7 @@ public class FilterIndexReader extends IndexReader { public FilterIndexReader(IndexReader in) { super(); this.in = in; + readerFinishedListeners = Collections.synchronizedSet(new HashSet()); } @Override @@ -391,11 +393,6 @@ public class FilterIndexReader extends IndexReader { @Override protected void doClose() throws IOException { in.close(); - - // NOTE: only needed in case someone had asked for - // FieldCache for top-level reader (which is generally - // not a good idea): - FieldCache.DEFAULT.purge(this); } @@ -454,4 +451,16 @@ public class FilterIndexReader extends IndexReader { buffer.append(')'); return buffer.toString(); } -} \ No newline at end of file + + @Override + public void addReaderFinishedListener(ReaderFinishedListener listener) { + super.addReaderFinishedListener(listener); + in.addReaderFinishedListener(listener); + } + + @Override + public void removeReaderFinishedListener(ReaderFinishedListener listener) { + super.removeReaderFinishedListener(listener); + in.removeReaderFinishedListener(listener); + } +} diff --git a/lucene/src/java/org/apache/lucene/index/IndexReader.java b/lucene/src/java/org/apache/lucene/index/IndexReader.java index 7f1b736cf1e..29d7869f214 100644 --- a/lucene/src/java/org/apache/lucene/index/IndexReader.java +++ b/lucene/src/java/org/apache/lucene/index/IndexReader.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.io.Closeable; import java.util.Collection; import java.util.List; +import java.util.HashSet; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -81,6 +82,65 @@ import java.util.concurrent.atomic.AtomicInteger; */ public abstract class IndexReader implements Cloneable,Closeable { + /** + * A custom listener that's invoked when the IndexReader + * is finished. + * + *

For a SegmentReader, this listener is called only + * once all SegmentReaders sharing the same core are + * closed. At this point it is safe for apps to evict + * this reader from any caches keyed on {@link + * #getCoreCacheKey}. This is the same interface that + * {@link FieldCache} uses, internally, to evict + * entries.

+ * + *

For other readers, this listener is called when they + * are closed.

+ * + * @lucene.experimental + */ + public static interface ReaderFinishedListener { + public void finished(IndexReader reader); + } + + // Impls must set this if they may call add/removeReaderFinishedListener: + protected volatile Collection readerFinishedListeners; + + /** Expert: adds a {@link ReaderFinishedListener}. The + * provided listener is also added to any sub-readers, if + * this is a composite reader. Also, any reader reopened + * or cloned from this one will also copy the listeners at + * the time of reopen. + * + * @lucene.experimental */ + public void addReaderFinishedListener(ReaderFinishedListener listener) { + readerFinishedListeners.add(listener); + } + + /** Expert: remove a previously added {@link ReaderFinishedListener}. + * + * @lucene.experimental */ + public void removeReaderFinishedListener(ReaderFinishedListener listener) { + readerFinishedListeners.remove(listener); + } + + protected void notifyReaderFinishedListeners() { + // Defensive (should never be null -- all impls must set + // this): + if (readerFinishedListeners != null) { + + // Clone the set so that we don't have to sync on + // readerFinishedListeners while invoking them: + for(ReaderFinishedListener listener : new HashSet(readerFinishedListeners)) { + listener.finished(this); + } + } + } + + protected void readerFinished() { + notifyReaderFinishedListeners(); + } + /** * Constants describing field properties, for example used for * {@link IndexReader#getFieldNames(FieldOption)}. @@ -195,6 +255,7 @@ public abstract class IndexReader implements Cloneable,Closeable { refCount.incrementAndGet(); } } + readerFinished(); } } diff --git a/lucene/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/src/java/org/apache/lucene/index/IndexWriter.java index 710822bd15d..b7573b5b4e4 100644 --- a/lucene/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/src/java/org/apache/lucene/index/IndexWriter.java @@ -30,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.Analyzer; @@ -365,6 +366,13 @@ public class IndexWriter implements Closeable { return r; } + // Used for all SegmentReaders we open + private final Collection readerFinishedListeners = Collections.synchronizedSet(new HashSet()); + + Collection getReaderFinishedListeners() throws IOException { + return readerFinishedListeners; + } + /** Holds shared SegmentReader instances. IndexWriter uses * SegmentReaders for 1) applying deletes, 2) doing * merges, 3) handing out a real-time reader. This pool @@ -574,6 +582,7 @@ public class IndexWriter implements Closeable { // synchronized // Returns a ref, which we xfer to readerMap: sr = SegmentReader.get(false, info.dir, info, readBufferSize, doOpenStores, termsIndexDivisor); + sr.readerFinishedListeners = readerFinishedListeners; if (info.dir == directory) { // Only pool if reader is not external diff --git a/lucene/src/java/org/apache/lucene/index/MultiReader.java b/lucene/src/java/org/apache/lucene/index/MultiReader.java index 8a5dca94f22..1e95cb272d9 100644 --- a/lucene/src/java/org/apache/lucene/index/MultiReader.java +++ b/lucene/src/java/org/apache/lucene/index/MultiReader.java @@ -20,10 +20,11 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.HashSet; +import java.util.Collections; import org.apache.lucene.document.Document; import org.apache.lucene.document.FieldSelector; -import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.ReaderUtil; @@ -82,6 +83,7 @@ public class MultiReader extends IndexReader implements Cloneable { } } starts[subReaders.length] = maxDoc; + readerFinishedListeners = Collections.synchronizedSet(new HashSet()); return ReaderUtil.buildReaderContext(this); } @@ -345,11 +347,6 @@ public class MultiReader extends IndexReader implements Cloneable { subReaders[i].close(); } } - - // NOTE: only needed in case someone had asked for - // FieldCache for top-level reader (which is generally - // not a good idea): - FieldCache.DEFAULT.purge(this); } @Override @@ -389,4 +386,20 @@ public class MultiReader extends IndexReader implements Cloneable { public ReaderContext getTopReaderContext() { return topLevelContext; } + + @Override + public void addReaderFinishedListener(ReaderFinishedListener listener) { + super.addReaderFinishedListener(listener); + for(IndexReader sub : subReaders) { + sub.addReaderFinishedListener(listener); + } + } + + @Override + public void removeReaderFinishedListener(ReaderFinishedListener listener) { + super.removeReaderFinishedListener(listener); + for(IndexReader sub : subReaders) { + sub.removeReaderFinishedListener(listener); + } + } } diff --git a/lucene/src/java/org/apache/lucene/index/ParallelReader.java b/lucene/src/java/org/apache/lucene/index/ParallelReader.java index b1ffd23834b..8b789e02058 100644 --- a/lucene/src/java/org/apache/lucene/index/ParallelReader.java +++ b/lucene/src/java/org/apache/lucene/index/ParallelReader.java @@ -22,7 +22,6 @@ import org.apache.lucene.document.FieldSelector; import org.apache.lucene.document.FieldSelectorResult; import org.apache.lucene.document.Fieldable; import org.apache.lucene.util.Bits; -import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close import org.apache.lucene.util.BytesRef; import java.io.IOException; @@ -73,6 +72,7 @@ public class ParallelReader extends IndexReader { public ParallelReader(boolean closeSubReaders) throws IOException { super(); this.incRefReaders = !closeSubReaders; + readerFinishedListeners = Collections.synchronizedSet(new HashSet()); } /** {@inheritDoc} */ @@ -529,8 +529,6 @@ public class ParallelReader extends IndexReader { readers.get(i).close(); } } - - FieldCache.DEFAULT.purge(this); } @Override @@ -548,6 +546,21 @@ public class ParallelReader extends IndexReader { return topLevelReaderContext; } + @Override + public void addReaderFinishedListener(ReaderFinishedListener listener) { + super.addReaderFinishedListener(listener); + for (IndexReader reader : readers) { + reader.addReaderFinishedListener(listener); + } + } + + @Override + public void removeReaderFinishedListener(ReaderFinishedListener listener) { + super.removeReaderFinishedListener(listener); + for (IndexReader reader : readers) { + reader.removeReaderFinishedListener(listener); + } + } } diff --git a/lucene/src/java/org/apache/lucene/index/SegmentReader.java b/lucene/src/java/org/apache/lucene/index/SegmentReader.java index 462ceaaceb8..ac36827bfc6 100644 --- a/lucene/src/java/org/apache/lucene/index/SegmentReader.java +++ b/lucene/src/java/org/apache/lucene/index/SegmentReader.java @@ -38,7 +38,6 @@ import org.apache.lucene.util.BitVector; import org.apache.lucene.util.Bits; import org.apache.lucene.util.CloseableThreadLocal; import org.apache.lucene.index.codecs.FieldsProducer; -import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close import org.apache.lucene.util.BytesRef; /** @@ -183,13 +182,9 @@ public class SegmentReader extends IndexReader implements Cloneable { storeCFSReader.close(); } - // Force FieldCache to evict our entries at this - // point. If the exception occurred while - // initializing the core readers, then - // origInstance will be null, and we don't want - // to call FieldCache.purge (it leads to NPE): + // Now, notify any ReaderFinished listeners: if (origInstance != null) { - FieldCache.DEFAULT.purge(origInstance); + origInstance.notifyReaderFinishedListeners(); } } } @@ -633,6 +628,7 @@ public class SegmentReader extends IndexReader implements Cloneable { clone.si = si; clone.readBufferSize = readBufferSize; clone.pendingDeleteCount = pendingDeleteCount; + clone.readerFinishedListeners = readerFinishedListeners; if (!openReadOnly && hasChanges) { // My pending changes transfer to the new reader @@ -1203,4 +1199,14 @@ public class SegmentReader extends IndexReader implements Cloneable { public int getTermInfosIndexDivisor() { return core.termsIndexDivisor; } + + @Override + protected void readerFinished() { + // Do nothing here -- we have more careful control on + // when to notify that a SegmentReader has finished, + // because a given core is shared across many cloned + // SegmentReaders. We only notify once that core is no + // longer used (all SegmentReaders sharing it have been + // closed). + } } diff --git a/lucene/src/java/org/apache/lucene/search/FieldCacheImpl.java b/lucene/src/java/org/apache/lucene/search/FieldCacheImpl.java index b583dc6fe78..971d7459840 100644 --- a/lucene/src/java/org/apache/lucene/search/FieldCacheImpl.java +++ b/lucene/src/java/org/apache/lucene/search/FieldCacheImpl.java @@ -137,6 +137,13 @@ public class FieldCacheImpl implements FieldCache { // Made Public so that public Object getValue() { return value; } } + final static IndexReader.ReaderFinishedListener purgeReader = new IndexReader.ReaderFinishedListener() { + // @Override -- not until Java 1.6 + public void finished(IndexReader reader) { + FieldCache.DEFAULT.purge(reader); + } + }; + /** Expert: Internal cache. */ final static class Cache { Cache() { @@ -171,8 +178,10 @@ public class FieldCacheImpl implements FieldCache { // Made Public so that synchronized (readerCache) { innerCache = readerCache.get(readerKey); if (innerCache == null) { + // First time this reader is using FieldCache innerCache = new HashMap,Object>(); readerCache.put(readerKey, innerCache); + reader.addReaderFinishedListener(purgeReader); value = null; } else { value = innerCache.get(key); diff --git a/lucene/src/test/org/apache/lucene/index/TestIndexReader.java b/lucene/src/test/org/apache/lucene/index/TestIndexReader.java index ef87922f311..01b73877385 100644 --- a/lucene/src/test/org/apache/lucene/index/TestIndexReader.java +++ b/lucene/src/test/org/apache/lucene/index/TestIndexReader.java @@ -1905,4 +1905,42 @@ public class TestIndexReader extends LuceneTestCase dir.close(); } } + + // LUCENE-2474 + public void testReaderFinishedListener() throws Exception { + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer())); + ((LogMergePolicy) writer.getConfig().getMergePolicy()).setMergeFactor(3); + writer.setInfoStream(VERBOSE ? System.out : null); + writer.addDocument(new Document()); + writer.commit(); + writer.addDocument(new Document()); + writer.commit(); + final IndexReader reader = writer.getReader(); + final int[] closeCount = new int[1]; + final IndexReader.ReaderFinishedListener listener = new IndexReader.ReaderFinishedListener() { + public void finished(IndexReader reader) { + closeCount[0]++; + } + }; + + reader.addReaderFinishedListener(listener); + + reader.close(); + + // Just the top reader + assertEquals(1, closeCount[0]); + writer.close(); + + // Now also the subs + assertEquals(3, closeCount[0]); + + IndexReader reader2 = IndexReader.open(dir); + reader2.addReaderFinishedListener(listener); + + closeCount[0] = 0; + reader2.close(); + assertEquals(3, closeCount[0]); + dir.close(); + } }