mirror of https://github.com/apache/lucene.git
LUCENE-3736: ParallelReader was split into ParallelAtomicReader and ParallelCompositeReader. Lucene 3.x's ParallelReader is now ParallelAtomicReader; but the new composite variant has improved performance as it works on the atomic subreaders.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1242924 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c74d48b857
commit
6188bc66d7
|
@ -235,6 +235,18 @@ Changes in backwards compatibility policy
|
|||
you really want a top-level fieldcache, use SlowMultiReaderWrapper.
|
||||
(Robert Muir)
|
||||
|
||||
* LUCENE-2858, LUCENE-3733: IndexReader was refactored into abstract
|
||||
AtomicReader, CompositeReader, and DirectoryReader. TODO:add more info
|
||||
(Uwe Schindler, Mike McCandless, Robert Muir)
|
||||
|
||||
* LUCENE-3736: ParallelReader was split into ParallelAtomicReader
|
||||
and ParallelCompositeReader. Lucene 3.x's ParallelReader is now
|
||||
ParallelAtomicReader; but the new composite variant has improved performance
|
||||
as it works on the atomic subreaders. It requires that all parallel
|
||||
composite readers have the same subreader structure. If you cannot provide this,
|
||||
you can use SlowCompositeReaderWrapper to make all parallel readers atomic
|
||||
and use ParallelAtomicReader. (Uwe Schindler, Mike McCandless, Robert Muir)
|
||||
|
||||
Changes in Runtime Behavior
|
||||
|
||||
* LUCENE-2846: omitNorms now behaves like omitTermFrequencyAndPositions, if you
|
||||
|
|
|
@ -0,0 +1,277 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.lucene.util.Bits;
|
||||
|
||||
|
||||
/** An {@link AtomicReader} which reads multiple, parallel indexes. Each index
|
||||
* added must have the same number of documents, but typically each contains
|
||||
* different fields. Deletions are taken from the first reader.
|
||||
* Each document contains the union of the fields of all documents
|
||||
* with the same document number. When searching, matches for a
|
||||
* query term are from the first index added that has the field.
|
||||
*
|
||||
* <p>This is useful, e.g., with collections that have large fields which
|
||||
* change rarely and small fields that change more frequently. The smaller
|
||||
* fields may be re-indexed in a new index and both indexes may be searched
|
||||
* together.
|
||||
*
|
||||
* <p><strong>Warning:</strong> It is up to you to make sure all indexes
|
||||
* are created and modified the same way. For example, if you add
|
||||
* documents to one index, you need to add the same documents in the
|
||||
* same order to the other indexes. <em>Failure to do so will result in
|
||||
* undefined behavior</em>.
|
||||
*/
|
||||
public final class ParallelAtomicReader extends AtomicReader {
|
||||
private final FieldInfos fieldInfos = new FieldInfos();
|
||||
private final ParallelFields fields = new ParallelFields();
|
||||
private final AtomicReader[] parallelReaders, storedFieldsReaders;
|
||||
private final Set<AtomicReader> completeReaderSet =
|
||||
Collections.newSetFromMap(new IdentityHashMap<AtomicReader,Boolean>());
|
||||
private final boolean closeSubReaders;
|
||||
private final int maxDoc, numDocs;
|
||||
private final boolean hasDeletions;
|
||||
final SortedMap<String,AtomicReader> fieldToReader = new TreeMap<String,AtomicReader>();
|
||||
|
||||
/** Create a ParallelAtomicReader based on the provided
|
||||
* readers; auto-closes the given readers on {@link #close()}. */
|
||||
public ParallelAtomicReader(AtomicReader... readers) throws IOException {
|
||||
this(true, readers);
|
||||
}
|
||||
|
||||
/** Create a ParallelAtomicReader based on the provided
|
||||
* readers. */
|
||||
public ParallelAtomicReader(boolean closeSubReaders, AtomicReader... readers) throws IOException {
|
||||
this(closeSubReaders, readers, readers);
|
||||
}
|
||||
|
||||
/** Expert: create a ParallelAtomicReader based on the provided
|
||||
* readers and storedFieldReaders; when a document is
|
||||
* loaded, only storedFieldsReaders will be used. */
|
||||
public ParallelAtomicReader(boolean closeSubReaders, AtomicReader[] readers, AtomicReader[] storedFieldsReaders) throws IOException {
|
||||
this.closeSubReaders = closeSubReaders;
|
||||
if (readers.length == 0 && storedFieldsReaders.length > 0)
|
||||
throw new IllegalArgumentException("There must be at least one main reader if storedFieldsReaders are used.");
|
||||
this.parallelReaders = readers.clone();
|
||||
this.storedFieldsReaders = storedFieldsReaders.clone();
|
||||
this.numDocs = (readers.length > 0) ? readers[0].numDocs() : 0;
|
||||
this.maxDoc = (readers.length > 0) ? readers[0].maxDoc() : 0;
|
||||
this.hasDeletions = (readers.length > 0) ? readers[0].hasDeletions() : false;
|
||||
Collections.addAll(completeReaderSet, this.parallelReaders);
|
||||
Collections.addAll(completeReaderSet, this.storedFieldsReaders);
|
||||
|
||||
// check compatibility:
|
||||
for(AtomicReader reader : completeReaderSet) {
|
||||
if (reader.maxDoc() != maxDoc) {
|
||||
throw new IllegalArgumentException("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
|
||||
}
|
||||
}
|
||||
|
||||
for (final AtomicReader reader : this.parallelReaders) {
|
||||
final FieldInfos readerFieldInfos = reader.getFieldInfos();
|
||||
for(FieldInfo fieldInfo : readerFieldInfos) { // update fieldToReader map
|
||||
// NOTE: first reader having a given field "wins":
|
||||
if (fieldToReader.get(fieldInfo.name) == null) {
|
||||
fieldInfos.add(fieldInfo);
|
||||
fieldToReader.put(fieldInfo.name, reader);
|
||||
this.fields.addField(fieldInfo.name, reader.terms(fieldInfo.name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// do this finally so any Exceptions occurred before don't affect refcounts:
|
||||
if (!closeSubReaders) {
|
||||
for (AtomicReader reader : completeReaderSet) {
|
||||
reader.incRef();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder buffer = new StringBuilder("ParallelAtomicReader(");
|
||||
for (final Iterator<AtomicReader> iter = completeReaderSet.iterator(); iter.hasNext();) {
|
||||
buffer.append(iter.next());
|
||||
if (iter.hasNext()) buffer.append(", ");
|
||||
}
|
||||
return buffer.append(')').toString();
|
||||
}
|
||||
|
||||
private final class ParallelFieldsEnum extends FieldsEnum {
|
||||
private String currentField;
|
||||
private final Iterator<String> keys;
|
||||
private final Fields fields;
|
||||
|
||||
ParallelFieldsEnum(Fields fields) {
|
||||
this.fields = fields;
|
||||
keys = fieldToReader.keySet().iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String next() throws IOException {
|
||||
if (keys.hasNext()) {
|
||||
currentField = keys.next();
|
||||
} else {
|
||||
currentField = null;
|
||||
}
|
||||
return currentField;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Terms terms() throws IOException {
|
||||
return fields.terms(currentField);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Single instance of this, per ParallelReader instance
|
||||
private final class ParallelFields extends Fields {
|
||||
final HashMap<String,Terms> fields = new HashMap<String,Terms>();
|
||||
|
||||
ParallelFields() {
|
||||
}
|
||||
|
||||
void addField(String fieldName, Terms terms) throws IOException {
|
||||
fields.put(fieldName, terms);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FieldsEnum iterator() throws IOException {
|
||||
return new ParallelFieldsEnum(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Terms terms(String field) throws IOException {
|
||||
return fields.get(field);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUniqueFieldCount() throws IOException {
|
||||
return fields.size();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FieldInfos getFieldInfos() {
|
||||
return fieldInfos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bits getLiveDocs() {
|
||||
ensureOpen();
|
||||
return hasDeletions ? parallelReaders[0].getLiveDocs() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Fields fields() {
|
||||
ensureOpen();
|
||||
return fields;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numDocs() {
|
||||
// Don't call ensureOpen() here (it could affect performance)
|
||||
return numDocs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxDoc() {
|
||||
// Don't call ensureOpen() here (it could affect performance)
|
||||
return maxDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasDeletions() {
|
||||
ensureOpen();
|
||||
return hasDeletions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void document(int docID, StoredFieldVisitor visitor) throws CorruptIndexException, IOException {
|
||||
ensureOpen();
|
||||
for (final AtomicReader reader: storedFieldsReaders) {
|
||||
reader.document(docID, visitor);
|
||||
}
|
||||
}
|
||||
|
||||
// get all vectors
|
||||
@Override
|
||||
public Fields getTermVectors(int docID) throws IOException {
|
||||
ensureOpen();
|
||||
ParallelFields fields = new ParallelFields();
|
||||
for (Map.Entry<String,AtomicReader> ent : fieldToReader.entrySet()) {
|
||||
String fieldName = ent.getKey();
|
||||
Terms vector = ent.getValue().getTermVector(docID, fieldName);
|
||||
if (vector != null) {
|
||||
fields.addField(fieldName, vector);
|
||||
}
|
||||
}
|
||||
|
||||
return fields;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNorms(String field) throws IOException {
|
||||
ensureOpen();
|
||||
AtomicReader reader = fieldToReader.get(field);
|
||||
return reader==null ? false : reader.hasNorms(field);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void doClose() throws IOException {
|
||||
IOException ioe = null;
|
||||
for (AtomicReader reader : completeReaderSet) {
|
||||
try {
|
||||
if (closeSubReaders) {
|
||||
reader.close();
|
||||
} else {
|
||||
reader.decRef();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (ioe == null) ioe = e;
|
||||
}
|
||||
}
|
||||
// throw the first exception
|
||||
if (ioe != null) throw ioe;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocValues docValues(String field) throws IOException {
|
||||
ensureOpen();
|
||||
AtomicReader reader = fieldToReader.get(field);
|
||||
return reader == null ? null : reader.docValues(field);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocValues normValues(String field) throws IOException {
|
||||
ensureOpen();
|
||||
AtomicReader reader = fieldToReader.get(field);
|
||||
return reader == null ? null : reader.normValues(field);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,179 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
|
||||
/** An {@link CompositeReader} which reads multiple, parallel indexes. Each index added
|
||||
* must have the same number of documents, and exactly the same hierarchical subreader structure,
|
||||
* but typically each contains different fields. Deletions are taken from the first reader.
|
||||
* Each document contains the union of the fields of all
|
||||
* documents with the same document number. When searching, matches for a
|
||||
* query term are from the first index added that has the field.
|
||||
*
|
||||
* <p>This is useful, e.g., with collections that have large fields which
|
||||
* change rarely and small fields that change more frequently. The smaller
|
||||
* fields may be re-indexed in a new index and both indexes may be searched
|
||||
* together.
|
||||
*
|
||||
* <p><strong>Warning:</strong> It is up to you to make sure all indexes
|
||||
* are created and modified the same way. For example, if you add
|
||||
* documents to one index, you need to add the same documents in the
|
||||
* same order to the other indexes. <em>Failure to do so will result in
|
||||
* undefined behavior</em>.
|
||||
* A good strategy to create suitable indexes with {@link IndexWriter} is to use
|
||||
* {@link LogDocMergePolicy}, as this one does not reorder documents
|
||||
* during merging (like {@code TieredMergePolicy}) and triggers merges
|
||||
* by number of documents per segment. If you use different {@link MergePolicy}s
|
||||
* it might happen that the segment structure of your index is no longer predictable.
|
||||
*/
|
||||
public final class ParallelCompositeReader extends BaseMultiReader<IndexReader> {
|
||||
private final boolean closeSubReaders;
|
||||
private final Set<CompositeReader> completeReaderSet =
|
||||
Collections.newSetFromMap(new IdentityHashMap<CompositeReader,Boolean>());
|
||||
|
||||
/** Create a ParallelCompositeReader based on the provided
|
||||
* readers; auto-closes the given readers on {@link #close()}. */
|
||||
public ParallelCompositeReader(CompositeReader... readers) throws IOException {
|
||||
this(true, readers);
|
||||
}
|
||||
|
||||
/** Create a ParallelCompositeReader based on the provided
|
||||
* readers. */
|
||||
public ParallelCompositeReader(boolean closeSubReaders, CompositeReader... readers) throws IOException {
|
||||
this(closeSubReaders, readers, readers);
|
||||
}
|
||||
|
||||
/** Expert: create a ParallelCompositeReader based on the provided
|
||||
* readers and storedFieldReaders; when a document is
|
||||
* loaded, only storedFieldsReaders will be used. */
|
||||
public ParallelCompositeReader(boolean closeSubReaders, CompositeReader[] readers, CompositeReader[] storedFieldReaders) throws IOException {
|
||||
super(prepareSubReaders(readers, storedFieldReaders));
|
||||
this.closeSubReaders = closeSubReaders;
|
||||
Collections.addAll(completeReaderSet, readers);
|
||||
Collections.addAll(completeReaderSet, storedFieldReaders);
|
||||
// do this finally so any Exceptions occurred before don't affect refcounts:
|
||||
if (!closeSubReaders) {
|
||||
for (CompositeReader reader : completeReaderSet) {
|
||||
reader.incRef();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static IndexReader[] prepareSubReaders(CompositeReader[] readers, CompositeReader[] storedFieldsReaders) throws IOException {
|
||||
if (readers.length == 0) {
|
||||
if (storedFieldsReaders.length > 0)
|
||||
throw new IllegalArgumentException("There must be at least one main reader if storedFieldsReaders are used.");
|
||||
return new IndexReader[0];
|
||||
} else {
|
||||
final IndexReader[] firstSubReaders = readers[0].getSequentialSubReaders();
|
||||
|
||||
// check compatibility:
|
||||
final int maxDoc = readers[0].maxDoc();
|
||||
final int[] childMaxDoc = new int[firstSubReaders.length];
|
||||
for (int i = 0; i < firstSubReaders.length; i++) {
|
||||
childMaxDoc[i] = firstSubReaders[i].maxDoc();
|
||||
}
|
||||
validate(readers, maxDoc, childMaxDoc);
|
||||
validate(storedFieldsReaders, maxDoc, childMaxDoc);
|
||||
|
||||
// hierarchically build the same subreader structure as the first CompositeReader with Parallel*Readers:
|
||||
final IndexReader[] subReaders = new IndexReader[firstSubReaders.length];
|
||||
for (int i = 0; i < subReaders.length; i++) {
|
||||
if (firstSubReaders[i] instanceof AtomicReader) {
|
||||
final AtomicReader[] atomicSubs = new AtomicReader[readers.length];
|
||||
for (int j = 0; j < readers.length; j++) {
|
||||
atomicSubs[j] = (AtomicReader) readers[j].getSequentialSubReaders()[i];
|
||||
}
|
||||
final AtomicReader[] storedSubs = new AtomicReader[storedFieldsReaders.length];
|
||||
for (int j = 0; j < storedFieldsReaders.length; j++) {
|
||||
storedSubs[j] = (AtomicReader) storedFieldsReaders[j].getSequentialSubReaders()[i];
|
||||
}
|
||||
// we simply enable closing of subReaders, to prevent incRefs on subReaders
|
||||
// -> for synthetic subReaders, close() is never
|
||||
// called by our doClose()
|
||||
subReaders[i] = new ParallelAtomicReader(true, atomicSubs, storedSubs);
|
||||
} else {
|
||||
assert firstSubReaders[i] instanceof CompositeReader;
|
||||
final CompositeReader[] compositeSubs = new CompositeReader[readers.length];
|
||||
for (int j = 0; j < readers.length; j++) {
|
||||
compositeSubs[j] = (CompositeReader) readers[j].getSequentialSubReaders()[i];
|
||||
}
|
||||
final CompositeReader[] storedSubs = new CompositeReader[storedFieldsReaders.length];
|
||||
for (int j = 0; j < storedFieldsReaders.length; j++) {
|
||||
storedSubs[j] = (CompositeReader) storedFieldsReaders[j].getSequentialSubReaders()[i];
|
||||
}
|
||||
// we simply enable closing of subReaders, to prevent incRefs on subReaders
|
||||
// -> for synthetic subReaders, close() is never called by our doClose()
|
||||
subReaders[i] = new ParallelCompositeReader(true, compositeSubs, storedSubs);
|
||||
}
|
||||
}
|
||||
return subReaders;
|
||||
}
|
||||
}
|
||||
|
||||
private static void validate(CompositeReader[] readers, int maxDoc, int[] childMaxDoc) {
|
||||
for (int i = 0; i < readers.length; i++) {
|
||||
final CompositeReader reader = readers[i];
|
||||
final IndexReader[] subs = reader.getSequentialSubReaders();
|
||||
if (reader.maxDoc() != maxDoc) {
|
||||
throw new IllegalArgumentException("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
|
||||
}
|
||||
if (subs.length != childMaxDoc.length) {
|
||||
throw new IllegalArgumentException("All readers must have same number of subReaders");
|
||||
}
|
||||
for (int subIDX = 0; subIDX < subs.length; subIDX++) {
|
||||
if (subs[subIDX].maxDoc() != childMaxDoc[subIDX]) {
|
||||
throw new IllegalArgumentException("All readers must have same subReader maxDoc");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder buffer = new StringBuilder("ParallelCompositeReader(");
|
||||
for (final Iterator<CompositeReader> iter = completeReaderSet.iterator(); iter.hasNext();) {
|
||||
buffer.append(iter.next());
|
||||
if (iter.hasNext()) buffer.append(", ");
|
||||
}
|
||||
return buffer.append(')').toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void doClose() throws IOException {
|
||||
IOException ioe = null;
|
||||
for (final CompositeReader reader : completeReaderSet) {
|
||||
try {
|
||||
if (closeSubReaders) {
|
||||
reader.close();
|
||||
} else {
|
||||
reader.decRef();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (ioe == null) ioe = e;
|
||||
}
|
||||
}
|
||||
// throw the first exception
|
||||
if (ioe != null) throw ioe;
|
||||
}
|
||||
}
|
|
@ -1,298 +0,0 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
|
||||
|
||||
/** An AtomicIndexReader which reads multiple, parallel indexes. Each index added
|
||||
* must have the same number of documents, but typically each contains
|
||||
* different fields. Each document contains the union of the fields of all
|
||||
* documents with the same document number. When searching, matches for a
|
||||
* query term are from the first index added that has the field.
|
||||
*
|
||||
* <p>This is useful, e.g., with collections that have large fields which
|
||||
* change rarely and small fields that change more frequently. The smaller
|
||||
* fields may be re-indexed in a new index and both indexes may be searched
|
||||
* together.
|
||||
*
|
||||
* <p><strong>Warning:</strong> It is up to you to make sure all indexes
|
||||
* are created and modified the same way. For example, if you add
|
||||
* documents to one index, you need to add the same documents in the
|
||||
* same order to the other indexes. <em>Failure to do so will result in
|
||||
* undefined behavior</em>.
|
||||
*/
|
||||
public class ParallelReader extends AtomicReader {
|
||||
private List<AtomicReader> readers = new ArrayList<AtomicReader>();
|
||||
private List<Boolean> decrefOnClose = new ArrayList<Boolean>(); // remember which subreaders to decRef on close
|
||||
boolean incRefReaders = false;
|
||||
private SortedMap<String,AtomicReader> fieldToReader = new TreeMap<String,AtomicReader>();
|
||||
private Map<AtomicReader,Collection<String>> readerToFields = new HashMap<AtomicReader,Collection<String>>();
|
||||
private List<AtomicReader> storedFieldReaders = new ArrayList<AtomicReader>();
|
||||
private Map<String, DocValues> normsCache = new HashMap<String,DocValues>();
|
||||
private int maxDoc;
|
||||
private int numDocs;
|
||||
private boolean hasDeletions;
|
||||
private final FieldInfos fieldInfos;
|
||||
|
||||
private final ParallelFields fields = new ParallelFields();
|
||||
|
||||
/** Construct a ParallelReader.
|
||||
* <p>Note that all subreaders are closed if this ParallelReader is closed.</p>
|
||||
*/
|
||||
public ParallelReader() throws IOException { this(true); }
|
||||
|
||||
/** Construct a ParallelReader.
|
||||
* @param closeSubReaders indicates whether the subreaders should be closed
|
||||
* when this ParallelReader is closed
|
||||
*/
|
||||
public ParallelReader(boolean closeSubReaders) throws IOException {
|
||||
super();
|
||||
this.incRefReaders = !closeSubReaders;
|
||||
fieldInfos = new FieldInfos();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder buffer = new StringBuilder("ParallelReader(");
|
||||
final Iterator<AtomicReader> iter = readers.iterator();
|
||||
if (iter.hasNext()) {
|
||||
buffer.append(iter.next());
|
||||
}
|
||||
while (iter.hasNext()) {
|
||||
buffer.append(", ").append(iter.next());
|
||||
}
|
||||
buffer.append(')');
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
/** Add an AtomicIndexReader.
|
||||
* @throws IOException if there is a low-level IO error
|
||||
*/
|
||||
public void add(AtomicReader reader) throws IOException {
|
||||
ensureOpen();
|
||||
add(reader, false);
|
||||
}
|
||||
|
||||
/** Add an AtomicIndexReader whose stored fields will not be returned. This can
|
||||
* accelerate search when stored fields are only needed from a subset of
|
||||
* the IndexReaders.
|
||||
*
|
||||
* @throws IllegalArgumentException if not all indexes contain the same number
|
||||
* of documents
|
||||
* @throws IllegalArgumentException if not all indexes have the same value
|
||||
* of {@link AtomicReader#maxDoc()}
|
||||
* @throws IOException if there is a low-level IO error
|
||||
*/
|
||||
public void add(AtomicReader reader, boolean ignoreStoredFields)
|
||||
throws IOException {
|
||||
|
||||
ensureOpen();
|
||||
if (readers.size() == 0) {
|
||||
this.maxDoc = reader.maxDoc();
|
||||
this.numDocs = reader.numDocs();
|
||||
this.hasDeletions = reader.hasDeletions();
|
||||
}
|
||||
|
||||
if (reader.maxDoc() != maxDoc) // check compatibility
|
||||
throw new IllegalArgumentException
|
||||
("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
|
||||
if (reader.numDocs() != numDocs)
|
||||
throw new IllegalArgumentException
|
||||
("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs());
|
||||
|
||||
final FieldInfos readerFieldInfos = MultiFields.getMergedFieldInfos(reader);
|
||||
for(FieldInfo fieldInfo : readerFieldInfos) { // update fieldToReader map
|
||||
// NOTE: first reader having a given field "wins":
|
||||
if (fieldToReader.get(fieldInfo.name) == null) {
|
||||
fieldInfos.add(fieldInfo);
|
||||
fieldToReader.put(fieldInfo.name, reader);
|
||||
this.fields.addField(fieldInfo.name, reader.terms(fieldInfo.name));
|
||||
}
|
||||
}
|
||||
|
||||
if (!ignoreStoredFields)
|
||||
storedFieldReaders.add(reader); // add to storedFieldReaders
|
||||
readers.add(reader);
|
||||
|
||||
if (incRefReaders) {
|
||||
reader.incRef();
|
||||
}
|
||||
decrefOnClose.add(Boolean.valueOf(incRefReaders));
|
||||
synchronized(normsCache) {
|
||||
normsCache.clear(); // TODO: don't need to clear this for all fields really?
|
||||
}
|
||||
}
|
||||
|
||||
private class ParallelFieldsEnum extends FieldsEnum {
|
||||
String currentField;
|
||||
Iterator<String> keys;
|
||||
private final Fields fields;
|
||||
|
||||
ParallelFieldsEnum(Fields fields) {
|
||||
this.fields = fields;
|
||||
keys = fieldToReader.keySet().iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String next() throws IOException {
|
||||
if (keys.hasNext()) {
|
||||
currentField = keys.next();
|
||||
} else {
|
||||
currentField = null;
|
||||
}
|
||||
return currentField;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Terms terms() throws IOException {
|
||||
return fields.terms(currentField);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Single instance of this, per ParallelReader instance
|
||||
private class ParallelFields extends Fields {
|
||||
final HashMap<String,Terms> fields = new HashMap<String,Terms>();
|
||||
|
||||
public void addField(String fieldName, Terms terms) throws IOException {
|
||||
fields.put(fieldName, terms);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FieldsEnum iterator() throws IOException {
|
||||
return new ParallelFieldsEnum(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Terms terms(String field) throws IOException {
|
||||
return fields.get(field);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUniqueFieldCount() throws IOException {
|
||||
return fields.size();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FieldInfos getFieldInfos() {
|
||||
return fieldInfos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bits getLiveDocs() {
|
||||
ensureOpen();
|
||||
return readers.get(0).getLiveDocs();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Fields fields() {
|
||||
ensureOpen();
|
||||
return fields;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numDocs() {
|
||||
// Don't call ensureOpen() here (it could affect performance)
|
||||
return numDocs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxDoc() {
|
||||
// Don't call ensureOpen() here (it could affect performance)
|
||||
return maxDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasDeletions() {
|
||||
ensureOpen();
|
||||
return hasDeletions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void document(int docID, StoredFieldVisitor visitor) throws CorruptIndexException, IOException {
|
||||
ensureOpen();
|
||||
for (final AtomicReader reader: storedFieldReaders) {
|
||||
reader.document(docID, visitor);
|
||||
}
|
||||
}
|
||||
|
||||
// get all vectors
|
||||
@Override
|
||||
public Fields getTermVectors(int docID) throws IOException {
|
||||
ensureOpen();
|
||||
ParallelFields fields = new ParallelFields();
|
||||
for (Map.Entry<String,AtomicReader> ent : fieldToReader.entrySet()) {
|
||||
String fieldName = ent.getKey();
|
||||
Terms vector = ent.getValue().getTermVector(docID, fieldName);
|
||||
if (vector != null) {
|
||||
fields.addField(fieldName, vector);
|
||||
}
|
||||
}
|
||||
|
||||
return fields;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNorms(String field) throws IOException {
|
||||
ensureOpen();
|
||||
AtomicReader reader = fieldToReader.get(field);
|
||||
return reader==null ? false : reader.hasNorms(field);
|
||||
}
|
||||
|
||||
// for testing
|
||||
AtomicReader[] getSubReaders() {
|
||||
return readers.toArray(new AtomicReader[readers.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void doClose() throws IOException {
|
||||
for (int i = 0; i < readers.size(); i++) {
|
||||
if (decrefOnClose.get(i).booleanValue()) {
|
||||
readers.get(i).decRef();
|
||||
} else {
|
||||
readers.get(i).close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: I suspect this is completely untested!!!!!
|
||||
@Override
|
||||
public DocValues docValues(String field) throws IOException {
|
||||
AtomicReader reader = fieldToReader.get(field);
|
||||
return reader == null ? null : reader.docValues(field);
|
||||
}
|
||||
|
||||
// TODO: I suspect this is completely untested!!!!!
|
||||
@Override
|
||||
public synchronized DocValues normValues(String field) throws IOException {
|
||||
DocValues values = normsCache.get(field);
|
||||
if (values == null) {
|
||||
AtomicReader reader = fieldToReader.get(field);
|
||||
values = reader == null ? null : reader.normValues(field);
|
||||
normsCache.put(field, values);
|
||||
}
|
||||
return values;
|
||||
}
|
||||
}
|
|
@ -28,30 +28,15 @@ import org.apache.lucene.search.*;
|
|||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
|
||||
public class TestParallelReader extends LuceneTestCase {
|
||||
public class TestParallelAtomicReader extends LuceneTestCase {
|
||||
|
||||
private IndexSearcher parallel;
|
||||
private IndexSearcher single;
|
||||
private IndexSearcher parallel, single;
|
||||
private Directory dir, dir1, dir2;
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
single = single(random);
|
||||
parallel = parallel(random);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
single.getIndexReader().close();
|
||||
parallel.getIndexReader().close();
|
||||
dir.close();
|
||||
dir1.close();
|
||||
dir2.close();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
public void testQueries() throws Exception {
|
||||
single = single(random);
|
||||
parallel = parallel(random);
|
||||
|
||||
queryTest(new TermQuery(new Term("f1", "v1")));
|
||||
queryTest(new TermQuery(new Term("f1", "v2")));
|
||||
queryTest(new TermQuery(new Term("f2", "v1")));
|
||||
|
@ -65,14 +50,19 @@ public class TestParallelReader extends LuceneTestCase {
|
|||
bq1.add(new TermQuery(new Term("f1", "v1")), Occur.MUST);
|
||||
bq1.add(new TermQuery(new Term("f4", "v1")), Occur.MUST);
|
||||
queryTest(bq1);
|
||||
|
||||
single.getIndexReader().close(); single = null;
|
||||
parallel.getIndexReader().close(); parallel = null;
|
||||
dir.close(); dir = null;
|
||||
dir1.close(); dir1 = null;
|
||||
dir2.close(); dir2 = null;
|
||||
}
|
||||
|
||||
public void testFieldNames() throws Exception {
|
||||
Directory dir1 = getDir1(random);
|
||||
Directory dir2 = getDir2(random);
|
||||
ParallelReader pr = new ParallelReader();
|
||||
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)));
|
||||
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2)));
|
||||
ParallelAtomicReader pr = new ParallelAtomicReader(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)),
|
||||
SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2)));
|
||||
FieldInfos fieldInfos = pr.getFieldInfos();
|
||||
assertEquals(4, fieldInfos.size());
|
||||
assertNotNull(fieldInfos.fieldInfo("f1"));
|
||||
|
@ -84,6 +74,45 @@ public class TestParallelReader extends LuceneTestCase {
|
|||
dir2.close();
|
||||
}
|
||||
|
||||
public void testRefCounts1() throws IOException {
|
||||
Directory dir1 = getDir1(random);
|
||||
Directory dir2 = getDir2(random);
|
||||
AtomicReader ir1, ir2;
|
||||
// close subreaders, ParallelReader will not change refCounts, but close on its own close
|
||||
ParallelAtomicReader pr = new ParallelAtomicReader(ir1 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)),
|
||||
ir2 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2)));
|
||||
|
||||
// check RefCounts
|
||||
assertEquals(1, ir1.getRefCount());
|
||||
assertEquals(1, ir2.getRefCount());
|
||||
pr.close();
|
||||
assertEquals(0, ir1.getRefCount());
|
||||
assertEquals(0, ir2.getRefCount());
|
||||
dir1.close();
|
||||
dir2.close();
|
||||
}
|
||||
|
||||
public void testRefCounts2() throws IOException {
|
||||
Directory dir1 = getDir1(random);
|
||||
Directory dir2 = getDir2(random);
|
||||
AtomicReader ir1 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1));
|
||||
AtomicReader ir2 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2));
|
||||
// don't close subreaders, so ParallelReader will increment refcounts
|
||||
ParallelAtomicReader pr = new ParallelAtomicReader(false, ir1, ir2);
|
||||
// check RefCounts
|
||||
assertEquals(2, ir1.getRefCount());
|
||||
assertEquals(2, ir2.getRefCount());
|
||||
pr.close();
|
||||
assertEquals(1, ir1.getRefCount());
|
||||
assertEquals(1, ir2.getRefCount());
|
||||
ir1.close();
|
||||
ir2.close();
|
||||
assertEquals(0, ir1.getRefCount());
|
||||
assertEquals(0, ir2.getRefCount());
|
||||
dir1.close();
|
||||
dir2.close();
|
||||
}
|
||||
|
||||
public void testIncompatibleIndexes() throws IOException {
|
||||
// two documents:
|
||||
Directory dir1 = getDir1(random);
|
||||
|
@ -97,17 +126,94 @@ public class TestParallelReader extends LuceneTestCase {
|
|||
w2.addDocument(d3);
|
||||
w2.close();
|
||||
|
||||
ParallelReader pr = new ParallelReader();
|
||||
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)));
|
||||
DirectoryReader ir = DirectoryReader.open(dir2);
|
||||
AtomicReader ir1 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1));
|
||||
AtomicReader ir2 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2));
|
||||
|
||||
try {
|
||||
pr.add(SlowCompositeReaderWrapper.wrap(ir));
|
||||
new ParallelAtomicReader(ir1, ir2);
|
||||
fail("didn't get exptected exception: indexes don't have same number of documents");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// expected exception
|
||||
}
|
||||
|
||||
try {
|
||||
new ParallelAtomicReader(random.nextBoolean(),
|
||||
new AtomicReader[] {ir1, ir2},
|
||||
new AtomicReader[] {ir1, ir2});
|
||||
fail("didn't get expected exception: indexes don't have same number of documents");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// expected exception
|
||||
}
|
||||
// check RefCounts
|
||||
assertEquals(1, ir1.getRefCount());
|
||||
assertEquals(1, ir2.getRefCount());
|
||||
ir1.close();
|
||||
ir2.close();
|
||||
dir1.close();
|
||||
dir2.close();
|
||||
}
|
||||
|
||||
public void testIgnoreStoredFields() throws IOException {
|
||||
Directory dir1 = getDir1(random);
|
||||
Directory dir2 = getDir2(random);
|
||||
AtomicReader ir1 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1));
|
||||
AtomicReader ir2 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2));
|
||||
|
||||
// with overlapping
|
||||
ParallelAtomicReader pr = new ParallelAtomicReader(false,
|
||||
new AtomicReader[] {ir1, ir2},
|
||||
new AtomicReader[] {ir1});
|
||||
assertEquals("v1", pr.document(0).get("f1"));
|
||||
assertEquals("v1", pr.document(0).get("f2"));
|
||||
assertNull(pr.document(0).get("f3"));
|
||||
assertNull(pr.document(0).get("f4"));
|
||||
// check that fields are there
|
||||
assertNotNull(pr.terms("f1"));
|
||||
assertNotNull(pr.terms("f2"));
|
||||
assertNotNull(pr.terms("f3"));
|
||||
assertNotNull(pr.terms("f4"));
|
||||
pr.close();
|
||||
ir.close();
|
||||
|
||||
// no stored fields at all
|
||||
pr = new ParallelAtomicReader(false,
|
||||
new AtomicReader[] {ir2},
|
||||
new AtomicReader[0]);
|
||||
assertNull(pr.document(0).get("f1"));
|
||||
assertNull(pr.document(0).get("f2"));
|
||||
assertNull(pr.document(0).get("f3"));
|
||||
assertNull(pr.document(0).get("f4"));
|
||||
// check that fields are there
|
||||
assertNull(pr.terms("f1"));
|
||||
assertNull(pr.terms("f2"));
|
||||
assertNotNull(pr.terms("f3"));
|
||||
assertNotNull(pr.terms("f4"));
|
||||
pr.close();
|
||||
|
||||
// without overlapping
|
||||
pr = new ParallelAtomicReader(true,
|
||||
new AtomicReader[] {ir2},
|
||||
new AtomicReader[] {ir1});
|
||||
assertEquals("v1", pr.document(0).get("f1"));
|
||||
assertEquals("v1", pr.document(0).get("f2"));
|
||||
assertNull(pr.document(0).get("f3"));
|
||||
assertNull(pr.document(0).get("f4"));
|
||||
// check that fields are there
|
||||
assertNull(pr.terms("f1"));
|
||||
assertNull(pr.terms("f2"));
|
||||
assertNotNull(pr.terms("f3"));
|
||||
assertNotNull(pr.terms("f4"));
|
||||
pr.close();
|
||||
|
||||
// no main readers
|
||||
try {
|
||||
new ParallelAtomicReader(true,
|
||||
new AtomicReader[0],
|
||||
new AtomicReader[] {ir1});
|
||||
fail("didn't get expected exception: need a non-empty main-reader array");
|
||||
} catch (IllegalArgumentException iae) {
|
||||
// pass
|
||||
}
|
||||
|
||||
dir1.close();
|
||||
dir2.close();
|
||||
}
|
||||
|
@ -153,9 +259,9 @@ public class TestParallelReader extends LuceneTestCase {
|
|||
private IndexSearcher parallel(Random random) throws IOException {
|
||||
dir1 = getDir1(random);
|
||||
dir2 = getDir2(random);
|
||||
ParallelReader pr = new ParallelReader();
|
||||
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)));
|
||||
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2)));
|
||||
ParallelAtomicReader pr = new ParallelAtomicReader(
|
||||
SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)),
|
||||
SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2)));
|
||||
return newSearcher(pr);
|
||||
}
|
||||
|
|
@ -0,0 +1,376 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.TextField;
|
||||
import org.apache.lucene.search.BooleanClause.Occur;
|
||||
import org.apache.lucene.search.*;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
|
||||
public class TestParallelCompositeReader extends LuceneTestCase {
|
||||
|
||||
private IndexSearcher parallel, single;
|
||||
private Directory dir, dir1, dir2;
|
||||
|
||||
public void testQueries() throws Exception {
|
||||
single = single(random);
|
||||
parallel = parallel(random);
|
||||
|
||||
queryTest(new TermQuery(new Term("f1", "v1")));
|
||||
queryTest(new TermQuery(new Term("f1", "v2")));
|
||||
queryTest(new TermQuery(new Term("f2", "v1")));
|
||||
queryTest(new TermQuery(new Term("f2", "v2")));
|
||||
queryTest(new TermQuery(new Term("f3", "v1")));
|
||||
queryTest(new TermQuery(new Term("f3", "v2")));
|
||||
queryTest(new TermQuery(new Term("f4", "v1")));
|
||||
queryTest(new TermQuery(new Term("f4", "v2")));
|
||||
|
||||
BooleanQuery bq1 = new BooleanQuery();
|
||||
bq1.add(new TermQuery(new Term("f1", "v1")), Occur.MUST);
|
||||
bq1.add(new TermQuery(new Term("f4", "v1")), Occur.MUST);
|
||||
queryTest(bq1);
|
||||
|
||||
single.getIndexReader().close(); single = null;
|
||||
parallel.getIndexReader().close(); parallel = null;
|
||||
dir.close(); dir = null;
|
||||
dir1.close(); dir1 = null;
|
||||
dir2.close(); dir2 = null;
|
||||
}
|
||||
|
||||
public void testRefCounts1() throws IOException {
|
||||
Directory dir1 = getDir1(random);
|
||||
Directory dir2 = getDir2(random);
|
||||
DirectoryReader ir1, ir2;
|
||||
// close subreaders, ParallelReader will not change refCounts, but close on its own close
|
||||
ParallelCompositeReader pr = new ParallelCompositeReader(ir1 = DirectoryReader.open(dir1),
|
||||
ir2 = DirectoryReader.open(dir2));
|
||||
// check RefCounts
|
||||
assertEquals(1, ir1.getRefCount());
|
||||
assertEquals(1, ir2.getRefCount());
|
||||
pr.close();
|
||||
assertEquals(0, ir1.getRefCount());
|
||||
assertEquals(0, ir2.getRefCount());
|
||||
dir1.close();
|
||||
dir2.close();
|
||||
}
|
||||
|
||||
public void testRefCounts2() throws IOException {
|
||||
Directory dir1 = getDir1(random);
|
||||
Directory dir2 = getDir2(random);
|
||||
DirectoryReader ir1 = DirectoryReader.open(dir1);
|
||||
DirectoryReader ir2 = DirectoryReader.open(dir2);
|
||||
|
||||
// don't close subreaders, so ParallelReader will increment refcounts
|
||||
ParallelCompositeReader pr = new ParallelCompositeReader(false, ir1, ir2);
|
||||
// check RefCounts
|
||||
assertEquals(2, ir1.getRefCount());
|
||||
assertEquals(2, ir2.getRefCount());
|
||||
pr.close();
|
||||
assertEquals(1, ir1.getRefCount());
|
||||
assertEquals(1, ir2.getRefCount());
|
||||
ir1.close();
|
||||
ir2.close();
|
||||
assertEquals(0, ir1.getRefCount());
|
||||
assertEquals(0, ir2.getRefCount());
|
||||
dir1.close();
|
||||
dir2.close();
|
||||
}
|
||||
|
||||
public void testIncompatibleIndexes1() throws IOException {
|
||||
// two documents:
|
||||
Directory dir1 = getDir1(random);
|
||||
|
||||
// one document only:
|
||||
Directory dir2 = newDirectory();
|
||||
IndexWriter w2 = new IndexWriter(dir2, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random)));
|
||||
Document d3 = new Document();
|
||||
|
||||
d3.add(newField("f3", "v1", TextField.TYPE_STORED));
|
||||
w2.addDocument(d3);
|
||||
w2.close();
|
||||
|
||||
DirectoryReader ir1 = DirectoryReader.open(dir1),
|
||||
ir2 = DirectoryReader.open(dir2);
|
||||
try {
|
||||
new ParallelCompositeReader(ir1, ir2);
|
||||
fail("didn't get expected exception: indexes don't have same number of documents");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// expected exception
|
||||
}
|
||||
try {
|
||||
new ParallelCompositeReader(random.nextBoolean(), ir1, ir2);
|
||||
fail("didn't get expected exception: indexes don't have same number of documents");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// expected exception
|
||||
}
|
||||
assertEquals(1, ir1.getRefCount());
|
||||
assertEquals(1, ir2.getRefCount());
|
||||
ir1.close();
|
||||
ir2.close();
|
||||
assertEquals(0, ir1.getRefCount());
|
||||
assertEquals(0, ir2.getRefCount());
|
||||
dir1.close();
|
||||
dir2.close();
|
||||
}
|
||||
|
||||
public void testIncompatibleIndexes2() throws IOException {
|
||||
Directory dir1 = getDir1(random);
|
||||
Directory dir2 = getInvalidStructuredDir2(random);
|
||||
|
||||
DirectoryReader ir1 = DirectoryReader.open(dir1),
|
||||
ir2 = DirectoryReader.open(dir2);
|
||||
CompositeReader[] readers = new CompositeReader[] {ir1, ir2};
|
||||
try {
|
||||
new ParallelCompositeReader(ir1, ir2);
|
||||
fail("didn't get expected exception: indexes don't have same subreader structure");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// expected exception
|
||||
}
|
||||
try {
|
||||
new ParallelCompositeReader(random.nextBoolean(), readers, readers);
|
||||
fail("didn't get expected exception: indexes don't have same subreader structure");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// expected exception
|
||||
}
|
||||
assertEquals(1, ir1.getRefCount());
|
||||
assertEquals(1, ir2.getRefCount());
|
||||
ir1.close();
|
||||
ir2.close();
|
||||
assertEquals(0, ir1.getRefCount());
|
||||
assertEquals(0, ir2.getRefCount());
|
||||
dir1.close();
|
||||
dir2.close();
|
||||
}
|
||||
|
||||
public void testIgnoreStoredFields() throws IOException {
|
||||
Directory dir1 = getDir1(random);
|
||||
Directory dir2 = getDir2(random);
|
||||
CompositeReader ir1 = DirectoryReader.open(dir1);
|
||||
CompositeReader ir2 = DirectoryReader.open(dir2);
|
||||
|
||||
// with overlapping
|
||||
ParallelCompositeReader pr = new ParallelCompositeReader(false,
|
||||
new CompositeReader[] {ir1, ir2},
|
||||
new CompositeReader[] {ir1});
|
||||
assertEquals("v1", pr.document(0).get("f1"));
|
||||
assertEquals("v1", pr.document(0).get("f2"));
|
||||
assertNull(pr.document(0).get("f3"));
|
||||
assertNull(pr.document(0).get("f4"));
|
||||
// check that fields are there
|
||||
AtomicReader slow = SlowCompositeReaderWrapper.wrap(pr);
|
||||
assertNotNull(slow.terms("f1"));
|
||||
assertNotNull(slow.terms("f2"));
|
||||
assertNotNull(slow.terms("f3"));
|
||||
assertNotNull(slow.terms("f4"));
|
||||
pr.close();
|
||||
|
||||
// no stored fields at all
|
||||
pr = new ParallelCompositeReader(false,
|
||||
new CompositeReader[] {ir2},
|
||||
new CompositeReader[0]);
|
||||
assertNull(pr.document(0).get("f1"));
|
||||
assertNull(pr.document(0).get("f2"));
|
||||
assertNull(pr.document(0).get("f3"));
|
||||
assertNull(pr.document(0).get("f4"));
|
||||
// check that fields are there
|
||||
slow = SlowCompositeReaderWrapper.wrap(pr);
|
||||
assertNull(slow.terms("f1"));
|
||||
assertNull(slow.terms("f2"));
|
||||
assertNotNull(slow.terms("f3"));
|
||||
assertNotNull(slow.terms("f4"));
|
||||
pr.close();
|
||||
|
||||
// without overlapping
|
||||
pr = new ParallelCompositeReader(true,
|
||||
new CompositeReader[] {ir2},
|
||||
new CompositeReader[] {ir1});
|
||||
assertEquals("v1", pr.document(0).get("f1"));
|
||||
assertEquals("v1", pr.document(0).get("f2"));
|
||||
assertNull(pr.document(0).get("f3"));
|
||||
assertNull(pr.document(0).get("f4"));
|
||||
// check that fields are there
|
||||
slow = SlowCompositeReaderWrapper.wrap(pr);
|
||||
assertNull(slow.terms("f1"));
|
||||
assertNull(slow.terms("f2"));
|
||||
assertNotNull(slow.terms("f3"));
|
||||
assertNotNull(slow.terms("f4"));
|
||||
pr.close();
|
||||
|
||||
// no main readers
|
||||
try {
|
||||
new ParallelCompositeReader(true,
|
||||
new CompositeReader[0],
|
||||
new CompositeReader[] {ir1});
|
||||
fail("didn't get expected exception: need a non-empty main-reader array");
|
||||
} catch (IllegalArgumentException iae) {
|
||||
// pass
|
||||
}
|
||||
|
||||
dir1.close();
|
||||
dir2.close();
|
||||
}
|
||||
|
||||
private void queryTest(Query query) throws IOException {
|
||||
ScoreDoc[] parallelHits = parallel.search(query, null, 1000).scoreDocs;
|
||||
ScoreDoc[] singleHits = single.search(query, null, 1000).scoreDocs;
|
||||
assertEquals(parallelHits.length, singleHits.length);
|
||||
for(int i = 0; i < parallelHits.length; i++) {
|
||||
assertEquals(parallelHits[i].score, singleHits[i].score, 0.001f);
|
||||
Document docParallel = parallel.doc(parallelHits[i].doc);
|
||||
Document docSingle = single.doc(singleHits[i].doc);
|
||||
assertEquals(docParallel.get("f1"), docSingle.get("f1"));
|
||||
assertEquals(docParallel.get("f2"), docSingle.get("f2"));
|
||||
assertEquals(docParallel.get("f3"), docSingle.get("f3"));
|
||||
assertEquals(docParallel.get("f4"), docSingle.get("f4"));
|
||||
}
|
||||
}
|
||||
|
||||
// Fields 1-4 indexed together:
|
||||
private IndexSearcher single(Random random) throws IOException {
|
||||
dir = newDirectory();
|
||||
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random)));
|
||||
Document d1 = new Document();
|
||||
d1.add(newField("f1", "v1", TextField.TYPE_STORED));
|
||||
d1.add(newField("f2", "v1", TextField.TYPE_STORED));
|
||||
d1.add(newField("f3", "v1", TextField.TYPE_STORED));
|
||||
d1.add(newField("f4", "v1", TextField.TYPE_STORED));
|
||||
w.addDocument(d1);
|
||||
Document d2 = new Document();
|
||||
d2.add(newField("f1", "v2", TextField.TYPE_STORED));
|
||||
d2.add(newField("f2", "v2", TextField.TYPE_STORED));
|
||||
d2.add(newField("f3", "v2", TextField.TYPE_STORED));
|
||||
d2.add(newField("f4", "v2", TextField.TYPE_STORED));
|
||||
w.addDocument(d2);
|
||||
Document d3 = new Document();
|
||||
d3.add(newField("f1", "v3", TextField.TYPE_STORED));
|
||||
d3.add(newField("f2", "v3", TextField.TYPE_STORED));
|
||||
d3.add(newField("f3", "v3", TextField.TYPE_STORED));
|
||||
d3.add(newField("f4", "v3", TextField.TYPE_STORED));
|
||||
w.addDocument(d3);
|
||||
Document d4 = new Document();
|
||||
d4.add(newField("f1", "v4", TextField.TYPE_STORED));
|
||||
d4.add(newField("f2", "v4", TextField.TYPE_STORED));
|
||||
d4.add(newField("f3", "v4", TextField.TYPE_STORED));
|
||||
d4.add(newField("f4", "v4", TextField.TYPE_STORED));
|
||||
w.addDocument(d4);
|
||||
w.close();
|
||||
|
||||
DirectoryReader ir = DirectoryReader.open(dir);
|
||||
return newSearcher(ir);
|
||||
}
|
||||
|
||||
// Fields 1 & 2 in one index, 3 & 4 in other, with ParallelReader:
|
||||
private IndexSearcher parallel(Random random) throws IOException {
|
||||
dir1 = getDir1(random);
|
||||
dir2 = getDir2(random);
|
||||
final DirectoryReader rd1 = DirectoryReader.open(dir1),
|
||||
rd2 = DirectoryReader.open(dir2);
|
||||
assertEquals(3, rd1.getSequentialSubReaders().length);
|
||||
assertEquals(3, rd2.getSequentialSubReaders().length);
|
||||
ParallelCompositeReader pr = new ParallelCompositeReader(rd1, rd2);
|
||||
return newSearcher(pr);
|
||||
}
|
||||
|
||||
// subreader structure: (1,2,1)
|
||||
private Directory getDir1(Random random) throws IOException {
|
||||
Directory dir1 = newDirectory();
|
||||
IndexWriter w1 = new IndexWriter(dir1, newIndexWriterConfig(TEST_VERSION_CURRENT,
|
||||
new MockAnalyzer(random)).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES));
|
||||
Document d1 = new Document();
|
||||
d1.add(newField("f1", "v1", TextField.TYPE_STORED));
|
||||
d1.add(newField("f2", "v1", TextField.TYPE_STORED));
|
||||
w1.addDocument(d1);
|
||||
w1.commit();
|
||||
Document d2 = new Document();
|
||||
d2.add(newField("f1", "v2", TextField.TYPE_STORED));
|
||||
d2.add(newField("f2", "v2", TextField.TYPE_STORED));
|
||||
w1.addDocument(d2);
|
||||
Document d3 = new Document();
|
||||
d3.add(newField("f1", "v3", TextField.TYPE_STORED));
|
||||
d3.add(newField("f2", "v3", TextField.TYPE_STORED));
|
||||
w1.addDocument(d3);
|
||||
w1.commit();
|
||||
Document d4 = new Document();
|
||||
d4.add(newField("f1", "v4", TextField.TYPE_STORED));
|
||||
d4.add(newField("f2", "v4", TextField.TYPE_STORED));
|
||||
w1.addDocument(d4);
|
||||
w1.close();
|
||||
return dir1;
|
||||
}
|
||||
|
||||
// subreader structure: (1,2,1)
|
||||
private Directory getDir2(Random random) throws IOException {
|
||||
Directory dir2 = newDirectory();
|
||||
IndexWriter w2 = new IndexWriter(dir2, newIndexWriterConfig(TEST_VERSION_CURRENT,
|
||||
new MockAnalyzer(random)).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES));
|
||||
Document d1 = new Document();
|
||||
d1.add(newField("f3", "v1", TextField.TYPE_STORED));
|
||||
d1.add(newField("f4", "v1", TextField.TYPE_STORED));
|
||||
w2.addDocument(d1);
|
||||
w2.commit();
|
||||
Document d2 = new Document();
|
||||
d2.add(newField("f3", "v2", TextField.TYPE_STORED));
|
||||
d2.add(newField("f4", "v2", TextField.TYPE_STORED));
|
||||
w2.addDocument(d2);
|
||||
Document d3 = new Document();
|
||||
d3.add(newField("f3", "v3", TextField.TYPE_STORED));
|
||||
d3.add(newField("f4", "v3", TextField.TYPE_STORED));
|
||||
w2.addDocument(d3);
|
||||
w2.commit();
|
||||
Document d4 = new Document();
|
||||
d4.add(newField("f3", "v4", TextField.TYPE_STORED));
|
||||
d4.add(newField("f4", "v4", TextField.TYPE_STORED));
|
||||
w2.addDocument(d4);
|
||||
w2.close();
|
||||
return dir2;
|
||||
}
|
||||
|
||||
// this dir has a different subreader structure (1,1,2);
|
||||
private Directory getInvalidStructuredDir2(Random random) throws IOException {
|
||||
Directory dir2 = newDirectory();
|
||||
IndexWriter w2 = new IndexWriter(dir2, newIndexWriterConfig(TEST_VERSION_CURRENT,
|
||||
new MockAnalyzer(random)).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES));
|
||||
Document d1 = new Document();
|
||||
d1.add(newField("f3", "v1", TextField.TYPE_STORED));
|
||||
d1.add(newField("f4", "v1", TextField.TYPE_STORED));
|
||||
w2.addDocument(d1);
|
||||
w2.commit();
|
||||
Document d2 = new Document();
|
||||
d2.add(newField("f3", "v2", TextField.TYPE_STORED));
|
||||
d2.add(newField("f4", "v2", TextField.TYPE_STORED));
|
||||
w2.addDocument(d2);
|
||||
w2.commit();
|
||||
Document d3 = new Document();
|
||||
d3.add(newField("f3", "v3", TextField.TYPE_STORED));
|
||||
d3.add(newField("f4", "v3", TextField.TYPE_STORED));
|
||||
w2.addDocument(d3);
|
||||
Document d4 = new Document();
|
||||
d4.add(newField("f3", "v4", TextField.TYPE_STORED));
|
||||
d4.add(newField("f4", "v4", TextField.TYPE_STORED));
|
||||
w2.addDocument(d4);
|
||||
w2.close();
|
||||
return dir2;
|
||||
}
|
||||
|
||||
}
|
|
@ -30,7 +30,7 @@ import org.apache.lucene.document.TextField;
|
|||
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
|
||||
|
||||
/**
|
||||
* Some tests for {@link ParallelReader}s with empty indexes
|
||||
* Some tests for {@link ParallelAtomicReader}s with empty indexes
|
||||
*
|
||||
* @author Christian Kohlschuetter
|
||||
*/
|
||||
|
@ -46,20 +46,37 @@ public class TestParallelReaderEmptyIndex extends LuceneTestCase {
|
|||
Directory rd1 = newDirectory();
|
||||
IndexWriter iw = new IndexWriter(rd1, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random)));
|
||||
iw.close();
|
||||
|
||||
// create a copy:
|
||||
Directory rd2 = newDirectory(rd1);
|
||||
|
||||
Directory rdOut = newDirectory();
|
||||
|
||||
IndexWriter iwOut = new IndexWriter(rdOut, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random)));
|
||||
ParallelReader pr = new ParallelReader();
|
||||
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(rd1)));
|
||||
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(rd2)));
|
||||
|
||||
|
||||
ParallelAtomicReader apr = new ParallelAtomicReader(
|
||||
SlowCompositeReaderWrapper.wrap(DirectoryReader.open(rd1)),
|
||||
SlowCompositeReaderWrapper.wrap(DirectoryReader.open(rd2)));
|
||||
|
||||
// When unpatched, Lucene crashes here with a NoSuchElementException (caused by ParallelTermEnum)
|
||||
iwOut.addIndexes(pr);
|
||||
|
||||
iwOut.addIndexes(apr);
|
||||
iwOut.forceMerge(1);
|
||||
|
||||
// 2nd try with a readerless parallel reader
|
||||
iwOut.addIndexes(new ParallelAtomicReader());
|
||||
iwOut.forceMerge(1);
|
||||
|
||||
ParallelCompositeReader cpr = new ParallelCompositeReader(
|
||||
DirectoryReader.open(rd1),
|
||||
DirectoryReader.open(rd2));
|
||||
|
||||
// When unpatched, Lucene crashes here with a NoSuchElementException (caused by ParallelTermEnum)
|
||||
iwOut.addIndexes(cpr);
|
||||
iwOut.forceMerge(1);
|
||||
|
||||
// 2nd try with a readerless parallel reader
|
||||
iwOut.addIndexes(new ParallelCompositeReader());
|
||||
iwOut.forceMerge(1);
|
||||
|
||||
iwOut.close();
|
||||
rdOut.close();
|
||||
rd1.close();
|
||||
|
@ -115,15 +132,20 @@ public class TestParallelReaderEmptyIndex extends LuceneTestCase {
|
|||
Directory rdOut = newDirectory();
|
||||
|
||||
IndexWriter iwOut = new IndexWriter(rdOut, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random)));
|
||||
ParallelReader pr = new ParallelReader();
|
||||
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(rd1)));
|
||||
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(rd2)));
|
||||
final DirectoryReader reader1, reader2;
|
||||
ParallelAtomicReader pr = new ParallelAtomicReader(
|
||||
SlowCompositeReaderWrapper.wrap(reader1 = DirectoryReader.open(rd1)),
|
||||
SlowCompositeReaderWrapper.wrap(reader2 = DirectoryReader.open(rd2)));
|
||||
|
||||
// When unpatched, Lucene crashes here with an ArrayIndexOutOfBoundsException (caused by TermVectorsWriter)
|
||||
iwOut.addIndexes(pr);
|
||||
|
||||
// ParallelReader closes any IndexReader you added to it:
|
||||
pr.close();
|
||||
|
||||
// assert subreaders were closed
|
||||
assertEquals(0, reader1.getRefCount());
|
||||
assertEquals(0, reader2.getRefCount());
|
||||
|
||||
rd1.close();
|
||||
rd2.close();
|
||||
|
|
|
@ -72,9 +72,7 @@ public class TestParallelTermEnum extends LuceneTestCase {
|
|||
}
|
||||
|
||||
public void test1() throws IOException {
|
||||
ParallelReader pr = new ParallelReader();
|
||||
pr.add(ir1);
|
||||
pr.add(ir2);
|
||||
ParallelAtomicReader pr = new ParallelAtomicReader(ir1, ir2);
|
||||
|
||||
Bits liveDocs = pr.getLiveDocs();
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.MultiReader;
|
||||
import org.apache.lucene.index.ParallelReader;
|
||||
import org.apache.lucene.index.ParallelAtomicReader;
|
||||
import org.apache.lucene.index.SlowCompositeReaderWrapper;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.Query;
|
||||
|
@ -68,8 +68,7 @@ public class TestFacetsAccumulatorWithComplement extends FacetTestBase {
|
|||
@Test
|
||||
public void testComplementsWithParallerReader() throws Exception {
|
||||
IndexReader origReader = indexReader;
|
||||
ParallelReader pr = new ParallelReader(true);
|
||||
pr.add(SlowCompositeReaderWrapper.wrap(origReader));
|
||||
ParallelAtomicReader pr = new ParallelAtomicReader(SlowCompositeReaderWrapper.wrap(origReader));
|
||||
indexReader = pr;
|
||||
try {
|
||||
doTestComplements();
|
||||
|
|
Loading…
Reference in New Issue