mirror of https://github.com/apache/lucene.git
LUCENE-4953: Fixed ParallelCompositeReader to inform ReaderClosedListeners of its synthetic subreaders. FieldCaches keyed on the atomic childs will be purged earlier and FC insanity prevented. In addition, ParallelCompositeReader's toString() was changed to better reflect the reader structure.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1476526 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
aa73a22aca
commit
60151ce379
|
@ -59,6 +59,12 @@ Bug Fixes
|
|||
* LUCENE-4955: NGramTokenizer now supports inputs larger than 1024 chars.
|
||||
(Adrien Grand)
|
||||
|
||||
* LUCENE-4953: Fixed ParallelCompositeReader to inform ReaderClosedListeners of
|
||||
its synthetic subreaders. FieldCaches keyed on the atomic childs will be purged
|
||||
earlier and FC insanity prevented. In addition, ParallelCompositeReader's
|
||||
toString() was changed to better reflect the reader structure.
|
||||
(Mike McCandless, Uwe Schindler)
|
||||
|
||||
Optimizations
|
||||
|
||||
* LUCENE-4938: Don't use an unnecessarily large priority queue in IndexSearcher
|
||||
|
|
|
@ -66,7 +66,13 @@ public abstract class CompositeReader extends IndexReader {
|
|||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder buffer = new StringBuilder();
|
||||
buffer.append(getClass().getSimpleName());
|
||||
// walk up through class hierarchy to get a non-empty simple name (anonymous classes have no name):
|
||||
for (Class<?> clazz = getClass(); clazz != null; clazz = clazz.getSuperclass()) {
|
||||
if (!clazz.isAnonymousClass()) {
|
||||
buffer.append(clazz.getSimpleName());
|
||||
break;
|
||||
}
|
||||
}
|
||||
buffer.append('(');
|
||||
final List<? extends IndexReader> subReaders = getSequentialSubReaders();
|
||||
assert subReaders != null;
|
||||
|
|
|
@ -47,7 +47,7 @@ import org.apache.lucene.util.Bits;
|
|||
* same order to the other indexes. <em>Failure to do so will result in
|
||||
* undefined behavior</em>.
|
||||
*/
|
||||
public final class ParallelAtomicReader extends AtomicReader {
|
||||
public class ParallelAtomicReader extends AtomicReader {
|
||||
private final FieldInfos fieldInfos;
|
||||
private final ParallelFields fields = new ParallelFields();
|
||||
private final AtomicReader[] parallelReaders, storedFieldsReaders;
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.lucene.index;
|
|||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -47,10 +46,10 @@ import java.util.Set;
|
|||
* by number of documents per segment. If you use different {@link MergePolicy}s
|
||||
* it might happen that the segment structure of your index is no longer predictable.
|
||||
*/
|
||||
public final class ParallelCompositeReader extends BaseCompositeReader<IndexReader> {
|
||||
public class ParallelCompositeReader extends BaseCompositeReader<IndexReader> {
|
||||
private final boolean closeSubReaders;
|
||||
private final Set<CompositeReader> completeReaderSet =
|
||||
Collections.newSetFromMap(new IdentityHashMap<CompositeReader,Boolean>());
|
||||
private final Set<IndexReader> completeReaderSet =
|
||||
Collections.newSetFromMap(new IdentityHashMap<IndexReader,Boolean>());
|
||||
|
||||
/** Create a ParallelCompositeReader based on the provided
|
||||
* readers; auto-closes the given readers on {@link #close()}. */
|
||||
|
@ -72,12 +71,14 @@ public final class ParallelCompositeReader extends BaseCompositeReader<IndexRead
|
|||
this.closeSubReaders = closeSubReaders;
|
||||
Collections.addAll(completeReaderSet, readers);
|
||||
Collections.addAll(completeReaderSet, storedFieldReaders);
|
||||
// do this finally so any Exceptions occurred before don't affect refcounts:
|
||||
// update ref-counts (like MultiReader):
|
||||
if (!closeSubReaders) {
|
||||
for (CompositeReader reader : completeReaderSet) {
|
||||
for (final IndexReader reader : completeReaderSet) {
|
||||
reader.incRef();
|
||||
}
|
||||
}
|
||||
// finally add our own synthetic readers, so we close or decRef them, too (it does not matter what we do)
|
||||
completeReaderSet.addAll(getSequentialSubReaders());
|
||||
}
|
||||
|
||||
private static IndexReader[] prepareSubReaders(CompositeReader[] readers, CompositeReader[] storedFieldsReaders) throws IOException {
|
||||
|
@ -112,10 +113,12 @@ public final class ParallelCompositeReader extends BaseCompositeReader<IndexRead
|
|||
for (int j = 0; j < storedFieldsReaders.length; j++) {
|
||||
storedSubs[j] = (AtomicReader) storedFieldsReaders[j].getSequentialSubReaders().get(i);
|
||||
}
|
||||
// we simply enable closing of subReaders, to prevent incRefs on subReaders
|
||||
// -> for synthetic subReaders, close() is never
|
||||
// called by our doClose()
|
||||
subReaders[i] = new ParallelAtomicReader(true, atomicSubs, storedSubs);
|
||||
// We pass true for closeSubs and we prevent closing of subreaders in doClose():
|
||||
// By this the synthetic throw-away readers used here are completely invisible to ref-counting
|
||||
subReaders[i] = new ParallelAtomicReader(true, atomicSubs, storedSubs) {
|
||||
@Override
|
||||
protected void doClose() {}
|
||||
};
|
||||
} else {
|
||||
assert firstSubReaders.get(i) instanceof CompositeReader;
|
||||
final CompositeReader[] compositeSubs = new CompositeReader[readers.length];
|
||||
|
@ -126,9 +129,12 @@ public final class ParallelCompositeReader extends BaseCompositeReader<IndexRead
|
|||
for (int j = 0; j < storedFieldsReaders.length; j++) {
|
||||
storedSubs[j] = (CompositeReader) storedFieldsReaders[j].getSequentialSubReaders().get(i);
|
||||
}
|
||||
// we simply enable closing of subReaders, to prevent incRefs on subReaders
|
||||
// -> for synthetic subReaders, close() is never called by our doClose()
|
||||
subReaders[i] = new ParallelCompositeReader(true, compositeSubs, storedSubs);
|
||||
// We pass true for closeSubs and we prevent closing of subreaders in doClose():
|
||||
// By this the synthetic throw-away readers used here are completely invisible to ref-counting
|
||||
subReaders[i] = new ParallelCompositeReader(true, compositeSubs, storedSubs) {
|
||||
@Override
|
||||
protected void doClose() {}
|
||||
};
|
||||
}
|
||||
}
|
||||
return subReaders;
|
||||
|
@ -158,20 +164,10 @@ public final class ParallelCompositeReader extends BaseCompositeReader<IndexRead
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder buffer = new StringBuilder("ParallelCompositeReader(");
|
||||
for (final Iterator<CompositeReader> iter = completeReaderSet.iterator(); iter.hasNext();) {
|
||||
buffer.append(iter.next());
|
||||
if (iter.hasNext()) buffer.append(", ");
|
||||
}
|
||||
return buffer.append(')').toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void doClose() throws IOException {
|
||||
IOException ioe = null;
|
||||
for (final CompositeReader reader : completeReaderSet) {
|
||||
for (final IndexReader reader : completeReaderSet) {
|
||||
try {
|
||||
if (closeSubReaders) {
|
||||
reader.close();
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.lucene.document.Document;
|
|||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.search.BooleanClause.Occur;
|
||||
import org.apache.lucene.search.*;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util._TestUtil;
|
||||
|
@ -114,6 +115,29 @@ public class TestParallelAtomicReader extends LuceneTestCase {
|
|||
dir2.close();
|
||||
}
|
||||
|
||||
public void testCloseInnerReader() throws Exception {
|
||||
Directory dir1 = getDir1(random());
|
||||
AtomicReader ir1 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1));
|
||||
|
||||
// with overlapping
|
||||
ParallelAtomicReader pr = new ParallelAtomicReader(true,
|
||||
new AtomicReader[] {ir1},
|
||||
new AtomicReader[] {ir1});
|
||||
|
||||
ir1.close();
|
||||
|
||||
try {
|
||||
pr.document(0);
|
||||
fail("ParallelAtomicReader should be already closed because inner reader was closed!");
|
||||
} catch (AlreadyClosedException e) {
|
||||
// pass
|
||||
}
|
||||
|
||||
// noop:
|
||||
pr.close();
|
||||
dir1.close();
|
||||
}
|
||||
|
||||
public void testIncompatibleIndexes() throws IOException {
|
||||
// two documents:
|
||||
Directory dir1 = getDir1(random());
|
||||
|
|
|
@ -23,8 +23,10 @@ import java.util.Random;
|
|||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.index.IndexReader.ReaderClosedListener;
|
||||
import org.apache.lucene.search.BooleanClause.Occur;
|
||||
import org.apache.lucene.search.*;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
|
||||
|
@ -82,12 +84,15 @@ public class TestParallelCompositeReader extends LuceneTestCase {
|
|||
// close subreaders, ParallelReader will not change refCounts, but close on its own close
|
||||
ParallelCompositeReader pr = new ParallelCompositeReader(ir1 = DirectoryReader.open(dir1),
|
||||
ir2 = DirectoryReader.open(dir2));
|
||||
IndexReader psub1 = pr.getSequentialSubReaders().get(0);
|
||||
// check RefCounts
|
||||
assertEquals(1, ir1.getRefCount());
|
||||
assertEquals(1, ir2.getRefCount());
|
||||
assertEquals(1, psub1.getRefCount());
|
||||
pr.close();
|
||||
assertEquals(0, ir1.getRefCount());
|
||||
assertEquals(0, ir2.getRefCount());
|
||||
assertEquals(0, psub1.getRefCount());
|
||||
dir1.close();
|
||||
dir2.close();
|
||||
}
|
||||
|
@ -100,20 +105,115 @@ public class TestParallelCompositeReader extends LuceneTestCase {
|
|||
|
||||
// don't close subreaders, so ParallelReader will increment refcounts
|
||||
ParallelCompositeReader pr = new ParallelCompositeReader(false, ir1, ir2);
|
||||
IndexReader psub1 = pr.getSequentialSubReaders().get(0);
|
||||
// check RefCounts
|
||||
assertEquals(2, ir1.getRefCount());
|
||||
assertEquals(2, ir2.getRefCount());
|
||||
assertEquals("refCount must be 1, as the synthetic reader was created by ParallelCompositeReader", 1, psub1.getRefCount());
|
||||
pr.close();
|
||||
assertEquals(1, ir1.getRefCount());
|
||||
assertEquals(1, ir2.getRefCount());
|
||||
assertEquals("refcount must be 0 because parent was closed", 0, psub1.getRefCount());
|
||||
ir1.close();
|
||||
ir2.close();
|
||||
assertEquals(0, ir1.getRefCount());
|
||||
assertEquals(0, ir2.getRefCount());
|
||||
assertEquals("refcount should not change anymore", 0, psub1.getRefCount());
|
||||
dir1.close();
|
||||
dir2.close();
|
||||
}
|
||||
|
||||
// closeSubreaders=false
|
||||
public void testReaderClosedListener1() throws Exception {
|
||||
Directory dir1 = getDir1(random());
|
||||
CompositeReader ir1 = DirectoryReader.open(dir1);
|
||||
|
||||
// with overlapping
|
||||
ParallelCompositeReader pr = new ParallelCompositeReader(false,
|
||||
new CompositeReader[] {ir1},
|
||||
new CompositeReader[] {ir1});
|
||||
|
||||
final int[] listenerClosedCount = new int[1];
|
||||
|
||||
assertEquals(3, pr.leaves().size());
|
||||
|
||||
for(AtomicReaderContext cxt : pr.leaves()) {
|
||||
cxt.reader().addReaderClosedListener(new ReaderClosedListener() {
|
||||
@Override
|
||||
public void onClose(IndexReader reader) {
|
||||
listenerClosedCount[0]++;
|
||||
}
|
||||
});
|
||||
}
|
||||
pr.close();
|
||||
ir1.close();
|
||||
assertEquals(3, listenerClosedCount[0]);
|
||||
dir1.close();
|
||||
}
|
||||
|
||||
// closeSubreaders=true
|
||||
public void testReaderClosedListener2() throws Exception {
|
||||
Directory dir1 = getDir1(random());
|
||||
CompositeReader ir1 = DirectoryReader.open(dir1);
|
||||
|
||||
// with overlapping
|
||||
ParallelCompositeReader pr = new ParallelCompositeReader(true,
|
||||
new CompositeReader[] {ir1},
|
||||
new CompositeReader[] {ir1});
|
||||
|
||||
final int[] listenerClosedCount = new int[1];
|
||||
|
||||
assertEquals(3, pr.leaves().size());
|
||||
|
||||
for(AtomicReaderContext cxt : pr.leaves()) {
|
||||
cxt.reader().addReaderClosedListener(new ReaderClosedListener() {
|
||||
@Override
|
||||
public void onClose(IndexReader reader) {
|
||||
listenerClosedCount[0]++;
|
||||
}
|
||||
});
|
||||
}
|
||||
pr.close();
|
||||
assertEquals(3, listenerClosedCount[0]);
|
||||
dir1.close();
|
||||
}
|
||||
|
||||
public void testCloseInnerReader() throws Exception {
|
||||
Directory dir1 = getDir1(random());
|
||||
CompositeReader ir1 = DirectoryReader.open(dir1);
|
||||
assertEquals(1, ir1.getSequentialSubReaders().get(0).getRefCount());
|
||||
|
||||
// with overlapping
|
||||
ParallelCompositeReader pr = new ParallelCompositeReader(true,
|
||||
new CompositeReader[] {ir1},
|
||||
new CompositeReader[] {ir1});
|
||||
|
||||
IndexReader psub = pr.getSequentialSubReaders().get(0);
|
||||
assertEquals(1, psub.getRefCount());
|
||||
|
||||
ir1.close();
|
||||
|
||||
assertEquals("refCount of synthetic subreader should be unchanged", 1, psub.getRefCount());
|
||||
try {
|
||||
psub.document(0);
|
||||
fail("Subreader should be already closed because inner reader was closed!");
|
||||
} catch (AlreadyClosedException e) {
|
||||
// pass
|
||||
}
|
||||
|
||||
try {
|
||||
pr.document(0);
|
||||
fail("ParallelCompositeReader should be already closed because inner reader was closed!");
|
||||
} catch (AlreadyClosedException e) {
|
||||
// pass
|
||||
}
|
||||
|
||||
// noop:
|
||||
pr.close();
|
||||
assertEquals(0, psub.getRefCount());
|
||||
dir1.close();
|
||||
}
|
||||
|
||||
public void testIncompatibleIndexes1() throws IOException {
|
||||
// two documents:
|
||||
Directory dir1 = getDir1(random());
|
||||
|
@ -277,6 +377,30 @@ public class TestParallelCompositeReader extends LuceneTestCase {
|
|||
dir2.close();
|
||||
}
|
||||
|
||||
public void testToString() throws IOException {
|
||||
Directory dir1 = getDir1(random());
|
||||
CompositeReader ir1 = DirectoryReader.open(dir1);
|
||||
ParallelCompositeReader pr = new ParallelCompositeReader(new CompositeReader[] {ir1});
|
||||
|
||||
final String s = pr.toString();
|
||||
assertTrue("toString incorrect: " + s, s.startsWith("ParallelCompositeReader(ParallelAtomicReader("));
|
||||
|
||||
pr.close();
|
||||
dir1.close();
|
||||
}
|
||||
|
||||
public void testToStringCompositeComposite() throws IOException {
|
||||
Directory dir1 = getDir1(random());
|
||||
CompositeReader ir1 = DirectoryReader.open(dir1);
|
||||
ParallelCompositeReader pr = new ParallelCompositeReader(new CompositeReader[] {new MultiReader(ir1)});
|
||||
|
||||
final String s = pr.toString();
|
||||
assertTrue("toString incorrect: " + s, s.startsWith("ParallelCompositeReader(ParallelCompositeReader(ParallelAtomicReader("));
|
||||
|
||||
pr.close();
|
||||
dir1.close();
|
||||
}
|
||||
|
||||
private void queryTest(Query query) throws IOException {
|
||||
ScoreDoc[] parallelHits = parallel.search(query, null, 1000).scoreDocs;
|
||||
ScoreDoc[] singleHits = single.search(query, null, 1000).scoreDocs;
|
||||
|
|
Loading…
Reference in New Issue