buffer up per-doc sortedbytes ordinals in packedints too

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/lucene4547@1440377 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Muir 2013-01-30 12:46:17 +00:00
parent 3ecb5d8831
commit 7143779b04
2 changed files with 121 additions and 22 deletions

View File

@ -31,20 +31,19 @@ import org.apache.lucene.util.BytesRefHash.DirectBytesStartArray;
import org.apache.lucene.util.BytesRefHash;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.packed.AppendingLongBuffer;
/** Buffers up pending byte[] per doc, deref and sorting via
* int ord, then flushes when segment flushes. */
class SortedDocValuesWriter extends DocValuesWriter {
final BytesRefHash hash;
private int[] pending = new int[DEFAULT_PENDING_SIZE];
private int pendingIndex = 0;
private AppendingLongBuffer pending;
private final Counter iwBytesUsed;
private long bytesUsed;
private long bytesUsed; // this currently only tracks differences in 'pending'
private final FieldInfo fieldInfo;
private static final BytesRef EMPTY = new BytesRef(BytesRef.EMPTY_BYTES);
private static final int DEFAULT_PENDING_SIZE = 16;
public SortedDocValuesWriter(FieldInfo fieldInfo, Counter iwBytesUsed) {
this.fieldInfo = fieldInfo;
@ -54,11 +53,13 @@ class SortedDocValuesWriter extends DocValuesWriter {
new ByteBlockPool.DirectTrackingAllocator(iwBytesUsed)),
BytesRefHash.DEFAULT_CAPACITY,
new DirectBytesStartArray(BytesRefHash.DEFAULT_CAPACITY, iwBytesUsed));
iwBytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + RamUsageEstimator.NUM_BYTES_INT * DEFAULT_PENDING_SIZE);
pending = new AppendingLongBuffer();
bytesUsed = pending.ramBytesUsed();
iwBytesUsed.addAndGet(bytesUsed);
}
public void addValue(int docID, BytesRef value) {
if (docID < pendingIndex) {
if (docID < pending.size()) {
throw new IllegalArgumentException("DocValuesField \"" + fieldInfo.name + "\" appears more than once in this document (only one value is allowed per field)");
}
if (value == null) {
@ -69,7 +70,7 @@ class SortedDocValuesWriter extends DocValuesWriter {
}
// Fill in any holes:
while(pendingIndex < docID) {
while(pending.size() < docID) {
addOneValue(EMPTY);
}
@ -78,7 +79,8 @@ class SortedDocValuesWriter extends DocValuesWriter {
@Override
public void finish(int maxDoc) {
if (pendingIndex < maxDoc) {
// nocommit: WTF.. why is this not a while but an if?
if (pending.size() < maxDoc) {
addOneValue(EMPTY);
}
}
@ -89,12 +91,14 @@ class SortedDocValuesWriter extends DocValuesWriter {
ord = -ord-1;
}
if (pendingIndex <= pending.length) {
int pendingLen = pending.length;
pending = ArrayUtil.grow(pending, pendingIndex+1);
iwBytesUsed.addAndGet((pending.length - pendingLen) * RamUsageEstimator.NUM_BYTES_INT);
}
pending[pendingIndex++] = ord;
pending.add(ord);
updateBytesUsed();
}
private void updateBytesUsed() {
final long newBytesUsed = pending.ramBytesUsed();
iwBytesUsed.addAndGet(newBytesUsed - bytesUsed);
bytesUsed = newBytesUsed;
}
@Override
@ -102,7 +106,7 @@ class SortedDocValuesWriter extends DocValuesWriter {
final int maxDoc = state.segmentInfo.getDocCount();
final int emptyOrd;
if (pendingIndex < maxDoc) {
if (pending.size() < maxDoc) {
// Make sure we added EMPTY value before sorting:
int ord = hash.add(EMPTY);
if (ord < 0) {
@ -125,8 +129,6 @@ class SortedDocValuesWriter extends DocValuesWriter {
ordMap[sortedValues[ord]] = ord;
}
final int bufferedDocCount = pendingIndex;
dvConsumer.addSortedField(fieldInfo,
// ord -> value
@ -141,7 +143,7 @@ class SortedDocValuesWriter extends DocValuesWriter {
new Iterable<Number>() {
@Override
public Iterator<Number> iterator() {
return new OrdsIterator(ordMap, bufferedDocCount, maxDoc, emptyOrd);
return new OrdsIterator(ordMap, maxDoc, emptyOrd);
}
});
}
@ -185,15 +187,15 @@ class SortedDocValuesWriter extends DocValuesWriter {
// iterates over the ords for each doc we have in ram
private class OrdsIterator implements Iterator<Number> {
final AppendingLongBuffer.Iterator iter = pending.iterator();
final int ordMap[];
final int size;
final int size = pending.size();
final int maxDoc;
final int emptyOrd; // nocommit
int docUpto;
OrdsIterator(int ordMap[], int size, int maxDoc, int emptyOrd) {
OrdsIterator(int ordMap[], int maxDoc, int emptyOrd) {
this.ordMap = ordMap;
this.size = size;
this.maxDoc = maxDoc;
this.emptyOrd = emptyOrd;
}
@ -210,7 +212,7 @@ class SortedDocValuesWriter extends DocValuesWriter {
}
int ord;
if (docUpto < size) {
ord = pending[docUpto];
ord = (int) iter.next();
} else {
ord = emptyOrd;
}

View File

@ -0,0 +1,97 @@
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TimeUnits;
import org.apache.lucene.util._TestUtil;
import org.junit.Ignore;
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
@TimeoutSuite(millis = 80 * TimeUnits.HOUR)
@Ignore("takes ?? minutes")
public class Test2BSortedDocValues extends LuceneTestCase {
// indexes Integer.MAX_VALUE docs with a fixed binary field
public void testFixedSorted() throws Exception {
BaseDirectoryWrapper dir = newFSDirectory(_TestUtil.getTempDir("2BFixedSorted"));
if (dir instanceof MockDirectoryWrapper) {
((MockDirectoryWrapper)dir).setThrottling(MockDirectoryWrapper.Throttling.NEVER);
}
IndexWriter w = new IndexWriter(dir,
new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()))
.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH)
.setRAMBufferSizeMB(256.0)
.setMergeScheduler(new ConcurrentMergeScheduler())
.setMergePolicy(newLogMergePolicy(false, 10))
.setOpenMode(IndexWriterConfig.OpenMode.CREATE));
Document doc = new Document();
byte bytes[] = new byte[2];
BytesRef data = new BytesRef(bytes);
SortedDocValuesField dvField = new SortedDocValuesField("dv", data);
doc.add(dvField);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
bytes[0] = (byte)(i >> 8);
bytes[1] = (byte) i;
w.addDocument(doc);
if (i % 100000 == 0) {
System.out.println("indexed: " + i);
System.out.flush();
}
}
w.forceMerge(1);
w.close();
System.out.println("verifying...");
System.out.flush();
DirectoryReader r = DirectoryReader.open(dir);
int expectedValue = 0;
for (AtomicReaderContext context : r.leaves()) {
AtomicReader reader = context.reader();
BytesRef scratch = new BytesRef();
BinaryDocValues dv = reader.getSortedDocValues("dv");
for (int i = 0; i < reader.maxDoc(); i++) {
bytes[0] = (byte)(expectedValue >> 8);
bytes[1] = (byte) expectedValue;
dv.get(i, scratch);
assertEquals(data, scratch);
expectedValue++;
}
}
r.close();
dir.close();
}
// TODO: variable, and also Test2BOrds
}