mirror of https://github.com/apache/lucene.git
LUCENE-1043: speed up merging of stored fields by bulk-copying bytes for contiguous range of non-deleted docs
git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@593131 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
658aebd28a
commit
a98a30818f
|
@ -214,6 +214,10 @@ Optimizations
|
|||
first), by adding symbolic constant DISABLE_AUTO_FLUSH to disable
|
||||
one of the flush triggers. (Ning Li via Mike McCandless)
|
||||
|
||||
12. LUCENE-1043: Speed up merging of stored fields by bulk-copying the
|
||||
raw bytes for each contiguous range of non-deleted documents.
|
||||
(Robert Engels via Mike McCandless)
|
||||
|
||||
Documentation
|
||||
|
||||
Build
|
||||
|
|
|
@ -48,6 +48,7 @@ final class FieldsReader {
|
|||
private final IndexInput fieldsStream;
|
||||
|
||||
private final IndexInput indexStream;
|
||||
private int numTotalDocs;
|
||||
private int size;
|
||||
private boolean closed;
|
||||
|
||||
|
@ -88,6 +89,7 @@ final class FieldsReader {
|
|||
this.size = (int) (indexStream.length() >> 3);
|
||||
}
|
||||
|
||||
numTotalDocs = (int) (indexStream.length() >> 3);
|
||||
success = true;
|
||||
} finally {
|
||||
// With lock-less commits, it's entirely possible (and
|
||||
|
@ -186,6 +188,32 @@ final class FieldsReader {
|
|||
return doc;
|
||||
}
|
||||
|
||||
/** Returns the length in bytes of each raw document in a
|
||||
* contiguous range of length numDocs starting with
|
||||
* startDocID. Returns the IndexInput (the fieldStream),
|
||||
* already seeked to the starting point for startDocID.*/
|
||||
final IndexInput rawDocs(int[] lengths, int startDocID, int numDocs) throws IOException {
|
||||
indexStream.seek(startDocID * 8L);
|
||||
long startOffset = indexStream.readLong();
|
||||
long lastOffset = startOffset;
|
||||
int count = 0;
|
||||
while (count < numDocs) {
|
||||
final long offset;
|
||||
final int docID = startDocID + count + 1;
|
||||
assert docID <= numTotalDocs;
|
||||
if (docID < numTotalDocs)
|
||||
offset = indexStream.readLong();
|
||||
else
|
||||
offset = fieldsStream.length();
|
||||
lengths[count++] = (int) (offset-lastOffset);
|
||||
lastOffset = offset;
|
||||
}
|
||||
|
||||
fieldsStream.seek(startOffset);
|
||||
|
||||
return fieldsStream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip the field. We still have to read some of the information about the field, but can skip past the actual content.
|
||||
* This will have the most payoff on large fields.
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.lucene.document.Fieldable;
|
|||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.RAMOutputStream;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
|
||||
final class FieldsWriter
|
||||
{
|
||||
|
@ -127,6 +128,21 @@ final class FieldsWriter
|
|||
}
|
||||
}
|
||||
|
||||
/** Bulk write a contiguous series of documents. The
|
||||
* lengths array is the length (in bytes) of each raw
|
||||
* document. The stream IndexInput is the
|
||||
* fieldsStream from which we should bulk-copy all
|
||||
* bytes. */
|
||||
final void addRawDocuments(IndexInput stream, int[] lengths, int numDocs) throws IOException {
|
||||
long position = fieldsStream.getFilePointer();
|
||||
long start = position;
|
||||
for(int i=0;i<numDocs;i++) {
|
||||
indexStream.writeLong(position);
|
||||
position += lengths[i];
|
||||
}
|
||||
fieldsStream.copyBytes(stream, position-start);
|
||||
}
|
||||
|
||||
final void addDocument(Document doc) throws IOException {
|
||||
indexStream.writeLong(fieldsStream.getFilePointer());
|
||||
|
||||
|
|
|
@ -2135,11 +2135,6 @@ public class IndexWriter {
|
|||
message("flush at addIndexesNoOptimize");
|
||||
flush();
|
||||
|
||||
/* new merge policy
|
||||
if (startUpperBound == 0)
|
||||
startUpperBound = 10;
|
||||
*/
|
||||
|
||||
boolean success = false;
|
||||
|
||||
startTransaction();
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.lucene.document.FieldSelector;
|
|||
import org.apache.lucene.document.FieldSelectorResult;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
|
||||
/**
|
||||
* The SegmentMerger class combines two or more Segments, represented by an IndexReader ({@link #add},
|
||||
|
@ -58,6 +59,10 @@ final class SegmentMerger {
|
|||
// to merge the doc stores.
|
||||
private boolean mergeDocStores;
|
||||
|
||||
/** Maximum number of contiguous documents to bulk-copy
|
||||
when merging stored fields */
|
||||
private final static int MAX_RAW_MERGE_DOCS = 16384;
|
||||
|
||||
/** This ctor used only by test code.
|
||||
*
|
||||
* @param dir The Directory to merge the other segments into
|
||||
|
@ -210,24 +215,53 @@ final class SegmentMerger {
|
|||
fieldInfos = new FieldInfos(); // merge field names
|
||||
}
|
||||
|
||||
int docCount = 0;
|
||||
for (int i = 0; i < readers.size(); i++) {
|
||||
IndexReader reader = (IndexReader) readers.elementAt(i);
|
||||
addIndexed(reader, fieldInfos, reader.getFieldNames(IndexReader.FieldOption.TERMVECTOR_WITH_POSITION_OFFSET), true, true, true, false);
|
||||
addIndexed(reader, fieldInfos, reader.getFieldNames(IndexReader.FieldOption.TERMVECTOR_WITH_POSITION), true, true, false, false);
|
||||
addIndexed(reader, fieldInfos, reader.getFieldNames(IndexReader.FieldOption.TERMVECTOR_WITH_OFFSET), true, false, true, false);
|
||||
addIndexed(reader, fieldInfos, reader.getFieldNames(IndexReader.FieldOption.TERMVECTOR), true, false, false, false);
|
||||
addIndexed(reader, fieldInfos, reader.getFieldNames(IndexReader.FieldOption.STORES_PAYLOADS), false, false, false, true);
|
||||
addIndexed(reader, fieldInfos, reader.getFieldNames(IndexReader.FieldOption.INDEXED), false, false, false, false);
|
||||
fieldInfos.add(reader.getFieldNames(IndexReader.FieldOption.UNINDEXED), false);
|
||||
if (reader instanceof SegmentReader) {
|
||||
SegmentReader segmentReader = (SegmentReader) reader;
|
||||
for (int j = 0; j < segmentReader.getFieldInfos().size(); j++) {
|
||||
FieldInfo fi = segmentReader.getFieldInfos().fieldInfo(j);
|
||||
fieldInfos.add(fi.name, fi.isIndexed, fi.storeTermVector, fi.storePositionWithTermVector, fi.storeOffsetWithTermVector, !reader.hasNorms(fi.name));
|
||||
}
|
||||
} else {
|
||||
addIndexed(reader, fieldInfos, reader.getFieldNames(IndexReader.FieldOption.TERMVECTOR_WITH_POSITION_OFFSET), true, true, true, false);
|
||||
addIndexed(reader, fieldInfos, reader.getFieldNames(IndexReader.FieldOption.TERMVECTOR_WITH_POSITION), true, true, false, false);
|
||||
addIndexed(reader, fieldInfos, reader.getFieldNames(IndexReader.FieldOption.TERMVECTOR_WITH_OFFSET), true, false, true, false);
|
||||
addIndexed(reader, fieldInfos, reader.getFieldNames(IndexReader.FieldOption.TERMVECTOR), true, false, false, false);
|
||||
addIndexed(reader, fieldInfos, reader.getFieldNames(IndexReader.FieldOption.STORES_PAYLOADS), false, false, false, true);
|
||||
addIndexed(reader, fieldInfos, reader.getFieldNames(IndexReader.FieldOption.INDEXED), false, false, false, false);
|
||||
fieldInfos.add(reader.getFieldNames(IndexReader.FieldOption.UNINDEXED), false);
|
||||
}
|
||||
}
|
||||
fieldInfos.write(directory, segment + ".fnm");
|
||||
|
||||
int docCount = 0;
|
||||
|
||||
if (mergeDocStores) {
|
||||
|
||||
FieldsWriter fieldsWriter = // merge field values
|
||||
new FieldsWriter(directory, segment, fieldInfos);
|
||||
|
||||
// If the i'th reader is a SegmentReader and has
|
||||
// identical fieldName -> number mapping, then this
|
||||
// array will be non-null at position i:
|
||||
SegmentReader[] matchingSegmentReaders = new SegmentReader[readers.size()];
|
||||
|
||||
for (int i = 0; i < readers.size(); i++) {
|
||||
IndexReader reader = (IndexReader) readers.elementAt(i);
|
||||
boolean same = reader.getFieldNames(IndexReader.FieldOption.ALL).size() == fieldInfos.size() && reader instanceof SegmentReader;
|
||||
if (same) {
|
||||
SegmentReader segmentReader = (SegmentReader) reader;
|
||||
for (int j = 0; same && j < fieldInfos.size(); j++)
|
||||
same = fieldInfos.fieldName(j).equals(segmentReader.getFieldInfos().fieldName(j));
|
||||
if (same)
|
||||
matchingSegmentReaders[i] = segmentReader;
|
||||
}
|
||||
}
|
||||
|
||||
// Used for bulk-reading raw bytes for stored fields
|
||||
final int[] rawDocLengths = new int[MAX_RAW_MERGE_DOCS];
|
||||
|
||||
// merge field values
|
||||
final FieldsWriter fieldsWriter = new FieldsWriter(directory, segment, fieldInfos);
|
||||
|
||||
// for merging we don't want to compress/uncompress the data, so to tell the FieldsReader that we're
|
||||
// in merge mode, we use this FieldSelector
|
||||
FieldSelector fieldSelectorMerge = new FieldSelector() {
|
||||
|
@ -238,13 +272,38 @@ final class SegmentMerger {
|
|||
|
||||
try {
|
||||
for (int i = 0; i < readers.size(); i++) {
|
||||
IndexReader reader = (IndexReader) readers.elementAt(i);
|
||||
int maxDoc = reader.maxDoc();
|
||||
for (int j = 0; j < maxDoc; j++)
|
||||
if (!reader.isDeleted(j)) { // skip deleted docs
|
||||
fieldsWriter.addDocument(reader.document(j, fieldSelectorMerge));
|
||||
docCount++;
|
||||
}
|
||||
final IndexReader reader = (IndexReader) readers.elementAt(i);
|
||||
final SegmentReader matchingSegmentReader = matchingSegmentReaders[i];
|
||||
final FieldsReader matchingFieldsReader;
|
||||
if (matchingSegmentReader != null)
|
||||
matchingFieldsReader = matchingSegmentReader.getFieldsReader();
|
||||
else
|
||||
matchingFieldsReader = null;
|
||||
final int maxDoc = reader.maxDoc();
|
||||
for (int j = 0; j < maxDoc;) {
|
||||
if (!reader.isDeleted(j)) { // skip deleted docs
|
||||
if (matchingSegmentReader != null) {
|
||||
// We can optimize this case (doing a bulk
|
||||
// byte copy) since the field numbers are
|
||||
// identical
|
||||
int start = j;
|
||||
int numDocs = 0;
|
||||
do {
|
||||
j++;
|
||||
numDocs++;
|
||||
} while(j < maxDoc && !matchingSegmentReader.isDeleted(j) && numDocs < MAX_RAW_MERGE_DOCS);
|
||||
|
||||
IndexInput stream = matchingFieldsReader.rawDocs(rawDocLengths, start, numDocs);
|
||||
fieldsWriter.addRawDocuments(stream, rawDocLengths, numDocs);
|
||||
docCount += numDocs;
|
||||
} else {
|
||||
fieldsWriter.addDocument(reader.document(j, fieldSelectorMerge));
|
||||
j++;
|
||||
docCount++;
|
||||
}
|
||||
} else
|
||||
j++;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
fieldsWriter.close();
|
||||
|
|
|
@ -314,6 +314,10 @@ class SegmentReader extends DirectoryIndexReader {
|
|||
undeleteAll = false;
|
||||
}
|
||||
|
||||
FieldsReader getFieldsReader() {
|
||||
return fieldsReader;
|
||||
}
|
||||
|
||||
protected void doClose() throws IOException {
|
||||
if (fieldsReader != null) {
|
||||
fieldsReader.close();
|
||||
|
@ -388,6 +392,10 @@ class SegmentReader extends DirectoryIndexReader {
|
|||
return tis.terms(t);
|
||||
}
|
||||
|
||||
FieldInfos getFieldInfos() {
|
||||
return fieldInfos;
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws CorruptIndexException if the index is corrupt
|
||||
* @throws IOException if there is a low-level IO error
|
||||
|
|
|
@ -149,6 +149,25 @@ public abstract class IndexOutput {
|
|||
}
|
||||
}
|
||||
|
||||
private static int COPY_BUFFER_SIZE = 16384;
|
||||
private byte[] copyBuffer;
|
||||
|
||||
/** Copy numBytes bytes from input to ourself. */
|
||||
public void copyBytes(IndexInput input, long numBytes) throws IOException {
|
||||
long left = numBytes;
|
||||
if (copyBuffer == null)
|
||||
copyBuffer = new byte[COPY_BUFFER_SIZE];
|
||||
while(left > 0) {
|
||||
final int toCopy;
|
||||
if (left > COPY_BUFFER_SIZE)
|
||||
toCopy = COPY_BUFFER_SIZE;
|
||||
else
|
||||
toCopy = (int) left;
|
||||
input.readBytes(copyBuffer, 0, toCopy);
|
||||
writeBytes(copyBuffer, 0, toCopy);
|
||||
left -= toCopy;
|
||||
}
|
||||
}
|
||||
|
||||
/** Forces any buffered output to be written. */
|
||||
public abstract void flush() throws IOException;
|
||||
|
|
Loading…
Reference in New Issue