LUCENE-5701: Move core closed listeners to AtomicReader.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1597180 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Adrien Grand 2014-05-23 20:54:51 +00:00
parent 32a87a7bbc
commit ed66d23ebc
12 changed files with 217 additions and 68 deletions

View File

@ -86,6 +86,9 @@ API Changes
removed, because buffering and checksumming is provided by FilterOutputStreams,
provided by the JDK. (Uwe Schindler, Mike McCandless)
* LUCENE-5701: Core closed listeners are now available in the AtomicReader API,
they used to sit only in SegmentReader. (Adrien Grand, Robert Muir)
Documentation
* LUCENE-5392: Add/improve analysis package documentation to reflect

View File

@ -19,7 +19,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import org.apache.lucene.search.SearcherManager; // javadocs
import org.apache.lucene.index.IndexReader.ReaderClosedListener;
import org.apache.lucene.util.Bits;
/** {@code AtomicReader} is an abstract class, providing an interface for accessing an
@ -47,8 +47,8 @@ import org.apache.lucene.util.Bits;
public abstract class AtomicReader extends IndexReader {
private final AtomicReaderContext readerContext = new AtomicReaderContext(this);
/** Sole constructor. (For invocation by subclass
/** Sole constructor. (For invocation by subclass
* constructors, typically implicit.) */
protected AtomicReader() {
super();
@ -60,13 +60,87 @@ public abstract class AtomicReader extends IndexReader {
return readerContext;
}
/**
* Called when the shared core for this {@link AtomicReader}
* is closed.
* <p>
* If this {@link AtomicReader} impl has the ability to share
* resources across instances that might only vary through
* deleted documents and doc values updates, then this listener
* will only be called when the shared core is closed.
* Otherwise, this listener will be called when this reader is
* closed.</p>
* <p>
* This is typically useful to manage per-segment caches: when
* the listener is called, it is safe to evict this reader from
* any caches keyed on {@link #getCoreCacheKey}.</p>
*
* @lucene.experimental
*/
public static interface CoreClosedListener {
/** Invoked when the shared core of the original {@code
* SegmentReader} has closed. */
public void onClose(Object ownerCoreCacheKey);
}
private static class CoreClosedListenerWrapper implements ReaderClosedListener {
private final CoreClosedListener listener;
CoreClosedListenerWrapper(CoreClosedListener listener) {
this.listener = listener;
}
@Override
public void onClose(IndexReader reader) {
listener.onClose(reader.getCoreCacheKey());
}
@Override
public int hashCode() {
return listener.hashCode();
}
@Override
public boolean equals(Object other) {
if (!(other instanceof CoreClosedListenerWrapper)) {
return false;
}
return listener.equals(((CoreClosedListenerWrapper) other).listener);
}
}
/** Add a {@link CoreClosedListener} as a {@link ReaderClosedListener}. This
* method is typically useful for {@link AtomicReader} implementations that
* don't have the concept of a core that is shared across several
* {@link AtomicReader} instances in which case the {@link CoreClosedListener}
* is called when closing the reader. */
protected static void addCoreClosedListenerAsReaderClosedListener(IndexReader reader, CoreClosedListener listener) {
reader.addReaderClosedListener(new CoreClosedListenerWrapper(listener));
}
/** Remove a {@link CoreClosedListener} which has been added with
* {@link #addCoreClosedListenerAsReaderClosedListener(IndexReader, CoreClosedListener)}. */
protected static void removeCoreClosedListenerAsReaderClosedListener(IndexReader reader, CoreClosedListener listener) {
reader.removeReaderClosedListener(new CoreClosedListenerWrapper(listener));
}
/** Expert: adds a CoreClosedListener to this reader's shared core
* @lucene.experimental */
public abstract void addCoreClosedListener(CoreClosedListener listener);
/** Expert: removes a CoreClosedListener from this reader's shared core
* @lucene.experimental */
public abstract void removeCoreClosedListener(CoreClosedListener listener);
/**
* Returns {@link Fields} for this reader.
* This method may return null if the reader has no
* postings.
*/
public abstract Fields fields() throws IOException;
@Override
public final int docFreq(Term term) throws IOException {
final Fields fields = fields();
@ -107,7 +181,7 @@ public abstract class AtomicReader extends IndexReader {
return 0;
}
}
@Override
public final long getSumDocFreq(String field) throws IOException {
final Terms terms = terms(field);
@ -116,7 +190,7 @@ public abstract class AtomicReader extends IndexReader {
}
return terms.getSumDocFreq();
}
@Override
public final int getDocCount(String field) throws IOException {
final Terms terms = terms(field);
@ -125,7 +199,7 @@ public abstract class AtomicReader extends IndexReader {
}
return terms.getDocCount();
}
@Override
public final long getSumTotalTermFreq(String field) throws IOException {
final Terms terms = terms(field);
@ -146,7 +220,7 @@ public abstract class AtomicReader extends IndexReader {
/** Returns {@link DocsEnum} for the specified term.
* This will return null if either the field or
* term does not exist.
* term does not exist.
* @see TermsEnum#docs(Bits, DocsEnum) */
public final DocsEnum termDocsEnum(Term term) throws IOException {
assert term.field() != null;
@ -166,7 +240,7 @@ public abstract class AtomicReader extends IndexReader {
/** Returns {@link DocsAndPositionsEnum} for the specified
* term. This will return null if the
* field or term does not exist or positions weren't indexed.
* field or term does not exist or positions weren't indexed.
* @see TermsEnum#docsAndPositions(Bits, DocsAndPositionsEnum) */
public final DocsAndPositionsEnum termPositionsEnum(Term term) throws IOException {
assert term.field() != null;
@ -201,14 +275,14 @@ public abstract class AtomicReader extends IndexReader {
* this field. The returned instance should only be
* used by a single thread. */
public abstract SortedDocValues getSortedDocValues(String field) throws IOException;
/** Returns {@link SortedSetDocValues} for this field, or
* null if no {@link SortedSetDocValues} were indexed for
* this field. The returned instance should only be
* used by a single thread. */
public abstract SortedSetDocValues getSortedSetDocValues(String field) throws IOException;
/** Returns a {@link Bits} at the size of <code>reader.maxDoc()</code>,
/** Returns a {@link Bits} at the size of <code>reader.maxDoc()</code>,
* with turned on bits for each docid that does have a value for this field,
* or null if no DocValues were indexed for this field. The
* returned instance should only be used by a single thread */
@ -226,7 +300,7 @@ public abstract class AtomicReader extends IndexReader {
* @lucene.experimental
*/
public abstract FieldInfos getFieldInfos();
/** Returns the {@link Bits} representing live (not
* deleted) docs. A set bit indicates the doc ID has not
* been deleted. If this method returns null it means
@ -238,11 +312,11 @@ public abstract class AtomicReader extends IndexReader {
* synchronization.
*/
public abstract Bits getLiveDocs();
/**
/**
* Checks consistency of this reader.
* <p>
* Note that this may be costly in terms of I/O, e.g.
* Note that this may be costly in terms of I/O, e.g.
* may involve computing a checksum value against large data files.
* @lucene.internal
*/

View File

@ -331,6 +331,16 @@ public class FilterAtomicReader extends AtomicReader {
in.registerParentReader(this);
}
@Override
public void addCoreClosedListener(CoreClosedListener listener) {
in.addCoreClosedListener(listener);
}
@Override
public void removeCoreClosedListener(CoreClosedListener listener) {
in.removeCoreClosedListener(listener);
}
@Override
public Bits getLiveDocs() {
ensureOpen();

View File

@ -100,6 +100,8 @@ public abstract class IndexReader implements Closeable {
/** Expert: adds a {@link ReaderClosedListener}. The
* provided listener will be invoked when this reader is closed.
* At this point, it is safe for apps to evict this reader from
* any caches keyed on {@link #getCombinedCoreAndDeletesKey()}.
*
* @lucene.experimental */
public final void addReaderClosedListener(ReaderClosedListener listener) {

View File

@ -148,7 +148,17 @@ public class ParallelAtomicReader extends AtomicReader {
}
return buffer.append(')').toString();
}
@Override
public void addCoreClosedListener(CoreClosedListener listener) {
addCoreClosedListenerAsReaderClosedListener(this, listener);
}
@Override
public void removeCoreClosedListener(CoreClosedListener listener) {
removeCoreClosedListenerAsReaderClosedListener(this, listener);
}
// Single instance of this, per ParallelReader instance
private final class ParallelFields extends Fields {
final Map<String,Terms> fields = new TreeMap<>();

View File

@ -23,7 +23,7 @@ import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.TermVectorsReader;
import org.apache.lucene.index.SegmentReader.CoreClosedListener;
import org.apache.lucene.index.AtomicReader.CoreClosedListener;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.CompoundFileDirectory;
import org.apache.lucene.store.Directory;

View File

@ -557,33 +557,14 @@ public final class SegmentReader extends AtomicReader {
ensureOpen();
return core.getNormValues(fieldInfos, field);
}
/**
* Called when the shared core for this SegmentReader
* is closed.
* <p>
* This listener is called only once all SegmentReaders
* sharing the same core are closed. At this point it
* is safe for apps to evict this reader from any caches
* keyed on {@link #getCoreCacheKey}. This is the same
* interface that {@link CachingWrapperFilter} uses, internally,
* to evict entries.</p>
*
* @lucene.experimental
*/
public static interface CoreClosedListener {
/** Invoked when the shared core of the original {@code
* SegmentReader} has closed. */
public void onClose(Object ownerCoreCacheKey);
}
/** Expert: adds a CoreClosedListener to this reader's shared core */
@Override
public void addCoreClosedListener(CoreClosedListener listener) {
ensureOpen();
core.addCoreClosedListener(listener);
}
/** Expert: removes a CoreClosedListener from this reader's shared core */
@Override
public void removeCoreClosedListener(CoreClosedListener listener) {
ensureOpen();
core.removeCoreClosedListener(listener);

View File

@ -77,6 +77,16 @@ public final class SlowCompositeReaderWrapper extends AtomicReader {
return "SlowCompositeReaderWrapper(" + in + ")";
}
@Override
public void addCoreClosedListener(CoreClosedListener listener) {
addCoreClosedListenerAsReaderClosedListener(in, listener);
}
@Override
public void removeCoreClosedListener(CoreClosedListener listener) {
removeCoreClosedListenerAsReaderClosedListener(in, listener);
}
@Override
public Fields fields() {
ensureOpen();

View File

@ -17,15 +17,18 @@ package org.apache.lucene.index;
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.TestUtil;
/**
*/
@ -50,7 +53,6 @@ public class TestIndexReaderClose extends LuceneTestCase {
}
}
};
List<IndexReader.ReaderClosedListener> listeners = new ArrayList<>();
int listenerCount = random().nextInt(20);
AtomicInteger count = new AtomicInteger();
boolean faultySet = false;
@ -92,6 +94,64 @@ public class TestIndexReaderClose extends LuceneTestCase {
}
}
public void testCoreListenerOnWrapper() throws IOException {
RandomIndexWriter w = new RandomIndexWriter(random(), newDirectory());
final int numDocs = TestUtil.nextInt(random(), 1, 5);
for (int i = 0; i < numDocs; ++i) {
w.addDocument(new Document());
if (random().nextBoolean()) {
w.commit();
}
}
w.commit();
w.close();
final IndexReader reader = DirectoryReader.open(w.w.getDirectory());
final AtomicReader atomicReader = SlowCompositeReaderWrapper.wrap(reader);
final int numListeners = TestUtil.nextInt(random(), 1, 10);
final List<AtomicReader.CoreClosedListener> listeners = new ArrayList<>();
AtomicInteger counter = new AtomicInteger(numListeners);
for (int i = 0; i < numListeners; ++i) {
CountCoreListener listener = new CountCoreListener(counter);
listeners.add(listener);
atomicReader.addCoreClosedListener(listener);
}
for (int i = 0; i < 100; ++i) {
atomicReader.addCoreClosedListener(listeners.get(random().nextInt(listeners.size())));
}
final int removed = random().nextInt(numListeners);
Collections.shuffle(listeners);
for (int i = 0; i < removed; ++i) {
atomicReader.removeCoreClosedListener(listeners.get(i));
}
assertEquals(numListeners, counter.get());
// make sure listeners are registered on the wrapped reader and that closing any of them has the same effect
if (random().nextBoolean()) {
reader.close();
} else {
atomicReader.close();
}
assertEquals(removed, counter.get());
w.w.getDirectory().close();
}
private static final class CountCoreListener implements AtomicReader.CoreClosedListener {
private final AtomicInteger count;
public CountCoreListener(AtomicInteger count) {
this.count = count;
}
@Override
public void onClose(Object coreCacheKey) {
count.decrementAndGet();
}
}
private static final class CountListener implements IndexReader.ReaderClosedListener {
private final AtomicInteger count;

View File

@ -749,7 +749,17 @@ public class MemoryIndex {
private MemoryIndexReader() {
super(); // avoid as much superclass baggage as possible
}
@Override
public void addCoreClosedListener(CoreClosedListener listener) {
addCoreClosedListenerAsReaderClosedListener(this, listener);
}
@Override
public void removeCoreClosedListener(CoreClosedListener listener) {
removeCoreClosedListenerAsReaderClosedListener(this, listener);
}
private Info getInfo(String fieldName) {
return fields.get(fieldName);
}

View File

@ -110,30 +110,9 @@ class FieldCacheImpl implements FieldCache {
FieldCacheImpl.this.purgeByCacheKey(ownerCoreCacheKey);
}
};
// composite/SlowMultiReaderWrapper fieldcaches don't purge until composite reader is closed.
final IndexReader.ReaderClosedListener purgeReader = new IndexReader.ReaderClosedListener() {
@Override
public void onClose(IndexReader owner) {
assert owner instanceof AtomicReader;
FieldCacheImpl.this.purgeByCacheKey(((AtomicReader) owner).getCoreCacheKey());
}
};
private void initReader(AtomicReader reader) {
if (reader instanceof SegmentReader) {
((SegmentReader) reader).addCoreClosedListener(purgeCore);
} else {
// we have a slow reader of some sort, try to register a purge event
// rather than relying on gc:
Object key = reader.getCoreCacheKey();
if (key instanceof AtomicReader) {
((AtomicReader)key).addReaderClosedListener(purgeReader);
} else {
// last chance
reader.addReaderClosedListener(purgeReader);
}
}
reader.addCoreClosedListener(purgeCore);
}
/** Expert: Internal cache. */

View File

@ -361,6 +361,16 @@ public class TestDocSet extends LuceneTestCase {
return maxDoc;
}
@Override
public void addCoreClosedListener(CoreClosedListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void removeCoreClosedListener(CoreClosedListener listener) {
throw new UnsupportedOperationException();
}
@Override
public FieldInfos getFieldInfos() {
return new FieldInfos(new FieldInfo[0]);