LUCENE-3186: Promote docvalues types during merge if ValueType or size differes across segments

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1181020 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2011-10-10 15:28:07 +00:00
parent cbde4e03e8
commit 3088c013ae
14 changed files with 198 additions and 89 deletions

View File

@ -86,7 +86,7 @@ public final class FieldInfo {
public int getCodecId() {
return codecId;
}
@Override
public Object clone() {
FieldInfo clone = new FieldInfo(name, isIndexed, number, storeTermVector, storePositionWithTermVector,
@ -132,6 +132,12 @@ public final class FieldInfo {
}
}
public void resetDocValues(ValueType v) {
if (docValues != null) {
docValues = v;
}
}
public boolean hasDocValues() {
return docValues != null;
}

View File

@ -128,7 +128,7 @@ public class MultiPerDocValues extends PerDocValues {
if (docsUpto != start) {
type = values.type();
docValuesIndex.add(new MultiIndexDocValues.DocValuesIndex(
new MultiIndexDocValues.DummyDocValues(start, type), docsUpto, start
new MultiIndexDocValues.EmptyDocValues(start, type), docsUpto, start
- docsUpto));
}
docValuesIndex.add(new MultiIndexDocValues.DocValuesIndex(values, start,
@ -137,7 +137,7 @@ public class MultiPerDocValues extends PerDocValues {
} else if (i + 1 == subs.length && !docValuesIndex.isEmpty()) {
docValuesIndex.add(new MultiIndexDocValues.DocValuesIndex(
new MultiIndexDocValues.DummyDocValues(start, type), docsUpto, start
new MultiIndexDocValues.EmptyDocValues(start, type), docsUpto, start
- docsUpto));
}
}

View File

@ -33,7 +33,6 @@ import org.apache.lucene.index.codecs.FieldsReader;
import org.apache.lucene.index.codecs.FieldsWriter;
import org.apache.lucene.index.codecs.MergeState;
import org.apache.lucene.index.codecs.PerDocConsumer;
import org.apache.lucene.index.codecs.PerDocValues;
import org.apache.lucene.store.CompoundFileDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
@ -141,6 +140,8 @@ final class SegmentMerger {
if (fieldInfos.hasVectors()) {
mergeVectors();
}
// write FIS once merge is done. IDV might change types or drops fields
fieldInfos.write(directory, segment + "." + IndexFileNames.FIELD_INFOS_EXTENSION);
return mergedDocs;
}
@ -254,7 +255,6 @@ final class SegmentMerger {
}
}
final SegmentCodecs codecInfo = fieldInfos.buildSegmentCodecs(false);
fieldInfos.write(directory, segment + "." + IndexFileNames.FIELD_INFOS_EXTENSION);
int docCount = 0;
@ -584,28 +584,11 @@ final class SegmentMerger {
}
private void mergePerDoc() throws IOException {
final List<PerDocValues> perDocProducers = new ArrayList<PerDocValues>();
final List<ReaderUtil.Slice> perDocSlices = new ArrayList<ReaderUtil.Slice>();
int docBase = 0;
for (MergeState.IndexReaderAndLiveDocs r : readers) {
final int maxDoc = r.reader.maxDoc();
final PerDocValues producer = r.reader.perDocValues();
if (producer != null) {
perDocSlices.add(new ReaderUtil.Slice(docBase, maxDoc, perDocProducers
.size()));
perDocProducers.add(producer);
}
docBase += maxDoc;
}
if (!perDocSlices.isEmpty()) {
final PerDocConsumer docsConsumer = codec
.docsConsumer(new PerDocWriteState(segmentWriteState));
boolean success = false;
try {
final MultiPerDocValues multiPerDocValues = new MultiPerDocValues(
perDocProducers.toArray(PerDocValues.EMPTY_ARRAY),
perDocSlices.toArray(ReaderUtil.Slice.EMPTY_ARRAY));
docsConsumer.merge(mergeState, multiPerDocValues);
docsConsumer.merge(mergeState);
success = true;
} finally {
if (success) {
@ -614,11 +597,8 @@ final class SegmentMerger {
IOUtils.closeWhileHandlingException(docsConsumer);
}
}
}
/* don't close the perDocProducers here since they are private segment producers
* and will be closed once the SegmentReader goes out of scope */
}
private MergeState mergeState;
public boolean getAnyNonBulkMerges() {

View File

@ -92,48 +92,38 @@ public abstract class DocValuesConsumer {
*
* @param mergeState
* the state to merge
* @param values
* the docValues to merge in
* @param docValues docValues array containing one instance per reader (
* {@link MergeState#readers}) or <code>null</code> if the reader has
* no {@link IndexDocValues} instance.
* @throws IOException
* if an {@link IOException} occurs
*/
public void merge(org.apache.lucene.index.codecs.MergeState mergeState,
IndexDocValues values) throws IOException {
public void merge(MergeState mergeState, IndexDocValues[] docValues) throws IOException {
assert mergeState != null;
// TODO we need some kind of compatibility notation for values such
// that two slightly different segments can be merged eg. fixed vs.
// variable byte len or float32 vs. float64
boolean merged = false;
/*
* We ignore the given DocValues here and merge from the subReaders directly
* to support bulk copies on the DocValues Writer level. if this gets merged
* with MultiDocValues the writer can not optimize for bulk-copyable data
*/
boolean hasMerged = false;
for(int readerIDX=0;readerIDX<mergeState.readers.size();readerIDX++) {
final org.apache.lucene.index.codecs.MergeState.IndexReaderAndLiveDocs reader = mergeState.readers.get(readerIDX);
final IndexDocValues r = reader.reader.docValues(mergeState.fieldInfo.name);
if (r != null) {
merged = true;
merge(new Writer.MergeState(r, mergeState.docBase[readerIDX], reader.reader.maxDoc(),
if (docValues[readerIDX] != null) {
hasMerged = true;
merge(new Writer.SingleSubMergeState(docValues[readerIDX], mergeState.docBase[readerIDX], reader.reader.maxDoc(),
reader.liveDocs));
}
}
if (merged) {
// only finish if no exception is thrown!
if (hasMerged) {
finish(mergeState.mergedDocCount);
}
}
/**
* Merges the given {@link MergeState} into this {@link DocValuesConsumer}.
* {@link MergeState#docBase} must always be increasing. Merging segments out
* of order is not supported.
* Merges the given {@link SingleSubMergeState} into this {@link DocValuesConsumer}.
*
* @param mergeState
* the {@link MergeState} to merge
* the {@link SingleSubMergeState} to merge
* @throws IOException
* if an {@link IOException} occurs
*/
protected abstract void merge(MergeState mergeState) throws IOException;
protected abstract void merge(SingleSubMergeState mergeState) throws IOException;
/**
* Specialized auxiliary MergeState is necessary since we don't want to
@ -141,7 +131,7 @@ public abstract class DocValuesConsumer {
* created for each merged low level {@link IndexReader} we are merging to
* support low level bulk copies.
*/
public static class MergeState {
public static class SingleSubMergeState {
/**
* the source reader for this MergeState - merged values should be read from
* this instance
@ -154,7 +144,7 @@ public abstract class DocValuesConsumer {
/** the not deleted bits for this MergeState */
public final Bits liveDocs;
public MergeState(IndexDocValues reader, int docBase, int docCount, Bits liveDocs) {
public SingleSubMergeState(IndexDocValues reader, int docBase, int docCount, Bits liveDocs) {
assert reader != null;
this.reader = reader;
this.docBase = docBase;

View File

@ -19,7 +19,10 @@ import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.values.IndexDocValues;
import org.apache.lucene.index.values.TypePromoter;
import org.apache.lucene.index.values.ValueType;
/**
* Abstract API that consumes per document values. Concrete implementations of
@ -40,28 +43,75 @@ public abstract class PerDocConsumer implements Closeable{
* Consumes and merges the given {@link PerDocValues} producer
* into this consumers format.
*/
public void merge(MergeState mergeState, PerDocValues producer)
public void merge(MergeState mergeState)
throws IOException {
Iterable<String> fields = producer.fields();
for (String field : fields) {
mergeState.fieldInfo = mergeState.fieldInfos.fieldInfo(field);
assert mergeState.fieldInfo != null : "FieldInfo for field is null: "
+ field;
if (mergeState.fieldInfo.hasDocValues()) {
final IndexDocValues docValues = producer.docValues(field);
if (docValues == null) {
/*
* It is actually possible that a fieldInfo has a values type but no
* values are actually available. this can happen if there are already
* segments without values around.
*/
final FieldInfos fieldInfos = mergeState.fieldInfos;
final IndexDocValues[] docValues = new IndexDocValues[mergeState.readers.size()];
final PerDocValues[] perDocValues = new PerDocValues[mergeState.readers.size()];
// pull all PerDocValues
for (int i = 0; i < perDocValues.length; i++) {
perDocValues[i] = mergeState.readers.get(i).reader.perDocValues();
}
for (FieldInfo fieldInfo : fieldInfos) {
mergeState.fieldInfo = fieldInfo;
TypePromoter currentPromoter = TypePromoter.getIdentityPromoter();
if (fieldInfo.hasDocValues()) {
for (int i = 0; i < perDocValues.length; i++) {
if (perDocValues[i] != null) { // get all IDV to merge
docValues[i] = perDocValues[i].docValues(fieldInfo.name);
if (docValues[i] != null) {
currentPromoter = promoteValueType(fieldInfo, docValues[i], currentPromoter);
if (currentPromoter == null) {
break;
}
}
}
}
if (currentPromoter == null) {
fieldInfo.resetDocValues(null);
continue;
}
assert currentPromoter != TypePromoter.getIdentityPromoter();
if (fieldInfo.getDocValues() != currentPromoter.type()) {
// reset the type if we got promoted
fieldInfo.resetDocValues(currentPromoter.type());
}
final DocValuesConsumer docValuesConsumer = addValuesField(mergeState.fieldInfo);
assert docValuesConsumer != null;
docValuesConsumer.merge(mergeState, docValues);
}
}
/* NOTE: don't close the perDocProducers here since they are private segment producers
* and will be closed once the SegmentReader goes out of scope */
}
protected TypePromoter promoteValueType(final FieldInfo fieldInfo, final IndexDocValues docValues,
TypePromoter currentPromoter) {
assert currentPromoter != null;
final TypePromoter incomingPromoter = TypePromoter.create(docValues.type(), docValues.getValueSize());
assert incomingPromoter != null;
final TypePromoter newPromoter = currentPromoter.promote(incomingPromoter);
return newPromoter == null ? handleIncompatibleValueType(fieldInfo, incomingPromoter, currentPromoter) : newPromoter;
}
/**
* Resolves a conflicts of incompatible {@link TypePromoter}s. The default
* implementation promotes incompatible types to
* {@link ValueType#BYTES_VAR_STRAIGHT} and preserves all values. If this
* method returns <code>null</code> all docvalues for the given
* {@link FieldInfo} are dropped and all values are lost.
*
* @param incomingPromoter
* the incompatible incoming promoter
* @param currentPromoter
* the current promoter
* @return a promoted {@link TypePromoter} or <code>null</code> iff this index
* docvalues should be dropped for this field.
*/
protected TypePromoter handleIncompatibleValueType(FieldInfo fieldInfo, TypePromoter incomingPromoter, TypePromoter currentPromoter) {
return TypePromoter.create(ValueType.BYTES_VAR_STRAIGHT, TypePromoter.VAR_TYPE_VALUE_SIZE);
}
}

View File

@ -86,6 +86,12 @@ class FixedDerefBytesImpl {
throws IOException {
return new DirectFixedDerefSource(cloneData(), cloneIndex(), size, type());
}
@Override
public int getValueSize() {
return size;
}
}
static final class FixedDerefSource extends BytesSourceBase {

View File

@ -104,6 +104,11 @@ class FixedSortedBytesImpl {
return new DirectFixedSortedSource(cloneData(), cloneIndex(), size,
valueCount, comparator, type);
}
@Override
public int getValueSize() {
return size;
}
}
static final class FixedSortedSource extends BytesSortedSourceBase {

View File

@ -59,6 +59,7 @@ class FixedStraightBytesImpl {
int version, Counter bytesUsed, IOContext context) throws IOException {
super(dir, id, codecName, version, bytesUsed, context);
pool = new ByteBlockPool(new DirectTrackingAllocator(bytesUsed));
pool.nextBuffer();
}
@Override
@ -70,7 +71,6 @@ class FixedStraightBytesImpl {
throw new IllegalArgumentException("bytes arrays > " + Short.MAX_VALUE + " are not supported");
}
size = bytes.length;
pool.nextBuffer();
} else if (bytes.length != size) {
throw new IllegalArgumentException("expected bytes size=" + size
+ " but got " + bytes.length);
@ -120,7 +120,7 @@ class FixedStraightBytesImpl {
}
static class Writer extends FixedBytesWriterBase {
private boolean merge;
private boolean hasMerged;
private IndexOutput datOut;
public Writer(Directory dir, String id, Counter bytesUsed, IOContext context) throws IOException {
@ -133,12 +133,15 @@ class FixedStraightBytesImpl {
@Override
protected void merge(MergeState state) throws IOException {
merge = true;
protected void merge(SingleSubMergeState state) throws IOException {
datOut = getOrCreateDataOut();
boolean success = false;
try {
if (state.liveDocs == null && state.reader instanceof FixedStraightReader ) {
if (!hasMerged && size != -1) {
datOut.writeInt(size);
}
if (state.liveDocs == null && tryBulkMerge(state.reader)) {
FixedStraightReader reader = (FixedStraightReader) state.reader;
final int maxDocs = reader.maxDoc;
if (maxDocs == 0) {
@ -172,24 +175,33 @@ class FixedStraightBytesImpl {
if (!success) {
IOUtils.closeWhileHandlingException(datOut);
}
hasMerged = true;
}
}
protected boolean tryBulkMerge(IndexDocValues docValues) {
return docValues instanceof FixedStraightReader;
}
@Override
protected void mergeDoc(int docID, int sourceDoc) throws IOException {
assert lastDocID < docID;
currentMergeSource.getBytes(sourceDoc, bytesRef);
setMergeBytes(sourceDoc);
if (size == -1) {
size = bytesRef.length;
datOut.writeInt(size);
}
assert size == bytesRef.length;
assert size == bytesRef.length : "size: " + size + " ref: " + bytesRef.length;
if (lastDocID+1 < docID) {
fill(datOut, docID);
}
datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length);
lastDocID = docID;
}
protected void setMergeBytes(int sourceDoc) {
currentMergeSource.getBytes(sourceDoc, bytesRef);
}
@ -203,7 +215,7 @@ class FixedStraightBytesImpl {
public void finish(int docCount) throws IOException {
boolean success = false;
try {
if (!merge) {
if (!hasMerged) {
// indexing path - no disk IO until here
assert datOut == null;
datOut = getOrCreateDataOut();
@ -267,6 +279,11 @@ class FixedStraightBytesImpl {
public Source getDirectSource() throws IOException {
return new DirectFixedStraightSource(cloneData(), size, type());
}
@Override
public int getValueSize() {
return size;
}
}
// specialized version for single bytes

View File

@ -85,6 +85,18 @@ public class Floats {
public void add(int docID, PerDocFieldValues docValues) throws IOException {
add(docID, docValues.getFloat());
}
@Override
protected boolean tryBulkMerge(IndexDocValues docValues) {
// only bulk merge if value type is the same otherwise size differs
return super.tryBulkMerge(docValues) && docValues.type() == template.type();
}
@Override
protected void setMergeBytes(int sourceDoc) {
final double value = currentMergeSource.getFloat(sourceDoc);
template.toBytes(value, bytesRef);
}
}
final static class FloatsReader extends FixedStraightBytesImpl.FixedStraightReader {

View File

@ -106,6 +106,17 @@ public abstract class IndexDocValues implements Closeable {
cache.close(this);
}
/**
* Returns the size per value in bytes or <code>-1</code> iff size per value
* is variable.
*
* @return the size per value in bytes or <code>-1</code> iff size per value
* is variable.
*/
public int getValueSize() {
return -1;
}
/**
* Sets the {@link SourceCache} used by this {@link IndexDocValues} instance. This
* method should be called before {@link #load()} is called. All {@link Source} instances in the currently used cache will be closed

View File

@ -93,9 +93,9 @@ public final class Ints {
protected IntsWriter(Directory dir, String id, String codecName,
int version, Counter bytesUsed, IOContext context, ValueType valueType) throws IOException {
super(dir, id, codecName, version, bytesUsed, context);
final int expectedSize = typeToSize(valueType);
this.bytesRef = new BytesRef(expectedSize);
bytesRef.length = expectedSize;
size = typeToSize(valueType);
this.bytesRef = new BytesRef(size);
bytesRef.length = size;
template = IndexDocValuesArray.TEMPLATES.get(valueType);
}
@ -109,6 +109,18 @@ public final class Ints {
public void add(int docID, PerDocFieldValues docValues) throws IOException {
add(docID, docValues.getInt());
}
@Override
protected void setMergeBytes(int sourceDoc) {
final long value = currentMergeSource.getInt(sourceDoc);
template.toBytes(value, bytesRef);
}
@Override
protected boolean tryBulkMerge(IndexDocValues docValues) {
// only bulk merge if value type is the same otherwise size differs
return super.tryBulkMerge(docValues) && docValues.type() == template.type();
}
}
final static class IntsReader extends FixedStraightBytesImpl.FixedStraightReader {

View File

@ -46,6 +46,8 @@ public class MultiIndexDocValues extends IndexDocValues {
private DocValuesIndex[] docValuesIdx;
private int[] starts;
private ValueType type;
private int valueSize;
public MultiIndexDocValues() {
starts = new int[0];
@ -62,38 +64,51 @@ public class MultiIndexDocValues extends IndexDocValues {
}
public IndexDocValues reset(DocValuesIndex[] docValuesIdx) {
int[] start = new int[docValuesIdx.length];
final int[] start = new int[docValuesIdx.length];
TypePromoter promoter = TypePromoter.getIdentityPromoter();
for (int i = 0; i < docValuesIdx.length; i++) {
start[i] = docValuesIdx[i].start;
if (!(docValuesIdx[i].docValues instanceof EmptyDocValues)) {
// only promote if not a dummy
final TypePromoter incomingPromoter = TypePromoter.create(
docValuesIdx[i].docValues.type(),
docValuesIdx[i].docValues.getValueSize());
promoter = promoter.promote(incomingPromoter);
if (promoter == null) {
throw new IllegalStateException("Can not promote " + incomingPromoter);
}
}
}
this.type = promoter.type();
this.valueSize = promoter.getValueSize();
this.starts = start;
this.docValuesIdx = docValuesIdx;
return this;
}
public static class DummyDocValues extends IndexDocValues {
public static class EmptyDocValues extends IndexDocValues {
final int maxDoc;
final Source emptySoruce;
final Source emptySource;
public DummyDocValues(int maxDoc, ValueType type) {
public EmptyDocValues(int maxDoc, ValueType type) {
this.maxDoc = maxDoc;
this.emptySoruce = new EmptySource(type);
this.emptySource = new EmptySource(type);
}
@Override
public Source load() throws IOException {
return emptySoruce;
return emptySource;
}
@Override
public ValueType type() {
return emptySoruce.type();
return emptySource.type();
}
@Override
public Source getDirectSource() throws IOException {
return emptySoruce;
return emptySource;
}
}
@ -180,7 +195,12 @@ public class MultiIndexDocValues extends IndexDocValues {
@Override
public ValueType type() {
return this.docValuesIdx[0].docValues.type();
return type;
}
@Override
public int getValueSize() {
return valueSize;
}
@Override

View File

@ -93,7 +93,7 @@ class VarStraightBytesImpl {
}
@Override
protected void merge(MergeState state) throws IOException {
protected void merge(SingleSubMergeState state) throws IOException {
merge = true;
datOut = getOrCreateDataOut();
boolean success = false;

View File

@ -138,7 +138,7 @@ public abstract class Writer extends DocValuesConsumer {
public abstract void finish(int docCount) throws IOException;
@Override
protected void merge(MergeState state) throws IOException {
protected void merge(SingleSubMergeState state) throws IOException {
// This enables bulk copies in subclasses per MergeState, subclasses can
// simply override this and decide if they want to merge
// segments using this generic implementation or if a bulk merge is possible