mirror of https://github.com/apache/lucene.git
LUCENE-3507: Improve Memory Consumption for merging DocValues SortedBytes variants
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1182240 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b6479fc838
commit
e97ac531f1
|
@ -393,6 +393,7 @@ public final class Bytes {
|
||||||
protected int lastDocId = -1;
|
protected int lastDocId = -1;
|
||||||
protected int[] docToEntry;
|
protected int[] docToEntry;
|
||||||
protected final BytesRefHash hash;
|
protected final BytesRefHash hash;
|
||||||
|
protected long maxBytes = 0;
|
||||||
|
|
||||||
protected DerefBytesWriterBase(Directory dir, String id, String codecName,
|
protected DerefBytesWriterBase(Directory dir, String id, String codecName,
|
||||||
int codecVersion, Counter bytesUsed, IOContext context)
|
int codecVersion, Counter bytesUsed, IOContext context)
|
||||||
|
@ -433,8 +434,11 @@ public final class Bytes {
|
||||||
int ord = hash.add(bytes);
|
int ord = hash.add(bytes);
|
||||||
if (ord < 0) {
|
if (ord < 0) {
|
||||||
ord = (-ord) - 1;
|
ord = (-ord) - 1;
|
||||||
|
} else {
|
||||||
|
maxBytes += bytes.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
docToEntry[docID] = ord;
|
docToEntry[docID] = ord;
|
||||||
lastDocId = docID;
|
lastDocId = docID;
|
||||||
}
|
}
|
||||||
|
@ -554,6 +558,8 @@ public final class Bytes {
|
||||||
private final PagedBytes pagedBytes;
|
private final PagedBytes pagedBytes;
|
||||||
|
|
||||||
protected final PackedInts.Reader docToOrdIndex;
|
protected final PackedInts.Reader docToOrdIndex;
|
||||||
|
protected final PackedInts.Reader ordToOffsetIndex;
|
||||||
|
|
||||||
protected final IndexInput datIn;
|
protected final IndexInput datIn;
|
||||||
protected final IndexInput idxIn;
|
protected final IndexInput idxIn;
|
||||||
protected final BytesRef defaultValue = new BytesRef();
|
protected final BytesRef defaultValue = new BytesRef();
|
||||||
|
@ -561,12 +567,12 @@ public final class Bytes {
|
||||||
protected final PagedBytes.Reader data;
|
protected final PagedBytes.Reader data;
|
||||||
|
|
||||||
protected BytesSortedSourceBase(IndexInput datIn, IndexInput idxIn,
|
protected BytesSortedSourceBase(IndexInput datIn, IndexInput idxIn,
|
||||||
Comparator<BytesRef> comp, long bytesToRead, ValueType type) throws IOException {
|
Comparator<BytesRef> comp, long bytesToRead, ValueType type, boolean hasOffsets) throws IOException {
|
||||||
this(datIn, idxIn, comp, new PagedBytes(PAGED_BYTES_BITS), bytesToRead, type);
|
this(datIn, idxIn, comp, new PagedBytes(PAGED_BYTES_BITS), bytesToRead, type, hasOffsets);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected BytesSortedSourceBase(IndexInput datIn, IndexInput idxIn,
|
protected BytesSortedSourceBase(IndexInput datIn, IndexInput idxIn,
|
||||||
Comparator<BytesRef> comp, PagedBytes pagedBytes, long bytesToRead,ValueType type)
|
Comparator<BytesRef> comp, PagedBytes pagedBytes, long bytesToRead, ValueType type, boolean hasOffsets)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super(type, comp);
|
super(type, comp);
|
||||||
assert bytesToRead <= datIn.length() : " file size is less than the expected size diff: "
|
assert bytesToRead <= datIn.length() : " file size is less than the expected size diff: "
|
||||||
|
@ -576,24 +582,19 @@ public final class Bytes {
|
||||||
this.pagedBytes.copy(datIn, bytesToRead);
|
this.pagedBytes.copy(datIn, bytesToRead);
|
||||||
data = pagedBytes.freeze(true);
|
data = pagedBytes.freeze(true);
|
||||||
this.idxIn = idxIn;
|
this.idxIn = idxIn;
|
||||||
|
ordToOffsetIndex = hasOffsets ? PackedInts.getReader(idxIn) : null;
|
||||||
docToOrdIndex = PackedInts.getReader(idxIn);
|
docToOrdIndex = PackedInts.getReader(idxIn);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int ord(int docID) {
|
public int ord(int docID) {
|
||||||
|
assert docToOrdIndex.get(docID) < getValueCount();
|
||||||
return (int) docToOrdIndex.get(docID);
|
return (int) docToOrdIndex.get(docID);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void closeIndexInput() throws IOException {
|
protected void closeIndexInput() throws IOException {
|
||||||
IOUtils.close(datIn, idxIn);
|
IOUtils.close(datIn, idxIn);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the largest doc id + 1 in this doc values source
|
|
||||||
*/
|
|
||||||
public int maxDoc() {
|
|
||||||
return docToOrdIndex.size();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,17 +19,22 @@ package org.apache.lucene.index.values;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.lucene.index.values.Bytes.BytesSortedSourceBase;
|
import org.apache.lucene.index.codecs.MergeState;
|
||||||
import org.apache.lucene.index.values.Bytes.BytesReaderBase;
|
import org.apache.lucene.index.values.Bytes.BytesReaderBase;
|
||||||
|
import org.apache.lucene.index.values.Bytes.BytesSortedSourceBase;
|
||||||
import org.apache.lucene.index.values.Bytes.DerefBytesWriterBase;
|
import org.apache.lucene.index.values.Bytes.DerefBytesWriterBase;
|
||||||
import org.apache.lucene.index.values.IndexDocValues.SortedSource;
|
import org.apache.lucene.index.values.IndexDocValues.SortedSource;
|
||||||
|
import org.apache.lucene.index.values.SortedBytesMergeUtils.MergeContext;
|
||||||
|
import org.apache.lucene.index.values.SortedBytesMergeUtils.SortedSourceSlice;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.IOContext;
|
import org.apache.lucene.store.IOContext;
|
||||||
import org.apache.lucene.store.IndexInput;
|
import org.apache.lucene.store.IndexInput;
|
||||||
import org.apache.lucene.store.IndexOutput;
|
import org.apache.lucene.store.IndexOutput;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
import org.apache.lucene.util.Counter;
|
import org.apache.lucene.util.Counter;
|
||||||
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.apache.lucene.util.packed.PackedInts;
|
import org.apache.lucene.util.packed.PackedInts;
|
||||||
|
|
||||||
// Stores fixed-length byte[] by deref, ie when two docs
|
// Stores fixed-length byte[] by deref, ie when two docs
|
||||||
|
@ -53,6 +58,37 @@ class FixedSortedBytesImpl {
|
||||||
this.comp = comp;
|
this.comp = comp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void merge(MergeState mergeState, IndexDocValues[] docValues)
|
||||||
|
throws IOException {
|
||||||
|
boolean success = false;
|
||||||
|
try {
|
||||||
|
final MergeContext ctx = SortedBytesMergeUtils.init(ValueType.BYTES_FIXED_SORTED, docValues, comp, mergeState);
|
||||||
|
List<SortedSourceSlice> slices = SortedBytesMergeUtils.buildSlices(mergeState, docValues, ctx);
|
||||||
|
final IndexOutput datOut = getOrCreateDataOut();
|
||||||
|
datOut.writeInt(ctx.sizePerValues);
|
||||||
|
final int maxOrd = SortedBytesMergeUtils.mergeRecords(ctx, datOut, slices);
|
||||||
|
|
||||||
|
final IndexOutput idxOut = getOrCreateIndexOut();
|
||||||
|
idxOut.writeInt(maxOrd);
|
||||||
|
final PackedInts.Writer ordsWriter = PackedInts.getWriter(idxOut, ctx.docToEntry.length,
|
||||||
|
PackedInts.bitsRequired(maxOrd));
|
||||||
|
for (SortedSourceSlice slice : slices) {
|
||||||
|
slice.writeOrds(ordsWriter);
|
||||||
|
}
|
||||||
|
ordsWriter.finish();
|
||||||
|
success = true;
|
||||||
|
} finally {
|
||||||
|
releaseResources();
|
||||||
|
if (success) {
|
||||||
|
IOUtils.close(getIndexOut(), getDataOut());
|
||||||
|
} else {
|
||||||
|
IOUtils.closeWhileHandlingException(getIndexOut(), getDataOut());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Important that we get docCount, in case there were
|
// Important that we get docCount, in case there were
|
||||||
// some last docs that we didn't see
|
// some last docs that we didn't see
|
||||||
@Override
|
@Override
|
||||||
|
@ -60,15 +96,15 @@ class FixedSortedBytesImpl {
|
||||||
fillDefault(docCount);
|
fillDefault(docCount);
|
||||||
final IndexOutput datOut = getOrCreateDataOut();
|
final IndexOutput datOut = getOrCreateDataOut();
|
||||||
final int count = hash.size();
|
final int count = hash.size();
|
||||||
final int[] address = new int[count]; // addr 0 is default values
|
final int[] address = new int[count];
|
||||||
datOut.writeInt(size);
|
datOut.writeInt(size);
|
||||||
if (size != -1) {
|
if (size != -1) {
|
||||||
final int[] sortedEntries = hash.sort(comp);
|
final int[] sortedEntries = hash.sort(comp);
|
||||||
// first dump bytes data, recording address as we go
|
// first dump bytes data, recording address as we go
|
||||||
final BytesRef bytesRef = new BytesRef(size);
|
final BytesRef spare = new BytesRef(size);
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
final int e = sortedEntries[i];
|
final int e = sortedEntries[i];
|
||||||
final BytesRef bytes = hash.get(e, bytesRef);
|
final BytesRef bytes = hash.get(e, spare);
|
||||||
assert bytes.length == size;
|
assert bytes.length == size;
|
||||||
datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
|
datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
|
||||||
address[e] = i;
|
address[e] = i;
|
||||||
|
@ -95,8 +131,8 @@ class FixedSortedBytesImpl {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Source load() throws IOException {
|
public Source load() throws IOException {
|
||||||
return new FixedSortedSource(cloneData(), cloneIndex(), size,
|
return new FixedSortedSource(cloneData(), cloneIndex(), size, valueCount,
|
||||||
valueCount, comparator);
|
comparator);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -117,7 +153,8 @@ class FixedSortedBytesImpl {
|
||||||
|
|
||||||
FixedSortedSource(IndexInput datIn, IndexInput idxIn, int size,
|
FixedSortedSource(IndexInput datIn, IndexInput idxIn, int size,
|
||||||
int numValues, Comparator<BytesRef> comp) throws IOException {
|
int numValues, Comparator<BytesRef> comp) throws IOException {
|
||||||
super(datIn, idxIn, comp, size * numValues, ValueType.BYTES_FIXED_SORTED);
|
super(datIn, idxIn, comp, size * numValues, ValueType.BYTES_FIXED_SORTED,
|
||||||
|
false);
|
||||||
this.size = size;
|
this.size = size;
|
||||||
this.valueCount = numValues;
|
this.valueCount = numValues;
|
||||||
closeIndexInput();
|
closeIndexInput();
|
||||||
|
@ -165,9 +202,7 @@ class FixedSortedBytesImpl {
|
||||||
public BytesRef getByOrd(int ord, BytesRef bytesRef) {
|
public BytesRef getByOrd(int ord, BytesRef bytesRef) {
|
||||||
try {
|
try {
|
||||||
datIn.seek(basePointer + size * ord);
|
datIn.seek(basePointer + size * ord);
|
||||||
if (bytesRef.bytes.length < size) {
|
|
||||||
bytesRef.grow(size);
|
bytesRef.grow(size);
|
||||||
}
|
|
||||||
datIn.readBytes(bytesRef.bytes, 0, size);
|
datIn.readBytes(bytesRef.bytes, 0, size);
|
||||||
bytesRef.length = size;
|
bytesRef.length = size;
|
||||||
bytesRef.offset = 0;
|
bytesRef.offset = 0;
|
||||||
|
@ -182,4 +217,5 @@ class FixedSortedBytesImpl {
|
||||||
return valueCount;
|
return valueCount;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -278,7 +278,7 @@ public abstract class IndexDocValues implements Closeable {
|
||||||
return binarySearch(value, spare, 0, getValueCount() - 1);
|
return binarySearch(value, spare, 0, getValueCount() - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int binarySearch(BytesRef b, BytesRef bytesRef, int low,
|
private int binarySearch(BytesRef b, BytesRef bytesRef, int low,
|
||||||
int high) {
|
int high) {
|
||||||
int mid = 0;
|
int mid = 0;
|
||||||
while (low <= high) {
|
while (low <= high) {
|
||||||
|
|
|
@ -0,0 +1,332 @@
|
||||||
|
package org.apache.lucene.index.values;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.codecs.MergeState;
|
||||||
|
import org.apache.lucene.index.codecs.MergeState.IndexReaderAndLiveDocs;
|
||||||
|
import org.apache.lucene.index.values.IndexDocValues.SortedSource;
|
||||||
|
import org.apache.lucene.index.values.IndexDocValues.Source;
|
||||||
|
import org.apache.lucene.store.IndexOutput;
|
||||||
|
import org.apache.lucene.util.ArrayUtil;
|
||||||
|
import org.apache.lucene.util.BytesRef;
|
||||||
|
import org.apache.lucene.util.PriorityQueue;
|
||||||
|
import org.apache.lucene.util.packed.PackedInts;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @lucene.internal
|
||||||
|
*/
|
||||||
|
final class SortedBytesMergeUtils {
|
||||||
|
|
||||||
|
private SortedBytesMergeUtils() {
|
||||||
|
// no instance
|
||||||
|
}
|
||||||
|
|
||||||
|
static MergeContext init(ValueType type, IndexDocValues[] docValues,
|
||||||
|
Comparator<BytesRef> comp, MergeState mergeState) {
|
||||||
|
int size = -1;
|
||||||
|
if (type == ValueType.BYTES_FIXED_SORTED) {
|
||||||
|
for (IndexDocValues indexDocValues : docValues) {
|
||||||
|
if (indexDocValues != null) {
|
||||||
|
size = indexDocValues.getValueSize();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert size >= 0;
|
||||||
|
}
|
||||||
|
return new MergeContext(comp, mergeState, size, type);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class MergeContext {
|
||||||
|
private final Comparator<BytesRef> comp;
|
||||||
|
private final BytesRef missingValue = new BytesRef();
|
||||||
|
final int sizePerValues; // -1 if var length
|
||||||
|
final ValueType type;
|
||||||
|
final int[] docToEntry;
|
||||||
|
long[] offsets; // if non-null #mergeRecords collects byte offsets here
|
||||||
|
|
||||||
|
public MergeContext(Comparator<BytesRef> comp, MergeState mergeState,
|
||||||
|
int size, ValueType type) {
|
||||||
|
assert type == ValueType.BYTES_FIXED_SORTED || type == ValueType.BYTES_VAR_SORTED;
|
||||||
|
this.comp = comp;
|
||||||
|
this.sizePerValues = size;
|
||||||
|
this.type = type;
|
||||||
|
if (size > 0) {
|
||||||
|
missingValue.grow(size);
|
||||||
|
missingValue.length = size;
|
||||||
|
}
|
||||||
|
docToEntry = new int[mergeState.mergedDocCount];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static List<SortedSourceSlice> buildSlices(MergeState mergeState,
|
||||||
|
IndexDocValues[] docValues, MergeContext ctx) throws IOException {
|
||||||
|
final List<SortedSourceSlice> slices = new ArrayList<SortedSourceSlice>();
|
||||||
|
for (int i = 0; i < docValues.length; i++) {
|
||||||
|
final SortedSourceSlice nextSlice;
|
||||||
|
final Source directSource;
|
||||||
|
if (docValues[i] != null
|
||||||
|
&& (directSource = docValues[i].getDirectSource()) != null) {
|
||||||
|
final SortedSourceSlice slice = new SortedSourceSlice(i, directSource
|
||||||
|
.asSortedSource(), mergeState, ctx.docToEntry);
|
||||||
|
nextSlice = slice;
|
||||||
|
} else {
|
||||||
|
nextSlice = new SortedSourceSlice(i, new MissingValueSource(ctx),
|
||||||
|
mergeState, ctx.docToEntry);
|
||||||
|
}
|
||||||
|
createOrdMapping(mergeState, nextSlice);
|
||||||
|
slices.add(nextSlice);
|
||||||
|
}
|
||||||
|
return Collections.unmodifiableList(slices);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* In order to merge we need to map the ords used in each segment to the new
|
||||||
|
* global ords in the new segment. Additionally we need to drop values that
|
||||||
|
* are not referenced anymore due to deleted documents. This method walks all
|
||||||
|
* live documents and fetches their current ordinal. We store this ordinal per
|
||||||
|
* slice and (SortedSourceSlice#ordMapping) and remember the doc to ord
|
||||||
|
* mapping in docIDToRelativeOrd. After the merge SortedSourceSlice#ordMapping
|
||||||
|
* contains the new global ordinals for the relative index.
|
||||||
|
*/
|
||||||
|
private static void createOrdMapping(MergeState mergeState,
|
||||||
|
SortedSourceSlice currentSlice) {
|
||||||
|
final int readerIdx = currentSlice.readerIdx;
|
||||||
|
final int[] currentDocMap = mergeState.docMaps[readerIdx];
|
||||||
|
final int docBase = currentSlice.docToOrdStart;
|
||||||
|
assert docBase == mergeState.docBase[readerIdx];
|
||||||
|
if (currentDocMap != null) { // we have deletes
|
||||||
|
for (int i = 0; i < currentDocMap.length; i++) {
|
||||||
|
final int doc = currentDocMap[i];
|
||||||
|
if (doc != -1) { // not deleted
|
||||||
|
final int ord = currentSlice.source.ord(i); // collect ords strictly
|
||||||
|
// increasing
|
||||||
|
currentSlice.docIDToRelativeOrd[docBase + doc] = ord;
|
||||||
|
// use ord + 1 to identify unreferenced values (ie. == 0)
|
||||||
|
currentSlice.ordMapping[ord] = ord + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else { // no deletes
|
||||||
|
final IndexReaderAndLiveDocs indexReaderAndLiveDocs = mergeState.readers
|
||||||
|
.get(readerIdx);
|
||||||
|
final int numDocs = indexReaderAndLiveDocs.reader.numDocs();
|
||||||
|
assert indexReaderAndLiveDocs.liveDocs == null;
|
||||||
|
assert currentSlice.docToOrdEnd - currentSlice.docToOrdStart == numDocs;
|
||||||
|
for (int doc = 0; doc < numDocs; doc++) {
|
||||||
|
final int ord = currentSlice.source.ord(doc);
|
||||||
|
currentSlice.docIDToRelativeOrd[docBase + doc] = ord;
|
||||||
|
// use ord + 1 to identify unreferenced values (ie. == 0)
|
||||||
|
currentSlice.ordMapping[ord] = ord + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int mergeRecords(MergeContext ctx, IndexOutput datOut,
|
||||||
|
List<SortedSourceSlice> slices) throws IOException {
|
||||||
|
final RecordMerger merger = new RecordMerger(new MergeQueue(slices.size(),
|
||||||
|
ctx.comp), slices.toArray(new SortedSourceSlice[0]));
|
||||||
|
long[] offsets = ctx.offsets;
|
||||||
|
final boolean recordOffsets = offsets != null;
|
||||||
|
long offset = 0;
|
||||||
|
BytesRef currentMergedBytes;
|
||||||
|
merger.pushTop();
|
||||||
|
while (merger.queue.size() > 0) {
|
||||||
|
merger.pullTop();
|
||||||
|
currentMergedBytes = merger.current;
|
||||||
|
assert ctx.sizePerValues == -1 || ctx.sizePerValues == currentMergedBytes.length : "size: "
|
||||||
|
+ ctx.sizePerValues + " spare: " + currentMergedBytes.length;
|
||||||
|
|
||||||
|
if (recordOffsets) {
|
||||||
|
offset += currentMergedBytes.length;
|
||||||
|
if (merger.currentOrd >= offsets.length) {
|
||||||
|
offsets = ArrayUtil.grow(offsets, merger.currentOrd + 1);
|
||||||
|
}
|
||||||
|
offsets[merger.currentOrd] = offset;
|
||||||
|
}
|
||||||
|
datOut.writeBytes(currentMergedBytes.bytes, currentMergedBytes.offset,
|
||||||
|
currentMergedBytes.length);
|
||||||
|
merger.pushTop();
|
||||||
|
}
|
||||||
|
ctx.offsets = offsets;
|
||||||
|
assert offsets == null || offsets[merger.currentOrd - 1] == offset;
|
||||||
|
return merger.currentOrd;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class RecordMerger {
|
||||||
|
private final MergeQueue queue;
|
||||||
|
private final SortedSourceSlice[] top;
|
||||||
|
private int numTop;
|
||||||
|
BytesRef current;
|
||||||
|
int currentOrd = -1;
|
||||||
|
|
||||||
|
RecordMerger(MergeQueue queue, SortedSourceSlice[] top) {
|
||||||
|
super();
|
||||||
|
this.queue = queue;
|
||||||
|
this.top = top;
|
||||||
|
this.numTop = top.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void pullTop() {
|
||||||
|
// extract all subs from the queue that have the same
|
||||||
|
// top record
|
||||||
|
assert numTop == 0;
|
||||||
|
assert currentOrd >= 0;
|
||||||
|
while (true) {
|
||||||
|
final SortedSourceSlice popped = top[numTop++] = queue.pop();
|
||||||
|
// use ord + 1 to identify unreferenced values (ie. == 0)
|
||||||
|
popped.ordMapping[popped.relativeOrd] = currentOrd + 1;
|
||||||
|
if (queue.size() == 0
|
||||||
|
|| !(queue.top()).current.bytesEquals(top[0].current)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
current = top[0].current;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void pushTop() throws IOException {
|
||||||
|
// call next() on each top, and put back into queue
|
||||||
|
for (int i = 0; i < numTop; i++) {
|
||||||
|
top[i].current = top[i].next();
|
||||||
|
if (top[i].current != null) {
|
||||||
|
queue.add(top[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
currentOrd++;
|
||||||
|
numTop = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class SortedSourceSlice {
|
||||||
|
final SortedSource source;
|
||||||
|
final int readerIdx;
|
||||||
|
/* global array indexed by docID containg the relative ord for the doc */
|
||||||
|
final int[] docIDToRelativeOrd;
|
||||||
|
/*
|
||||||
|
* maps relative ords to merged global ords - index is relative ord value
|
||||||
|
* new global ord this map gets updates as we merge ords. later we use the
|
||||||
|
* docIDtoRelativeOrd to get the previous relative ord to get the new ord
|
||||||
|
* from the relative ord map.
|
||||||
|
*/
|
||||||
|
final int[] ordMapping;
|
||||||
|
|
||||||
|
/* start index into docIDToRelativeOrd */
|
||||||
|
final int docToOrdStart;
|
||||||
|
/* end index into docIDToRelativeOrd */
|
||||||
|
final int docToOrdEnd;
|
||||||
|
BytesRef current = new BytesRef();
|
||||||
|
/* the currently merged relative ordinal */
|
||||||
|
int relativeOrd = -1;
|
||||||
|
|
||||||
|
SortedSourceSlice(int readerIdx, SortedSource source, MergeState state,
|
||||||
|
int[] docToOrd) {
|
||||||
|
super();
|
||||||
|
this.readerIdx = readerIdx;
|
||||||
|
this.source = source;
|
||||||
|
this.docIDToRelativeOrd = docToOrd;
|
||||||
|
this.ordMapping = new int[source.getValueCount()];
|
||||||
|
this.docToOrdStart = state.docBase[readerIdx];
|
||||||
|
this.docToOrdEnd = this.docToOrdStart + numDocs(state, readerIdx);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int numDocs(MergeState state, int readerIndex) {
|
||||||
|
if (readerIndex == state.docBase.length - 1) {
|
||||||
|
return state.mergedDocCount - state.docBase[readerIndex];
|
||||||
|
}
|
||||||
|
return state.docBase[readerIndex + 1] - state.docBase[readerIndex];
|
||||||
|
}
|
||||||
|
|
||||||
|
BytesRef next() {
|
||||||
|
for (int i = relativeOrd + 1; i < ordMapping.length; i++) {
|
||||||
|
if (ordMapping[i] != 0) { // skip ords that are not referenced anymore
|
||||||
|
source.getByOrd(i, current);
|
||||||
|
relativeOrd = i;
|
||||||
|
return current;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
void writeOrds(PackedInts.Writer writer) throws IOException {
|
||||||
|
for (int i = docToOrdStart; i < docToOrdEnd; i++) {
|
||||||
|
final int mappedOrd = docIDToRelativeOrd[i];
|
||||||
|
assert mappedOrd < ordMapping.length;
|
||||||
|
assert ordMapping[mappedOrd] > 0 : "illegal mapping ord maps to an unreferenced value";
|
||||||
|
writer.add(ordMapping[mappedOrd] - 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* if a segment has no values at all we use this source to fill in the missing
|
||||||
|
* value in the right place (depending on the comparator used)
|
||||||
|
*/
|
||||||
|
private static final class MissingValueSource extends SortedSource {
|
||||||
|
|
||||||
|
private BytesRef missingValue;
|
||||||
|
|
||||||
|
public MissingValueSource(MergeContext ctx) {
|
||||||
|
super(ctx.type, ctx.comp);
|
||||||
|
this.missingValue = ctx.missingValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int ord(int docID) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BytesRef getByOrd(int ord, BytesRef bytesRef) {
|
||||||
|
bytesRef.copy(missingValue);
|
||||||
|
return bytesRef;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getValueCount() {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* merge queue
|
||||||
|
*/
|
||||||
|
private static final class MergeQueue extends
|
||||||
|
PriorityQueue<SortedSourceSlice> {
|
||||||
|
final Comparator<BytesRef> comp;
|
||||||
|
|
||||||
|
public MergeQueue(int maxSize, Comparator<BytesRef> comp) {
|
||||||
|
super(maxSize);
|
||||||
|
this.comp = comp;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean lessThan(SortedSourceSlice a, SortedSourceSlice b) {
|
||||||
|
int cmp = comp.compare(a.current, b.current);
|
||||||
|
if (cmp != 0) {
|
||||||
|
return cmp < 0;
|
||||||
|
} else { // just a tie-breaker
|
||||||
|
return a.docToOrdStart < b.docToOrdStart;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,17 +19,22 @@ package org.apache.lucene.index.values;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.codecs.MergeState;
|
||||||
import org.apache.lucene.index.values.Bytes.BytesSortedSourceBase;
|
import org.apache.lucene.index.values.Bytes.BytesSortedSourceBase;
|
||||||
import org.apache.lucene.index.values.Bytes.BytesReaderBase;
|
import org.apache.lucene.index.values.Bytes.BytesReaderBase;
|
||||||
import org.apache.lucene.index.values.Bytes.DerefBytesWriterBase;
|
import org.apache.lucene.index.values.Bytes.DerefBytesWriterBase;
|
||||||
import org.apache.lucene.index.values.IndexDocValues.SortedSource;
|
import org.apache.lucene.index.values.IndexDocValues.SortedSource;
|
||||||
|
import org.apache.lucene.index.values.SortedBytesMergeUtils.MergeContext;
|
||||||
|
import org.apache.lucene.index.values.SortedBytesMergeUtils.SortedSourceSlice;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.IOContext;
|
import org.apache.lucene.store.IOContext;
|
||||||
import org.apache.lucene.store.IndexInput;
|
import org.apache.lucene.store.IndexInput;
|
||||||
import org.apache.lucene.store.IndexOutput;
|
import org.apache.lucene.store.IndexOutput;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
import org.apache.lucene.util.Counter;
|
import org.apache.lucene.util.Counter;
|
||||||
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.apache.lucene.util.packed.PackedInts;
|
import org.apache.lucene.util.packed.PackedInts;
|
||||||
|
|
||||||
// Stores variable-length byte[] by deref, ie when two docs
|
// Stores variable-length byte[] by deref, ie when two docs
|
||||||
|
@ -54,6 +59,47 @@ final class VarSortedBytesImpl {
|
||||||
this.comp = comp;
|
this.comp = comp;
|
||||||
size = 0;
|
size = 0;
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
|
public void merge(MergeState mergeState, IndexDocValues[] docValues)
|
||||||
|
throws IOException {
|
||||||
|
boolean success = false;
|
||||||
|
try {
|
||||||
|
MergeContext ctx = SortedBytesMergeUtils.init(ValueType.BYTES_VAR_SORTED, docValues, comp, mergeState);
|
||||||
|
final List<SortedSourceSlice> slices = SortedBytesMergeUtils.buildSlices(mergeState, docValues, ctx);
|
||||||
|
IndexOutput datOut = getOrCreateDataOut();
|
||||||
|
|
||||||
|
ctx.offsets = new long[1];
|
||||||
|
final int maxOrd = SortedBytesMergeUtils.mergeRecords(ctx, datOut, slices);
|
||||||
|
final long[] offsets = ctx.offsets;
|
||||||
|
maxBytes = offsets[maxOrd-1];
|
||||||
|
final IndexOutput idxOut = getOrCreateIndexOut();
|
||||||
|
|
||||||
|
idxOut.writeLong(maxBytes);
|
||||||
|
final PackedInts.Writer offsetWriter = PackedInts.getWriter(idxOut, maxOrd+1,
|
||||||
|
PackedInts.bitsRequired(maxBytes));
|
||||||
|
offsetWriter.add(0);
|
||||||
|
for (int i = 0; i < maxOrd; i++) {
|
||||||
|
offsetWriter.add(offsets[i]);
|
||||||
|
}
|
||||||
|
offsetWriter.finish();
|
||||||
|
|
||||||
|
final PackedInts.Writer ordsWriter = PackedInts.getWriter(idxOut, ctx.docToEntry.length,
|
||||||
|
PackedInts.bitsRequired(maxOrd-1));
|
||||||
|
for (SortedSourceSlice slice : slices) {
|
||||||
|
slice.writeOrds(ordsWriter);
|
||||||
|
}
|
||||||
|
ordsWriter.finish();
|
||||||
|
success = true;
|
||||||
|
} finally {
|
||||||
|
releaseResources();
|
||||||
|
if (success) {
|
||||||
|
IOUtils.close(getIndexOut(), getDataOut());
|
||||||
|
} else {
|
||||||
|
IOUtils.closeWhileHandlingException(getIndexOut(), getDataOut());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void checkSize(BytesRef bytes) {
|
protected void checkSize(BytesRef bytes) {
|
||||||
|
@ -67,35 +113,31 @@ final class VarSortedBytesImpl {
|
||||||
fillDefault(docCount);
|
fillDefault(docCount);
|
||||||
final int count = hash.size();
|
final int count = hash.size();
|
||||||
final IndexOutput datOut = getOrCreateDataOut();
|
final IndexOutput datOut = getOrCreateDataOut();
|
||||||
|
final IndexOutput idxOut = getOrCreateIndexOut();
|
||||||
long offset = 0;
|
long offset = 0;
|
||||||
final int[] index = new int[count];
|
final int[] index = new int[count];
|
||||||
final long[] offsets = new long[count];
|
|
||||||
final int[] sortedEntries = hash.sort(comp);
|
final int[] sortedEntries = hash.sort(comp);
|
||||||
// first dump bytes data, recording index & offset as
|
// total bytes of data
|
||||||
|
idxOut.writeLong(maxBytes);
|
||||||
|
PackedInts.Writer offsetWriter = PackedInts.getWriter(idxOut, count+1,
|
||||||
|
PackedInts.bitsRequired(maxBytes));
|
||||||
|
// first dump bytes data, recording index & write offset as
|
||||||
// we go
|
// we go
|
||||||
|
final BytesRef spare = new BytesRef();
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
final int e = sortedEntries[i];
|
final int e = sortedEntries[i];
|
||||||
offsets[i] = offset;
|
offsetWriter.add(offset);
|
||||||
index[e] = i;
|
index[e] = i;
|
||||||
|
final BytesRef bytes = hash.get(e, spare);
|
||||||
final BytesRef bytes = hash.get(e, new BytesRef());
|
|
||||||
// TODO: we could prefix code...
|
// TODO: we could prefix code...
|
||||||
datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
|
datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
|
||||||
offset += bytes.length;
|
offset += bytes.length;
|
||||||
}
|
}
|
||||||
final IndexOutput idxOut = getOrCreateIndexOut();
|
|
||||||
// total bytes of data
|
|
||||||
idxOut.writeLong(offset);
|
|
||||||
// write index
|
|
||||||
writeIndex(idxOut, docCount, count, index, docToEntry);
|
|
||||||
// next ord (0-based) -> offset
|
|
||||||
PackedInts.Writer offsetWriter = PackedInts.getWriter(idxOut, count+1,
|
|
||||||
PackedInts.bitsRequired(offset));
|
|
||||||
for (int i = 0; i < count; i++) {
|
|
||||||
offsetWriter.add(offsets[i]);
|
|
||||||
}
|
|
||||||
offsetWriter.add(offset);
|
offsetWriter.add(offset);
|
||||||
offsetWriter.finish();
|
offsetWriter.finish();
|
||||||
|
// write index
|
||||||
|
writeIndex(idxOut, docCount, count, index, docToEntry);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,13 +165,11 @@ final class VarSortedBytesImpl {
|
||||||
|
|
||||||
}
|
}
|
||||||
private static final class VarSortedSource extends BytesSortedSourceBase {
|
private static final class VarSortedSource extends BytesSortedSourceBase {
|
||||||
private final PackedInts.Reader ordToOffsetIndex; // 0-based
|
|
||||||
private final int valueCount;
|
private final int valueCount;
|
||||||
|
|
||||||
VarSortedSource(IndexInput datIn, IndexInput idxIn,
|
VarSortedSource(IndexInput datIn, IndexInput idxIn,
|
||||||
Comparator<BytesRef> comp) throws IOException {
|
Comparator<BytesRef> comp) throws IOException {
|
||||||
super(datIn, idxIn, comp, idxIn.readLong(), ValueType.BYTES_VAR_SORTED);
|
super(datIn, idxIn, comp, idxIn.readLong(), ValueType.BYTES_VAR_SORTED, true);
|
||||||
ordToOffsetIndex = PackedInts.getReader(idxIn);
|
|
||||||
valueCount = ordToOffsetIndex.size()-1; // the last value here is just a dummy value to get the length of the last value
|
valueCount = ordToOffsetIndex.size()-1; // the last value here is just a dummy value to get the length of the last value
|
||||||
closeIndexInput();
|
closeIndexInput();
|
||||||
}
|
}
|
||||||
|
@ -149,7 +189,7 @@ final class VarSortedBytesImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class DirectSortedSource extends SortedSource {
|
private static final class DirectSortedSource extends SortedSource {
|
||||||
private final PackedInts.Reader docToOrdIndex;
|
private final PackedInts.RandomAccessReaderIterator docToOrdIndex;
|
||||||
private final PackedInts.RandomAccessReaderIterator ordToOffsetIndex;
|
private final PackedInts.RandomAccessReaderIterator ordToOffsetIndex;
|
||||||
private final IndexInput datIn;
|
private final IndexInput datIn;
|
||||||
private final long basePointer;
|
private final long basePointer;
|
||||||
|
@ -159,16 +199,22 @@ final class VarSortedBytesImpl {
|
||||||
Comparator<BytesRef> comparator, ValueType type) throws IOException {
|
Comparator<BytesRef> comparator, ValueType type) throws IOException {
|
||||||
super(type, comparator);
|
super(type, comparator);
|
||||||
idxIn.readLong();
|
idxIn.readLong();
|
||||||
docToOrdIndex = PackedInts.getReader(idxIn); // read the ords in to prevent too many random disk seeks
|
|
||||||
ordToOffsetIndex = PackedInts.getRandomAccessReaderIterator(idxIn);
|
ordToOffsetIndex = PackedInts.getRandomAccessReaderIterator(idxIn);
|
||||||
valueCount = ordToOffsetIndex.size()-1; // the last value here is just a dummy value to get the length of the last value
|
valueCount = ordToOffsetIndex.size()-1; // the last value here is just a dummy value to get the length of the last value
|
||||||
|
// advance this iterator to the end and clone the stream once it points to the docToOrdIndex header
|
||||||
|
ordToOffsetIndex.advance(valueCount);
|
||||||
|
docToOrdIndex = PackedInts.getRandomAccessReaderIterator((IndexInput) idxIn.clone()); // read the ords in to prevent too many random disk seeks
|
||||||
basePointer = datIn.getFilePointer();
|
basePointer = datIn.getFilePointer();
|
||||||
this.datIn = datIn;
|
this.datIn = datIn;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int ord(int docID) {
|
public int ord(int docID) {
|
||||||
|
try {
|
||||||
return (int) docToOrdIndex.get(docID);
|
return (int) docToOrdIndex.get(docID);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new IllegalStateException("failed", ex);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -178,9 +224,7 @@ final class VarSortedBytesImpl {
|
||||||
final long nextOffset = ordToOffsetIndex.next();
|
final long nextOffset = ordToOffsetIndex.next();
|
||||||
datIn.seek(basePointer + offset);
|
datIn.seek(basePointer + offset);
|
||||||
final int length = (int) (nextOffset - offset);
|
final int length = (int) (nextOffset - offset);
|
||||||
if (bytesRef.bytes.length < length) {
|
|
||||||
bytesRef.grow(length);
|
bytesRef.grow(length);
|
||||||
}
|
|
||||||
datIn.readBytes(bytesRef.bytes, 0, length);
|
datIn.readBytes(bytesRef.bytes, 0, length);
|
||||||
bytesRef.length = length;
|
bytesRef.length = length;
|
||||||
bytesRef.offset = 0;
|
bytesRef.offset = 0;
|
||||||
|
|
Loading…
Reference in New Issue