LUCENE-3216: keep doc values in memory during indexing while merge directly to the target file

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1141100 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2011-06-29 13:39:32 +00:00
parent f85c4e7c88
commit 904e3f73bf
11 changed files with 474 additions and 129 deletions

View File

@ -31,7 +31,6 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.AttributeSource;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CodecUtil;
import org.apache.lucene.util.IOUtils;
@ -116,7 +115,7 @@ public final class Bytes {
if (fixedSize) {
if (mode == Mode.STRAIGHT) {
return new FixedStraightBytesImpl.Writer(dir, id);
return new FixedStraightBytesImpl.Writer(dir, id, bytesUsed);
} else if (mode == Mode.DEREF) {
return new FixedDerefBytesImpl.Writer(dir, id, bytesUsed);
} else if (mode == Mode.SORTED) {
@ -337,37 +336,56 @@ public final class Bytes {
// TODO: open up this API?!
static abstract class BytesWriterBase extends Writer {
private final String id;
protected IndexOutput idxOut;
protected IndexOutput datOut;
private IndexOutput idxOut;
private IndexOutput datOut;
protected BytesRef bytesRef;
protected final ByteBlockPool pool;
private final Directory dir;
private final String codecName;
private final int version;
protected BytesWriterBase(Directory dir, String id, String codecName,
int version, boolean initIndex, ByteBlockPool pool,
int version,
AtomicLong bytesUsed) throws IOException {
super(bytesUsed);
this.id = id;
this.pool = pool;
datOut = dir.createOutput(IndexFileNames.segmentFileName(id, "",
DATA_EXTENSION));
this.dir = dir;
this.codecName = codecName;
this.version = version;
}
protected IndexOutput getDataOut() throws IOException {
if (datOut == null) {
boolean success = false;
try {
datOut = dir.createOutput(IndexFileNames.segmentFileName(id, "",
DATA_EXTENSION));
CodecUtil.writeHeader(datOut, codecName, version);
success = true;
} finally {
if (!success) {
IOUtils.closeSafely(true, datOut);
}
}
}
return datOut;
}
protected IndexOutput getIndexOut() throws IOException {
boolean success = false;
try {
CodecUtil.writeHeader(datOut, codecName, version);
if (initIndex) {
if (idxOut == null) {
idxOut = dir.createOutput(IndexFileNames.segmentFileName(id, "",
INDEX_EXTENSION));
CodecUtil.writeHeader(idxOut, codecName, version);
} else {
idxOut = null;
}
success = true;
} finally {
if (!success) {
IOUtils.closeSafely(true, datOut, idxOut);
IOUtils.closeSafely(true, idxOut);
}
}
return idxOut;
}
/**
* Must be called only with increasing docIDs. It's OK for some docIDs to be
* skipped; they will be filled with 0 bytes.
@ -376,15 +394,7 @@ public final class Bytes {
public abstract void add(int docID, BytesRef bytes) throws IOException;
@Override
public void finish(int docCount) throws IOException {
try {
IOUtils.closeSafely(false, datOut, idxOut);
} finally {
if (pool != null) {
pool.reset();
}
}
}
public abstract void finish(int docCount) throws IOException;
@Override
protected void mergeDoc(int docID) throws IOException {

View File

@ -25,11 +25,13 @@ import org.apache.lucene.index.values.Bytes.BytesReaderBase;
import org.apache.lucene.index.values.Bytes.BytesWriterBase;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.AttributeSource;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefHash;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.PagedBytes;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.ByteBlockPool.Allocator;
@ -51,9 +53,7 @@ class FixedDerefBytesImpl {
static class Writer extends BytesWriterBase {
private int size = -1;
private int[] docToID;
private final BytesRefHash hash = new BytesRefHash(pool,
BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray(
BytesRefHash.DEFAULT_CAPACITY, bytesUsed));
private final BytesRefHash hash;
public Writer(Directory dir, String id, AtomicLong bytesUsed)
throws IOException {
this(dir, id, new DirectTrackingAllocator(ByteBlockPool.BYTE_BLOCK_SIZE, bytesUsed),
@ -62,11 +62,12 @@ class FixedDerefBytesImpl {
public Writer(Directory dir, String id, Allocator allocator,
AtomicLong bytesUsed) throws IOException {
super(dir, id, CODEC_NAME, VERSION_CURRENT, true,
new ByteBlockPool(allocator), bytesUsed);
super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed);
hash = new BytesRefHash(new ByteBlockPool(allocator),
BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray(
BytesRefHash.DEFAULT_CAPACITY, bytesUsed));
docToID = new int[1];
bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT); // TODO BytesRefHash
// uses bytes too!
bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT);
}
@Override
@ -75,20 +76,14 @@ class FixedDerefBytesImpl {
return;
if (size == -1) {
size = bytes.length;
datOut.writeInt(size);
} else if (bytes.length != size) {
throw new IllegalArgumentException("expected bytes size=" + size
+ " but got " + bytes.length);
}
int ord = hash.add(bytes);
if (ord >= 0) {
// new added entry
datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
} else {
if (ord < 0) {
ord = (-ord) - 1;
}
if (docID >= docToID.length) {
final int size = docToID.length;
docToID = ArrayUtil.grow(docToID, 1 + docID);
@ -102,11 +97,27 @@ class FixedDerefBytesImpl {
// some last docs that we didn't see
@Override
public void finish(int docCount) throws IOException {
boolean success = false;
final int numValues = hash.size();
final IndexOutput datOut = getDataOut();
try {
if (size == -1) {
datOut.writeInt(size);
datOut.writeInt(size);
if (size != -1) {
final BytesRef bytesRef = new BytesRef(size);
for (int i = 0; i < numValues; i++) {
hash.get(i, bytesRef);
datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length);
}
}
final int count = 1 + hash.size();
success = true;
} finally {
IOUtils.closeSafely(!success, datOut);
hash.close();
}
success = false;
final IndexOutput idxOut = getIndexOut();
try {
final int count = 1 + numValues;
idxOut.writeInt(count - 1);
// write index
final PackedInts.Writer w = PackedInts.getWriter(idxOut, docCount,
@ -120,9 +131,9 @@ class FixedDerefBytesImpl {
w.add(0);
}
w.finish();
success = true;
} finally {
hash.close();
super.finish(docCount);
IOUtils.closeSafely(!success, idxOut);
bytesUsed
.addAndGet((-docToID.length) * RamUsageEstimator.NUM_BYTES_INT);
docToID = null;

View File

@ -27,12 +27,14 @@ import org.apache.lucene.index.values.Bytes.BytesWriterBase;
import org.apache.lucene.index.values.FixedDerefBytesImpl.Reader.DerefBytesEnum;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.AttributeSource;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefHash;
import org.apache.lucene.util.CodecUtil;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.PagedBytes;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.ByteBlockPool.Allocator;
@ -56,10 +58,7 @@ class FixedSortedBytesImpl {
private int size = -1;
private int[] docToEntry;
private final Comparator<BytesRef> comp;
private final BytesRefHash hash = new BytesRefHash(pool,
BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray(
BytesRefHash.DEFAULT_CAPACITY, bytesUsed));
private final BytesRefHash hash;
public Writer(Directory dir, String id, Comparator<BytesRef> comp,
AtomicLong bytesUsed) throws IOException {
@ -69,10 +68,12 @@ class FixedSortedBytesImpl {
public Writer(Directory dir, String id, Comparator<BytesRef> comp,
Allocator allocator, AtomicLong bytesUsed) throws IOException {
super(dir, id, CODEC_NAME, VERSION_CURRENT, true,
new ByteBlockPool(allocator), bytesUsed);
super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed);
ByteBlockPool pool = new ByteBlockPool(allocator);
hash = new BytesRefHash(pool, BytesRefHash.DEFAULT_CAPACITY,
new TrackingDirectBytesStartArray(BytesRefHash.DEFAULT_CAPACITY,
bytesUsed));
docToEntry = new int[1];
// docToEntry[0] = -1;
bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT);
this.comp = comp;
}
@ -83,7 +84,6 @@ class FixedSortedBytesImpl {
return; // default - skip it
if (size == -1) {
size = bytes.length;
datOut.writeInt(size);
} else if (bytes.length != size) {
throw new IllegalArgumentException("expected bytes size=" + size
+ " but got " + bytes.length);
@ -104,26 +104,36 @@ class FixedSortedBytesImpl {
// some last docs that we didn't see
@Override
public void finish(int docCount) throws IOException {
final IndexOutput datOut = getDataOut();
boolean success = false;
final int count = hash.size();
final int[] address = new int[count];
try {
if (size == -1) {// no data added
datOut.writeInt(size);
datOut.writeInt(size);
if (size != -1) {
final int[] sortedEntries = hash.sort(comp);
// first dump bytes data, recording address as we go
final BytesRef bytesRef = new BytesRef(size);
for (int i = 0; i < count; i++) {
final int e = sortedEntries[i];
final BytesRef bytes = hash.get(e, bytesRef);
assert bytes.length == size;
datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
address[e] = 1 + i;
}
}
final int[] sortedEntries = hash.sort(comp);
final int count = hash.size();
int[] address = new int[count];
// first dump bytes data, recording address as we go
for (int i = 0; i < count; i++) {
final int e = sortedEntries[i];
final BytesRef bytes = hash.get(e, new BytesRef());
assert bytes.length == size;
datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
address[e] = 1 + i;
}
success = true;
} finally {
IOUtils.closeSafely(!success, datOut);
hash.close();
}
final IndexOutput idxOut = getIndexOut();
success = false;
try {
idxOut.writeInt(count);
// next write index
PackedInts.Writer w = PackedInts.getWriter(idxOut, docCount,
final PackedInts.Writer w = PackedInts.getWriter(idxOut, docCount,
PackedInts.bitsRequired(count));
final int limit;
if (docCount > docToEntry.length) {
@ -148,11 +158,10 @@ class FixedSortedBytesImpl {
}
w.finish();
} finally {
super.finish(docCount);
IOUtils.closeSafely(!success, idxOut);
bytesUsed.addAndGet((-docToEntry.length)
* RamUsageEstimator.NUM_BYTES_INT);
docToEntry = null;
hash.close();
}
}
}

View File

@ -17,14 +17,20 @@ package org.apache.lucene.index.values;
* limitations under the License.
*/
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.index.values.Bytes.BytesBaseSource;
import org.apache.lucene.index.values.Bytes.BytesReaderBase;
import org.apache.lucene.index.values.Bytes.BytesWriterBase;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.AttributeSource;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.PagedBytes;
@ -44,30 +50,59 @@ class FixedStraightBytesImpl {
private int size = -1;
// start at -1 if the first added value is > 0
private int lastDocID = -1;
private byte[] oneRecord;
private final ByteBlockPool pool;
private boolean merge;
private final int byteBlockSize;
private IndexOutput datOut;
public Writer(Directory dir, String id) throws IOException {
super(dir, id, CODEC_NAME, VERSION_CURRENT, false, null, null);
public Writer(Directory dir, String id, AtomicLong bytesUsed) throws IOException {
super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed);
pool = new ByteBlockPool(new DirectTrackingAllocator(bytesUsed));
byteBlockSize = BYTE_BLOCK_SIZE;
}
@Override
public void add(int docID, BytesRef bytes) throws IOException {
assert lastDocID < docID;
assert !merge;
if (size == -1) {
if (bytes.length > BYTE_BLOCK_SIZE) {
throw new IllegalArgumentException("bytes arrays > " + Short.MAX_VALUE + " are not supported");
}
size = bytes.length;
datOut.writeInt(size);
oneRecord = new byte[size];
pool.nextBuffer();
} else if (bytes.length != size) {
throw new IllegalArgumentException("expected bytes size=" + size
+ " but got " + bytes.length);
}
fill(docID);
assert bytes.bytes.length >= bytes.length;
datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
if (lastDocID+1 < docID) {
advancePool(docID);
}
pool.copy(bytes);
lastDocID = docID;
}
private final void advancePool(int docID) {
assert !merge;
long numBytes = (docID - (lastDocID+1))*size;
while(numBytes > 0) {
if (numBytes + pool.byteUpto < byteBlockSize) {
pool.byteUpto += numBytes;
numBytes = 0;
} else {
numBytes -= byteBlockSize - pool.byteUpto;
pool.nextBuffer();
}
}
assert numBytes == 0;
}
@Override
protected void merge(MergeState state) throws IOException {
merge = true;
datOut = getDataOut();
boolean success = false;
try {
if (state.bits == null && state.reader instanceof Reader) {
Reader reader = (Reader) state.reader;
final int maxDocs = reader.maxDoc;
@ -77,48 +112,92 @@ class FixedStraightBytesImpl {
if (size == -1) {
size = reader.size;
datOut.writeInt(size);
oneRecord = new byte[size];
}
fill(state.docBase);
if (lastDocID+1 < state.docBase) {
fill(datOut, state.docBase);
lastDocID = state.docBase-1;
}
// TODO should we add a transfer to API to each reader?
final IndexInput cloneData = reader.cloneData();
try {
datOut.copyBytes(cloneData, size * maxDocs);
} finally {
cloneData.close();
IOUtils.closeSafely(true, cloneData);
}
lastDocID += maxDocs - 1;
lastDocID += maxDocs;
} else {
super.merge(state);
}
success = true;
} finally {
if (!success) {
IOUtils.closeSafely(!success, datOut);
}
}
}
@Override
protected void mergeDoc(int docID) throws IOException {
assert lastDocID < docID;
if (size == -1) {
size = bytesRef.length;
datOut.writeInt(size);
}
assert size == bytesRef.length;
if (lastDocID+1 < docID) {
fill(datOut, docID);
}
datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length);
lastDocID = docID;
}
// Fills up to but not including this docID
private void fill(int docID) throws IOException {
private void fill(IndexOutput datOut, int docID) throws IOException {
assert size >= 0;
for (int i = lastDocID + 1; i < docID; i++) {
datOut.writeBytes(oneRecord, size);
final long numBytes = (docID - (lastDocID+1))*size;
final byte zero = 0;
for (long i = 0; i < numBytes; i++) {
datOut.writeByte(zero);
}
lastDocID = docID;
}
@Override
public void finish(int docCount) throws IOException {
boolean success = false;
try {
if (size == -1) {// no data added
datOut.writeInt(0);
if (!merge) {
// indexing path - no disk IO until here
assert datOut == null;
datOut = getDataOut();
if (size == -1) {
datOut.writeInt(0);
} else {
datOut.writeInt(size);
pool.writePool(datOut);
}
if (lastDocID + 1 < docCount) {
fill(datOut, docCount);
}
} else {
fill(docCount);
// merge path - datOut should be initialized
assert datOut != null;
if (size == -1) {// no data added
datOut.writeInt(0);
} else {
fill(datOut, docCount);
}
}
success = true;
} finally {
super.finish(docCount);
pool.dropBuffersAndReset();
IOUtils.closeSafely(!success, datOut);
}
}
public long ramBytesUsed() {
return oneRecord == null ? 0 : oneRecord.length;
}
}
public static class Reader extends BytesReaderBase {

View File

@ -27,12 +27,14 @@ import org.apache.lucene.index.values.FixedDerefBytesImpl.Reader.DerefBytesEnum;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.AttributeSource;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefHash;
import org.apache.lucene.util.CodecUtil;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.PagedBytes;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.ByteBlockPool.Allocator;
@ -113,7 +115,7 @@ class VarDerefBytesImpl {
private final AddressByteStartArray array = new AddressByteStartArray(1,
bytesUsed);
private final BytesRefHash hash = new BytesRefHash(pool, 16, array);
private final BytesRefHash hash;
public Writer(Directory dir, String id, AtomicLong bytesUsed)
throws IOException {
@ -123,8 +125,8 @@ class VarDerefBytesImpl {
public Writer(Directory dir, String id, Allocator allocator,
AtomicLong bytesUsed) throws IOException {
super(dir, id, CODEC_NAME, VERSION_CURRENT, true,
new ByteBlockPool(allocator), bytesUsed);
super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed);
hash = new BytesRefHash(new ByteBlockPool(allocator), 16, array);
docToAddress = new int[1];
bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT);
}
@ -144,8 +146,7 @@ class VarDerefBytesImpl {
final int docAddress;
if (e >= 0) {
docAddress = array.address[e] = address;
address += writePrefixLength(datOut, bytes);
datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
address += bytes.length < 128 ? 1 : 2;
address += bytes.length;
} else {
docAddress = array.address[(-e) - 1];
@ -169,6 +170,24 @@ class VarDerefBytesImpl {
// some last docs that we didn't see
@Override
public void finish(int docCount) throws IOException {
final IndexOutput datOut = getDataOut();
boolean success = false;
try {
final int size = hash.size();
final BytesRef bytesRef = new BytesRef();
for (int i = 0; i < size; i++) {
hash.get(i, bytesRef);
writePrefixLength(datOut, bytesRef);
datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length);
}
success = true;
} finally {
hash.close();
IOUtils.closeSafely(!success, datOut);
}
final IndexOutput idxOut = getIndexOut();
success = false;
try {
idxOut.writeInt(address - 1);
// write index
@ -189,9 +208,9 @@ class VarDerefBytesImpl {
w.add(0);
}
w.finish();
success = true;
} finally {
hash.close();
super.finish(docCount);
IOUtils.closeSafely(!success,idxOut);
bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT
* (-docToAddress.length));
docToAddress = null;

View File

@ -27,11 +27,13 @@ import org.apache.lucene.index.values.Bytes.BytesReaderBase;
import org.apache.lucene.index.values.Bytes.BytesWriterBase;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.AttributeSource;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefHash;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.PagedBytes;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.ByteBlockPool.Allocator;
@ -56,9 +58,7 @@ class VarSortedBytesImpl {
private int[] docToEntry;
private final Comparator<BytesRef> comp;
private final BytesRefHash hash = new BytesRefHash(pool,
BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray(
BytesRefHash.DEFAULT_CAPACITY, bytesUsed));
private final BytesRefHash hash;
public Writer(Directory dir, String id, Comparator<BytesRef> comp,
AtomicLong bytesUsed) throws IOException {
@ -68,13 +68,14 @@ class VarSortedBytesImpl {
public Writer(Directory dir, String id, Comparator<BytesRef> comp,
Allocator allocator, AtomicLong bytesUsed) throws IOException {
super(dir, id, CODEC_NAME, VERSION_CURRENT, true,
new ByteBlockPool(allocator), bytesUsed);
super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed);
this.hash = new BytesRefHash(new ByteBlockPool(allocator),
BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray(
BytesRefHash.DEFAULT_CAPACITY, bytesUsed));
this.comp = comp;
docToEntry = new int[1];
docToEntry[0] = -1;
bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT);
}
@Override
@ -99,14 +100,16 @@ class VarSortedBytesImpl {
@Override
public void finish(int docCount) throws IOException {
final int count = hash.size();
final IndexOutput datOut = getDataOut();
long offset = 0;
long lastOffset = 0;
final int[] index = new int[count];
final long[] offsets = new long[count];
boolean success = false;
try {
final int[] sortedEntries = hash.sort(comp);
// first dump bytes data, recording index & offset as
// we go
long offset = 0;
long lastOffset = 0;
final int[] index = new int[count];
final long[] offsets = new long[count];
for (int i = 0; i < count; i++) {
final int e = sortedEntries[i];
offsets[i] = offset;
@ -118,7 +121,14 @@ class VarSortedBytesImpl {
lastOffset = offset;
offset += bytes.length;
}
success = true;
} finally {
IOUtils.closeSafely(!success, datOut);
hash.close();
}
final IndexOutput idxOut = getIndexOut();
success = false;
try {
// total bytes of data
idxOut.writeLong(offset);
@ -145,11 +155,12 @@ class VarSortedBytesImpl {
offsetWriter.add(offsets[i]);
}
offsetWriter.finish();
success = true;
} finally {
super.finish(docCount);
bytesUsed.addAndGet((-docToEntry.length)
* RamUsageEstimator.NUM_BYTES_INT);
hash.close();
docToEntry = null;
IOUtils.closeSafely(!success, idxOut);
}
}
}

View File

@ -25,12 +25,17 @@ import org.apache.lucene.index.values.Bytes.BytesReaderBase;
import org.apache.lucene.index.values.Bytes.BytesWriterBase;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.AttributeSource;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.PagedBytes;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PackedInts.ReaderIterator;
// Variable length byte[] per document, no sharing
@ -48,11 +53,15 @@ class VarStraightBytesImpl {
// start at -1 if the first added value is > 0
private int lastDocID = -1;
private long[] docToAddress;
private final ByteBlockPool pool;
private IndexOutput datOut;
private boolean merge = false;
public Writer(Directory dir, String id, AtomicLong bytesUsed)
throws IOException {
super(dir, id, CODEC_NAME, VERSION_CURRENT, true, null, bytesUsed);
super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed);
pool = new ByteBlockPool(new DirectTrackingAllocator(bytesUsed));
docToAddress = new long[1];
pool.nextBuffer(); // init
bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT);
}
@ -67,21 +76,109 @@ class VarStraightBytesImpl {
for (int i = lastDocID + 1; i < docID; i++) {
docToAddress[i] = address;
}
lastDocID = docID;
}
@Override
public void add(int docID, BytesRef bytes) throws IOException {
if (bytes.length == 0)
assert !merge;
if (bytes.length == 0) {
return; // default
}
fill(docID);
docToAddress[docID] = address;
datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
pool.copy(bytes);
address += bytes.length;
lastDocID = docID;
}
@Override
protected void merge(MergeState state) throws IOException {
merge = true;
datOut = getDataOut();
boolean success = false;
try {
if (state.bits == null && state.reader instanceof Reader) {
// bulk merge since we don't have any deletes
Reader reader = (Reader) state.reader;
final int maxDocs = reader.maxDoc;
if (maxDocs == 0) {
return;
}
if (lastDocID+1 < state.docBase) {
fill(state.docBase);
lastDocID = state.docBase-1;
}
final long numDataBytes;
final IndexInput cloneIdx = reader.cloneIndex();
try {
numDataBytes = cloneIdx.readVLong();
final ReaderIterator iter = PackedInts.getReaderIterator(cloneIdx);
for (int i = 0; i < maxDocs; i++) {
long offset = iter.next();
++lastDocID;
if (lastDocID >= docToAddress.length) {
int oldSize = docToAddress.length;
docToAddress = ArrayUtil.grow(docToAddress, 1 + lastDocID);
bytesUsed.addAndGet((docToAddress.length - oldSize)
* RamUsageEstimator.NUM_BYTES_INT);
}
docToAddress[lastDocID] = address + offset;
}
address += numDataBytes; // this is the address after all addr pointers are updated
iter.close();
} finally {
IOUtils.closeSafely(true, cloneIdx);
}
final IndexInput cloneData = reader.cloneData();
try {
datOut.copyBytes(cloneData, numDataBytes);
} finally {
IOUtils.closeSafely(true, cloneData);
}
} else {
super.merge(state);
}
success = true;
} finally {
if (!success) {
IOUtils.closeSafely(!success, datOut);
}
}
}
@Override
protected void mergeDoc(int docID) throws IOException {
assert merge;
assert lastDocID < docID;
if (bytesRef.length == 0) {
return; // default
}
fill(docID);
datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length);
docToAddress[docID] = address;
address += bytesRef.length;
lastDocID = docID;
}
@Override
public void finish(int docCount) throws IOException {
boolean success = false;
assert (!merge && datOut == null) || (merge && datOut != null);
final IndexOutput datOut = getDataOut();
try {
if (!merge) {
// header is already written in getDataOut()
pool.writePool(datOut);
}
success = true;
} finally {
IOUtils.closeSafely(!success, datOut);
pool.dropBuffersAndReset();
}
success = false;
final IndexOutput idxOut = getIndexOut();
try {
if (lastDocID == -1) {
idxOut.writeVLong(0);
@ -101,11 +198,12 @@ class VarStraightBytesImpl {
}
w.finish();
}
success = true;
} finally {
bytesUsed.addAndGet(-(docToAddress.length)
* RamUsageEstimator.NUM_BYTES_INT);
docToAddress = null;
super.finish(docCount);
IOUtils.closeSafely(!success, idxOut);
}
}
@ -179,21 +277,23 @@ class VarStraightBytesImpl {
}
private class VarStraightBytesEnum extends ValuesEnum {
private final PackedInts.Reader addresses;
private final PackedInts.ReaderIterator addresses;
private final IndexInput datIn;
private final IndexInput idxIn;
private final long fp;
private final long totBytes;
private int pos = -1;
private long nextAddress;
protected VarStraightBytesEnum(AttributeSource source, IndexInput datIn,
IndexInput idxIn) throws IOException {
super(source, ValueType.BYTES_VAR_STRAIGHT);
totBytes = idxIn.readVLong();
fp = datIn.getFilePointer();
addresses = PackedInts.getReader(idxIn);
addresses = PackedInts.getReaderIterator(idxIn);
this.datIn = datIn;
this.idxIn = idxIn;
nextAddress = addresses.next();
}
@Override
@ -207,7 +307,7 @@ class VarStraightBytesImpl {
if (target >= maxDoc) {
return pos = NO_MORE_DOCS;
}
final long addr = addresses.get(target);
final long addr = pos+1 == target ? nextAddress : addresses.advance(target);
if (addr == totBytes) { // empty values at the end
bytesRef.length = 0;
bytesRef.offset = 0;
@ -215,7 +315,7 @@ class VarStraightBytesImpl {
}
datIn.seek(fp + addr);
final int size = (int) (target == maxDoc - 1 ? totBytes - addr
: addresses.get(target + 1) - addr);
: (nextAddress = addresses.next()) - addr);
if (bytesRef.bytes.length < size) {
bytesRef.grow(size);
}

View File

@ -16,10 +16,13 @@ package org.apache.lucene.util;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.store.DataOutput;
import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
/**
@ -241,5 +244,42 @@ public final class ByteBlockPool {
assert term.length >= 0;
return term;
}
/**
* Copies the given {@link BytesRef} at the current positions (
* {@link #byteUpto} across buffer boundaries
*/
public final void copy(final BytesRef bytes) {
int length = bytes.length;
int offset = bytes.offset;
int overflow = (length + byteUpto) - BYTE_BLOCK_SIZE;
do {
if (overflow <= 0) {
System.arraycopy(bytes.bytes, offset, buffer, byteUpto, length);
byteUpto += length;
break;
} else {
final int bytesToCopy = length-overflow;
System.arraycopy(bytes.bytes, offset, buffer, byteUpto, bytesToCopy);
offset += bytesToCopy;
length -= bytesToCopy;
nextBuffer();
overflow = overflow - BYTE_BLOCK_SIZE;
}
} while(true);
}
/**
* Writes the pools content to the given {@link DataOutput}
*/
public final void writePool(final DataOutput out) throws IOException {
int bytesOffset = byteOffset;
int block = 0;
while (bytesOffset > 0) {
out.writeBytes(buffers[block++], BYTE_BLOCK_SIZE);
bytesOffset -= BYTE_BLOCK_SIZE;
}
out.writeBytes(buffers[block], byteUpto);
}
}

View File

@ -64,7 +64,7 @@ public class TestDocValues extends LuceneTestCase {
Writer w = Bytes.getWriter(dir, "test", mode, comp, fixedSize, trackBytes);
int maxDoc = 220;
final String[] values = new String[maxDoc];
final int fixedLength = 3 + random.nextInt(7);
final int fixedLength = 1 + atLeast(50);
for (int i = 0; i < 100; i++) {
final String s;
if (i > 0 && random.nextInt(5) <= 2) {

View File

@ -329,8 +329,7 @@ public class TestDocValuesIndexing extends LuceneTestCase {
final int numValues = 50 + atLeast(10);
for (ValueType byteIndexValue : byteVariantList) {
List<Closeable> closeables = new ArrayList<Closeable>();
int bytesSize = 1 + atLeast(10);
final int bytesSize = 1 + atLeast(50);
OpenBitSet deleted = indexValues(w, numValues, byteIndexValue,
byteVariantList, withDeletions, bytesSize);
final IndexReader r = IndexReader.open(w, withDeletions);
@ -357,7 +356,7 @@ public class TestDocValuesIndexing extends LuceneTestCase {
assertNotNull("expected none null - " + msg, br);
if (br.length != 0) {
assertEquals("expected zero bytes of length " + bytesSize + " - "
+ msg, bytesSize, br.length);
+ msg + br.utf8ToString(), bytesSize, br.length);
for (int j = 0; j < br.length; j++) {
assertEquals("Byte at index " + j + " doesn't match - " + msg, 0,
br.bytes[br.offset + j]);
@ -391,12 +390,12 @@ public class TestDocValuesIndexing extends LuceneTestCase {
while (withDeletions && deleted.get(v++)) {
upto += bytesSize;
}
BytesRef br = bytes.getBytes(i, new BytesRef());
if (bytesEnum.docID() != i) {
assertEquals("seek failed for index " + i + " " + msg, i, bytesEnum
.advance(i));
}
assertTrue(msg, br.length > 0);
for (int j = 0; j < br.length; j++, upto++) {
assertTrue(" enumRef not initialized " + msg,
enumRef.bytes.length > 0);

View File

@ -0,0 +1,67 @@
package org.apache.lucene.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RAMDirectory;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
public class TestByteBlockPool extends LuceneTestCase {
public void testCopyRefAndWrite() throws IOException {
List<String> list = new ArrayList<String>();
int maxLength = atLeast(500);
ByteBlockPool pool = new ByteBlockPool(new ByteBlockPool.DirectAllocator());
pool.nextBuffer();
final int numValues = atLeast(100);
BytesRef ref = new BytesRef();
for (int i = 0; i < numValues; i++) {
final String value = _TestUtil.randomRealisticUnicodeString(random,
maxLength);
list.add(value);
ref.copy(value);
pool.copy(ref);
}
RAMDirectory dir = new RAMDirectory();
IndexOutput stream = dir.createOutput("foo.txt");
pool.writePool(stream);
stream.flush();
stream.close();
IndexInput input = dir.openInput("foo.txt");
assertEquals(pool.byteOffset + pool.byteUpto, stream.length());
BytesRef expected = new BytesRef();
BytesRef actual = new BytesRef();
for (String string : list) {
expected.copy(string);
actual.grow(expected.length);
actual.length = expected.length;
input.readBytes(actual.bytes, 0, actual.length);
assertEquals(expected, actual);
}
try {
input.readByte();
fail("must be EOF");
} catch (IOException e) {
// expected - read past EOF
}
dir.close();
}
}