Replace DWPT.DocState with simple method parameters (#1594)

DWPT.DocState had some history value but today in a little bit more
cleaned up DWPT and IndexingChain there is little to no value in having
this class. It also requires explicit cleanup which is not not necessary
anymore.
This commit is contained in:
Simon Willnauer 2020-06-18 20:02:10 +02:00 committed by GitHub
parent 4db1e3895f
commit 56febf05c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 72 additions and 99 deletions

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesFormat;
@ -45,13 +46,13 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefHash.MaxBytesLengthExceededException;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.RamUsageEstimator;
/** Default general purpose indexing chain, which handles
* indexing all types of fields. */
final class DefaultIndexingChain extends DocConsumer {
final Counter bytesUsed;
final DocumentsWriterPerThread.DocState docState;
final DocumentsWriterPerThread docWriter;
final FieldInfos.Builder fieldInfos;
@ -71,12 +72,13 @@ final class DefaultIndexingChain extends DocConsumer {
// Holds fields seen in each document
private PerField[] fields = new PerField[1];
private final InfoStream infoStream;
public DefaultIndexingChain(DocumentsWriterPerThread docWriter) {
this.docWriter = docWriter;
this.fieldInfos = docWriter.getFieldInfosBuilder();
this.docState = docWriter.docState;
this.bytesUsed = docWriter.bytesUsed;
this.infoStream = docWriter.getIndexWriterConfig().getInfoStream();
final TermsHash termVectorsWriter;
if (docWriter.getSegmentInfo().getIndexSort() == null) {
@ -92,7 +94,7 @@ final class DefaultIndexingChain extends DocConsumer {
private LeafReader getDocValuesLeafReader() {
return new DocValuesLeafReader() {
@Override
public NumericDocValues getNumericDocValues(String field) throws IOException {
public NumericDocValues getNumericDocValues(String field) {
PerField pf = getPerField(field);
if (pf == null) {
return null;
@ -104,7 +106,7 @@ final class DefaultIndexingChain extends DocConsumer {
}
@Override
public BinaryDocValues getBinaryDocValues(String field) throws IOException {
public BinaryDocValues getBinaryDocValues(String field) {
PerField pf = getPerField(field);
if (pf == null) {
return null;
@ -190,29 +192,29 @@ final class DefaultIndexingChain extends DocConsumer {
int maxDoc = state.segmentInfo.maxDoc();
long t0 = System.nanoTime();
writeNorms(state, sortMap);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write norms");
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write norms");
}
SegmentReadState readState = new SegmentReadState(state.directory, state.segmentInfo, state.fieldInfos, IOContext.READ, state.segmentSuffix);
t0 = System.nanoTime();
writeDocValues(state, sortMap);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write docValues");
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write docValues");
}
t0 = System.nanoTime();
writePoints(state, sortMap);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write points");
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write points");
}
// it's possible all docs hit non-aborting exceptions...
t0 = System.nanoTime();
storedFieldsConsumer.finish(maxDoc);
storedFieldsConsumer.flush(state, sortMap);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to finish stored fields");
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to finish stored fields");
}
t0 = System.nanoTime();
@ -237,8 +239,8 @@ final class DefaultIndexingChain extends DocConsumer {
}
termsHash.flush(fieldsToFlush, state, sortMap, normsMergeInstance);
}
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write postings and finish vectors");
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write postings and finish vectors");
}
// Important to save after asking consumer to flush so
@ -247,8 +249,8 @@ final class DefaultIndexingChain extends DocConsumer {
// FieldInfo.storePayload.
t0 = System.nanoTime();
docWriter.codec.fieldInfosFormat().write(state.directory, state.segmentInfo, "", state.fieldInfos, IOContext.DEFAULT);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write fieldInfos");
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write fieldInfos");
}
return sortMap;
@ -440,7 +442,7 @@ final class DefaultIndexingChain extends DocConsumer {
}
@Override
public void processDocument() throws IOException {
public void processDocument(int docID, Iterable<? extends IndexableField> document) throws IOException {
// How many indexed field names we've seen (collapses
// multiple field instances by the same name):
@ -457,23 +459,23 @@ final class DefaultIndexingChain extends DocConsumer {
termsHash.startDocument();
startStoredFields(docState.docID);
startStoredFields(docID);
try {
for (IndexableField field : docState.doc) {
fieldCount = processField(field, fieldGen, fieldCount);
for (IndexableField field : document) {
fieldCount = processField(docID, field, fieldGen, fieldCount);
}
} finally {
if (docWriter.hasHitAbortingException() == false) {
// Finish each indexed field name seen in the document:
for (int i=0;i<fieldCount;i++) {
fields[i].finish();
fields[i].finish(docID);
}
finishStoredFields();
}
}
try {
termsHash.finishDocument();
termsHash.finishDocument(docID);
} catch (Throwable th) {
// Must abort, on the possibility that on-disk term
// vectors are now corrupt:
@ -482,7 +484,7 @@ final class DefaultIndexingChain extends DocConsumer {
}
}
private int processField(IndexableField field, long fieldGen, int fieldCount) throws IOException {
private int processField(int docID, IndexableField field, long fieldGen, int fieldCount) throws IOException {
String fieldName = field.name();
IndexableFieldType fieldType = field.fieldType();
@ -496,7 +498,7 @@ final class DefaultIndexingChain extends DocConsumer {
if (fieldType.indexOptions() != IndexOptions.NONE) {
fp = getOrAddField(fieldName, fieldType, true);
boolean first = fp.fieldGen != fieldGen;
fp.invert(field, first);
fp.invert(docID, field, first);
if (first) {
fields[fieldCount++] = fp;
@ -533,13 +535,13 @@ final class DefaultIndexingChain extends DocConsumer {
if (fp == null) {
fp = getOrAddField(fieldName, fieldType, false);
}
indexDocValue(fp, dvType, field);
indexDocValue(docID, fp, dvType, field);
}
if (fieldType.pointDimensionCount() != 0) {
if (fp == null) {
fp = getOrAddField(fieldName, fieldType, false);
}
indexPoint(fp, field);
indexPoint(docID, fp, field);
}
return fieldCount;
@ -565,7 +567,7 @@ final class DefaultIndexingChain extends DocConsumer {
}
/** Called from processDocument to index one field's point */
private void indexPoint(PerField fp, IndexableField field) throws IOException {
private void indexPoint(int docID, PerField fp, IndexableField field) {
int pointDimensionCount = field.fieldType().pointDimensionCount();
int pointIndexDimensionCount = field.fieldType().pointIndexDimensionCount();
@ -582,7 +584,7 @@ final class DefaultIndexingChain extends DocConsumer {
if (fp.pointValuesWriter == null) {
fp.pointValuesWriter = new PointValuesWriter(docWriter, fp.fieldInfo);
}
fp.pointValuesWriter.addPackedValue(docState.docID, field.binaryValue());
fp.pointValuesWriter.addPackedValue(docID, field.binaryValue());
}
private void validateIndexSortDVType(Sort indexSort, String fieldToValidate, DocValuesType dvType) throws IOException {
@ -641,7 +643,7 @@ final class DefaultIndexingChain extends DocConsumer {
}
/** Called from processDocument to index one field's doc value */
private void indexDocValue(PerField fp, DocValuesType dvType, IndexableField field) throws IOException {
private void indexDocValue(int docID, PerField fp, DocValuesType dvType, IndexableField field) throws IOException {
if (fp.fieldInfo.getDocValuesType() == DocValuesType.NONE) {
// This is the first time we are seeing this field indexed with doc values, so we
@ -656,8 +658,6 @@ final class DefaultIndexingChain extends DocConsumer {
fp.fieldInfo.setDocValuesType(dvType);
int docID = docState.docID;
switch(dvType) {
case NUMERIC:
@ -737,7 +737,9 @@ final class DefaultIndexingChain extends DocConsumer {
attributes.forEach((k, v) -> fi.putAttribute(k, v));
}
fp = new PerField(docWriter.getIndexCreatedVersionMajor(), fi, invert);
LiveIndexWriterConfig indexWriterConfig = docWriter.getIndexWriterConfig();
fp = new PerField(docWriter.getIndexCreatedVersionMajor(), fi, invert,
indexWriterConfig.getSimilarity(), indexWriterConfig.getInfoStream(), indexWriterConfig.getAnalyzer());
fp.next = fieldHash[hashPos];
fieldHash[hashPos] = fp;
totalFieldCount++;
@ -801,11 +803,15 @@ final class DefaultIndexingChain extends DocConsumer {
// reused
TokenStream tokenStream;
private final InfoStream infoStream;
private final Analyzer analyzer;
public PerField(int indexCreatedVersionMajor, FieldInfo fieldInfo, boolean invert) {
PerField(int indexCreatedVersionMajor, FieldInfo fieldInfo, boolean invert, Similarity similarity, InfoStream infoStream, Analyzer analyzer) {
this.indexCreatedVersionMajor = indexCreatedVersionMajor;
this.fieldInfo = fieldInfo;
similarity = docState.similarity;
this.similarity = similarity;
this.infoStream = infoStream;
this.analyzer = analyzer;
if (invert) {
setInvertState();
}
@ -817,7 +823,7 @@ final class DefaultIndexingChain extends DocConsumer {
if (fieldInfo.omitsNorms() == false) {
assert norms == null;
// Even if no documents actually succeed in setting a norm, we still write norms for this segment:
norms = new NormValuesWriter(fieldInfo, docState.docWriter.bytesUsed);
norms = new NormValuesWriter(fieldInfo, bytesUsed);
}
}
@ -826,7 +832,7 @@ final class DefaultIndexingChain extends DocConsumer {
return this.fieldInfo.name.compareTo(other.fieldInfo.name);
}
public void finish() throws IOException {
public void finish(int docID) throws IOException {
if (fieldInfo.omitsNorms() == false) {
long normValue;
if (invertState.length == 0) {
@ -840,7 +846,7 @@ final class DefaultIndexingChain extends DocConsumer {
throw new IllegalStateException("Similarity " + similarity + " return 0 for non-empty field");
}
}
norms.addValue(docState.docID, normValue);
norms.addValue(docID, normValue);
}
termsHashPerField.finish();
@ -849,7 +855,7 @@ final class DefaultIndexingChain extends DocConsumer {
/** Inverts one field for one document; first is true
* if this is the first time we are seeing this field
* name in this document. */
public void invert(IndexableField field, boolean first) throws IOException {
public void invert(int docID, IndexableField field, boolean first) throws IOException {
if (first) {
// First time we're seeing this field (indexed) in
// this document:
@ -865,7 +871,7 @@ final class DefaultIndexingChain extends DocConsumer {
fieldInfo.setOmitsNorms();
}
final boolean analyzed = fieldType.tokenized() && docState.analyzer != null;
final boolean analyzed = fieldType.tokenized() && analyzer != null;
/*
* To assist people in tracking down problems in analysis components, we wish to write the field name to the infostream
@ -873,7 +879,7 @@ final class DefaultIndexingChain extends DocConsumer {
* but rather a finally that takes note of the problem.
*/
boolean succeededInProcessingField = false;
try (TokenStream stream = tokenStream = field.tokenStream(docState.analyzer, tokenStream)) {
try (TokenStream stream = tokenStream = field.tokenStream(analyzer, tokenStream)) {
// reset the TokenStream to the first token
stream.reset();
invertState.setAttributeSource(stream);
@ -929,14 +935,14 @@ final class DefaultIndexingChain extends DocConsumer {
// corrupt and should not be flushed to a
// new segment:
try {
termsHashPerField.add(invertState.termAttribute.getBytesRef(), docState.docID);
termsHashPerField.add(invertState.termAttribute.getBytesRef(), docID);
} catch (MaxBytesLengthExceededException e) {
byte[] prefix = new byte[30];
BytesRef bigTerm = invertState.termAttribute.getBytesRef();
System.arraycopy(bigTerm.bytes, bigTerm.offset, prefix, 0, 30);
String msg = "Document contains at least one immense term in field=\"" + fieldInfo.name + "\" (whose UTF8 encoding is longer than the max length " + DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8 + "), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '" + Arrays.toString(prefix) + "...', original message: " + e.getMessage();
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", "ERROR: " + msg);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "ERROR: " + msg);
}
// Document will be deleted above:
throw new IllegalArgumentException(msg, e);
@ -957,14 +963,14 @@ final class DefaultIndexingChain extends DocConsumer {
/* if there is an exception coming through, we won't set this to true here:*/
succeededInProcessingField = true;
} finally {
if (!succeededInProcessingField && docState.infoStream.isEnabled("DW")) {
docState.infoStream.message("DW", "An exception was thrown while processing field " + fieldInfo.name);
if (!succeededInProcessingField && infoStream.isEnabled("DW")) {
infoStream.message("DW", "An exception was thrown while processing field " + fieldInfo.name);
}
}
if (analyzed) {
invertState.position += docState.analyzer.getPositionIncrementGap(fieldInfo.name);
invertState.offset += docState.analyzer.getOffsetGap(fieldInfo.name);
invertState.position += analyzer.getPositionIncrementGap(fieldInfo.name);
invertState.offset += analyzer.getOffsetGap(fieldInfo.name);
}
}
}

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.lucene.search.DocIdSetIterator;
abstract class DocConsumer {
abstract void processDocument() throws IOException;
abstract void processDocument(int docId, Iterable<? extends IndexableField> document) throws IOException;
abstract Sorter.DocMap flush(final SegmentWriteState state) throws IOException;
abstract void abort() throws IOException;

View File

@ -137,7 +137,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
return new DocumentsWriterPerThread(indexCreatedVersionMajor,
segmentNameSupplier.get(), directoryOrig,
directory, config, infoStream, deleteQueue, infos,
directory, config, deleteQueue, infos,
pendingNumDocs, enableTestPoints);
});
this.pendingNumDocs = pendingNumDocs;

View File

@ -26,11 +26,9 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext;
@ -52,6 +50,10 @@ import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
final class DocumentsWriterPerThread {
LiveIndexWriterConfig getIndexWriterConfig() {
return indexWriterConfig;
}
/**
* The IndexingChain must define the {@link #getChain(DocumentsWriterPerThread)} method
* which returns the DocConsumer that the DocumentsWriter calls to process the
@ -85,27 +87,6 @@ final class DocumentsWriterPerThread {
}
};
static class DocState {
final DocumentsWriterPerThread docWriter;
final Analyzer analyzer;
InfoStream infoStream;
Similarity similarity;
int docID;
Iterable<? extends IndexableField> doc;
DocState(DocumentsWriterPerThread docWriter, Analyzer analyzer, InfoStream infoStream) {
this.docWriter = docWriter;
this.infoStream = infoStream;
this.analyzer = analyzer;
}
public void clear() {
// don't hold onto doc nor analyzer, in case it is
// largish:
doc = null;
}
}
static final class FlushedSegment {
final SegmentCommitInfo segmentInfo;
final FieldInfos fieldInfos;
@ -150,7 +131,6 @@ final class DocumentsWriterPerThread {
private final static boolean INFO_VERBOSE = false;
final Codec codec;
final TrackingDirectoryWrapper directory;
final DocState docState;
private final DocConsumer consumer;
final Counter bytesUsed;
@ -179,15 +159,13 @@ final class DocumentsWriterPerThread {
private int numDeletedDocIds = 0;
DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, DocumentsWriterDeleteQueue deleteQueue,
FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
this.directory = new TrackingDirectoryWrapper(directory);
this.fieldInfos = fieldInfos;
this.indexWriterConfig = indexWriterConfig;
this.infoStream = infoStream;
this.infoStream = indexWriterConfig.getInfoStream();
this.codec = indexWriterConfig.getCodec();
this.docState = new DocState(this, indexWriterConfig.getAnalyzer(), infoStream);
this.docState.similarity = indexWriterConfig.getSimilarity();
this.pendingNumDocs = pendingNumDocs;
bytesUsed = Counter.newCounter();
byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
@ -239,7 +217,7 @@ final class DocumentsWriterPerThread {
testPoint("DocumentsWriterPerThread addDocuments start");
assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing";
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name);
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + numDocsInRAM + " seg=" + segmentInfo.name);
}
final int docsInRamBefore = numDocsInRAM;
boolean allDocsIndexed = false;
@ -252,13 +230,7 @@ final class DocumentsWriterPerThread {
// it's very hard to fix (we can't easily distinguish aborting
// vs non-aborting exceptions):
reserveOneDoc();
docState.doc = doc;
docState.docID = numDocsInRAM;
try {
consumer.processDocument();
} finally {
numDocsInRAM++; // we count the doc anyway even in the case of an exception
}
consumer.processDocument(numDocsInRAM++, doc);
}
allDocsIndexed = true;
return finishDocuments(deleteNode, docsInRamBefore);
@ -268,7 +240,6 @@ final class DocumentsWriterPerThread {
// go and mark all docs from this block as deleted
deleteLastDocs(numDocsInRAM - docsInRamBefore);
}
docState.clear();
}
} finally {
maybeAbort("updateDocuments", flushNotifications);
@ -400,8 +371,8 @@ final class DocumentsWriterPerThread {
final Sorter.DocMap sortMap;
try {
DocIdSetIterator softDeletedDocs;
if (indexWriterConfig.getSoftDeletesField() != null) {
softDeletedDocs = consumer.getHasDocValues(indexWriterConfig.getSoftDeletesField());
if (getIndexWriterConfig().getSoftDeletesField() != null) {
softDeletedDocs = consumer.getHasDocValues(getIndexWriterConfig().getSoftDeletesField());
} else {
softDeletedDocs = null;
}
@ -509,7 +480,7 @@ final class DocumentsWriterPerThread {
boolean success = false;
try {
if (indexWriterConfig.getUseCompoundFile()) {
if (getIndexWriterConfig().getUseCompoundFile()) {
Set<String> originalFiles = newSegment.info.files();
// TODO: like addIndexes, we are relying on createCompoundFile to successfully cleanup...
IndexWriter.createCompoundFile(infoStream, new TrackingDirectoryWrapper(directory), newSegment.info, context, flushNotifications::deleteUnusedFiles);

View File

@ -91,7 +91,7 @@ class TermVectorsConsumer extends TermsHash {
}
@Override
void finishDocument() throws IOException {
void finishDocument(int docID) throws IOException {
if (!hasVectors) {
return;
@ -102,7 +102,7 @@ class TermVectorsConsumer extends TermsHash {
initTermVectorsWriter();
fill(docState.docID);
fill(docID);
// Append term vectors to the real outputs:
writer.startDocument(numVectorFields);
@ -111,7 +111,7 @@ class TermVectorsConsumer extends TermsHash {
}
writer.finishDocument();
assert lastDocID == docState.docID: "lastDocID=" + lastDocID + " docState.docID=" + docState.docID;
assert lastDocID == docID: "lastDocID=" + lastDocID + " docID=" + docID;
lastDocID++;

View File

@ -40,14 +40,10 @@ abstract class TermsHash {
final ByteBlockPool bytePool;
ByteBlockPool termBytePool;
final Counter bytesUsed;
final DocumentsWriterPerThread.DocState docState;
final boolean trackAllocations;
TermsHash(final DocumentsWriterPerThread docWriter, boolean trackAllocations, TermsHash nextTermsHash) {
this.docState = docWriter.docState;
this.trackAllocations = trackAllocations;
this.trackAllocations = trackAllocations;
this.nextTermsHash = nextTermsHash;
this.bytesUsed = trackAllocations ? docWriter.bytesUsed : Counter.newCounter();
intPool = new IntBlockPool(docWriter.intBlockAllocator);
@ -90,9 +86,9 @@ abstract class TermsHash {
abstract TermsHashPerField addField(FieldInvertState fieldInvertState, FieldInfo fieldInfo);
void finishDocument() throws IOException {
void finishDocument(int docID) throws IOException {
if (nextTermsHash != null) {
nextTermsHash.finishDocument();
nextTermsHash.finishDocument(docID);
}
}