LUCENE-1426: next steps towards flexible indexing: use the same API when writing a new segment

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@707836 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2008-10-25 10:40:00 +00:00
parent f2c988ae2b
commit fa79b04042
27 changed files with 695 additions and 356 deletions

View File

@ -54,6 +54,14 @@ class DefaultSkipListWriter extends MultiLevelSkipListWriter {
lastSkipProxPointer = new long[numberOfSkipLevels]; lastSkipProxPointer = new long[numberOfSkipLevels];
} }
void setFreqOutput(IndexOutput freqOutput) {
this.freqOutput = freqOutput;
}
void setProxOutput(IndexOutput proxOutput) {
this.proxOutput = proxOutput;
}
/** /**
* Sets the values for the current skip data. * Sets the values for the current skip data.
*/ */

View File

@ -22,8 +22,8 @@ import java.util.Collection;
abstract class DocConsumer { abstract class DocConsumer {
abstract DocConsumerPerThread addThread(DocumentsWriterThreadState perThread) throws IOException; abstract DocConsumerPerThread addThread(DocumentsWriterThreadState perThread) throws IOException;
abstract void flush(final Collection threads, final DocumentsWriter.FlushState state) throws IOException; abstract void flush(final Collection threads, final SegmentWriteState state) throws IOException;
abstract void closeDocStore(final DocumentsWriter.FlushState state) throws IOException; abstract void closeDocStore(final SegmentWriteState state) throws IOException;
abstract void abort(); abstract void abort();
abstract boolean freeRAM(); abstract boolean freeRAM();
} }

View File

@ -26,11 +26,11 @@ abstract class DocFieldConsumer {
/** Called when DocumentsWriter decides to create a new /** Called when DocumentsWriter decides to create a new
* segment */ * segment */
abstract void flush(Map threadsAndFields, DocumentsWriter.FlushState state) throws IOException; abstract void flush(Map threadsAndFields, SegmentWriteState state) throws IOException;
/** Called when DocumentsWriter decides to close the doc /** Called when DocumentsWriter decides to close the doc
* stores */ * stores */
abstract void closeDocStore(DocumentsWriter.FlushState state) throws IOException; abstract void closeDocStore(SegmentWriteState state) throws IOException;
/** Called when an aborting exception is hit */ /** Called when an aborting exception is hit */
abstract void abort(); abstract void abort();

View File

@ -44,7 +44,7 @@ final class DocFieldConsumers extends DocFieldConsumer {
two.setFieldInfos(fieldInfos); two.setFieldInfos(fieldInfos);
} }
public void flush(Map threadsAndFields, DocumentsWriter.FlushState state) throws IOException { public void flush(Map threadsAndFields, SegmentWriteState state) throws IOException {
Map oneThreadsAndFields = new HashMap(); Map oneThreadsAndFields = new HashMap();
Map twoThreadsAndFields = new HashMap(); Map twoThreadsAndFields = new HashMap();
@ -76,7 +76,7 @@ final class DocFieldConsumers extends DocFieldConsumer {
two.flush(twoThreadsAndFields, state); two.flush(twoThreadsAndFields, state);
} }
public void closeDocStore(DocumentsWriter.FlushState state) throws IOException { public void closeDocStore(SegmentWriteState state) throws IOException {
try { try {
one.closeDocStore(state); one.closeDocStore(state);
} finally { } finally {

View File

@ -43,11 +43,11 @@ final class DocFieldProcessor extends DocConsumer {
consumer.setFieldInfos(fieldInfos); consumer.setFieldInfos(fieldInfos);
} }
public void closeDocStore(DocumentsWriter.FlushState state) throws IOException { public void closeDocStore(SegmentWriteState state) throws IOException {
consumer.closeDocStore(state); consumer.closeDocStore(state);
} }
public void flush(Collection threads, DocumentsWriter.FlushState state) throws IOException { public void flush(Collection threads, SegmentWriteState state) throws IOException {
Map childThreadsAndFields = new HashMap(); Map childThreadsAndFields = new HashMap();
Iterator it = threads.iterator(); Iterator it = threads.iterator();
@ -63,7 +63,9 @@ final class DocFieldProcessor extends DocConsumer {
// consumer can alter the FieldInfo* if necessary. EG, // consumer can alter the FieldInfo* if necessary. EG,
// FreqProxTermsWriter does this with // FreqProxTermsWriter does this with
// FieldInfo.storePayload. // FieldInfo.storePayload.
fieldInfos.write(state.directory, state.segmentName + ".fnm"); final String fileName = state.segmentFileName(IndexFileNames.FIELD_INFOS_EXTENSION);
fieldInfos.write(state.directory, fileName);
state.flushedFiles.add(fileName);
} }
public void abort() { public void abort() {

View File

@ -87,7 +87,7 @@ final class DocFieldProcessorPerThread extends DocConsumerPerThread {
/** If there are fields we've seen but did not see again /** If there are fields we've seen but did not see again
* in the last run, then free them up. */ * in the last run, then free them up. */
void trimFields(DocumentsWriter.FlushState state) { void trimFields(SegmentWriteState state) {
for(int i=0;i<fieldHash.length;i++) { for(int i=0;i<fieldHash.length;i++) {
DocFieldProcessorPerField perField = fieldHash[i]; DocFieldProcessorPerField perField = fieldHash[i];

View File

@ -44,7 +44,7 @@ final class DocInverter extends DocFieldConsumer {
endConsumer.setFieldInfos(fieldInfos); endConsumer.setFieldInfos(fieldInfos);
} }
void flush(Map threadsAndFields, DocumentsWriter.FlushState state) throws IOException { void flush(Map threadsAndFields, SegmentWriteState state) throws IOException {
Map childThreadsAndFields = new HashMap(); Map childThreadsAndFields = new HashMap();
Map endChildThreadsAndFields = new HashMap(); Map endChildThreadsAndFields = new HashMap();
@ -75,7 +75,7 @@ final class DocInverter extends DocFieldConsumer {
endConsumer.flush(endChildThreadsAndFields, state); endConsumer.flush(endChildThreadsAndFields, state);
} }
public void closeDocStore(DocumentsWriter.FlushState state) throws IOException { public void closeDocStore(SegmentWriteState state) throws IOException {
consumer.closeDocStore(state); consumer.closeDocStore(state);
endConsumer.closeDocStore(state); endConsumer.closeDocStore(state);
} }

View File

@ -156,20 +156,6 @@ final class DocumentsWriter {
} }
} }
static class FlushState {
DocumentsWriter docWriter;
Directory directory;
String segmentName;
String docStoreSegmentName;
int numDocsInRAM;
int numDocsInStore;
Collection flushedFiles;
public String segmentFileName(String ext) {
return segmentName + "." + ext;
}
}
/** Consumer returns this on each doc. This holds any /** Consumer returns this on each doc. This holds any
* state that must be flushed synchronized "in docID * state that must be flushed synchronized "in docID
* order". We gather these and flush them in order. */ * order". We gather these and flush them in order. */
@ -402,7 +388,7 @@ final class DocumentsWriter {
private Collection abortedFiles; // List of files that were written before last abort() private Collection abortedFiles; // List of files that were written before last abort()
private FlushState flushState; private SegmentWriteState flushState;
Collection abortedFiles() { Collection abortedFiles() {
return abortedFiles; return abortedFiles;
@ -545,18 +531,7 @@ final class DocumentsWriter {
synchronized private void initFlushState(boolean onlyDocStore) { synchronized private void initFlushState(boolean onlyDocStore) {
initSegmentName(onlyDocStore); initSegmentName(onlyDocStore);
flushState = new SegmentWriteState(this, directory, segment, docStoreSegment, numDocsInRAM, numDocsInStore, writer.getTermIndexInterval());
if (flushState == null) {
flushState = new FlushState();
flushState.directory = directory;
flushState.docWriter = this;
}
flushState.docStoreSegmentName = docStoreSegment;
flushState.segmentName = segment;
flushState.numDocsInRAM = numDocsInRAM;
flushState.numDocsInStore = numDocsInStore;
flushState.flushedFiles = new HashSet();
} }
/** Flush all pending docs to a new segment */ /** Flush all pending docs to a new segment */
@ -602,7 +577,7 @@ final class DocumentsWriter {
message(message); message(message);
} }
flushedDocCount += flushState.numDocsInRAM; flushedDocCount += flushState.numDocs;
doAfterFlush(); doAfterFlush();
@ -616,7 +591,7 @@ final class DocumentsWriter {
assert waitQueue.waitingBytes == 0; assert waitQueue.waitingBytes == 0;
return flushState.numDocsInRAM; return flushState.numDocs;
} }
/** Build compound file for the segment we just flushed */ /** Build compound file for the segment we just flushed */

View File

@ -0,0 +1,34 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
/**
* NOTE: this API is experimental and will likely change
*/
abstract class FormatPostingsDocsConsumer {
/** Adds a new doc in this term. If this returns null
* then we just skip consuming positions/payloads. */
abstract FormatPostingsPositionsConsumer addDoc(int docID, int termDocFreq) throws IOException;
/** Called when we are done adding docs to this term */
abstract void finish() throws IOException;
}

View File

@ -0,0 +1,127 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Consumes doc & freq, writing them using the current
* index file format */
import java.io.IOException;
import org.apache.lucene.util.UnicodeUtil;
import org.apache.lucene.store.IndexOutput;
final class FormatPostingsDocsWriter extends FormatPostingsDocsConsumer {
final IndexOutput out;
final FormatPostingsTermsWriter parent;
final FormatPostingsPositionsWriter posWriter;
final DefaultSkipListWriter skipListWriter;
final int skipInterval;
final int totalNumDocs;
boolean omitTF;
boolean storePayloads;
long freqStart;
FieldInfo fieldInfo;
FormatPostingsDocsWriter(SegmentWriteState state, FormatPostingsTermsWriter parent) throws IOException {
super();
this.parent = parent;
final String fileName = IndexFileNames.segmentFileName(parent.parent.segment, IndexFileNames.FREQ_EXTENSION);
state.flushedFiles.add(fileName);
out = parent.parent.dir.createOutput(fileName);
totalNumDocs = parent.parent.totalNumDocs;
// TODO: abstraction violation
skipInterval = parent.parent.termsOut.skipInterval;
skipListWriter = parent.parent.skipListWriter;
skipListWriter.setFreqOutput(out);
posWriter = new FormatPostingsPositionsWriter(state, this);
}
void setField(FieldInfo fieldInfo) {
this.fieldInfo = fieldInfo;
omitTF = fieldInfo.omitTf;
storePayloads = fieldInfo.storePayloads;
posWriter.setField(fieldInfo);
}
int lastDocID;
int df;
/** Adds a new doc in this term. If this returns null
* then we just skip consuming positions/payloads. */
FormatPostingsPositionsConsumer addDoc(int docID, int termDocFreq) throws IOException {
final int delta = docID - lastDocID;
if (docID < 0 || (df > 0 && delta <= 0))
throw new CorruptIndexException("docs out of order (" + docID + " <= " + lastDocID + " )");
if ((++df % skipInterval) == 0) {
// TODO: abstraction violation
skipListWriter.setSkipData(lastDocID, storePayloads, posWriter.lastPayloadLength);
skipListWriter.bufferSkip(df);
}
assert docID < totalNumDocs: "docID=" + docID + " totalNumDocs=" + totalNumDocs;
lastDocID = docID;
if (omitTF)
out.writeVInt(delta);
else if (1 == termDocFreq)
out.writeVInt((delta<<1) | 1);
else {
out.writeVInt(delta<<1);
out.writeVInt(termDocFreq);
}
return posWriter;
}
private final TermInfo termInfo = new TermInfo(); // minimize consing
final UnicodeUtil.UTF8Result utf8 = new UnicodeUtil.UTF8Result();
/** Called when we are done adding docs to this term */
void finish() throws IOException {
long skipPointer = skipListWriter.writeSkip(out);
// TODO: this is abstraction violation -- we should not
// peek up into parents terms encoding format
termInfo.set(df, parent.freqStart, parent.proxStart, (int) (skipPointer - parent.freqStart));
// TODO: we could do this incrementally
UnicodeUtil.UTF16toUTF8(parent.currentTerm, parent.currentTermStart, utf8);
if (df > 0) {
parent.termsOut.add(fieldInfo.number,
utf8.result,
utf8.length,
termInfo);
}
lastDocID = 0;
df = 0;
}
void close() throws IOException {
out.close();
posWriter.close();
}
}

View File

@ -0,0 +1,36 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
/** Abstract API that consumes terms, doc, freq, prox and
* payloads postings. Concrete implementations of this
* actually do "something" with the postings (write it into
* the index in a specific format).
*
* NOTE: this API is experimental and will likely change
*/
abstract class FormatPostingsFieldsConsumer {
/** Add a new field */
abstract FormatPostingsTermsConsumer addField(FieldInfo field) throws IOException;
/** Called when we are done adding everything. */
abstract void finish() throws IOException;
}

View File

@ -0,0 +1,73 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.store.Directory;
final class FormatPostingsFieldsWriter extends FormatPostingsFieldsConsumer {
final Directory dir;
final String segment;
final TermInfosWriter termsOut;
final FieldInfos fieldInfos;
final FormatPostingsTermsWriter termsWriter;
final DefaultSkipListWriter skipListWriter;
final int totalNumDocs;
public FormatPostingsFieldsWriter(SegmentWriteState state, FieldInfos fieldInfos) throws IOException {
super();
dir = state.directory;
segment = state.segmentName;
totalNumDocs = state.numDocs;
this.fieldInfos = fieldInfos;
termsOut = new TermInfosWriter(dir,
segment,
fieldInfos,
state.termIndexInterval);
// TODO: this is a nasty abstraction violation (that we
// peek down to find freqOut/proxOut) -- we need a
// better abstraction here whereby these child consumers
// can provide skip data or not
skipListWriter = new DefaultSkipListWriter(termsOut.skipInterval,
termsOut.maxSkipLevels,
totalNumDocs,
null,
null);
state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_EXTENSION));
state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_INDEX_EXTENSION));
termsWriter = new FormatPostingsTermsWriter(state, this);
}
/** Add a new field */
FormatPostingsTermsConsumer addField(FieldInfo field) {
termsWriter.setField(field);
return termsWriter;
}
/** Called when we are done adding everything. */
void finish() throws IOException {
termsOut.close();
termsWriter.close();
}
}

View File

@ -0,0 +1,32 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.store.IndexInput;
abstract class FormatPostingsPositionsConsumer {
/** Add a new position & payload. If payloadLength > 0
* you must read those bytes from the IndexInput. */
abstract void addPosition(int position, byte[] payload, int payloadOffset, int payloadLength) throws IOException;
/** Called when we are done adding positions & payloads */
abstract void finish() throws IOException;
}

View File

@ -0,0 +1,87 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.IndexInput;
import java.io.IOException;
final class FormatPostingsPositionsWriter extends FormatPostingsPositionsConsumer {
final FormatPostingsDocsWriter parent;
final IndexOutput out;
boolean omitTF;
boolean storePayloads;
int lastPayloadLength = -1;
FormatPostingsPositionsWriter(SegmentWriteState state, FormatPostingsDocsWriter parent) throws IOException {
this.parent = parent;
omitTF = parent.omitTF;
if (parent.parent.parent.fieldInfos.hasProx()) {
// At least one field does not omit TF, so create the
// prox file
final String fileName = IndexFileNames.segmentFileName(parent.parent.parent.segment, IndexFileNames.PROX_EXTENSION);
state.flushedFiles.add(fileName);
out = parent.parent.parent.dir.createOutput(fileName);
parent.skipListWriter.setProxOutput(out);
} else
// Every field omits TF so we will write no prox file
out = null;
}
int lastPosition;
/** Add a new position & payload */
void addPosition(int position, byte[] payload, int payloadOffset, int payloadLength) throws IOException {
assert !omitTF: "omitTF is true";
assert out != null;
final int delta = position - lastPosition;
lastPosition = position;
if (storePayloads) {
if (payloadLength != lastPayloadLength) {
lastPayloadLength = payloadLength;
out.writeVInt((delta<<1)|1);
out.writeVInt(payloadLength);
} else
out.writeVInt(delta << 1);
if (payloadLength > 0)
out.writeBytes(payload, payloadLength);
} else
out.writeVInt(delta);
}
void setField(FieldInfo fieldInfo) {
omitTF = fieldInfo.omitTf;
storePayloads = omitTF ? false : fieldInfo.storePayloads;
}
/** Called when we are done adding positions & payloads */
void finish() {
lastPosition = 0;
lastPayloadLength = -1;
}
void close() throws IOException {
if (out != null)
out.close();
}
}

View File

@ -0,0 +1,46 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.util.ArrayUtil;
/**
* NOTE: this API is experimental and will likely change
*/
abstract class FormatPostingsTermsConsumer {
/** Adds a new term in this field; term ends with U+FFFF
* char */
abstract FormatPostingsDocsConsumer addTerm(char[] text, int start) throws IOException;
char[] termBuffer;
FormatPostingsDocsConsumer addTerm(String text) throws IOException {
final int len = text.length();
if (termBuffer == null || termBuffer.length < 1+len)
termBuffer = new char[ArrayUtil.getNextSize(1+len)];
text.getChars(0, len, termBuffer, 0);
termBuffer[len] = 0xffff;
return addTerm(termBuffer, 0);
}
/** Called when we are done adding terms to this field */
abstract void finish() throws IOException;
}

View File

@ -0,0 +1,71 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
final class FormatPostingsTermsWriter extends FormatPostingsTermsConsumer {
final FormatPostingsFieldsWriter parent;
final FormatPostingsDocsWriter docsWriter;
final TermInfosWriter termsOut;
FieldInfo fieldInfo;
FormatPostingsTermsWriter(SegmentWriteState state, FormatPostingsFieldsWriter parent) throws IOException {
super();
this.parent = parent;
termsOut = parent.termsOut;
docsWriter = new FormatPostingsDocsWriter(state, this);
}
void setField(FieldInfo fieldInfo) {
this.fieldInfo = fieldInfo;
docsWriter.setField(fieldInfo);
}
char[] currentTerm;
int currentTermStart;
long freqStart;
long proxStart;
/** Adds a new term in this field */
FormatPostingsDocsConsumer addTerm(char[] text, int start) {
currentTerm = text;
currentTermStart = start;
// TODO: this is abstraction violation -- ideally this
// terms writer is not so "invasive", looking for file
// pointers in its child consumers.
freqStart = docsWriter.out.getFilePointer();
if (docsWriter.posWriter.out != null)
proxStart = docsWriter.posWriter.out.getFilePointer();
parent.skipListWriter.resetSkip();
return docsWriter;
}
/** Called when we are done adding terms to this field */
void finish() {
}
void close() throws IOException {
docsWriter.close();
}
}

View File

@ -57,7 +57,7 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
} }
} }
void closeDocStore(DocumentsWriter.FlushState state) {} void closeDocStore(SegmentWriteState state) {}
void abort() {} void abort() {}
@ -66,7 +66,7 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
// under the same FieldInfo together, up into TermsHash*. // under the same FieldInfo together, up into TermsHash*.
// Other writers would presumably share alot of this... // Other writers would presumably share alot of this...
public void flush(Map threadsAndFields, final DocumentsWriter.FlushState state) throws IOException { public void flush(Map threadsAndFields, final SegmentWriteState state) throws IOException {
// Gather all FieldData's that have postings, across all // Gather all FieldData's that have postings, across all
// ThreadStates // ThreadStates
@ -92,22 +92,19 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
Collections.sort(allFields); Collections.sort(allFields);
final int numAllFields = allFields.size(); final int numAllFields = allFields.size();
final TermInfosWriter termsOut = new TermInfosWriter(state.directory, // TODO: allow Lucene user to customize this consumer:
state.segmentName, final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos);
fieldInfos, /*
state.docWriter.writer.getTermIndexInterval()); Current writer chain:
FormatPostingsFieldsConsumer
final IndexOutput freqOut = state.directory.createOutput(state.segmentFileName(IndexFileNames.FREQ_EXTENSION)); -> IMPL: FormatPostingsFieldsWriter
final IndexOutput proxOut; -> FormatPostingsTermsConsumer
-> IMPL: FormatPostingsTermsWriter
if (fieldInfos.hasProx()) -> FormatPostingsDocConsumer
proxOut = state.directory.createOutput(state.segmentFileName(IndexFileNames.PROX_EXTENSION)); -> IMPL: FormatPostingsDocWriter
else -> FormatPostingsPositionsConsumer
proxOut = null; -> IMPL: FormatPostingsPositionsWriter
*/
final DefaultSkipListWriter skipListWriter = new DefaultSkipListWriter(termsOut.skipInterval,
termsOut.maxSkipLevels,
state.numDocsInRAM, freqOut, proxOut);
int start = 0; int start = 0;
while(start < numAllFields) { while(start < numAllFields) {
@ -129,7 +126,7 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
// If this field has postings then add them to the // If this field has postings then add them to the
// segment // segment
appendPostings(state, fields, termsOut, freqOut, proxOut, skipListWriter); appendPostings(fields, consumer);
for(int i=0;i<fields.length;i++) { for(int i=0;i<fields.length;i++) {
TermsHashPerField perField = fields[i].termsHashPerField; TermsHashPerField perField = fields[i].termsHashPerField;
@ -149,51 +146,18 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
perThread.termsHashPerThread.reset(true); perThread.termsHashPerThread.reset(true);
} }
freqOut.close(); consumer.finish();
if (proxOut != null) {
state.flushedFiles.add(state.segmentFileName(IndexFileNames.PROX_EXTENSION));
proxOut.close();
}
termsOut.close();
// Record all files we have flushed
state.flushedFiles.add(state.segmentFileName(IndexFileNames.FIELD_INFOS_EXTENSION));
state.flushedFiles.add(state.segmentFileName(IndexFileNames.FREQ_EXTENSION));
state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_EXTENSION));
state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_INDEX_EXTENSION));
} }
final byte[] copyByteBuffer = new byte[4096]; private byte[] payloadBuffer;
/** Copy numBytes from srcIn to destIn */
void copyBytes(IndexInput srcIn, IndexOutput destIn, long numBytes) throws IOException {
// TODO: we could do this more efficiently (save a copy)
// because it's always from a ByteSliceReader ->
// IndexOutput
while(numBytes > 0) {
final int chunk;
if (numBytes > 4096)
chunk = 4096;
else
chunk = (int) numBytes;
srcIn.readBytes(copyByteBuffer, 0, chunk);
destIn.writeBytes(copyByteBuffer, 0, chunk);
numBytes -= chunk;
}
}
/* Walk through all unique text tokens (Posting /* Walk through all unique text tokens (Posting
* instances) found in this field and serialize them * instances) found in this field and serialize them
* into a single RAM segment. */ * into a single RAM segment. */
void appendPostings(final DocumentsWriter.FlushState flushState, void appendPostings(FreqProxTermsWriterPerField[] fields,
FreqProxTermsWriterPerField[] fields, FormatPostingsFieldsConsumer consumer)
TermInfosWriter termsOut,
IndexOutput freqOut,
IndexOutput proxOut,
DefaultSkipListWriter skipListWriter)
throws CorruptIndexException, IOException { throws CorruptIndexException, IOException {
final int fieldNumber = fields[0].fieldInfo.number;
int numFields = fields.length; int numFields = fields.length;
final FreqProxFieldMergeState[] mergeStates = new FreqProxFieldMergeState[numFields]; final FreqProxFieldMergeState[] mergeStates = new FreqProxFieldMergeState[numFields];
@ -208,15 +172,12 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
assert result; assert result;
} }
final int skipInterval = termsOut.skipInterval; final FormatPostingsTermsConsumer termsConsumer = consumer.addField(fields[0].fieldInfo);
final boolean currentFieldOmitTf = fields[0].fieldInfo.omitTf;
// If current field omits tf then it cannot store
// payloads. We silently drop the payloads in this case:
final boolean currentFieldStorePayloads = currentFieldOmitTf ? false : fields[0].fieldInfo.storePayloads;
FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields]; FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields];
final boolean currentFieldOmitTf = fields[0].fieldInfo.omitTf;
while(numFields > 0) { while(numFields > 0) {
// Get the next term to merge // Get the next term to merge
@ -235,43 +196,21 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
termStates[numToMerge++] = mergeStates[i]; termStates[numToMerge++] = mergeStates[i];
} }
int df = 0; final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(termStates[0].text, termStates[0].textOffset);
int lastPayloadLength = -1;
int lastDoc = 0;
final char[] text = termStates[0].text;
final int start = termStates[0].textOffset;
final long freqPointer = freqOut.getFilePointer();
final long proxPointer;
if (proxOut != null)
proxPointer = proxOut.getFilePointer();
else
proxPointer = 0;
skipListWriter.resetSkip();
// Now termStates has numToMerge FieldMergeStates // Now termStates has numToMerge FieldMergeStates
// which all share the same term. Now we must // which all share the same term. Now we must
// interleave the docID streams. // interleave the docID streams.
while(numToMerge > 0) { while(numToMerge > 0) {
if ((++df % skipInterval) == 0) {
skipListWriter.setSkipData(lastDoc, currentFieldStorePayloads, lastPayloadLength);
skipListWriter.bufferSkip(df);
}
FreqProxFieldMergeState minState = termStates[0]; FreqProxFieldMergeState minState = termStates[0];
for(int i=1;i<numToMerge;i++) for(int i=1;i<numToMerge;i++)
if (termStates[i].docID < minState.docID) if (termStates[i].docID < minState.docID)
minState = termStates[i]; minState = termStates[i];
final int doc = minState.docID;
final int termDocFreq = minState.termFreq; final int termDocFreq = minState.termFreq;
assert doc < flushState.numDocsInRAM; final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(minState.docID, termDocFreq);
assert doc > lastDoc || df == 1;
final ByteSliceReader prox = minState.prox; final ByteSliceReader prox = minState.prox;
@ -279,46 +218,31 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
// changing the format to match Lucene's segment // changing the format to match Lucene's segment
// format. // format.
if (!currentFieldOmitTf) { if (!currentFieldOmitTf) {
// omitTf == false so we do write positions & payload // omitTf == false so we do write positions &
assert proxOut != null; // payload
int position = 0;
for(int j=0;j<termDocFreq;j++) { for(int j=0;j<termDocFreq;j++) {
final int code = prox.readVInt(); final int code = prox.readVInt();
if (currentFieldStorePayloads) { position += code >> 1;
final int payloadLength; final int payloadLength;
if ((code & 1) != 0) { if ((code & 1) != 0) {
// This position has a payload // This position has a payload
payloadLength = prox.readVInt(); payloadLength = prox.readVInt();
if (payloadBuffer == null || payloadBuffer.length < payloadLength)
payloadBuffer = new byte[payloadLength];
prox.readBytes(payloadBuffer, 0, payloadLength);
} else } else
payloadLength = 0; payloadLength = 0;
if (payloadLength != lastPayloadLength) {
proxOut.writeVInt(code|1); posConsumer.addPosition(position, payloadBuffer, 0, payloadLength);
proxOut.writeVInt(payloadLength);
lastPayloadLength = payloadLength;
} else
proxOut.writeVInt(code & (~1));
if (payloadLength > 0)
copyBytes(prox, proxOut, payloadLength);
} else {
assert 0 == (code & 1);
proxOut.writeVInt(code>>1);
}
} //End for } //End for
final int newDocCode = (doc-lastDoc)<<1; posConsumer.finish();
if (1 == termDocFreq) {
freqOut.writeVInt(newDocCode|1);
} else {
freqOut.writeVInt(newDocCode);
freqOut.writeVInt(termDocFreq);
} }
} else {
// omitTf==true: we store only the docs, without
// term freq, positions, payloads
freqOut.writeVInt(doc-lastDoc);
}
lastDoc = doc;
if (!minState.nextDoc()) { if (!minState.nextDoc()) {
@ -345,26 +269,10 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
} }
} }
assert df > 0; docConsumer.finish();
// Done merging this term
long skipPointer = skipListWriter.writeSkip(freqOut);
// Write term
termInfo.set(df, freqPointer, proxPointer, (int) (skipPointer - freqPointer));
// TODO: we could do this incrementally
UnicodeUtil.UTF16toUTF8(text, start, termsUTF8);
// TODO: we could save O(n) re-scan of the term by
// computing the shared prefix with the last term
// while during the UTF8 encoding
termsOut.add(fieldNumber,
termsUTF8.result,
termsUTF8.length,
termInfo);
} }
termsConsumer.finish();
} }
private final TermInfo termInfo = new TermInfo(); // minimize consing private final TermInfo termInfo = new TermInfo(); // minimize consing

View File

@ -195,4 +195,8 @@ final class IndexFileNames {
return true; return true;
return false; return false;
} }
static String segmentFileName(String segmentName, String ext) {
return segmentName + "." + ext;
}
} }

View File

@ -29,10 +29,10 @@ abstract class InvertedDocConsumer {
abstract void abort(); abstract void abort();
/** Flush a new segment */ /** Flush a new segment */
abstract void flush(Map threadsAndFields, DocumentsWriter.FlushState state) throws IOException; abstract void flush(Map threadsAndFields, SegmentWriteState state) throws IOException;
/** Close doc stores */ /** Close doc stores */
abstract void closeDocStore(DocumentsWriter.FlushState state) throws IOException; abstract void closeDocStore(SegmentWriteState state) throws IOException;
/** Attempt to free RAM, returning true if any RAM was /** Attempt to free RAM, returning true if any RAM was
* freed */ * freed */

View File

@ -22,8 +22,8 @@ import java.io.IOException;
abstract class InvertedDocEndConsumer { abstract class InvertedDocEndConsumer {
abstract InvertedDocEndConsumerPerThread addThread(DocInverterPerThread docInverterPerThread); abstract InvertedDocEndConsumerPerThread addThread(DocInverterPerThread docInverterPerThread);
abstract void flush(Map threadsAndFields, DocumentsWriter.FlushState state) throws IOException; abstract void flush(Map threadsAndFields, SegmentWriteState state) throws IOException;
abstract void closeDocStore(DocumentsWriter.FlushState state) throws IOException; abstract void closeDocStore(SegmentWriteState state) throws IOException;
abstract void abort(); abstract void abort();
abstract void setFieldInfos(FieldInfos fieldInfos); abstract void setFieldInfos(FieldInfos fieldInfos);
} }

View File

@ -54,7 +54,7 @@ final class NormsWriter extends InvertedDocEndConsumer {
/** Produce _X.nrm if any document had a field with norms /** Produce _X.nrm if any document had a field with norms
* not disabled */ * not disabled */
public void flush(Map threadsAndFields, DocumentsWriter.FlushState state) throws IOException { public void flush(Map threadsAndFields, SegmentWriteState state) throws IOException {
final Map byField = new HashMap(); final Map byField = new HashMap();
@ -133,7 +133,7 @@ final class NormsWriter extends InvertedDocEndConsumer {
} }
} }
assert minDocID < state.numDocsInRAM; assert minDocID < state.numDocs;
// Fill hole // Fill hole
for(;upto<minDocID;upto++) for(;upto<minDocID;upto++)
@ -154,16 +154,16 @@ final class NormsWriter extends InvertedDocEndConsumer {
} }
// Fill final hole with defaultNorm // Fill final hole with defaultNorm
for(;upto<state.numDocsInRAM;upto++) for(;upto<state.numDocs;upto++)
normsOut.writeByte(defaultNorm); normsOut.writeByte(defaultNorm);
} else if (fieldInfo.isIndexed && !fieldInfo.omitNorms) { } else if (fieldInfo.isIndexed && !fieldInfo.omitNorms) {
normCount++; normCount++;
// Fill entire field with default norm: // Fill entire field with default norm:
for(;upto<state.numDocsInRAM;upto++) for(;upto<state.numDocs;upto++)
normsOut.writeByte(defaultNorm); normsOut.writeByte(defaultNorm);
} }
assert 4+normCount*state.numDocsInRAM == normsOut.getFilePointer() : ".nrm file size mismatch: expected=" + (4+normCount*state.numDocsInRAM) + " actual=" + normsOut.getFilePointer(); assert 4+normCount*state.numDocs == normsOut.getFilePointer() : ".nrm file size mismatch: expected=" + (4+normCount*state.numDocs) + " actual=" + normsOut.getFilePointer();
} }
} finally { } finally {
@ -171,5 +171,5 @@ final class NormsWriter extends InvertedDocEndConsumer {
} }
} }
void closeDocStore(DocumentsWriter.FlushState state) {} void closeDocStore(SegmentWriteState state) {}
} }

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.HashSet;
import java.util.List; import java.util.List;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
@ -476,38 +477,28 @@ final class SegmentMerger {
throw new RuntimeException("mergeVectors produced an invalid result: mergedDocs is " + mergedDocs + " but tvx size is " + tvxSize + "; now aborting this merge to prevent index corruption"); throw new RuntimeException("mergeVectors produced an invalid result: mergedDocs is " + mergedDocs + " but tvx size is " + tvxSize + "; now aborting this merge to prevent index corruption");
} }
private IndexOutput freqOutput = null;
private IndexOutput proxOutput = null;
private TermInfosWriter termInfosWriter = null;
private int skipInterval;
private int maxSkipLevels;
private SegmentMergeQueue queue = null; private SegmentMergeQueue queue = null;
private DefaultSkipListWriter skipListWriter = null;
private final void mergeTerms() throws CorruptIndexException, IOException { private final void mergeTerms() throws CorruptIndexException, IOException {
SegmentWriteState state = new SegmentWriteState(null, directory, segment, null, mergedDocs, 0, termIndexInterval);
final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos);
try { try {
freqOutput = directory.createOutput(segment + ".frq");
if (hasProx())
proxOutput = directory.createOutput(segment + ".prx");
termInfosWriter =
new TermInfosWriter(directory, segment, fieldInfos,
termIndexInterval);
skipInterval = termInfosWriter.skipInterval;
maxSkipLevels = termInfosWriter.maxSkipLevels;
skipListWriter = new DefaultSkipListWriter(skipInterval, maxSkipLevels, mergedDocs, freqOutput, proxOutput);
queue = new SegmentMergeQueue(readers.size()); queue = new SegmentMergeQueue(readers.size());
mergeTermInfos(); mergeTermInfos(consumer);
} finally { } finally {
if (freqOutput != null) freqOutput.close(); consumer.finish();
if (proxOutput != null) proxOutput.close();
if (termInfosWriter != null) termInfosWriter.close();
if (queue != null) queue.close(); if (queue != null) queue.close();
} }
} }
private final void mergeTermInfos() throws CorruptIndexException, IOException { boolean omitTF;
private final void mergeTermInfos(final FormatPostingsFieldsConsumer consumer) throws CorruptIndexException, IOException {
int base = 0; int base = 0;
final int readerCount = readers.size(); final int readerCount = readers.size();
for (int i = 0; i < readerCount; i++) { for (int i = 0; i < readerCount; i++) {
@ -533,6 +524,9 @@ final class SegmentMerger {
SegmentMergeInfo[] match = new SegmentMergeInfo[readers.size()]; SegmentMergeInfo[] match = new SegmentMergeInfo[readers.size()];
String currentField = null;
FormatPostingsTermsConsumer termsConsumer = null;
while (queue.size() > 0) { while (queue.size() > 0) {
int matchSize = 0; // pop matching terms int matchSize = 0; // pop matching terms
match[matchSize++] = (SegmentMergeInfo) queue.pop(); match[matchSize++] = (SegmentMergeInfo) queue.pop();
@ -544,7 +538,16 @@ final class SegmentMerger {
top = (SegmentMergeInfo) queue.top(); top = (SegmentMergeInfo) queue.top();
} }
final int df = mergeTermInfo(match, matchSize); // add new TermInfo if (currentField != term.field) {
currentField = term.field;
if (termsConsumer != null)
termsConsumer.finish();
final FieldInfo fieldInfo = fieldInfos.fieldInfo(currentField);
termsConsumer = consumer.addField(fieldInfo);
omitTF = fieldInfo.omitTf;
}
int df = appendPostings(termsConsumer, match, matchSize); // add new TermInfo
if (checkAbort != null) if (checkAbort != null)
checkAbort.work(df/3.0); checkAbort.work(df/3.0);
@ -559,44 +562,6 @@ final class SegmentMerger {
} }
} }
private final TermInfo termInfo = new TermInfo(); // minimize consing
/** Merge one term found in one or more segments. The array <code>smis</code>
* contains segments that are positioned at the same term. <code>N</code>
* is the number of cells in the array actually occupied.
*
* @param smis array of segments
* @param n number of cells in the array actually occupied
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
private final int mergeTermInfo(SegmentMergeInfo[] smis, int n)
throws CorruptIndexException, IOException {
final long freqPointer = freqOutput.getFilePointer();
final long proxPointer;
if (proxOutput != null)
proxPointer = proxOutput.getFilePointer();
else
proxPointer = 0;
int df;
if (fieldInfos.fieldInfo(smis[0].term.field).omitTf) { // append posting data
df = appendPostingsNoTf(smis, n);
} else{
df = appendPostings(smis, n);
}
long skipPointer = skipListWriter.writeSkip(freqOutput);
if (df > 0) {
// add an entry to the dictionary with pointers to prox and freq files
termInfo.set(df, freqPointer, proxPointer, (int) (skipPointer - freqPointer));
termInfosWriter.add(smis[0].term, termInfo);
}
return df;
}
private byte[] payloadBuffer; private byte[] payloadBuffer;
private int[][] docMaps; private int[][] docMaps;
int[][] getDocMaps() { int[][] getDocMaps() {
@ -617,13 +582,11 @@ final class SegmentMerger {
* @throws CorruptIndexException if the index is corrupt * @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
private final int appendPostings(SegmentMergeInfo[] smis, int n) private final int appendPostings(final FormatPostingsTermsConsumer termsConsumer, SegmentMergeInfo[] smis, int n)
throws CorruptIndexException, IOException { throws CorruptIndexException, IOException {
int lastDoc = 0;
int df = 0; // number of docs w/ term final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(smis[0].term.text);
skipListWriter.resetSkip(); int df = 0;
boolean storePayloads = fieldInfos.fieldInfo(smis[0].term.field).storePayloads;
int lastPayloadLength = -1; // ensures that we write the first length
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
SegmentMergeInfo smi = smis[i]; SegmentMergeInfo smi = smis[i];
TermPositions postings = smi.getPositions(); TermPositions postings = smi.getPositions();
@ -631,111 +594,34 @@ final class SegmentMerger {
int base = smi.base; int base = smi.base;
int[] docMap = smi.getDocMap(); int[] docMap = smi.getDocMap();
postings.seek(smi.termEnum); postings.seek(smi.termEnum);
while (postings.next()) { while (postings.next()) {
df++;
int doc = postings.doc(); int doc = postings.doc();
if (docMap != null) if (docMap != null)
doc = docMap[doc]; // map around deletions doc = docMap[doc]; // map around deletions
doc += base; // convert to merged space doc += base; // convert to merged space
if (doc < 0 || (df > 0 && doc <= lastDoc)) final int freq = postings.freq();
throw new CorruptIndexException("docs out of order (" + doc + final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(doc, freq);
" <= " + lastDoc + " )");
df++; if (!omitTF) {
if ((df % skipInterval) == 0) {
skipListWriter.setSkipData(lastDoc, storePayloads, lastPayloadLength);
skipListWriter.bufferSkip(df);
}
int docCode = (doc - lastDoc) << 1; // use low bit to flag freq=1
lastDoc = doc;
int freq = postings.freq();
if (freq == 1) {
freqOutput.writeVInt(docCode | 1); // write doc & freq=1
} else {
freqOutput.writeVInt(docCode); // write doc
freqOutput.writeVInt(freq); // write frequency in doc
}
/** See {@link DocumentWriter#writePostings(Posting[], String)} for
* documentation about the encoding of positions and payloads
*/
int lastPosition = 0; // write position deltas
for (int j = 0; j < freq; j++) { for (int j = 0; j < freq; j++) {
int position = postings.nextPosition(); final int position = postings.nextPosition();
int delta = position - lastPosition; final int payloadLength = postings.getPayloadLength();
if (storePayloads) {
int payloadLength = postings.getPayloadLength();
if (payloadLength == lastPayloadLength) {
proxOutput.writeVInt(delta * 2);
} else {
proxOutput.writeVInt(delta * 2 + 1);
proxOutput.writeVInt(payloadLength);
lastPayloadLength = payloadLength;
}
if (payloadLength > 0) { if (payloadLength > 0) {
if (payloadBuffer == null || payloadBuffer.length < payloadLength) { if (payloadBuffer == null || payloadBuffer.length < payloadLength)
payloadBuffer = new byte[payloadLength]; payloadBuffer = new byte[payloadLength];
}
postings.getPayload(payloadBuffer, 0); postings.getPayload(payloadBuffer, 0);
proxOutput.writeBytes(payloadBuffer, 0, payloadLength);
} }
} else { posConsumer.addPosition(position, payloadBuffer, 0, payloadLength);
proxOutput.writeVInt(delta);
} }
lastPosition = position; posConsumer.finish();
} }
} }
} }
return df; docConsumer.finish();
}
/** Process postings from multiple segments without tf, all positioned on the
* same term. Writes out merged entries only into freqOutput, proxOut is not written.
*
* @param smis array of segments
* @param n number of cells in the array actually occupied
* @return number of documents across all segments where this term was found
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
private final int appendPostingsNoTf(SegmentMergeInfo[] smis, int n)
throws CorruptIndexException, IOException {
int lastDoc = 0;
int df = 0; // number of docs w/ term
skipListWriter.resetSkip();
int lastPayloadLength = -1; // ensures that we write the first length
for (int i = 0; i < n; i++) {
SegmentMergeInfo smi = smis[i];
TermPositions postings = smi.getPositions();
assert postings != null;
int base = smi.base;
int[] docMap = smi.getDocMap();
postings.seek(smi.termEnum);
while (postings.next()) {
int doc = postings.doc();
if (docMap != null)
doc = docMap[doc]; // map around deletions
doc += base; // convert to merged space
if (doc < 0 || (df > 0 && doc <= lastDoc))
throw new CorruptIndexException("docs out of order (" + doc +
" <= " + lastDoc + " )");
df++;
if ((df % skipInterval) == 0) {
skipListWriter.setSkipData(lastDoc, false, lastPayloadLength);
skipListWriter.bufferSkip(df);
}
int docCode = (doc - lastDoc);
lastDoc = doc;
freqOutput.writeVInt(docCode); // write doc & freq=1
}
}
return df; return df;
} }

View File

@ -0,0 +1,50 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.HashSet;
import java.util.Collection;
import org.apache.lucene.store.Directory;
class SegmentWriteState {
DocumentsWriter docWriter;
Directory directory;
String segmentName;
String docStoreSegmentName;
int numDocs;
int termIndexInterval;
int numDocsInStore;
Collection flushedFiles;
public SegmentWriteState(DocumentsWriter docWriter, Directory directory, String segmentName, String docStoreSegmentName, int numDocs,
int numDocsInStore, int termIndexInterval) {
this.docWriter = docWriter;
this.directory = directory;
this.segmentName = segmentName;
this.docStoreSegmentName = docStoreSegmentName;
this.numDocs = numDocs;
this.numDocsInStore = numDocsInStore;
this.termIndexInterval = termIndexInterval;
flushedFiles = new HashSet();
}
public String segmentFileName(String ext) {
return segmentName + "." + ext;
}
}

View File

@ -40,7 +40,7 @@ final class StoredFieldsWriter extends DocFieldConsumer {
return new StoredFieldsWriterPerThread(docFieldProcessorPerThread, this); return new StoredFieldsWriterPerThread(docFieldProcessorPerThread, this);
} }
synchronized public void flush(Map threadsAndFields, DocumentsWriter.FlushState state) throws IOException { synchronized public void flush(Map threadsAndFields, SegmentWriteState state) throws IOException {
if (state.numDocsInStore > 0) { if (state.numDocsInStore > 0) {
// It's possible that all documents seen in this segment // It's possible that all documents seen in this segment
@ -72,7 +72,7 @@ final class StoredFieldsWriter extends DocFieldConsumer {
} }
} }
synchronized public void closeDocStore(DocumentsWriter.FlushState state) throws IOException { synchronized public void closeDocStore(SegmentWriteState state) throws IOException {
final int inc = state.numDocsInStore - lastDocID; final int inc = state.numDocsInStore - lastDocID;
if (inc > 0) { if (inc > 0) {
initFieldsWriter(); initFieldsWriter();

View File

@ -51,7 +51,7 @@ final class TermVectorsTermsWriter extends TermsHashConsumer {
postings[i] = new PostingList(); postings[i] = new PostingList();
} }
synchronized void flush(Map threadsAndFields, final DocumentsWriter.FlushState state) throws IOException { synchronized void flush(Map threadsAndFields, final SegmentWriteState state) throws IOException {
if (tvx != null) { if (tvx != null) {
@ -80,7 +80,7 @@ final class TermVectorsTermsWriter extends TermsHashConsumer {
} }
} }
synchronized void closeDocStore(final DocumentsWriter.FlushState state) throws IOException { synchronized void closeDocStore(final SegmentWriteState state) throws IOException {
if (tvx != null) { if (tvx != null) {
// At least one doc in this run had term vectors // At least one doc in this run had term vectors
// enabled // enabled

View File

@ -85,7 +85,7 @@ final class TermsHash extends InvertedDocConsumer {
nextTermsHash.abort(); nextTermsHash.abort();
} }
void shrinkFreePostings(Map threadsAndFields, DocumentsWriter.FlushState state) { void shrinkFreePostings(Map threadsAndFields, SegmentWriteState state) {
assert postingsFreeCount == postingsAllocCount: Thread.currentThread().getName() + ": postingsFreeCount=" + postingsFreeCount + " postingsAllocCount=" + postingsAllocCount + " consumer=" + consumer; assert postingsFreeCount == postingsAllocCount: Thread.currentThread().getName() + ": postingsFreeCount=" + postingsFreeCount + " postingsAllocCount=" + postingsAllocCount + " consumer=" + consumer;
@ -97,13 +97,13 @@ final class TermsHash extends InvertedDocConsumer {
} }
} }
synchronized void closeDocStore(DocumentsWriter.FlushState state) throws IOException { synchronized void closeDocStore(SegmentWriteState state) throws IOException {
consumer.closeDocStore(state); consumer.closeDocStore(state);
if (nextTermsHash != null) if (nextTermsHash != null)
nextTermsHash.closeDocStore(state); nextTermsHash.closeDocStore(state);
} }
synchronized void flush(Map threadsAndFields, final DocumentsWriter.FlushState state) throws IOException { synchronized void flush(Map threadsAndFields, final SegmentWriteState state) throws IOException {
Map childThreadsAndFields = new HashMap(); Map childThreadsAndFields = new HashMap();
Map nextThreadsAndFields; Map nextThreadsAndFields;

View File

@ -24,9 +24,9 @@ abstract class TermsHashConsumer {
abstract int bytesPerPosting(); abstract int bytesPerPosting();
abstract void createPostings(RawPostingList[] postings, int start, int count); abstract void createPostings(RawPostingList[] postings, int start, int count);
abstract TermsHashConsumerPerThread addThread(TermsHashPerThread perThread); abstract TermsHashConsumerPerThread addThread(TermsHashPerThread perThread);
abstract void flush(Map threadsAndFields, final DocumentsWriter.FlushState state) throws IOException; abstract void flush(Map threadsAndFields, final SegmentWriteState state) throws IOException;
abstract void abort(); abstract void abort();
abstract void closeDocStore(DocumentsWriter.FlushState state) throws IOException; abstract void closeDocStore(SegmentWriteState state) throws IOException;
FieldInfos fieldInfos; FieldInfos fieldInfos;