LUCENE-3736: Refactor ParallelReader to ParallelAtomicReader and ParallelCompositeReader

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1241470 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uwe Schindler 2012-02-07 14:24:40 +00:00
parent 36ff785606
commit fb27a1f4e6
8 changed files with 896 additions and 343 deletions

View File

@ -0,0 +1,321 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.lucene.util.Bits;
/** An {@link AtomicReader} which reads multiple, parallel indexes. Each index added
* must have the same number of documents, but typically each contains
* different fields. Each document contains the union of the fields of all
* documents with the same document number. When searching, matches for a
* query term are from the first index added that has the field.
*
* <p>This is useful, e.g., with collections that have large fields which
* change rarely and small fields that change more frequently. The smaller
* fields may be re-indexed in a new index and both indexes may be searched
* together.
*
* <p>To create instances of {@code ParallelAtomicReader}, use the provided
* {@link ParallelAtomicReader.Builder}.
*
* <p><strong>Warning:</strong> It is up to you to make sure all indexes
* are created and modified the same way. For example, if you add
* documents to one index, you need to add the same documents in the
* same order to the other indexes. <em>Failure to do so will result in
* undefined behavior</em>.
*/
public final class ParallelAtomicReader extends AtomicReader {
private final FieldInfos fieldInfos = new FieldInfos();
private final ParallelFields fields = new ParallelFields();
private final AtomicReader[] parallelReaders, storedFieldReaders;
private final boolean closeSubReaders;
private final int maxDoc, numDocs;
private final boolean hasDeletions;
final SortedMap<String,AtomicReader> fieldToReader = new TreeMap<String,AtomicReader>();
// only called from builder!!!
ParallelAtomicReader(boolean closeSubReaders, AtomicReader[] readers, AtomicReader[] storedFieldReaders) throws IOException {
this.closeSubReaders = closeSubReaders;
assert readers.length >= storedFieldReaders.length;
this.parallelReaders = readers;
this.storedFieldReaders = storedFieldReaders;
this.numDocs = (readers.length > 0) ? readers[0].numDocs() : 0;
this.maxDoc = (readers.length > 0) ? readers[0].maxDoc() : 0;
this.hasDeletions = (readers.length > 0) ? readers[0].hasDeletions() : false;
for (final AtomicReader reader : readers) {
final FieldInfos readerFieldInfos = reader.getFieldInfos();
for(FieldInfo fieldInfo : readerFieldInfos) { // update fieldToReader map
// NOTE: first reader having a given field "wins":
if (fieldToReader.get(fieldInfo.name) == null) {
fieldInfos.add(fieldInfo);
fieldToReader.put(fieldInfo.name, reader);
this.fields.addField(fieldInfo.name, reader.terms(fieldInfo.name));
}
}
if (!closeSubReaders) {
reader.incRef();
}
}
}
@Override
public String toString() {
final StringBuilder buffer = new StringBuilder("ParallelAtomicReader(");
for (final Iterator<AtomicReader> iter = Arrays.asList(parallelReaders).iterator(); iter.hasNext();) {
buffer.append(iter.next());
if (iter.hasNext()) buffer.append(", ");
}
return buffer.append(')').toString();
}
private final class ParallelFieldsEnum extends FieldsEnum {
private String currentField;
private final Iterator<String> keys;
private final Fields fields;
ParallelFieldsEnum(Fields fields) {
this.fields = fields;
keys = fieldToReader.keySet().iterator();
}
@Override
public String next() throws IOException {
if (keys.hasNext()) {
currentField = keys.next();
} else {
currentField = null;
}
return currentField;
}
@Override
public Terms terms() throws IOException {
return fields.terms(currentField);
}
}
// Single instance of this, per ParallelReader instance
private final class ParallelFields extends Fields {
final HashMap<String,Terms> fields = new HashMap<String,Terms>();
ParallelFields() {
}
void addField(String fieldName, Terms terms) throws IOException {
fields.put(fieldName, terms);
}
@Override
public FieldsEnum iterator() throws IOException {
return new ParallelFieldsEnum(this);
}
@Override
public Terms terms(String field) throws IOException {
return fields.get(field);
}
@Override
public int getUniqueFieldCount() throws IOException {
return fields.size();
}
}
@Override
public FieldInfos getFieldInfos() {
return fieldInfos;
}
@Override
public Bits getLiveDocs() {
ensureOpen();
return hasDeletions ? parallelReaders[0].getLiveDocs() : null;
}
@Override
public Fields fields() {
ensureOpen();
return fields;
}
@Override
public int numDocs() {
// Don't call ensureOpen() here (it could affect performance)
return numDocs;
}
@Override
public int maxDoc() {
// Don't call ensureOpen() here (it could affect performance)
return maxDoc;
}
@Override
public boolean hasDeletions() {
ensureOpen();
return hasDeletions;
}
@Override
public void document(int docID, StoredFieldVisitor visitor) throws CorruptIndexException, IOException {
ensureOpen();
for (final AtomicReader reader: storedFieldReaders) {
reader.document(docID, visitor);
}
}
// get all vectors
@Override
public Fields getTermVectors(int docID) throws IOException {
ensureOpen();
ParallelFields fields = new ParallelFields();
for (Map.Entry<String,AtomicReader> ent : fieldToReader.entrySet()) {
String fieldName = ent.getKey();
Terms vector = ent.getValue().getTermVector(docID, fieldName);
if (vector != null) {
fields.addField(fieldName, vector);
}
}
return fields;
}
@Override
public boolean hasNorms(String field) throws IOException {
ensureOpen();
AtomicReader reader = fieldToReader.get(field);
return reader==null ? false : reader.hasNorms(field);
}
@Override
protected synchronized void doClose() throws IOException {
IOException ioe = null;
for (AtomicReader reader : parallelReaders) {
try {
if (closeSubReaders) {
reader.close();
} else {
reader.decRef();
}
} catch (IOException e) {
if (ioe == null) ioe = e;
}
}
// throw the first exception
if (ioe != null) throw ioe;
}
@Override
public DocValues docValues(String field) throws IOException {
ensureOpen();
AtomicReader reader = fieldToReader.get(field);
return reader == null ? null : reader.docValues(field);
}
@Override
public DocValues normValues(String field) throws IOException {
ensureOpen();
AtomicReader reader = fieldToReader.get(field);
return reader == null ? null : reader.normValues(field);
}
/**
* Builder implementation to create instances of {@link ParallelAtomicReader}.
*/
public static final class Builder {
private final boolean closeSubReaders;
private final List<AtomicReader> parallelReaders = new ArrayList<AtomicReader>();
private final List<AtomicReader> storedFieldReaders = new ArrayList<AtomicReader>();
private int maxDoc, numDocs;
/**
* Create a new builder instance that automatically enables closing of all subreader
* once the build reader is closed.
*/
public Builder() {
this(true);
}
/**
* Create a new builder instance.
*/
public Builder(boolean closeSubReaders) {
this.closeSubReaders = closeSubReaders;
}
/** Add an AtomicReader.
* @throws IOException if there is a low-level IO error
*/
public Builder add(AtomicReader reader) throws IOException {
return add(reader, false);
}
/** Add an AtomicReader whose stored fields will not be returned. This can
* accelerate search when stored fields are only needed from a subset of
* the IndexReaders.
*
* @throws IllegalArgumentException if not all indexes contain the same number
* of documents
* @throws IllegalArgumentException if not all indexes have the same value
* of {@link AtomicReader#maxDoc()}
* @throws IOException if there is a low-level IO error
*/
public Builder add(AtomicReader reader, boolean ignoreStoredFields) throws IOException {
if (parallelReaders.isEmpty()) {
this.maxDoc = reader.maxDoc();
this.numDocs = reader.numDocs();
} else {
// check compatibility
if (reader.maxDoc() != maxDoc)
throw new IllegalArgumentException("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
if (reader.numDocs() != numDocs)
throw new IllegalArgumentException("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs());
}
if (!ignoreStoredFields)
storedFieldReaders.add(reader); // add to storedFieldReaders
parallelReaders.add(reader);
return this;
}
/**
* Build the {@link ParallelAtomicReader} instance from the settings.
*/
public ParallelAtomicReader build() throws IOException {
return new ParallelAtomicReader(
closeSubReaders,
parallelReaders.toArray(new AtomicReader[parallelReaders.size()]),
storedFieldReaders.toArray(new AtomicReader[storedFieldReaders.size()])
);
}
}
}

View File

@ -0,0 +1,223 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
/** An {@link CompositeReader} which reads multiple, parallel indexes. Each index added
* must have the same number of documents, and exactly the same hierarchical subreader structure,
* but typically each contains different fields. Each document contains the
* union of the fields of all
* documents with the same document number. When searching, matches for a
* query term are from the first index added that has the field.
*
* <p>This is useful, e.g., with collections that have large fields which
* change rarely and small fields that change more frequently. The smaller
* fields may be re-indexed in a new index and both indexes may be searched
* together.
*
* <p>To create instances of {@code ParallelCompositeReader}, use the provided
* {@link ParallelCompositeReader.Builder}.
*
* <p><strong>Warning:</strong> It is up to you to make sure all indexes
* are created and modified the same way. For example, if you add
* documents to one index, you need to add the same documents in the
* same order to the other indexes. <em>Failure to do so will result in
* undefined behavior</em>.
* A good strategy to create suitable indexes with {@link IndexWriter} is to use
* {@link LogDocMergePolicy}, as this one does not reorder documents
* during merging (like {@code TieredMergePolicy}) and triggers merges
* by number of documents per segment. If you use different {@link MergePolicy}s
* it might happen that the segment structure of your index is no longer predictable.
* {@link ParallelCompositeReader.Builder} will throw exceptions if the structure
* of the underlying segments do not match for each parallel reader.
*/
public final class ParallelCompositeReader extends BaseMultiReader<IndexReader> {
private final boolean closeSubReaders;
private final CompositeReader parallelReaders[];
// only called from builder!!!
ParallelCompositeReader(boolean closeSubReaders, List<CompositeReader> parallelReaders, BitSet ignoreStoredFieldsSet) throws IOException {
super(prepareSubReaders(parallelReaders, ignoreStoredFieldsSet));
this.closeSubReaders = closeSubReaders;
this.parallelReaders = parallelReaders.toArray(new CompositeReader[parallelReaders.size()]);
if (!closeSubReaders) {
for (CompositeReader reader : this.parallelReaders) {
reader.incRef();
}
}
}
private static IndexReader[] prepareSubReaders(List<CompositeReader> parallelReaders, BitSet ignoreStoredFieldsSet) throws IOException {
if (parallelReaders.isEmpty()) {
return new IndexReader[0];
} else {
// hierarchically build the same subreader structure as the first CompositeReader with Parallel*Readers:
final IndexReader[]
firstSubReaders = parallelReaders.get(0).getSequentialSubReaders(),
subReaders = new IndexReader[firstSubReaders.length];
for (int i = 0; i < subReaders.length; i++) {
if (firstSubReaders[i] instanceof AtomicReader) {
// we simply enable closing of subReaders, to prevent incRefs on subReaders
// -> for synthetic subReaders, close() is never called by our doClose()
final ParallelAtomicReader.Builder builder = new ParallelAtomicReader.Builder(true);
for (int j = 0, c = parallelReaders.size(); j < c; j++) {
builder.add((AtomicReader) parallelReaders.get(j).getSequentialSubReaders()[i], ignoreStoredFieldsSet.get(j));
}
subReaders[i] = builder.build();
} else {
assert firstSubReaders[i] instanceof CompositeReader;
// we simply enable closing of subReaders, to prevent incRefs on subReaders
// -> for synthetic subReaders, close() is never called by our doClose()
final ParallelCompositeReader.Builder builder = new ParallelCompositeReader.Builder(true);
for (int j = 0, c = parallelReaders.size(); j < c; j++) {
builder.add((CompositeReader) parallelReaders.get(j).getSequentialSubReaders()[i], ignoreStoredFieldsSet.get(j));
}
subReaders[i] = builder.build();
}
}
return subReaders;
}
}
@Override
public String toString() {
final StringBuilder buffer = new StringBuilder("ParallelCompositeReader(");
for (final Iterator<CompositeReader> iter = Arrays.asList(parallelReaders).iterator(); iter.hasNext();) {
buffer.append(iter.next());
if (iter.hasNext()) buffer.append(", ");
}
return buffer.append(')').toString();
}
@Override
protected synchronized void doClose() throws IOException {
IOException ioe = null;
for (final CompositeReader reader : parallelReaders) {
try {
if (closeSubReaders) {
reader.close();
} else {
reader.decRef();
}
} catch (IOException e) {
if (ioe == null) ioe = e;
}
}
// throw the first exception
if (ioe != null) throw ioe;
}
/**
* Builder implementation to create instances of {@link ParallelCompositeReader}.
*/
public static final class Builder {
private final boolean closeSubReaders;
private final List<CompositeReader> readers = new ArrayList<CompositeReader>();
private final BitSet ignoreStoredFieldsSet = new BitSet();
private int[] leaveSizes, childSizes;
private int maxDoc, numDocs;
/**
* Create a new builder instance that automatically enables closing of all subreader
* once the build reader is closed.
*/
public Builder() {
this(true);
}
/**
* Create a new builder instance.
*/
public Builder(boolean closeSubReaders) {
this.closeSubReaders = closeSubReaders;
}
/** Add an CompositeReader.
* @throws IOException if there is a low-level IO error
*/
public Builder add(CompositeReader reader) throws IOException {
return add(reader, false);
}
/** Add an CompositeReader whose stored fields will not be returned. This can
* accelerate search when stored fields are only needed from a subset of
* the IndexReaders.
*
* @throws IllegalArgumentException if not all indexes contain the same number
* of documents
* @throws IllegalArgumentException if not all indexes have the same value
* of {@link AtomicReader#maxDoc()}
* @throws IOException if there is a low-level IO error
*/
public Builder add(CompositeReader reader, boolean ignoreStoredFields) throws IOException {
final IndexReader[] subs = reader.getSequentialSubReaders();
if (readers.isEmpty()) {
this.maxDoc = reader.maxDoc();
this.numDocs = reader.numDocs();
childSizes = new int[subs.length];
for (int i = 0; i < subs.length; i++) {
childSizes[i] = subs[i].maxDoc();
}
final AtomicReaderContext[] leaves = reader.getTopReaderContext().leaves();
leaveSizes = new int[leaves.length];
for (int i = 0; i < leaves.length; i++) {
leaveSizes[i] = leaves[i].reader().maxDoc();
}
} else {
// check compatibility
if (reader.maxDoc() != maxDoc)
throw new IllegalArgumentException("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
if (reader.numDocs() != numDocs)
throw new IllegalArgumentException("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs());
if (subs.length != childSizes.length)
throw new IllegalArgumentException("All readers must have same number of subReaders");
for (int i = 0; i < subs.length; i++) {
if (subs[i].maxDoc() != childSizes[i])
throw new IllegalArgumentException("All readers must have same subReader maxDoc");
}
// the following checks are only to detect errors early, otherwise a wrong leaf
// structure would only cause errors on build(). These checks are still incomplete...
final AtomicReaderContext[] leaves = reader.getTopReaderContext().leaves();
if (leaves.length != leaveSizes.length)
throw new IllegalArgumentException("All readers must have same number of atomic leaves");
for (int i = 0; i < leaves.length; i++) {
if (leaves[i].reader().maxDoc() != leaveSizes[i])
throw new IllegalArgumentException("All readers must have atomic leaves with same maxDoc");
}
}
ignoreStoredFieldsSet.set(readers.size(), ignoreStoredFields);
readers.add(reader);
return this;
}
/**
* Build the {@link ParallelCompositeReader} instance from the settings.
*/
public ParallelCompositeReader build() throws IOException {
return new ParallelCompositeReader(closeSubReaders, readers, ignoreStoredFieldsSet);
}
}
}

View File

@ -1,298 +0,0 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.*;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
/** An AtomicIndexReader which reads multiple, parallel indexes. Each index added
* must have the same number of documents, but typically each contains
* different fields. Each document contains the union of the fields of all
* documents with the same document number. When searching, matches for a
* query term are from the first index added that has the field.
*
* <p>This is useful, e.g., with collections that have large fields which
* change rarely and small fields that change more frequently. The smaller
* fields may be re-indexed in a new index and both indexes may be searched
* together.
*
* <p><strong>Warning:</strong> It is up to you to make sure all indexes
* are created and modified the same way. For example, if you add
* documents to one index, you need to add the same documents in the
* same order to the other indexes. <em>Failure to do so will result in
* undefined behavior</em>.
*/
public class ParallelReader extends AtomicReader {
private List<AtomicReader> readers = new ArrayList<AtomicReader>();
private List<Boolean> decrefOnClose = new ArrayList<Boolean>(); // remember which subreaders to decRef on close
boolean incRefReaders = false;
private SortedMap<String,AtomicReader> fieldToReader = new TreeMap<String,AtomicReader>();
private Map<AtomicReader,Collection<String>> readerToFields = new HashMap<AtomicReader,Collection<String>>();
private List<AtomicReader> storedFieldReaders = new ArrayList<AtomicReader>();
private Map<String, DocValues> normsCache = new HashMap<String,DocValues>();
private int maxDoc;
private int numDocs;
private boolean hasDeletions;
private final FieldInfos fieldInfos;
private final ParallelFields fields = new ParallelFields();
/** Construct a ParallelReader.
* <p>Note that all subreaders are closed if this ParallelReader is closed.</p>
*/
public ParallelReader() throws IOException { this(true); }
/** Construct a ParallelReader.
* @param closeSubReaders indicates whether the subreaders should be closed
* when this ParallelReader is closed
*/
public ParallelReader(boolean closeSubReaders) throws IOException {
super();
this.incRefReaders = !closeSubReaders;
fieldInfos = new FieldInfos();
}
/** {@inheritDoc} */
@Override
public String toString() {
final StringBuilder buffer = new StringBuilder("ParallelReader(");
final Iterator<AtomicReader> iter = readers.iterator();
if (iter.hasNext()) {
buffer.append(iter.next());
}
while (iter.hasNext()) {
buffer.append(", ").append(iter.next());
}
buffer.append(')');
return buffer.toString();
}
/** Add an AtomicIndexReader.
* @throws IOException if there is a low-level IO error
*/
public void add(AtomicReader reader) throws IOException {
ensureOpen();
add(reader, false);
}
/** Add an AtomicIndexReader whose stored fields will not be returned. This can
* accelerate search when stored fields are only needed from a subset of
* the IndexReaders.
*
* @throws IllegalArgumentException if not all indexes contain the same number
* of documents
* @throws IllegalArgumentException if not all indexes have the same value
* of {@link AtomicReader#maxDoc()}
* @throws IOException if there is a low-level IO error
*/
public void add(AtomicReader reader, boolean ignoreStoredFields)
throws IOException {
ensureOpen();
if (readers.size() == 0) {
this.maxDoc = reader.maxDoc();
this.numDocs = reader.numDocs();
this.hasDeletions = reader.hasDeletions();
}
if (reader.maxDoc() != maxDoc) // check compatibility
throw new IllegalArgumentException
("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
if (reader.numDocs() != numDocs)
throw new IllegalArgumentException
("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs());
final FieldInfos readerFieldInfos = MultiFields.getMergedFieldInfos(reader);
for(FieldInfo fieldInfo : readerFieldInfos) { // update fieldToReader map
// NOTE: first reader having a given field "wins":
if (fieldToReader.get(fieldInfo.name) == null) {
fieldInfos.add(fieldInfo);
fieldToReader.put(fieldInfo.name, reader);
this.fields.addField(fieldInfo.name, reader.terms(fieldInfo.name));
}
}
if (!ignoreStoredFields)
storedFieldReaders.add(reader); // add to storedFieldReaders
readers.add(reader);
if (incRefReaders) {
reader.incRef();
}
decrefOnClose.add(Boolean.valueOf(incRefReaders));
synchronized(normsCache) {
normsCache.clear(); // TODO: don't need to clear this for all fields really?
}
}
private class ParallelFieldsEnum extends FieldsEnum {
String currentField;
Iterator<String> keys;
private final Fields fields;
ParallelFieldsEnum(Fields fields) {
this.fields = fields;
keys = fieldToReader.keySet().iterator();
}
@Override
public String next() throws IOException {
if (keys.hasNext()) {
currentField = keys.next();
} else {
currentField = null;
}
return currentField;
}
@Override
public Terms terms() throws IOException {
return fields.terms(currentField);
}
}
// Single instance of this, per ParallelReader instance
private class ParallelFields extends Fields {
final HashMap<String,Terms> fields = new HashMap<String,Terms>();
public void addField(String fieldName, Terms terms) throws IOException {
fields.put(fieldName, terms);
}
@Override
public FieldsEnum iterator() throws IOException {
return new ParallelFieldsEnum(this);
}
@Override
public Terms terms(String field) throws IOException {
return fields.get(field);
}
@Override
public int getUniqueFieldCount() throws IOException {
return fields.size();
}
}
@Override
public FieldInfos getFieldInfos() {
return fieldInfos;
}
@Override
public Bits getLiveDocs() {
ensureOpen();
return readers.get(0).getLiveDocs();
}
@Override
public Fields fields() {
ensureOpen();
return fields;
}
@Override
public int numDocs() {
// Don't call ensureOpen() here (it could affect performance)
return numDocs;
}
@Override
public int maxDoc() {
// Don't call ensureOpen() here (it could affect performance)
return maxDoc;
}
@Override
public boolean hasDeletions() {
ensureOpen();
return hasDeletions;
}
@Override
public void document(int docID, StoredFieldVisitor visitor) throws CorruptIndexException, IOException {
ensureOpen();
for (final AtomicReader reader: storedFieldReaders) {
reader.document(docID, visitor);
}
}
// get all vectors
@Override
public Fields getTermVectors(int docID) throws IOException {
ensureOpen();
ParallelFields fields = new ParallelFields();
for (Map.Entry<String,AtomicReader> ent : fieldToReader.entrySet()) {
String fieldName = ent.getKey();
Terms vector = ent.getValue().getTermVector(docID, fieldName);
if (vector != null) {
fields.addField(fieldName, vector);
}
}
return fields;
}
@Override
public boolean hasNorms(String field) throws IOException {
ensureOpen();
AtomicReader reader = fieldToReader.get(field);
return reader==null ? false : reader.hasNorms(field);
}
// for testing
AtomicReader[] getSubReaders() {
return readers.toArray(new AtomicReader[readers.size()]);
}
@Override
protected synchronized void doClose() throws IOException {
for (int i = 0; i < readers.size(); i++) {
if (decrefOnClose.get(i).booleanValue()) {
readers.get(i).decRef();
} else {
readers.get(i).close();
}
}
}
// TODO: I suspect this is completely untested!!!!!
@Override
public DocValues docValues(String field) throws IOException {
AtomicReader reader = fieldToReader.get(field);
return reader == null ? null : reader.docValues(field);
}
// TODO: I suspect this is completely untested!!!!!
@Override
public synchronized DocValues normValues(String field) throws IOException {
DocValues values = normsCache.get(field);
if (values == null) {
AtomicReader reader = fieldToReader.get(field);
values = reader == null ? null : reader.normValues(field);
normsCache.put(field, values);
}
return values;
}
}

View File

@ -28,30 +28,15 @@ import org.apache.lucene.search.*;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
public class TestParallelReader extends LuceneTestCase { public class TestParallelAtomicReader extends LuceneTestCase {
private IndexSearcher parallel; private IndexSearcher parallel, single;
private IndexSearcher single;
private Directory dir, dir1, dir2; private Directory dir, dir1, dir2;
@Override public void testQueries() throws Exception {
public void setUp() throws Exception {
super.setUp();
single = single(random); single = single(random);
parallel = parallel(random); parallel = parallel(random);
}
@Override
public void tearDown() throws Exception {
single.getIndexReader().close();
parallel.getIndexReader().close();
dir.close();
dir1.close();
dir2.close();
super.tearDown();
}
public void testQueries() throws Exception {
queryTest(new TermQuery(new Term("f1", "v1"))); queryTest(new TermQuery(new Term("f1", "v1")));
queryTest(new TermQuery(new Term("f1", "v2"))); queryTest(new TermQuery(new Term("f1", "v2")));
queryTest(new TermQuery(new Term("f2", "v1"))); queryTest(new TermQuery(new Term("f2", "v1")));
@ -65,14 +50,21 @@ public class TestParallelReader extends LuceneTestCase {
bq1.add(new TermQuery(new Term("f1", "v1")), Occur.MUST); bq1.add(new TermQuery(new Term("f1", "v1")), Occur.MUST);
bq1.add(new TermQuery(new Term("f4", "v1")), Occur.MUST); bq1.add(new TermQuery(new Term("f4", "v1")), Occur.MUST);
queryTest(bq1); queryTest(bq1);
single.getIndexReader().close(); single = null;
parallel.getIndexReader().close(); parallel = null;
dir.close(); dir = null;
dir1.close(); dir1 = null;
dir2.close(); dir2 = null;
} }
public void testFieldNames() throws Exception { public void testFieldNames() throws Exception {
Directory dir1 = getDir1(random); Directory dir1 = getDir1(random);
Directory dir2 = getDir2(random); Directory dir2 = getDir2(random);
ParallelReader pr = new ParallelReader(); ParallelAtomicReader pr = new ParallelAtomicReader.Builder()
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1))); .add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)))
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2))); .add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2)))
.build();
FieldInfos fieldInfos = pr.getFieldInfos(); FieldInfos fieldInfos = pr.getFieldInfos();
assertEquals(4, fieldInfos.size()); assertEquals(4, fieldInfos.size());
assertNotNull(fieldInfos.fieldInfo("f1")); assertNotNull(fieldInfos.fieldInfo("f1"));
@ -84,6 +76,48 @@ public class TestParallelReader extends LuceneTestCase {
dir2.close(); dir2.close();
} }
public void testRefCounts1() throws IOException {
Directory dir1 = getDir1(random);
Directory dir2 = getDir2(random);
AtomicReader ir1, ir2;
// close subreaders, ParallelReader will not change refCounts, but close on its own close
ParallelAtomicReader pr = new ParallelAtomicReader.Builder(true)
.add(ir1 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)))
.add(ir2 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2)))
.build();
// check RefCounts
assertEquals(1, ir1.getRefCount());
assertEquals(1, ir2.getRefCount());
pr.close();
assertEquals(0, ir1.getRefCount());
assertEquals(0, ir2.getRefCount());
dir1.close();
dir2.close();
}
public void testRefCounts2() throws IOException {
Directory dir1 = getDir1(random);
Directory dir2 = getDir2(random);
AtomicReader ir1, ir2;
// don't close subreaders, so ParallelReader will increment refcounts
ParallelAtomicReader pr = new ParallelAtomicReader.Builder(false)
.add(ir1 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)))
.add(ir2 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2)))
.build();
// check RefCounts
assertEquals(2, ir1.getRefCount());
assertEquals(2, ir2.getRefCount());
pr.close();
assertEquals(1, ir1.getRefCount());
assertEquals(1, ir2.getRefCount());
ir1.close();
ir2.close();
assertEquals(0, ir1.getRefCount());
assertEquals(0, ir2.getRefCount());
dir1.close();
dir2.close();
}
public void testIncompatibleIndexes() throws IOException { public void testIncompatibleIndexes() throws IOException {
// two documents: // two documents:
Directory dir1 = getDir1(random); Directory dir1 = getDir1(random);
@ -97,17 +131,43 @@ public class TestParallelReader extends LuceneTestCase {
w2.addDocument(d3); w2.addDocument(d3);
w2.close(); w2.close();
ParallelReader pr = new ParallelReader(); AtomicReader ir1 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)),
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1))); ir2 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2));;
DirectoryReader ir = DirectoryReader.open(dir2); ParallelAtomicReader.Builder builder = new ParallelAtomicReader.Builder(false).add(ir1);
try { try {
pr.add(SlowCompositeReaderWrapper.wrap(ir)); builder.add(ir2);
fail("didn't get exptected exception: indexes don't have same number of documents"); fail("didn't get exptected exception: indexes don't have same number of documents");
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// expected exception // expected exception
} }
ParallelAtomicReader pr = builder.build();
// check RefCounts
assertEquals(2, ir1.getRefCount());
assertEquals(1, ir2.getRefCount());
pr.close();
assertEquals(1, ir1.getRefCount());
assertEquals(1, ir2.getRefCount());
ir1.close();
ir2.close();
assertEquals(0, ir1.getRefCount());
assertEquals(0, ir2.getRefCount());
dir1.close();
dir2.close();
}
public void testignoreStoredFields() throws IOException {
Directory dir1 = getDir1(random);
Directory dir2 = getDir2(random);
ParallelAtomicReader pr = new ParallelAtomicReader.Builder()
.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)), false)
.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2)), true)
.build();
assertEquals("v1", pr.document(0).get("f1"));
assertEquals("v1", pr.document(0).get("f2"));
assertNull(pr.document(0).get("f3"));
assertNull(pr.document(0).get("f4"));
pr.close(); pr.close();
ir.close();
dir1.close(); dir1.close();
dir2.close(); dir2.close();
} }
@ -153,9 +213,10 @@ public class TestParallelReader extends LuceneTestCase {
private IndexSearcher parallel(Random random) throws IOException { private IndexSearcher parallel(Random random) throws IOException {
dir1 = getDir1(random); dir1 = getDir1(random);
dir2 = getDir2(random); dir2 = getDir2(random);
ParallelReader pr = new ParallelReader(); ParallelAtomicReader pr = new ParallelAtomicReader.Builder()
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1))); .add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)))
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2))); .add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir2)))
.build();
return newSearcher(pr); return newSearcher(pr);
} }

View File

@ -0,0 +1,241 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.Random;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.TextField;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.*;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
public class TestParallelCompositeReader extends LuceneTestCase {
private IndexSearcher parallel, single;
private Directory dir, dir1, dir2;
public void testQueries() throws Exception {
single = single(random);
parallel = parallel(random);
queryTest(new TermQuery(new Term("f1", "v1")));
queryTest(new TermQuery(new Term("f1", "v2")));
queryTest(new TermQuery(new Term("f2", "v1")));
queryTest(new TermQuery(new Term("f2", "v2")));
queryTest(new TermQuery(new Term("f3", "v1")));
queryTest(new TermQuery(new Term("f3", "v2")));
queryTest(new TermQuery(new Term("f4", "v1")));
queryTest(new TermQuery(new Term("f4", "v2")));
BooleanQuery bq1 = new BooleanQuery();
bq1.add(new TermQuery(new Term("f1", "v1")), Occur.MUST);
bq1.add(new TermQuery(new Term("f4", "v1")), Occur.MUST);
queryTest(bq1);
single.getIndexReader().close(); single = null;
parallel.getIndexReader().close(); parallel = null;
dir.close(); dir = null;
dir1.close(); dir1 = null;
dir2.close(); dir2 = null;
}
public void testRefCounts1() throws IOException {
Directory dir1 = getDir1(random);
Directory dir2 = getDir2(random);
DirectoryReader ir1, ir2;
// close subreaders, ParallelReader will not change refCounts, but close on its own close
ParallelCompositeReader pr = new ParallelCompositeReader.Builder(true)
.add(ir1 = DirectoryReader.open(dir1))
.add(ir2 = DirectoryReader.open(dir2))
.build();
// check RefCounts
assertEquals(1, ir1.getRefCount());
assertEquals(1, ir2.getRefCount());
pr.close();
assertEquals(0, ir1.getRefCount());
assertEquals(0, ir2.getRefCount());
dir1.close();
dir2.close();
}
public void testRefCounts2() throws IOException {
Directory dir1 = getDir1(random);
Directory dir2 = getDir2(random);
DirectoryReader ir1, ir2;
// don't close subreaders, so ParallelReader will increment refcounts
ParallelCompositeReader pr = new ParallelCompositeReader.Builder(false)
.add(ir1 = DirectoryReader.open(dir1))
.add(ir2 = DirectoryReader.open(dir2))
.build();
// check RefCounts
assertEquals(2, ir1.getRefCount());
assertEquals(2, ir2.getRefCount());
pr.close();
assertEquals(1, ir1.getRefCount());
assertEquals(1, ir2.getRefCount());
ir1.close();
ir2.close();
assertEquals(0, ir1.getRefCount());
assertEquals(0, ir2.getRefCount());
dir1.close();
dir2.close();
}
public void testIncompatibleIndexes() throws IOException {
// two documents:
Directory dir1 = getDir1(random);
// one document only:
Directory dir2 = newDirectory();
IndexWriter w2 = new IndexWriter(dir2, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random)));
Document d3 = new Document();
d3.add(newField("f3", "v1", TextField.TYPE_STORED));
w2.addDocument(d3);
w2.close();
DirectoryReader ir1 = DirectoryReader.open(dir1),
ir2 = DirectoryReader.open(dir2);
ParallelCompositeReader.Builder builder = new ParallelCompositeReader.Builder(false).add(ir1);
try {
builder.add(ir2);
fail("didn't get exptected exception: indexes don't have same number of documents");
} catch (IllegalArgumentException e) {
// expected exception
}
ParallelCompositeReader pr = builder.build();
// check RefCounts
assertEquals(2, ir1.getRefCount());
assertEquals(1, ir2.getRefCount());
pr.close();
assertEquals(1, ir1.getRefCount());
assertEquals(1, ir2.getRefCount());
ir1.close();
ir2.close();
assertEquals(0, ir1.getRefCount());
assertEquals(0, ir2.getRefCount());
dir1.close();
dir2.close();
}
public void testignoreStoredFields() throws IOException {
Directory dir1 = getDir1(random);
Directory dir2 = getDir2(random);
ParallelCompositeReader pr = new ParallelCompositeReader.Builder()
.add(DirectoryReader.open(dir1), false)
.add(DirectoryReader.open(dir2), true)
.build();
assertEquals("v1", pr.document(0).get("f1"));
assertEquals("v1", pr.document(0).get("f2"));
assertNull(pr.document(0).get("f3"));
assertNull(pr.document(0).get("f4"));
pr.close();
dir1.close();
dir2.close();
}
private void queryTest(Query query) throws IOException {
ScoreDoc[] parallelHits = parallel.search(query, null, 1000).scoreDocs;
ScoreDoc[] singleHits = single.search(query, null, 1000).scoreDocs;
assertEquals(parallelHits.length, singleHits.length);
for(int i = 0; i < parallelHits.length; i++) {
assertEquals(parallelHits[i].score, singleHits[i].score, 0.001f);
Document docParallel = parallel.doc(parallelHits[i].doc);
Document docSingle = single.doc(singleHits[i].doc);
assertEquals(docParallel.get("f1"), docSingle.get("f1"));
assertEquals(docParallel.get("f2"), docSingle.get("f2"));
assertEquals(docParallel.get("f3"), docSingle.get("f3"));
assertEquals(docParallel.get("f4"), docSingle.get("f4"));
}
}
// Fields 1-4 indexed together:
private IndexSearcher single(Random random) throws IOException {
dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random)));
Document d1 = new Document();
d1.add(newField("f1", "v1", TextField.TYPE_STORED));
d1.add(newField("f2", "v1", TextField.TYPE_STORED));
d1.add(newField("f3", "v1", TextField.TYPE_STORED));
d1.add(newField("f4", "v1", TextField.TYPE_STORED));
w.addDocument(d1);
Document d2 = new Document();
d2.add(newField("f1", "v2", TextField.TYPE_STORED));
d2.add(newField("f2", "v2", TextField.TYPE_STORED));
d2.add(newField("f3", "v2", TextField.TYPE_STORED));
d2.add(newField("f4", "v2", TextField.TYPE_STORED));
w.addDocument(d2);
w.close();
DirectoryReader ir = DirectoryReader.open(dir);
return newSearcher(ir);
}
// Fields 1 & 2 in one index, 3 & 4 in other, with ParallelReader:
private IndexSearcher parallel(Random random) throws IOException {
dir1 = getDir1(random);
dir2 = getDir2(random);
final DirectoryReader rd1 = DirectoryReader.open(dir1),
rd2 = DirectoryReader.open(dir2);
assertEquals(2, rd1.getSequentialSubReaders().length);
assertEquals(2, rd2.getSequentialSubReaders().length);
ParallelCompositeReader pr = new ParallelCompositeReader.Builder()
.add(rd1).add(rd2).build();
return newSearcher(pr);
}
private Directory getDir1(Random random) throws IOException {
Directory dir1 = newDirectory();
IndexWriter w1 = new IndexWriter(dir1, newIndexWriterConfig(TEST_VERSION_CURRENT,
new MockAnalyzer(random)).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES));
Document d1 = new Document();
d1.add(newField("f1", "v1", TextField.TYPE_STORED));
d1.add(newField("f2", "v1", TextField.TYPE_STORED));
w1.addDocument(d1);
w1.commit();
Document d2 = new Document();
d2.add(newField("f1", "v2", TextField.TYPE_STORED));
d2.add(newField("f2", "v2", TextField.TYPE_STORED));
w1.addDocument(d2);
w1.close();
return dir1;
}
private Directory getDir2(Random random) throws IOException {
Directory dir2 = newDirectory();
IndexWriter w2 = new IndexWriter(dir2, newIndexWriterConfig(TEST_VERSION_CURRENT,
new MockAnalyzer(random)).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES));
Document d3 = new Document();
d3.add(newField("f3", "v1", TextField.TYPE_STORED));
d3.add(newField("f4", "v1", TextField.TYPE_STORED));
w2.addDocument(d3);
w2.commit();
Document d4 = new Document();
d4.add(newField("f3", "v2", TextField.TYPE_STORED));
d4.add(newField("f4", "v2", TextField.TYPE_STORED));
w2.addDocument(d4);
w2.close();
return dir2;
}
}

View File

@ -30,7 +30,7 @@ import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.index.IndexWriterConfig.OpenMode;
/** /**
* Some tests for {@link ParallelReader}s with empty indexes * Some tests for {@link ParallelAtomicReader}s with empty indexes
* *
* @author Christian Kohlschuetter * @author Christian Kohlschuetter
*/ */
@ -52,9 +52,10 @@ public class TestParallelReaderEmptyIndex extends LuceneTestCase {
Directory rdOut = newDirectory(); Directory rdOut = newDirectory();
IndexWriter iwOut = new IndexWriter(rdOut, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random))); IndexWriter iwOut = new IndexWriter(rdOut, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random)));
ParallelReader pr = new ParallelReader(); ParallelAtomicReader pr = new ParallelAtomicReader.Builder()
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(rd1))); .add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(rd1)))
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(rd2))); .add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(rd2)))
.build();
// When unpatched, Lucene crashes here with a NoSuchElementException (caused by ParallelTermEnum) // When unpatched, Lucene crashes here with a NoSuchElementException (caused by ParallelTermEnum)
iwOut.addIndexes(pr); iwOut.addIndexes(pr);
@ -115,9 +116,11 @@ public class TestParallelReaderEmptyIndex extends LuceneTestCase {
Directory rdOut = newDirectory(); Directory rdOut = newDirectory();
IndexWriter iwOut = new IndexWriter(rdOut, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random))); IndexWriter iwOut = new IndexWriter(rdOut, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random)));
ParallelReader pr = new ParallelReader(); final DirectoryReader reader1, reader2;
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(rd1))); ParallelAtomicReader pr = new ParallelAtomicReader.Builder()
pr.add(SlowCompositeReaderWrapper.wrap(DirectoryReader.open(rd2))); .add(SlowCompositeReaderWrapper.wrap(reader1 = DirectoryReader.open(rd1)))
.add(SlowCompositeReaderWrapper.wrap(reader2 = DirectoryReader.open(rd2)))
.build();
// When unpatched, Lucene crashes here with an ArrayIndexOutOfBoundsException (caused by TermVectorsWriter) // When unpatched, Lucene crashes here with an ArrayIndexOutOfBoundsException (caused by TermVectorsWriter)
iwOut.addIndexes(pr); iwOut.addIndexes(pr);
@ -125,6 +128,10 @@ public class TestParallelReaderEmptyIndex extends LuceneTestCase {
// ParallelReader closes any IndexReader you added to it: // ParallelReader closes any IndexReader you added to it:
pr.close(); pr.close();
// assert subreaders were closed
assertEquals(0, reader1.getRefCount());
assertEquals(0, reader2.getRefCount());
rd1.close(); rd1.close();
rd2.close(); rd2.close();

View File

@ -72,9 +72,7 @@ public class TestParallelTermEnum extends LuceneTestCase {
} }
public void test1() throws IOException { public void test1() throws IOException {
ParallelReader pr = new ParallelReader(); ParallelAtomicReader pr = new ParallelAtomicReader.Builder().add(ir1).add(ir2).build();
pr.add(ir1);
pr.add(ir2);
Bits liveDocs = pr.getLiveDocs(); Bits liveDocs = pr.getLiveDocs();

View File

@ -5,7 +5,7 @@ import java.util.List;
import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader; import org.apache.lucene.index.MultiReader;
import org.apache.lucene.index.ParallelReader; import org.apache.lucene.index.ParallelAtomicReader;
import org.apache.lucene.index.SlowCompositeReaderWrapper; import org.apache.lucene.index.SlowCompositeReaderWrapper;
import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
@ -68,8 +68,8 @@ public class TestFacetsAccumulatorWithComplement extends FacetTestBase {
@Test @Test
public void testComplementsWithParallerReader() throws Exception { public void testComplementsWithParallerReader() throws Exception {
IndexReader origReader = indexReader; IndexReader origReader = indexReader;
ParallelReader pr = new ParallelReader(true); ParallelAtomicReader pr = new ParallelAtomicReader.Builder(true)
pr.add(SlowCompositeReaderWrapper.wrap(origReader)); .add(SlowCompositeReaderWrapper.wrap(origReader)).build();
indexReader = pr; indexReader = pr;
try { try {
doTestComplements(); doTestComplements();