Cleanup TermsHashPerField (#1573)

Several classes within the IndexWriter indexing chain haven't been touched for 
several years. Most of these classes expose their internals through public
members and are difficult to construct in tests since they depend on many other
classes. This change tries to clean up TermsHashPerField and adds a dedicated
standalone test for it to make it more accessible for other developers since
it's simpler to understand. There are also attempts to make documentation better
as a result of this refactoring.
This commit is contained in:
Simon Willnauer 2020-06-16 14:45:45 +02:00 committed by GitHub
parent a7792b129b
commit c083e5414e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 421 additions and 200 deletions

View File

@ -26,7 +26,6 @@ import org.apache.lucene.util.ByteBlockPool;
* byte[]. This is used by DocumentsWriter to hold the
* posting list for many terms in RAM.
*/
final class ByteSliceWriter extends DataOutput {
private byte[] slice;

View File

@ -929,7 +929,7 @@ final class DefaultIndexingChain extends DocConsumer {
// corrupt and should not be flushed to a
// new segment:
try {
termsHashPerField.add();
termsHashPerField.add(invertState.termAttribute.getBytesRef(), docState.docID);
} catch (MaxBytesLengthExceededException e) {
byte[] prefix = new byte[30];
BytesRef bigTerm = invertState.termAttribute.getBytesRef();

View File

@ -39,7 +39,7 @@ class FreqProxFields extends Fields {
public FreqProxFields(List<FreqProxTermsWriterPerField> fieldList) {
// NOTE: fields are already sorted by field name
for(FreqProxTermsWriterPerField field : fieldList) {
fields.put(field.fieldInfo.name, field);
fields.put(field.getFieldName(), field);
}
}
@ -55,7 +55,6 @@ class FreqProxFields extends Fields {
@Override
public int size() {
//return fields.size();
throw new UnsupportedOperationException();
}
@ -75,31 +74,27 @@ class FreqProxFields extends Fields {
@Override
public long size() {
//return terms.termsHashPerField.bytesHash.size();
throw new UnsupportedOperationException();
}
@Override
public long getSumTotalTermFreq() {
//return terms.sumTotalTermFreq;
throw new UnsupportedOperationException();
}
@Override
public long getSumDocFreq() {
//return terms.sumDocFreq;
throw new UnsupportedOperationException();
}
@Override
public int getDocCount() {
//return terms.docCount;
throw new UnsupportedOperationException();
}
@Override
public boolean hasFreqs() {
return terms.fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS) >= 0;
return terms.indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS) >= 0;
}
@Override
@ -107,7 +102,7 @@ class FreqProxFields extends Fields {
// NOTE: the in-memory buffer may have indexed offsets
// because that's what FieldInfo said when we started,
// but during indexing this may have been downgraded:
return terms.fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0;
return terms.indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0;
}
@Override
@ -115,7 +110,7 @@ class FreqProxFields extends Fields {
// NOTE: the in-memory buffer may have indexed positions
// because that's what FieldInfo said when we started,
// but during indexing this may have been downgraded:
return terms.fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0;
return terms.indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0;
}
@Override
@ -132,10 +127,10 @@ class FreqProxFields extends Fields {
final int numTerms;
int ord;
public FreqProxTermsEnum(FreqProxTermsWriterPerField terms) {
FreqProxTermsEnum(FreqProxTermsWriterPerField terms) {
this.terms = terms;
this.numTerms = terms.bytesHash.size();
sortedTermIDs = terms.sortedTermIDs;
this.numTerms = terms.getNumTerms();
sortedTermIDs = terms.getSortedTermIDs();
assert sortedTermIDs != null;
postingsArray = (FreqProxPostingsArray) terms.postingsArray;
}

View File

@ -75,9 +75,9 @@ final class FreqProxTermsWriter extends TermsHash {
for (TermsHashPerField f : fieldsToFlush.values()) {
final FreqProxTermsWriterPerField perField = (FreqProxTermsWriterPerField) f;
if (perField.bytesHash.size() > 0) {
perField.sortPostings();
assert perField.fieldInfo.getIndexOptions() != IndexOptions.NONE;
if (perField.getNumTerms() > 0) {
perField.sortTerms();
assert perField.indexOptions != IndexOptions.NONE;
allFields.add(perField);
}
}

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
import org.apache.lucene.analysis.tokenattributes.TermFrequencyAttribute;
import org.apache.lucene.util.BytesRef;
// TODO: break into separate freq and prox writers as
@ -28,26 +29,25 @@ import org.apache.lucene.util.BytesRef;
final class FreqProxTermsWriterPerField extends TermsHashPerField {
private FreqProxPostingsArray freqProxPostingsArray;
private final FieldInvertState fieldState;
private final FieldInfo fieldInfo;
final boolean hasFreq;
final boolean hasProx;
final boolean hasOffsets;
PayloadAttribute payloadAttribute;
OffsetAttribute offsetAttribute;
long sumTotalTermFreq;
long sumDocFreq;
// How many docs have this field:
int docCount;
TermFrequencyAttribute termFreqAtt;
/** Set to true if any token had a payload in the current
* segment. */
boolean sawPayloads;
public FreqProxTermsWriterPerField(FieldInvertState invertState, TermsHash termsHash, FieldInfo fieldInfo, TermsHashPerField nextPerField) {
super(fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0 ? 2 : 1, invertState, termsHash, nextPerField, fieldInfo);
IndexOptions indexOptions = fieldInfo.getIndexOptions();
assert indexOptions != IndexOptions.NONE;
FreqProxTermsWriterPerField(FieldInvertState invertState, TermsHash termsHash, FieldInfo fieldInfo, TermsHashPerField nextPerField) {
super(fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0 ? 2 : 1,
termsHash.intPool, termsHash.bytePool, termsHash.termBytePool, termsHash.bytesUsed, nextPerField, fieldInfo.name, fieldInfo.getIndexOptions());
this.fieldState = invertState;
this.fieldInfo = fieldInfo;
hasFreq = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS) >= 0;
hasProx = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0;
hasOffsets = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0;
@ -56,12 +56,6 @@ final class FreqProxTermsWriterPerField extends TermsHashPerField {
@Override
void finish() throws IOException {
super.finish();
sumDocFreq += fieldState.uniqueTermCount;
sumTotalTermFreq += fieldState.length;
if (fieldState.length > 0) {
docCount++;
}
if (sawPayloads) {
fieldInfo.setStorePayloads();
}
@ -70,6 +64,7 @@ final class FreqProxTermsWriterPerField extends TermsHashPerField {
@Override
boolean start(IndexableField f, boolean first) {
super.start(f, first);
termFreqAtt = fieldState.termFreqAttribute;
payloadAttribute = fieldState.payloadAttribute;
offsetAttribute = fieldState.offsetAttribute;
return true;
@ -104,18 +99,18 @@ final class FreqProxTermsWriterPerField extends TermsHashPerField {
}
@Override
void newTerm(final int termID) {
void newTerm(final int termID, final int docID) {
// First time we're seeing this term since the last
// flush
final FreqProxPostingsArray postings = freqProxPostingsArray;
postings.lastDocIDs[termID] = docState.docID;
postings.lastDocIDs[termID] = docID;
if (!hasFreq) {
assert postings.termFreqs == null;
postings.lastDocCodes[termID] = docState.docID;
postings.lastDocCodes[termID] = docID;
fieldState.maxTermFrequency = Math.max(1, fieldState.maxTermFrequency);
} else {
postings.lastDocCodes[termID] = docState.docID << 1;
postings.lastDocCodes[termID] = docID << 1;
postings.termFreqs[termID] = getTermFreq();
if (hasProx) {
writeProx(termID, fieldState.position);
@ -131,25 +126,25 @@ final class FreqProxTermsWriterPerField extends TermsHashPerField {
}
@Override
void addTerm(final int termID) {
void addTerm(final int termID, final int docID) {
final FreqProxPostingsArray postings = freqProxPostingsArray;
assert !hasFreq || postings.termFreqs[termID] > 0;
if (!hasFreq) {
assert postings.termFreqs == null;
if (termFreqAtt.getTermFrequency() != 1) {
throw new IllegalStateException("field \"" + fieldInfo.name + "\": must index term freq while using custom TermFrequencyAttribute");
throw new IllegalStateException("field \"" + getFieldName() + "\": must index term freq while using custom TermFrequencyAttribute");
}
if (docState.docID != postings.lastDocIDs[termID]) {
if (docID != postings.lastDocIDs[termID]) {
// New document; now encode docCode for previous doc:
assert docState.docID > postings.lastDocIDs[termID];
assert docID > postings.lastDocIDs[termID];
writeVInt(0, postings.lastDocCodes[termID]);
postings.lastDocCodes[termID] = docState.docID - postings.lastDocIDs[termID];
postings.lastDocIDs[termID] = docState.docID;
postings.lastDocCodes[termID] = docID - postings.lastDocIDs[termID];
postings.lastDocIDs[termID] = docID;
fieldState.uniqueTermCount++;
}
} else if (docState.docID != postings.lastDocIDs[termID]) {
assert docState.docID > postings.lastDocIDs[termID]:"id: "+docState.docID + " postings ID: "+ postings.lastDocIDs[termID] + " termID: "+termID;
} else if (docID != postings.lastDocIDs[termID]) {
assert docID > postings.lastDocIDs[termID]:"id: "+docID + " postings ID: "+ postings.lastDocIDs[termID] + " termID: "+termID;
// Term not yet seen in the current doc but previously
// seen in other doc(s) since the last flush
@ -165,8 +160,8 @@ final class FreqProxTermsWriterPerField extends TermsHashPerField {
// Init freq for the current document
postings.termFreqs[termID] = getTermFreq();
fieldState.maxTermFrequency = Math.max(postings.termFreqs[termID], fieldState.maxTermFrequency);
postings.lastDocCodes[termID] = (docState.docID - postings.lastDocIDs[termID]) << 1;
postings.lastDocIDs[termID] = docState.docID;
postings.lastDocCodes[termID] = (docID - postings.lastDocIDs[termID]) << 1;
postings.lastDocIDs[termID] = docID;
if (hasProx) {
writeProx(termID, fieldState.position);
if (hasOffsets) {
@ -193,7 +188,7 @@ final class FreqProxTermsWriterPerField extends TermsHashPerField {
int freq = termFreqAtt.getTermFrequency();
if (freq != 1) {
if (hasProx) {
throw new IllegalStateException("field \"" + fieldInfo.name + "\": cannot index positions while using custom TermFrequencyAttribute");
throw new IllegalStateException("field \"" + getFieldName() + "\": cannot index positions while using custom TermFrequencyAttribute");
}
}
@ -207,8 +202,6 @@ final class FreqProxTermsWriterPerField extends TermsHashPerField {
@Override
ParallelPostingsArray createPostingsArray(int size) {
IndexOptions indexOptions = fieldInfo.getIndexOptions();
assert indexOptions != IndexOptions.NONE;
boolean hasFreq = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS) >= 0;
boolean hasProx = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0;
boolean hasOffsets = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0;

View File

@ -22,14 +22,14 @@ class ParallelPostingsArray {
final static int BYTES_PER_POSTING = 3 * Integer.BYTES;
final int size;
final int[] textStarts;
final int[] intStarts;
final int[] byteStarts;
final int[] textStarts; // maps term ID to the terms's text start in the bytesHash
final int[] addressOffset; // maps term ID to current stream address
final int[] byteStarts; // maps term ID to stream start offset in the byte pool
ParallelPostingsArray(final int size) {
this.size = size;
textStarts = new int[size];
intStarts = new int[size];
addressOffset = new int[size];
byteStarts = new int[size];
}
@ -50,7 +50,7 @@ class ParallelPostingsArray {
void copyTo(ParallelPostingsArray toArray, int numToCopy) {
System.arraycopy(textStarts, 0, toArray.textStarts, 0, numToCopy);
System.arraycopy(intStarts, 0, toArray.intStarts, 0, numToCopy);
System.arraycopy(addressOffset, 0, toArray.addressOffset, 0, numToCopy);
System.arraycopy(byteStarts, 0, toArray.byteStarts, 0, numToCopy);
}
}

View File

@ -44,11 +44,11 @@ class TermVectorsConsumer extends TermsHash {
final ByteSliceReader vectorSliceReaderOff = new ByteSliceReader();
boolean hasVectors;
int numVectorFields;
private int numVectorFields;
int lastDocID;
private TermVectorsConsumerPerField[] perFields = new TermVectorsConsumerPerField[1];
public TermVectorsConsumer(DocumentsWriterPerThread docWriter) {
TermVectorsConsumer(DocumentsWriterPerThread docWriter) {
super(docWriter, false, null);
this.docWriter = docWriter;
}

View File

@ -20,27 +20,37 @@ import java.io.IOException;
import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
import org.apache.lucene.analysis.tokenattributes.TermFrequencyAttribute;
import org.apache.lucene.codecs.TermVectorsWriter;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.BytesRef;
final class TermVectorsConsumerPerField extends TermsHashPerField {
private TermVectorsPostingsArray termVectorsPostingsArray;
final TermVectorsConsumer termsWriter;
private final TermVectorsConsumer termsWriter;
private final FieldInvertState fieldState;
private final FieldInfo fieldInfo;
boolean doVectors;
boolean doVectorPositions;
boolean doVectorOffsets;
boolean doVectorPayloads;
private boolean doVectors;
private boolean doVectorPositions;
private boolean doVectorOffsets;
private boolean doVectorPayloads;
OffsetAttribute offsetAttribute;
PayloadAttribute payloadAttribute;
boolean hasPayloads; // if enabled, and we actually saw any for this field
private OffsetAttribute offsetAttribute;
private PayloadAttribute payloadAttribute;
private TermFrequencyAttribute termFreqAtt;
private final ByteBlockPool termBytePool;
public TermVectorsConsumerPerField(FieldInvertState invertState, TermVectorsConsumer termsWriter, FieldInfo fieldInfo) {
super(2, invertState, termsWriter, null, fieldInfo);
this.termsWriter = termsWriter;
private boolean hasPayloads; // if enabled, and we actually saw any for this field
TermVectorsConsumerPerField(FieldInvertState invertState, TermVectorsConsumer termsHash, FieldInfo fieldInfo) {
super(2, termsHash.intPool, termsHash.bytePool, termsHash.termBytePool, termsHash.bytesUsed, null, fieldInfo.name, fieldInfo.getIndexOptions());
this.termsWriter = termsHash;
this.fieldInfo = fieldInfo;
this.fieldState = invertState;
termBytePool = termsHash.termBytePool;
}
/** Called once per field per document if term vectors
@ -48,7 +58,7 @@ final class TermVectorsConsumerPerField extends TermsHashPerField {
* RAMOutputStream, which is then quickly flushed to
* the real term vectors files in the Directory. */ @Override
void finish() {
if (!doVectors || bytesHash.size() == 0) {
if (!doVectors || getNumTerms() == 0) {
return;
}
termsWriter.addFieldToFlush(this);
@ -61,7 +71,7 @@ final class TermVectorsConsumerPerField extends TermsHashPerField {
doVectors = false;
final int numPostings = bytesHash.size();
final int numPostings = getNumTerms();
final BytesRef flushTerm = termsWriter.flushTerm;
@ -74,7 +84,8 @@ final class TermVectorsConsumerPerField extends TermsHashPerField {
TermVectorsPostingsArray postings = termVectorsPostingsArray;
final TermVectorsWriter tv = termsWriter.writer;
final int[] termIDs = sortPostings();
sortTerms();
final int[] termIDs = getSortedTermIDs();
tv.startField(fieldInfo, numPostings, doVectorPositions, doVectorOffsets, hasPayloads);
@ -110,18 +121,19 @@ final class TermVectorsConsumerPerField extends TermsHashPerField {
@Override
boolean start(IndexableField field, boolean first) {
super.start(field, first);
termFreqAtt = fieldState.termFreqAttribute;
assert field.fieldType().indexOptions() != IndexOptions.NONE;
if (first) {
if (bytesHash.size() != 0) {
if (getNumTerms() != 0) {
// Only necessary if previous doc hit a
// non-aborting exception while writing vectors in
// this field:
reset();
}
bytesHash.reinit();
reinitHash();
hasPayloads = false;
@ -222,7 +234,7 @@ final class TermVectorsConsumerPerField extends TermsHashPerField {
}
@Override
void newTerm(final int termID) {
void newTerm(final int termID, final int docID) {
TermVectorsPostingsArray postings = termVectorsPostingsArray;
postings.freqs[termID] = getTermFreq();
@ -233,7 +245,7 @@ final class TermVectorsConsumerPerField extends TermsHashPerField {
}
@Override
void addTerm(final int termID) {
void addTerm(final int termID, final int docID) {
TermVectorsPostingsArray postings = termVectorsPostingsArray;
postings.freqs[termID] += getTermFreq();
@ -245,10 +257,10 @@ final class TermVectorsConsumerPerField extends TermsHashPerField {
int freq = termFreqAtt.getTermFrequency();
if (freq != 1) {
if (doVectorPositions) {
throw new IllegalArgumentException("field \"" + fieldInfo.name + "\": cannot index term vector positions while using custom TermFrequencyAttribute");
throw new IllegalArgumentException("field \"" + getFieldName() + "\": cannot index term vector positions while using custom TermFrequencyAttribute");
}
if (doVectorOffsets) {
throw new IllegalArgumentException("field \"" + fieldInfo.name + "\": cannot index term vector offsets while using custom TermFrequencyAttribute");
throw new IllegalArgumentException("field \"" + getFieldName() + "\": cannot index term vector offsets while using custom TermFrequencyAttribute");
}
}
@ -266,7 +278,7 @@ final class TermVectorsConsumerPerField extends TermsHashPerField {
}
static final class TermVectorsPostingsArray extends ParallelPostingsArray {
public TermVectorsPostingsArray(int size) {
TermVectorsPostingsArray(int size) {
super(size);
freqs = new int[size];
lastOffsets = new int[size];

View File

@ -82,7 +82,7 @@ abstract class TermsHash {
if (nextTermsHash != null) {
Map<String,TermsHashPerField> nextChildFields = new HashMap<>();
for (final Map.Entry<String,TermsHashPerField> entry : fieldsToFlush.entrySet()) {
nextChildFields.put(entry.getKey(), entry.getValue().nextPerField);
nextChildFields.put(entry.getKey(), entry.getValue().getNextPerField());
}
nextTermsHash.flush(nextChildFields, state, sortMap, norms);
}

View File

@ -19,182 +19,186 @@ package org.apache.lucene.index;
import java.io.IOException;
import org.apache.lucene.analysis.tokenattributes.TermFrequencyAttribute;
import org.apache.lucene.analysis.tokenattributes.TermToBytesRefAttribute;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefHash.BytesStartArray;
import org.apache.lucene.util.BytesRefHash;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.IntBlockPool;
/**
* This class stores streams of information per term without knowing
* the size of the stream ahead of time. Each stream typically encodes one level
* of information like term frequency per document or term proximity. Internally
* this class allocates a linked list of slices that can be read by a {@link ByteSliceReader}
* for each term. Terms are first deduplicated in a {@link BytesRefHash} once this is done
* internal data-structures point to the current offset of each stream that can be written to.
*/
abstract class TermsHashPerField implements Comparable<TermsHashPerField> {
private static final int HASH_INIT_SIZE = 4;
final TermsHash termsHash;
final TermsHashPerField nextPerField;
protected final DocumentsWriterPerThread.DocState docState;
protected final FieldInvertState fieldState;
TermToBytesRefAttribute termAtt;
protected TermFrequencyAttribute termFreqAtt;
// Copied from our perThread
final IntBlockPool intPool;
private final TermsHashPerField nextPerField;
private final IntBlockPool intPool;
final ByteBlockPool bytePool;
final ByteBlockPool termBytePool;
final int streamCount;
final int numPostingInt;
protected final FieldInfo fieldInfo;
final BytesRefHash bytesHash;
// for each term we store an integer per stream that points into the bytePool above
// the address is updated once data is written to the stream to point to the next free offset
// in the terms stream. The start address for the stream is stored in postingsArray.byteStarts[termId]
// This is initialized in the #addTerm method, either to a brand new per term stream if the term is new or
// to the addresses where the term stream was written to when we saw it the last time.
private int[] termStreamAddressBuffer;
private int streamAddressOffset;
private final int streamCount;
private final String fieldName;
final IndexOptions indexOptions;
/* This stores the actual term bytes for postings and offsets into the parent hash in the case that this
* TermsHashPerField is hashing term vectors.*/
private final BytesRefHash bytesHash;
ParallelPostingsArray postingsArray;
private final Counter bytesUsed;
private int lastDocID; // only with assert
/** streamCount: how many streams this field stores per term.
* E.g. doc(+freq) is 1 stream, prox+offset is a second. */
public TermsHashPerField(int streamCount, FieldInvertState fieldState, TermsHash termsHash, TermsHashPerField nextPerField, FieldInfo fieldInfo) {
intPool = termsHash.intPool;
bytePool = termsHash.bytePool;
termBytePool = termsHash.termBytePool;
docState = termsHash.docState;
this.termsHash = termsHash;
bytesUsed = termsHash.bytesUsed;
this.fieldState = fieldState;
TermsHashPerField(int streamCount, IntBlockPool intPool, ByteBlockPool bytePool, ByteBlockPool termBytePool,
Counter bytesUsed, TermsHashPerField nextPerField, String fieldName, IndexOptions indexOptions) {
this.intPool = intPool;
this.bytePool = bytePool;
this.streamCount = streamCount;
numPostingInt = 2*streamCount;
this.fieldInfo = fieldInfo;
this.fieldName = fieldName;
this.nextPerField = nextPerField;
assert indexOptions != IndexOptions.NONE;
this.indexOptions = indexOptions;
PostingsBytesStartArray byteStarts = new PostingsBytesStartArray(this, bytesUsed);
bytesHash = new BytesRefHash(termBytePool, HASH_INIT_SIZE, byteStarts);
}
void reset() {
bytesHash.clear(false);
sortedTermIDs = null;
if (nextPerField != null) {
nextPerField.reset();
}
}
public void initReader(ByteSliceReader reader, int termID, int stream) {
final void initReader(ByteSliceReader reader, int termID, int stream) {
assert stream < streamCount;
int intStart = postingsArray.intStarts[termID];
final int[] ints = intPool.buffers[intStart >> IntBlockPool.INT_BLOCK_SHIFT];
final int upto = intStart & IntBlockPool.INT_BLOCK_MASK;
int streamStartOffset = postingsArray.addressOffset[termID];
final int[] streamAddressBuffer = intPool.buffers[streamStartOffset >> IntBlockPool.INT_BLOCK_SHIFT];
final int offsetInAddressBuffer = streamStartOffset & IntBlockPool.INT_BLOCK_MASK;
reader.init(bytePool,
postingsArray.byteStarts[termID]+stream*ByteBlockPool.FIRST_LEVEL_SIZE,
ints[upto+stream]);
streamAddressBuffer[offsetInAddressBuffer+stream]);
}
int[] sortedTermIDs;
private int[] sortedTermIDs;
/** Collapse the hash table and sort in-place; also sets
* this.sortedTermIDs to the results */
public int[] sortPostings() {
* this.sortedTermIDs to the results
* This method must not be called twice unless {@link #reset()}
* or {@link #reinitHash()} was called. */
final void sortTerms() {
assert sortedTermIDs == null;
sortedTermIDs = bytesHash.sort();
}
/**
* Returns the sorted term IDs. {@link #sortTerms()} must be called before
*/
final int[] getSortedTermIDs() {
assert sortedTermIDs != null;
return sortedTermIDs;
}
final void reinitHash() {
sortedTermIDs = null;
bytesHash.reinit();
}
private boolean doNextCall;
// Secondary entry point (for 2nd & subsequent TermsHash),
// because token text has already been "interned" into
// textStart, so we hash by textStart. term vectors use
// this API.
public void add(int textStart) throws IOException {
private void add(int textStart, final int docID) throws IOException {
int termID = bytesHash.addByPoolOffset(textStart);
if (termID >= 0) { // New posting
// First time we are seeing this token since we last
// flushed the hash.
initStreamSlices(termID, docID);
} else {
positionStreamSlice(termID, docID);
}
}
private void initStreamSlices(int termID, int docID) throws IOException {
// Init stream slices
if (numPostingInt + intPool.intUpto > IntBlockPool.INT_BLOCK_SIZE) {
// TODO: figure out why this is 2*streamCount here. streamCount should be enough?
if ((2*streamCount) + intPool.intUpto > IntBlockPool.INT_BLOCK_SIZE) {
// can we fit all the streams in the current buffer?
intPool.nextBuffer();
}
if (ByteBlockPool.BYTE_BLOCK_SIZE - bytePool.byteUpto < numPostingInt*ByteBlockPool.FIRST_LEVEL_SIZE) {
if (ByteBlockPool.BYTE_BLOCK_SIZE - bytePool.byteUpto < (2*streamCount) * ByteBlockPool.FIRST_LEVEL_SIZE) {
// can we fit at least one byte per stream in the current buffer, if not allocate a new one
bytePool.nextBuffer();
}
intUptos = intPool.buffer;
intUptoStart = intPool.intUpto;
intPool.intUpto += streamCount;
termStreamAddressBuffer = intPool.buffer;
streamAddressOffset = intPool.intUpto;
intPool.intUpto += streamCount; // advance the pool to reserve the N streams for this term
postingsArray.intStarts[termID] = intUptoStart + intPool.intOffset;
postingsArray.addressOffset[termID] = streamAddressOffset + intPool.intOffset;
for(int i=0;i<streamCount;i++) {
for (int i = 0; i < streamCount; i++) {
// initialize each stream with a slice we start with ByteBlockPool.FIRST_LEVEL_SIZE)
// and grow as we need more space. see ByteBlockPool.LEVEL_SIZE_ARRAY
final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);
intUptos[intUptoStart+i] = upto + bytePool.byteOffset;
termStreamAddressBuffer[streamAddressOffset + i] = upto + bytePool.byteOffset;
}
postingsArray.byteStarts[termID] = intUptos[intUptoStart];
newTerm(termID);
} else {
termID = (-termID)-1;
int intStart = postingsArray.intStarts[termID];
intUptos = intPool.buffers[intStart >> IntBlockPool.INT_BLOCK_SHIFT];
intUptoStart = intStart & IntBlockPool.INT_BLOCK_MASK;
addTerm(termID);
postingsArray.byteStarts[termID] = termStreamAddressBuffer[streamAddressOffset];
newTerm(termID, docID);
}
private boolean assertDocId(int docId) {
assert docId >= lastDocID : "docID must be >= " + lastDocID + " but was: " + docId;
lastDocID = docId;
return true;
}
/** Called once per inverted token. This is the primary
* entry point (for first TermsHash); postings use this
* API. */
void add() throws IOException {
void add(BytesRef termBytes, final int docID) throws IOException {
assert assertDocId(docID);
// We are first in the chain so we must "intern" the
// term text into textStart address
// Get the text & hash of this term.
int termID = bytesHash.add(termAtt.getBytesRef());
int termID = bytesHash.add(termBytes);
//System.out.println("add term=" + termBytesRef.utf8ToString() + " doc=" + docState.docID + " termID=" + termID);
if (termID >= 0) {// New posting
bytesHash.byteStart(termID);
if (termID >= 0) { // New posting
// Init stream slices
if (numPostingInt + intPool.intUpto > IntBlockPool.INT_BLOCK_SIZE) {
intPool.nextBuffer();
}
if (ByteBlockPool.BYTE_BLOCK_SIZE - bytePool.byteUpto < numPostingInt*ByteBlockPool.FIRST_LEVEL_SIZE) {
bytePool.nextBuffer();
}
intUptos = intPool.buffer;
intUptoStart = intPool.intUpto;
intPool.intUpto += streamCount;
postingsArray.intStarts[termID] = intUptoStart + intPool.intOffset;
for(int i=0;i<streamCount;i++) {
final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);
intUptos[intUptoStart+i] = upto + bytePool.byteOffset;
}
postingsArray.byteStarts[termID] = intUptos[intUptoStart];
newTerm(termID);
initStreamSlices(termID, docID);
} else {
termID = (-termID)-1;
int intStart = postingsArray.intStarts[termID];
intUptos = intPool.buffers[intStart >> IntBlockPool.INT_BLOCK_SHIFT];
intUptoStart = intStart & IntBlockPool.INT_BLOCK_MASK;
addTerm(termID);
termID = positionStreamSlice(termID, docID);
}
if (doNextCall) {
nextPerField.add(postingsArray.textStarts[termID]);
nextPerField.add(postingsArray.textStarts[termID], docID);
}
}
int[] intUptos;
int intUptoStart;
private int positionStreamSlice(int termID, final int docID) throws IOException {
termID = (-termID) - 1;
int intStart = postingsArray.addressOffset[termID];
termStreamAddressBuffer = intPool.buffers[intStart >> IntBlockPool.INT_BLOCK_SHIFT];
streamAddressOffset = intStart & IntBlockPool.INT_BLOCK_MASK;
addTerm(termID, docID);
return termID;
}
void writeByte(int stream, byte b) {
int upto = intUptos[intUptoStart+stream];
final void writeByte(int stream, byte b) {
int streamAddress = streamAddressOffset + stream;
int upto = termStreamAddressBuffer[streamAddress];
byte[] bytes = bytePool.buffers[upto >> ByteBlockPool.BYTE_BLOCK_SHIFT];
assert bytes != null;
int offset = upto & ByteBlockPool.BYTE_BLOCK_MASK;
@ -202,20 +206,20 @@ abstract class TermsHashPerField implements Comparable<TermsHashPerField> {
// End of slice; allocate a new one
offset = bytePool.allocSlice(bytes, offset);
bytes = bytePool.buffer;
intUptos[intUptoStart+stream] = offset + bytePool.byteOffset;
termStreamAddressBuffer[streamAddress] = offset + bytePool.byteOffset;
}
bytes[offset] = b;
(intUptos[intUptoStart+stream])++;
(termStreamAddressBuffer[streamAddress])++;
}
public void writeBytes(int stream, byte[] b, int offset, int len) {
final void writeBytes(int stream, byte[] b, int offset, int len) {
// TODO: optimize
final int end = offset + len;
for(int i=offset;i<end;i++)
writeByte(stream, b[i]);
}
void writeVInt(int stream, int i) {
final void writeVInt(int stream, int i) {
assert stream < streamCount;
while ((i & ~0x7F) != 0) {
writeByte(stream, (byte)((i & 0x7f) | 0x80));
@ -224,6 +228,14 @@ abstract class TermsHashPerField implements Comparable<TermsHashPerField> {
writeByte(stream, (byte) i);
}
final TermsHashPerField getNextPerField() {
return nextPerField;
}
final String getFieldName() {
return fieldName;
}
private static final class PostingsBytesStartArray extends BytesStartArray {
private final TermsHashPerField perField;
@ -272,8 +284,8 @@ abstract class TermsHashPerField implements Comparable<TermsHashPerField> {
}
@Override
public int compareTo(TermsHashPerField other) {
return fieldInfo.name.compareTo(other.fieldInfo.name);
public final int compareTo(TermsHashPerField other) {
return fieldName.compareTo(other.fieldName);
}
/** Finish adding all instances of this field to the
@ -284,24 +296,25 @@ abstract class TermsHashPerField implements Comparable<TermsHashPerField> {
}
}
final int getNumTerms() {
return bytesHash.size();
}
/** Start adding a new field instance; first is true if
* this is the first time this field name was seen in the
* document. */
boolean start(IndexableField field, boolean first) {
termAtt = fieldState.termAttribute;
termFreqAtt = fieldState.termFreqAttribute;
if (nextPerField != null) {
doNextCall = nextPerField.start(field, first);
}
return true;
}
/** Called when a term is seen for the first time. */
abstract void newTerm(int termID) throws IOException;
abstract void newTerm(int termID, final int docID) throws IOException;
/** Called when a previously seen term is seen again. */
abstract void addTerm(int termID) throws IOException;
abstract void addTerm(int termID, final int docID) throws IOException;
/** Called when the postings array is initialized or
* resized. */

View File

@ -175,7 +175,7 @@ public final class IntBlockPool {
return upto;
}
private static final boolean assertSliceBuffer(int[] buffer) {
private static boolean assertSliceBuffer(int[] buffer) {
int count = 0;
for (int i = 0; i < buffer.length; i++) {
count += buffer[i]; // for slices the buffer must only have 0 values

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.IntBlockPool;
import org.apache.lucene.util.LuceneTestCase;
public class TestTermsHashPerField extends LuceneTestCase {
private static TermsHashPerField createNewHash(AtomicInteger newCalled, AtomicInteger addCalled) {
IntBlockPool intBlockPool = new IntBlockPool();
ByteBlockPool byteBlockPool = new ByteBlockPool(new ByteBlockPool.DirectAllocator());
ByteBlockPool termBlockPool = new ByteBlockPool(new ByteBlockPool.DirectAllocator());
TermsHashPerField hash = new TermsHashPerField(1, intBlockPool, byteBlockPool, termBlockPool, Counter.newCounter(),
null, "testfield", IndexOptions.DOCS_AND_FREQS) {
private FreqProxTermsWriterPerField.FreqProxPostingsArray freqProxPostingsArray;
@Override
void newTerm(int termID, int docID) {
newCalled.incrementAndGet();
FreqProxTermsWriterPerField.FreqProxPostingsArray postings = freqProxPostingsArray;
postings.lastDocIDs[termID] = docID;
postings.lastDocCodes[termID] = docID << 1;
postings.termFreqs[termID] = 1;
}
@Override
void addTerm(int termID, int docID) {
addCalled.incrementAndGet();
FreqProxTermsWriterPerField.FreqProxPostingsArray postings = freqProxPostingsArray;
if (docID != postings.lastDocIDs[termID]) {
if (1 == postings.termFreqs[termID]) {
writeVInt(0, postings.lastDocCodes[termID]|1);
} else {
writeVInt(0, postings.lastDocCodes[termID]);
writeVInt(0, postings.termFreqs[termID]);
}
postings.termFreqs[termID] = 1;
postings.lastDocCodes[termID] = (docID - postings.lastDocIDs[termID]) << 1;
postings.lastDocIDs[termID] = docID;
} else {
postings.termFreqs[termID] = Math.addExact(postings.termFreqs[termID], 1);
}
}
@Override
void newPostingsArray() {
freqProxPostingsArray = (FreqProxTermsWriterPerField.FreqProxPostingsArray) postingsArray;
}
@Override
ParallelPostingsArray createPostingsArray(int size) {
return new FreqProxTermsWriterPerField.FreqProxPostingsArray(size, true, false, false);
}
};
return hash;
}
boolean assertDocAndFreq(ByteSliceReader reader, FreqProxTermsWriterPerField.FreqProxPostingsArray postingsArray, int prevDoc, int termId, int doc, int frequency) throws IOException {
int docId = prevDoc;
int freq;
boolean eof = reader.eof();
if (eof) {
docId = postingsArray.lastDocIDs[termId];
freq = postingsArray.termFreqs[termId];
} else {
int code = reader.readVInt();
docId += code >>> 1;
if ((code & 1) != 0) {
freq = 1;
} else {
freq = reader.readVInt();
}
}
assertEquals("docID mismatch eof: " + eof, doc, docId);
assertEquals("freq mismatch eof: " + eof, frequency, freq);
return eof;
}
public void testAddAndUpdateTerm() throws IOException {
AtomicInteger newCalled = new AtomicInteger(0);
AtomicInteger addCalled = new AtomicInteger(0);
TermsHashPerField hash = createNewHash(newCalled, addCalled);
hash.start(null, true);
hash.add(new BytesRef("start"), 0); // tid = 0;
hash.add(new BytesRef("foo"), 0); // tid = 1;
hash.add(new BytesRef("bar"), 0); // tid = 2;
hash.finish();
hash.add(new BytesRef("bar"), 1);
hash.add(new BytesRef("foobar"), 1); // tid = 3;
hash.add(new BytesRef("bar"), 1);
hash.add(new BytesRef("bar"), 1);
hash.add(new BytesRef("foobar"), 1);
hash.add(new BytesRef("verylongfoobarbaz"), 1); // tid = 4;
hash.finish();
hash.add(new BytesRef("verylongfoobarbaz"), 2);
hash.add(new BytesRef("boom"), 2); // tid = 5;
hash.finish();
hash.add(new BytesRef("verylongfoobarbaz"), 3);
hash.add(new BytesRef("end"), 3); // tid = 6;
hash.finish();
assertEquals(7, newCalled.get());
assertEquals(6, addCalled.get());
final ByteSliceReader reader = new ByteSliceReader();
hash.initReader(reader, 0, 0);
assertTrue(assertDocAndFreq(reader, (FreqProxTermsWriterPerField.FreqProxPostingsArray) hash.postingsArray, 0, 0, 0, 1));
hash.initReader(reader, 1, 0);
assertTrue(assertDocAndFreq(reader, (FreqProxTermsWriterPerField.FreqProxPostingsArray) hash.postingsArray, 0, 1, 0, 1));
hash.initReader(reader, 2, 0);
assertFalse(assertDocAndFreq(reader, (FreqProxTermsWriterPerField.FreqProxPostingsArray) hash.postingsArray, 0, 2, 0, 1));
assertTrue(assertDocAndFreq(reader, (FreqProxTermsWriterPerField.FreqProxPostingsArray) hash.postingsArray, 2, 2, 1, 3));
hash.initReader(reader, 3, 0);
assertTrue(assertDocAndFreq(reader, (FreqProxTermsWriterPerField.FreqProxPostingsArray) hash.postingsArray, 0, 3, 1, 2));
hash.initReader(reader, 4, 0);
assertFalse(assertDocAndFreq(reader, (FreqProxTermsWriterPerField.FreqProxPostingsArray) hash.postingsArray, 0, 4, 1, 1));
assertFalse(assertDocAndFreq(reader, (FreqProxTermsWriterPerField.FreqProxPostingsArray) hash.postingsArray, 1, 4, 2, 1));
assertTrue(assertDocAndFreq(reader, (FreqProxTermsWriterPerField.FreqProxPostingsArray) hash.postingsArray, 2, 4, 3, 1));
hash.initReader(reader, 5, 0);
assertTrue(assertDocAndFreq(reader, (FreqProxTermsWriterPerField.FreqProxPostingsArray) hash.postingsArray, 0, 5, 2, 1));
hash.initReader(reader, 6, 0);
assertTrue(assertDocAndFreq(reader, (FreqProxTermsWriterPerField.FreqProxPostingsArray) hash.postingsArray, 0, 6, 3, 1));
}
public void testAddAndUpdateRandom() throws IOException {
AtomicInteger newCalled = new AtomicInteger(0);
AtomicInteger addCalled = new AtomicInteger(0);
TermsHashPerField hash = createNewHash(newCalled, addCalled);
hash.start(null, true);
class Posting {
int termId = -1;
final TreeMap<Integer, Integer> docAndFreq = new TreeMap<>();
}
Map<BytesRef, Posting> postingMap = new HashMap<>();
int numStrings = 1 + random().nextInt(200);
for (int i = 0; i < numStrings; i++) {
String randomString = RandomStrings.randomRealisticUnicodeOfCodepointLengthBetween(random(), 1, 10);
postingMap.putIfAbsent(new BytesRef(randomString), new Posting());
}
List<BytesRef> bytesRefs = Arrays.asList(postingMap.keySet().toArray(new BytesRef[0]));
Collections.sort(bytesRefs);
int numDocs = 1 + random().nextInt(200);
int termOrd = 0;
for (int i = 0; i < numDocs; i++) {
int numTerms = 1 + random().nextInt(200);
int doc = i;
for (int j = 0; i < numTerms; i++) {
BytesRef ref = RandomPicks.randomFrom(random(), bytesRefs);
Posting posting = postingMap.get(ref);
if (posting.termId == -1) {
posting.termId = termOrd++;
}
posting.docAndFreq.putIfAbsent(doc, 0);
posting.docAndFreq.compute(doc, (key, oldVal) -> oldVal+1);
hash.add(ref, doc);
}
hash.finish();
}
List<Posting> values = postingMap.values().stream().filter( x -> x.termId != -1)
.collect(Collectors.toList());
Collections.shuffle(values, random()); // term order doesn't matter
final ByteSliceReader reader = new ByteSliceReader();
for (Posting p : values) {
hash.initReader(reader, p.termId, 0);
boolean eof = false;
int prefDoc = 0;
for (Map.Entry<Integer, Integer> entry : p.docAndFreq.entrySet()) {
assertFalse("the reader must not be EOF here", eof);
eof = assertDocAndFreq(reader, (FreqProxTermsWriterPerField.FreqProxPostingsArray) hash.postingsArray,
prefDoc, p.termId, entry.getKey(), entry.getValue());
prefDoc = entry.getKey();
}
assertTrue("the last posting must be EOF on the reader", eof);
}
}
}