LUCENE-4670: Add finish* callbacks to StoredFieldsWriter and TermVectorsWriter.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1431283 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Adrien Grand 2013-01-10 11:24:44 +00:00
parent de8bfe6dd4
commit e38d91a8f5
10 changed files with 216 additions and 52 deletions

View File

@ -106,7 +106,11 @@ Changes in backwards compatibility policy
* LUCENE-4659: Massive cleanup to CategoryPath API. Additionally, CategoryPath is
now immutable, so you don't need to clone() it. (Shai Erera)
* LUCENE-4670: StoredFieldsWriter and TermVectorsWriter have new finish* callbacks
which are called after a doc/field/term has been completely added.
(Adrien Grand, Robert Muir)
New Features
* LUCENE-4226: New experimental StoredFieldsFormat that compresses chunks of

View File

@ -55,7 +55,10 @@ public abstract class StoredFieldsWriter implements Closeable {
* called even if the document has no stored fields, in
* this case <code>numStoredFields</code> will be zero. */
public abstract void startDocument(int numStoredFields) throws IOException;
/** Called when a document and all its fields have been added. */
public void finishDocument() throws IOException {}
/** Writes a single stored field. */
public abstract void writeField(FieldInfo info, StorableField field) throws IOException;
@ -116,6 +119,8 @@ public abstract class StoredFieldsWriter implements Closeable {
for (StorableField field : doc) {
writeField(fieldInfos.fieldInfo(field.name()), field);
}
finishDocument();
}
@Override

View File

@ -71,18 +71,27 @@ public abstract class TermVectorsWriter implements Closeable {
* has no vector fields, in this case <code>numVectorFields</code>
* will be zero. */
public abstract void startDocument(int numVectorFields) throws IOException;
/** Called after a doc and all its fields have been added. */
public void finishDocument() throws IOException {};
/** Called before writing the terms of the field.
* {@link #startTerm(BytesRef, int)} will be called <code>numTerms</code> times. */
public abstract void startField(FieldInfo info, int numTerms, boolean positions, boolean offsets, boolean payloads) throws IOException;
/** Called after a field and all its terms have been added. */
public void finishField() throws IOException {};
/** Adds a term and its term frequency <code>freq</code>.
* If this field has positions and/or offsets enabled, then
* {@link #addPosition(int, int, int, BytesRef)} will be called
* <code>freq</code> times respectively.
*/
public abstract void startTerm(BytesRef term, int freq) throws IOException;
/** Called after a term and all its positions have been added. */
public void finishTerm() throws IOException {}
/** Adds a term position and offsets */
public abstract void addPosition(int position, int startOffset, int endOffset, BytesRef payload) throws IOException;
@ -97,7 +106,7 @@ public abstract class TermVectorsWriter implements Closeable {
* check that this is the case to detect the JRE bug described
* in LUCENE-1282. */
public abstract void finish(FieldInfos fis, int numDocs) throws IOException;
/**
* Called by IndexWriter when writing new segments.
* <p>
@ -197,6 +206,7 @@ public abstract class TermVectorsWriter implements Closeable {
protected final void addAllDocVectors(Fields vectors, MergeState mergeState) throws IOException {
if (vectors == null) {
startDocument(0);
finishDocument();
return;
}
@ -275,10 +285,13 @@ public abstract class TermVectorsWriter implements Closeable {
addPosition(pos, startOffset, endOffset, payload);
}
}
finishTerm();
}
assert termCount == numTerms;
finishField();
}
assert fieldCount == numFields;
finishDocument();
}
/** Return the BytesRef Comparator used to sort terms

View File

@ -136,19 +136,8 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
}
}
private void endWithPreviousDocument() throws IOException {
if (numBufferedDocs > 0) {
endOffsets[numBufferedDocs - 1] = bufferedDocs.length;
}
}
@Override
public void startDocument(int numStoredFields) throws IOException {
endWithPreviousDocument();
if (triggerFlush()) {
flush();
}
if (numBufferedDocs == this.numStoredFields.length) {
final int newLength = ArrayUtil.oversize(numBufferedDocs + 1, 4);
this.numStoredFields = Arrays.copyOf(this.numStoredFields, newLength);
@ -158,6 +147,14 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
++numBufferedDocs;
}
@Override
public void finishDocument() throws IOException {
endOffsets[numBufferedDocs - 1] = bufferedDocs.length;
if (triggerFlush()) {
flush();
}
}
private static void saveInts(int[] values, int length, DataOutput out) throws IOException {
assert length > 0;
if (length == 1) {
@ -295,9 +292,10 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
@Override
public void finish(FieldInfos fis, int numDocs) throws IOException {
endWithPreviousDocument();
if (numBufferedDocs > 0) {
flush();
} else {
assert bufferedDocs.length == 0;
}
if (docBase != numDocs) {
throw new RuntimeException("Wrote " + docBase + " docs, finish called with numDocs=" + numDocs);
@ -351,17 +349,13 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
}
if (compressionMode == matchingFieldsReader.getCompressionMode() // same compression mode
&& (numBufferedDocs == 0 || triggerFlush()) // starting a new chunk
&& numBufferedDocs == 0 // starting a new chunk
&& startOffsets[it.chunkDocs - 1] < chunkSize // chunk is small enough
&& startOffsets[it.chunkDocs - 1] + it.lengths[it.chunkDocs - 1] >= chunkSize // chunk is large enough
&& nextDeletedDoc(it.docBase, liveDocs, it.docBase + it.chunkDocs) == it.docBase + it.chunkDocs) { // no deletion in the chunk
assert docID == it.docBase;
// no need to decompress, just copy data
endWithPreviousDocument();
if (triggerFlush()) {
flush();
}
indexWriter.writeIndex(it.chunkDocs, fieldsStream.getFilePointer());
writeHeader(this.docBase, it.chunkDocs, it.numStoredFields, it.lengths);
it.copyCompressedData(fieldsStream);
@ -380,6 +374,7 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
final int diff = docID - it.docBase;
startDocument(it.numStoredFields[diff]);
bufferedDocs.writeBytes(it.bytes.bytes, it.bytes.offset + startOffsets[diff], it.lengths[diff]);
finishDocument();
++docCount;
mergeState.checkAbort.work(300);
}

View File

@ -124,17 +124,16 @@ public final class Lucene40TermVectorsWriter extends TermVectorsWriter {
if (payloads)
bits |= Lucene40TermVectorsReader.STORE_PAYLOAD_WITH_TERMVECTOR;
tvf.writeByte(bits);
assert fieldCount <= numVectorFields;
if (fieldCount == numVectorFields) {
// last field of the document
// this is crazy because the file format is crazy!
for (int i = 1; i < fieldCount; i++) {
tvd.writeVLong(fps[i] - fps[i-1]);
}
}
}
@Override
public void finishDocument() throws IOException {
assert fieldCount == numVectorFields;
for (int i = 1; i < fieldCount; i++) {
tvd.writeVLong(fps[i] - fps[i-1]);
}
}
private final BytesRef lastTerm = new BytesRef(10);
// NOTE: we override addProx, so we don't need to buffer when indexing.
@ -222,20 +221,6 @@ public final class Lucene40TermVectorsWriter extends TermVectorsWriter {
}
bufferedIndex++;
// dump buffer if we are done
if (bufferedIndex == bufferedFreq) {
if (payloads) {
tvf.writeBytes(payloadData.bytes, payloadData.offset, payloadData.length);
}
for (int i = 0; i < bufferedIndex; i++) {
if (offsets) {
tvf.writeVInt(offsetStartBuffer[i] - lastOffset);
tvf.writeVInt(offsetEndBuffer[i] - offsetStartBuffer[i]);
lastOffset = offsetEndBuffer[i];
}
}
}
} else if (positions) {
// write position delta
writePosition(position - lastPosition, payload);
@ -248,6 +233,25 @@ public final class Lucene40TermVectorsWriter extends TermVectorsWriter {
}
}
@Override
public void finishTerm() throws IOException {
if (bufferedIndex > 0) {
// dump buffer
assert positions && (offsets || payloads);
assert bufferedIndex == bufferedFreq;
if (payloads) {
tvf.writeBytes(payloadData.bytes, payloadData.offset, payloadData.length);
}
for (int i = 0; i < bufferedIndex; i++) {
if (offsets) {
tvf.writeVInt(offsetStartBuffer[i] - lastOffset);
tvf.writeVInt(offsetEndBuffer[i] - offsetStartBuffer[i]);
lastOffset = offsetEndBuffer[i];
}
}
}
}
private void writePosition(int delta, BytesRef payload) throws IOException {
if (payloads) {
int payloadLength = payload == null ? 0 : payload.length;

View File

@ -105,6 +105,7 @@ final class StoredFieldsConsumer {
while(lastDocID < docID) {
fieldsWriter.startDocument(0);
lastDocID++;
fieldsWriter.finishDocument();
}
}
@ -119,6 +120,7 @@ final class StoredFieldsConsumer {
for (int i = 0; i < numStoredFields; i++) {
fieldsWriter.writeField(fieldInfos[i], storedFields[i]);
}
fieldsWriter.finishDocument();
lastDocID++;
}

View File

@ -78,6 +78,7 @@ final class TermVectorsConsumer extends TermsHashConsumer {
void fill(int docID) throws IOException {
while(lastDocID < docID) {
writer.startDocument(0);
writer.finishDocument();
lastDocID++;
}
}
@ -108,6 +109,7 @@ final class TermVectorsConsumer extends TermsHashConsumer {
for (int i = 0; i < numVectorFields; i++) {
perFields[i].finishDocument();
}
writer.finishDocument();
assert lastDocID == docState.docID: "lastDocID=" + lastDocID + " docState.docID=" + docState.docID;

View File

@ -182,7 +182,9 @@ final class TermVectorsConsumerPerField extends TermsHashConsumerPerField {
}
tv.addProx(freq, posReader, offReader);
}
tv.finishTerm();
}
tv.finishField();
termsHashPerField.reset();

View File

@ -72,26 +72,43 @@ public class AssertingStoredFieldsFormat extends StoredFieldsFormat {
return new AssertingStoredFieldsReader(in.clone(), maxDoc);
}
}
enum Status {
UNDEFINED, STARTED, FINISHED;
}
static class AssertingStoredFieldsWriter extends StoredFieldsWriter {
private final StoredFieldsWriter in;
private int numWritten;
private int fieldCount;
private Status docStatus;
AssertingStoredFieldsWriter(StoredFieldsWriter in) {
this.in = in;
this.docStatus = Status.UNDEFINED;
}
@Override
public void startDocument(int numStoredFields) throws IOException {
assert docStatus != Status.STARTED;
in.startDocument(numStoredFields);
assert fieldCount == 0;
fieldCount = numStoredFields;
numWritten++;
docStatus = Status.STARTED;
}
@Override
public void finishDocument() throws IOException {
assert docStatus == Status.STARTED;
assert fieldCount == 0;
in.finishDocument();
docStatus = Status.FINISHED;
}
@Override
public void writeField(FieldInfo info, StorableField field) throws IOException {
assert docStatus == Status.STARTED;
in.writeField(info, field);
assert fieldCount > 0;
fieldCount--;
@ -104,6 +121,7 @@ public class AssertingStoredFieldsFormat extends StoredFieldsFormat {
@Override
public void finish(FieldInfos fis, int numDocs) throws IOException {
assert docStatus == (numDocs > 0 ? Status.FINISHED : Status.UNDEFINED);
in.finish(fis, numDocs);
assert fieldCount == 0;
assert numDocs == numWritten;
@ -112,6 +130,7 @@ public class AssertingStoredFieldsFormat extends StoredFieldsFormat {
@Override
public void close() throws IOException {
in.close();
assert docStatus != Status.STARTED;
}
}
}

View File

@ -18,17 +18,20 @@ package org.apache.lucene.codecs.asserting;
*/
import java.io.IOException;
import java.util.Comparator;
import org.apache.lucene.codecs.TermVectorsFormat;
import org.apache.lucene.codecs.TermVectorsReader;
import org.apache.lucene.codecs.TermVectorsWriter;
import org.apache.lucene.codecs.lucene40.Lucene40TermVectorsFormat;
import org.apache.lucene.index.AssertingAtomicReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.BytesRef;
/**
* Just like {@link Lucene40TermVectorsFormat} but with additional asserts.
@ -43,16 +46,16 @@ public class AssertingTermVectorsFormat extends TermVectorsFormat {
@Override
public TermVectorsWriter vectorsWriter(Directory directory, SegmentInfo segmentInfo, IOContext context) throws IOException {
return in.vectorsWriter(directory, segmentInfo, context);
return new AssertingTermVectorsWriter(in.vectorsWriter(directory, segmentInfo, context));
}
static class AssertingTermVectorsReader extends TermVectorsReader {
private final TermVectorsReader in;
AssertingTermVectorsReader(TermVectorsReader in) {
this.in = in;
}
@Override
public void close() throws IOException {
in.close();
@ -68,5 +71,120 @@ public class AssertingTermVectorsFormat extends TermVectorsFormat {
public TermVectorsReader clone() {
return new AssertingTermVectorsReader(in.clone());
}
}
}
enum Status {
UNDEFINED, STARTED, FINISHED;
}
static class AssertingTermVectorsWriter extends TermVectorsWriter {
private final TermVectorsWriter in;
private Status docStatus, fieldStatus, termStatus;
private int fieldCount, termCount, positionCount;
boolean hasPositions;
AssertingTermVectorsWriter(TermVectorsWriter in) {
this.in = in;
docStatus = Status.UNDEFINED;
fieldStatus = Status.UNDEFINED;
termStatus = Status.UNDEFINED;
fieldCount = termCount = positionCount = 0;
}
@Override
public void startDocument(int numVectorFields) throws IOException {
assert fieldCount == 0;
assert docStatus != Status.STARTED;
in.startDocument(numVectorFields);
docStatus = Status.STARTED;
fieldCount = numVectorFields;
}
@Override
public void finishDocument() throws IOException {
assert fieldCount == 0;
assert docStatus == Status.STARTED;
in.finishDocument();
docStatus = Status.FINISHED;
}
@Override
public void startField(FieldInfo info, int numTerms, boolean positions,
boolean offsets, boolean payloads) throws IOException {
assert termCount == 0;
assert docStatus == Status.STARTED;
assert fieldStatus != Status.STARTED;
in.startField(info, numTerms, positions, offsets, payloads);
fieldStatus = Status.STARTED;
termCount = numTerms;
hasPositions = positions || offsets || payloads;
}
@Override
public void finishField() throws IOException {
assert termCount == 0;
assert fieldStatus == Status.STARTED;
in.finishField();
fieldStatus = Status.FINISHED;
--fieldCount;
}
@Override
public void startTerm(BytesRef term, int freq) throws IOException {
assert docStatus == Status.STARTED;
assert fieldStatus == Status.STARTED;
assert termStatus != Status.STARTED;
in.startTerm(term, freq);
termStatus = Status.STARTED;
positionCount = hasPositions ? freq : 0;
}
@Override
public void finishTerm() throws IOException {
assert positionCount == 0;
assert docStatus == Status.STARTED;
assert fieldStatus == Status.STARTED;
assert termStatus == Status.STARTED;
in.finishTerm();
termStatus = Status.FINISHED;
--termCount;
}
@Override
public void addPosition(int position, int startOffset, int endOffset,
BytesRef payload) throws IOException {
assert docStatus == Status.STARTED;
assert fieldStatus == Status.STARTED;
assert termStatus == Status.STARTED;
in.addPosition(position, startOffset, endOffset, payload);
--positionCount;
}
@Override
public void abort() {
in.abort();
}
@Override
public void finish(FieldInfos fis, int numDocs) throws IOException {
assert docStatus == (numDocs > 0 ? Status.FINISHED : Status.UNDEFINED);
assert fieldStatus != Status.STARTED;
assert termStatus != Status.STARTED;
in.finish(fis, numDocs);
}
@Override
public Comparator<BytesRef> getComparator() throws IOException {
return in.getComparator();
}
@Override
public void close() throws IOException {
in.close();
assert docStatus != Status.STARTED;
assert fieldStatus != Status.STARTED;
assert termStatus != Status.STARTED;
}
}
}