From 904e3f73bffd594faa72b002426c32a0e3bcb06e Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 29 Jun 2011 13:39:32 +0000 Subject: [PATCH] LUCENE-3216: keep doc values in memory during indexing while merge directly to the target file git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1141100 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/lucene/index/values/Bytes.java | 58 ++++---- .../index/values/FixedDerefBytesImpl.java | 49 ++++--- .../index/values/FixedSortedBytesImpl.java | 61 +++++---- .../index/values/FixedStraightBytesImpl.java | 129 ++++++++++++++---- .../index/values/VarDerefBytesImpl.java | 33 ++++- .../index/values/VarSortedBytesImpl.java | 37 +++-- .../index/values/VarStraightBytesImpl.java | 120 ++++++++++++++-- .../org/apache/lucene/util/ByteBlockPool.java | 40 ++++++ .../lucene/index/values/TestDocValues.java | 2 +- .../index/values/TestDocValuesIndexing.java | 7 +- .../apache/lucene/util/TestByteBlockPool.java | 67 +++++++++ 11 files changed, 474 insertions(+), 129 deletions(-) create mode 100644 lucene/src/test/org/apache/lucene/util/TestByteBlockPool.java diff --git a/lucene/src/java/org/apache/lucene/index/values/Bytes.java b/lucene/src/java/org/apache/lucene/index/values/Bytes.java index d566d96653c..4cd44751d35 100644 --- a/lucene/src/java/org/apache/lucene/index/values/Bytes.java +++ b/lucene/src/java/org/apache/lucene/index/values/Bytes.java @@ -31,7 +31,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.AttributeSource; -import org.apache.lucene.util.ByteBlockPool; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.CodecUtil; import org.apache.lucene.util.IOUtils; @@ -116,7 +115,7 @@ public final class Bytes { if (fixedSize) { if (mode == Mode.STRAIGHT) { - return new FixedStraightBytesImpl.Writer(dir, id); + return new FixedStraightBytesImpl.Writer(dir, id, bytesUsed); } else if (mode == Mode.DEREF) { return new FixedDerefBytesImpl.Writer(dir, id, bytesUsed); } else if (mode == Mode.SORTED) { @@ -337,37 +336,56 @@ public final class Bytes { // TODO: open up this API?! static abstract class BytesWriterBase extends Writer { private final String id; - protected IndexOutput idxOut; - protected IndexOutput datOut; + private IndexOutput idxOut; + private IndexOutput datOut; protected BytesRef bytesRef; - protected final ByteBlockPool pool; + private final Directory dir; + private final String codecName; + private final int version; protected BytesWriterBase(Directory dir, String id, String codecName, - int version, boolean initIndex, ByteBlockPool pool, + int version, AtomicLong bytesUsed) throws IOException { super(bytesUsed); this.id = id; - this.pool = pool; - datOut = dir.createOutput(IndexFileNames.segmentFileName(id, "", - DATA_EXTENSION)); + this.dir = dir; + this.codecName = codecName; + this.version = version; + } + + protected IndexOutput getDataOut() throws IOException { + if (datOut == null) { + boolean success = false; + try { + datOut = dir.createOutput(IndexFileNames.segmentFileName(id, "", + DATA_EXTENSION)); + CodecUtil.writeHeader(datOut, codecName, version); + success = true; + } finally { + if (!success) { + IOUtils.closeSafely(true, datOut); + } + } + } + return datOut; + } + + protected IndexOutput getIndexOut() throws IOException { boolean success = false; try { - CodecUtil.writeHeader(datOut, codecName, version); - if (initIndex) { + if (idxOut == null) { idxOut = dir.createOutput(IndexFileNames.segmentFileName(id, "", INDEX_EXTENSION)); CodecUtil.writeHeader(idxOut, codecName, version); - } else { - idxOut = null; } success = true; } finally { if (!success) { - IOUtils.closeSafely(true, datOut, idxOut); + IOUtils.closeSafely(true, idxOut); } } + return idxOut; } - /** * Must be called only with increasing docIDs. It's OK for some docIDs to be * skipped; they will be filled with 0 bytes. @@ -376,15 +394,7 @@ public final class Bytes { public abstract void add(int docID, BytesRef bytes) throws IOException; @Override - public void finish(int docCount) throws IOException { - try { - IOUtils.closeSafely(false, datOut, idxOut); - } finally { - if (pool != null) { - pool.reset(); - } - } - } + public abstract void finish(int docCount) throws IOException; @Override protected void mergeDoc(int docID) throws IOException { diff --git a/lucene/src/java/org/apache/lucene/index/values/FixedDerefBytesImpl.java b/lucene/src/java/org/apache/lucene/index/values/FixedDerefBytesImpl.java index 56493a20e63..f11186f6edb 100644 --- a/lucene/src/java/org/apache/lucene/index/values/FixedDerefBytesImpl.java +++ b/lucene/src/java/org/apache/lucene/index/values/FixedDerefBytesImpl.java @@ -25,11 +25,13 @@ import org.apache.lucene.index.values.Bytes.BytesReaderBase; import org.apache.lucene.index.values.Bytes.BytesWriterBase; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.AttributeSource; import org.apache.lucene.util.ByteBlockPool; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefHash; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.PagedBytes; import org.apache.lucene.util.RamUsageEstimator; import org.apache.lucene.util.ByteBlockPool.Allocator; @@ -51,9 +53,7 @@ class FixedDerefBytesImpl { static class Writer extends BytesWriterBase { private int size = -1; private int[] docToID; - private final BytesRefHash hash = new BytesRefHash(pool, - BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray( - BytesRefHash.DEFAULT_CAPACITY, bytesUsed)); + private final BytesRefHash hash; public Writer(Directory dir, String id, AtomicLong bytesUsed) throws IOException { this(dir, id, new DirectTrackingAllocator(ByteBlockPool.BYTE_BLOCK_SIZE, bytesUsed), @@ -62,11 +62,12 @@ class FixedDerefBytesImpl { public Writer(Directory dir, String id, Allocator allocator, AtomicLong bytesUsed) throws IOException { - super(dir, id, CODEC_NAME, VERSION_CURRENT, true, - new ByteBlockPool(allocator), bytesUsed); + super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed); + hash = new BytesRefHash(new ByteBlockPool(allocator), + BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray( + BytesRefHash.DEFAULT_CAPACITY, bytesUsed)); docToID = new int[1]; - bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT); // TODO BytesRefHash - // uses bytes too! + bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT); } @Override @@ -75,20 +76,14 @@ class FixedDerefBytesImpl { return; if (size == -1) { size = bytes.length; - datOut.writeInt(size); } else if (bytes.length != size) { throw new IllegalArgumentException("expected bytes size=" + size + " but got " + bytes.length); } int ord = hash.add(bytes); - - if (ord >= 0) { - // new added entry - datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length); - } else { + if (ord < 0) { ord = (-ord) - 1; } - if (docID >= docToID.length) { final int size = docToID.length; docToID = ArrayUtil.grow(docToID, 1 + docID); @@ -102,11 +97,27 @@ class FixedDerefBytesImpl { // some last docs that we didn't see @Override public void finish(int docCount) throws IOException { + boolean success = false; + final int numValues = hash.size(); + final IndexOutput datOut = getDataOut(); try { - if (size == -1) { - datOut.writeInt(size); + datOut.writeInt(size); + if (size != -1) { + final BytesRef bytesRef = new BytesRef(size); + for (int i = 0; i < numValues; i++) { + hash.get(i, bytesRef); + datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length); + } } - final int count = 1 + hash.size(); + success = true; + } finally { + IOUtils.closeSafely(!success, datOut); + hash.close(); + } + success = false; + final IndexOutput idxOut = getIndexOut(); + try { + final int count = 1 + numValues; idxOut.writeInt(count - 1); // write index final PackedInts.Writer w = PackedInts.getWriter(idxOut, docCount, @@ -120,9 +131,9 @@ class FixedDerefBytesImpl { w.add(0); } w.finish(); + success = true; } finally { - hash.close(); - super.finish(docCount); + IOUtils.closeSafely(!success, idxOut); bytesUsed .addAndGet((-docToID.length) * RamUsageEstimator.NUM_BYTES_INT); docToID = null; diff --git a/lucene/src/java/org/apache/lucene/index/values/FixedSortedBytesImpl.java b/lucene/src/java/org/apache/lucene/index/values/FixedSortedBytesImpl.java index 3a32f9892c3..d1dc0f83c83 100644 --- a/lucene/src/java/org/apache/lucene/index/values/FixedSortedBytesImpl.java +++ b/lucene/src/java/org/apache/lucene/index/values/FixedSortedBytesImpl.java @@ -27,12 +27,14 @@ import org.apache.lucene.index.values.Bytes.BytesWriterBase; import org.apache.lucene.index.values.FixedDerefBytesImpl.Reader.DerefBytesEnum; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.AttributeSource; import org.apache.lucene.util.ByteBlockPool; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefHash; import org.apache.lucene.util.CodecUtil; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.PagedBytes; import org.apache.lucene.util.RamUsageEstimator; import org.apache.lucene.util.ByteBlockPool.Allocator; @@ -56,10 +58,7 @@ class FixedSortedBytesImpl { private int size = -1; private int[] docToEntry; private final Comparator comp; - - private final BytesRefHash hash = new BytesRefHash(pool, - BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray( - BytesRefHash.DEFAULT_CAPACITY, bytesUsed)); + private final BytesRefHash hash; public Writer(Directory dir, String id, Comparator comp, AtomicLong bytesUsed) throws IOException { @@ -69,10 +68,12 @@ class FixedSortedBytesImpl { public Writer(Directory dir, String id, Comparator comp, Allocator allocator, AtomicLong bytesUsed) throws IOException { - super(dir, id, CODEC_NAME, VERSION_CURRENT, true, - new ByteBlockPool(allocator), bytesUsed); + super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed); + ByteBlockPool pool = new ByteBlockPool(allocator); + hash = new BytesRefHash(pool, BytesRefHash.DEFAULT_CAPACITY, + new TrackingDirectBytesStartArray(BytesRefHash.DEFAULT_CAPACITY, + bytesUsed)); docToEntry = new int[1]; - // docToEntry[0] = -1; bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT); this.comp = comp; } @@ -83,7 +84,6 @@ class FixedSortedBytesImpl { return; // default - skip it if (size == -1) { size = bytes.length; - datOut.writeInt(size); } else if (bytes.length != size) { throw new IllegalArgumentException("expected bytes size=" + size + " but got " + bytes.length); @@ -104,26 +104,36 @@ class FixedSortedBytesImpl { // some last docs that we didn't see @Override public void finish(int docCount) throws IOException { + final IndexOutput datOut = getDataOut(); + boolean success = false; + final int count = hash.size(); + final int[] address = new int[count]; + try { - if (size == -1) {// no data added - datOut.writeInt(size); + datOut.writeInt(size); + if (size != -1) { + final int[] sortedEntries = hash.sort(comp); + // first dump bytes data, recording address as we go + final BytesRef bytesRef = new BytesRef(size); + for (int i = 0; i < count; i++) { + final int e = sortedEntries[i]; + final BytesRef bytes = hash.get(e, bytesRef); + assert bytes.length == size; + datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length); + address[e] = 1 + i; + } } - final int[] sortedEntries = hash.sort(comp); - final int count = hash.size(); - int[] address = new int[count]; - // first dump bytes data, recording address as we go - for (int i = 0; i < count; i++) { - final int e = sortedEntries[i]; - final BytesRef bytes = hash.get(e, new BytesRef()); - assert bytes.length == size; - datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length); - address[e] = 1 + i; - } - + success = true; + } finally { + IOUtils.closeSafely(!success, datOut); + hash.close(); + } + final IndexOutput idxOut = getIndexOut(); + success = false; + try { idxOut.writeInt(count); - // next write index - PackedInts.Writer w = PackedInts.getWriter(idxOut, docCount, + final PackedInts.Writer w = PackedInts.getWriter(idxOut, docCount, PackedInts.bitsRequired(count)); final int limit; if (docCount > docToEntry.length) { @@ -148,11 +158,10 @@ class FixedSortedBytesImpl { } w.finish(); } finally { - super.finish(docCount); + IOUtils.closeSafely(!success, idxOut); bytesUsed.addAndGet((-docToEntry.length) * RamUsageEstimator.NUM_BYTES_INT); docToEntry = null; - hash.close(); } } } diff --git a/lucene/src/java/org/apache/lucene/index/values/FixedStraightBytesImpl.java b/lucene/src/java/org/apache/lucene/index/values/FixedStraightBytesImpl.java index d24a83a56a5..5e56cda08ca 100644 --- a/lucene/src/java/org/apache/lucene/index/values/FixedStraightBytesImpl.java +++ b/lucene/src/java/org/apache/lucene/index/values/FixedStraightBytesImpl.java @@ -17,14 +17,20 @@ package org.apache.lucene.index.values; * limitations under the License. */ +import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE; + import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.index.values.Bytes.BytesBaseSource; import org.apache.lucene.index.values.Bytes.BytesReaderBase; import org.apache.lucene.index.values.Bytes.BytesWriterBase; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.AttributeSource; +import org.apache.lucene.util.ByteBlockPool; +import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.PagedBytes; @@ -44,30 +50,59 @@ class FixedStraightBytesImpl { private int size = -1; // start at -1 if the first added value is > 0 private int lastDocID = -1; - private byte[] oneRecord; + private final ByteBlockPool pool; + private boolean merge; + private final int byteBlockSize; + private IndexOutput datOut; - public Writer(Directory dir, String id) throws IOException { - super(dir, id, CODEC_NAME, VERSION_CURRENT, false, null, null); + public Writer(Directory dir, String id, AtomicLong bytesUsed) throws IOException { + super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed); + pool = new ByteBlockPool(new DirectTrackingAllocator(bytesUsed)); + byteBlockSize = BYTE_BLOCK_SIZE; } - @Override public void add(int docID, BytesRef bytes) throws IOException { + assert lastDocID < docID; + assert !merge; if (size == -1) { + if (bytes.length > BYTE_BLOCK_SIZE) { + throw new IllegalArgumentException("bytes arrays > " + Short.MAX_VALUE + " are not supported"); + } size = bytes.length; - datOut.writeInt(size); - oneRecord = new byte[size]; + pool.nextBuffer(); } else if (bytes.length != size) { throw new IllegalArgumentException("expected bytes size=" + size + " but got " + bytes.length); } - fill(docID); - assert bytes.bytes.length >= bytes.length; - datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length); + if (lastDocID+1 < docID) { + advancePool(docID); + } + pool.copy(bytes); + lastDocID = docID; + } + + private final void advancePool(int docID) { + assert !merge; + long numBytes = (docID - (lastDocID+1))*size; + while(numBytes > 0) { + if (numBytes + pool.byteUpto < byteBlockSize) { + pool.byteUpto += numBytes; + numBytes = 0; + } else { + numBytes -= byteBlockSize - pool.byteUpto; + pool.nextBuffer(); + } + } + assert numBytes == 0; } @Override protected void merge(MergeState state) throws IOException { + merge = true; + datOut = getDataOut(); + boolean success = false; + try { if (state.bits == null && state.reader instanceof Reader) { Reader reader = (Reader) state.reader; final int maxDocs = reader.maxDoc; @@ -77,48 +112,92 @@ class FixedStraightBytesImpl { if (size == -1) { size = reader.size; datOut.writeInt(size); - oneRecord = new byte[size]; } - fill(state.docBase); + if (lastDocID+1 < state.docBase) { + fill(datOut, state.docBase); + lastDocID = state.docBase-1; + } // TODO should we add a transfer to API to each reader? final IndexInput cloneData = reader.cloneData(); try { datOut.copyBytes(cloneData, size * maxDocs); } finally { - cloneData.close(); + IOUtils.closeSafely(true, cloneData); } - lastDocID += maxDocs - 1; + lastDocID += maxDocs; } else { super.merge(state); } + success = true; + } finally { + if (!success) { + IOUtils.closeSafely(!success, datOut); + } + } + } + + + + @Override + protected void mergeDoc(int docID) throws IOException { + assert lastDocID < docID; + if (size == -1) { + size = bytesRef.length; + datOut.writeInt(size); + } + assert size == bytesRef.length; + if (lastDocID+1 < docID) { + fill(datOut, docID); + } + datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length); + lastDocID = docID; } + + // Fills up to but not including this docID - private void fill(int docID) throws IOException { + private void fill(IndexOutput datOut, int docID) throws IOException { assert size >= 0; - for (int i = lastDocID + 1; i < docID; i++) { - datOut.writeBytes(oneRecord, size); + final long numBytes = (docID - (lastDocID+1))*size; + final byte zero = 0; + for (long i = 0; i < numBytes; i++) { + datOut.writeByte(zero); } - lastDocID = docID; } @Override public void finish(int docCount) throws IOException { + boolean success = false; try { - if (size == -1) {// no data added - datOut.writeInt(0); + if (!merge) { + // indexing path - no disk IO until here + assert datOut == null; + datOut = getDataOut(); + if (size == -1) { + datOut.writeInt(0); + } else { + datOut.writeInt(size); + pool.writePool(datOut); + } + if (lastDocID + 1 < docCount) { + fill(datOut, docCount); + } } else { - fill(docCount); + // merge path - datOut should be initialized + assert datOut != null; + if (size == -1) {// no data added + datOut.writeInt(0); + } else { + fill(datOut, docCount); + } } + success = true; } finally { - super.finish(docCount); + pool.dropBuffersAndReset(); + IOUtils.closeSafely(!success, datOut); } } - - public long ramBytesUsed() { - return oneRecord == null ? 0 : oneRecord.length; - } } public static class Reader extends BytesReaderBase { diff --git a/lucene/src/java/org/apache/lucene/index/values/VarDerefBytesImpl.java b/lucene/src/java/org/apache/lucene/index/values/VarDerefBytesImpl.java index 215acd469ea..ced6ebea999 100644 --- a/lucene/src/java/org/apache/lucene/index/values/VarDerefBytesImpl.java +++ b/lucene/src/java/org/apache/lucene/index/values/VarDerefBytesImpl.java @@ -27,12 +27,14 @@ import org.apache.lucene.index.values.FixedDerefBytesImpl.Reader.DerefBytesEnum; import org.apache.lucene.store.DataOutput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.AttributeSource; import org.apache.lucene.util.ByteBlockPool; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefHash; import org.apache.lucene.util.CodecUtil; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.PagedBytes; import org.apache.lucene.util.RamUsageEstimator; import org.apache.lucene.util.ByteBlockPool.Allocator; @@ -113,7 +115,7 @@ class VarDerefBytesImpl { private final AddressByteStartArray array = new AddressByteStartArray(1, bytesUsed); - private final BytesRefHash hash = new BytesRefHash(pool, 16, array); + private final BytesRefHash hash; public Writer(Directory dir, String id, AtomicLong bytesUsed) throws IOException { @@ -123,8 +125,8 @@ class VarDerefBytesImpl { public Writer(Directory dir, String id, Allocator allocator, AtomicLong bytesUsed) throws IOException { - super(dir, id, CODEC_NAME, VERSION_CURRENT, true, - new ByteBlockPool(allocator), bytesUsed); + super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed); + hash = new BytesRefHash(new ByteBlockPool(allocator), 16, array); docToAddress = new int[1]; bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT); } @@ -144,8 +146,7 @@ class VarDerefBytesImpl { final int docAddress; if (e >= 0) { docAddress = array.address[e] = address; - address += writePrefixLength(datOut, bytes); - datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length); + address += bytes.length < 128 ? 1 : 2; address += bytes.length; } else { docAddress = array.address[(-e) - 1]; @@ -169,6 +170,24 @@ class VarDerefBytesImpl { // some last docs that we didn't see @Override public void finish(int docCount) throws IOException { + final IndexOutput datOut = getDataOut(); + boolean success = false; + try { + final int size = hash.size(); + final BytesRef bytesRef = new BytesRef(); + for (int i = 0; i < size; i++) { + hash.get(i, bytesRef); + writePrefixLength(datOut, bytesRef); + datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length); + } + success = true; + } finally { + hash.close(); + IOUtils.closeSafely(!success, datOut); + } + + final IndexOutput idxOut = getIndexOut(); + success = false; try { idxOut.writeInt(address - 1); // write index @@ -189,9 +208,9 @@ class VarDerefBytesImpl { w.add(0); } w.finish(); + success = true; } finally { - hash.close(); - super.finish(docCount); + IOUtils.closeSafely(!success,idxOut); bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT * (-docToAddress.length)); docToAddress = null; diff --git a/lucene/src/java/org/apache/lucene/index/values/VarSortedBytesImpl.java b/lucene/src/java/org/apache/lucene/index/values/VarSortedBytesImpl.java index 89d4b7b1bf6..905cf9f7862 100644 --- a/lucene/src/java/org/apache/lucene/index/values/VarSortedBytesImpl.java +++ b/lucene/src/java/org/apache/lucene/index/values/VarSortedBytesImpl.java @@ -27,11 +27,13 @@ import org.apache.lucene.index.values.Bytes.BytesReaderBase; import org.apache.lucene.index.values.Bytes.BytesWriterBase; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.AttributeSource; import org.apache.lucene.util.ByteBlockPool; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefHash; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.PagedBytes; import org.apache.lucene.util.RamUsageEstimator; import org.apache.lucene.util.ByteBlockPool.Allocator; @@ -56,9 +58,7 @@ class VarSortedBytesImpl { private int[] docToEntry; private final Comparator comp; - private final BytesRefHash hash = new BytesRefHash(pool, - BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray( - BytesRefHash.DEFAULT_CAPACITY, bytesUsed)); + private final BytesRefHash hash; public Writer(Directory dir, String id, Comparator comp, AtomicLong bytesUsed) throws IOException { @@ -68,13 +68,14 @@ class VarSortedBytesImpl { public Writer(Directory dir, String id, Comparator comp, Allocator allocator, AtomicLong bytesUsed) throws IOException { - super(dir, id, CODEC_NAME, VERSION_CURRENT, true, - new ByteBlockPool(allocator), bytesUsed); + super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed); + this.hash = new BytesRefHash(new ByteBlockPool(allocator), + BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray( + BytesRefHash.DEFAULT_CAPACITY, bytesUsed)); this.comp = comp; docToEntry = new int[1]; docToEntry[0] = -1; bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT); - } @Override @@ -99,14 +100,16 @@ class VarSortedBytesImpl { @Override public void finish(int docCount) throws IOException { final int count = hash.size(); + final IndexOutput datOut = getDataOut(); + long offset = 0; + long lastOffset = 0; + final int[] index = new int[count]; + final long[] offsets = new long[count]; + boolean success = false; try { final int[] sortedEntries = hash.sort(comp); // first dump bytes data, recording index & offset as // we go - long offset = 0; - long lastOffset = 0; - final int[] index = new int[count]; - final long[] offsets = new long[count]; for (int i = 0; i < count; i++) { final int e = sortedEntries[i]; offsets[i] = offset; @@ -118,7 +121,14 @@ class VarSortedBytesImpl { lastOffset = offset; offset += bytes.length; } - + success = true; + } finally { + IOUtils.closeSafely(!success, datOut); + hash.close(); + } + final IndexOutput idxOut = getIndexOut(); + success = false; + try { // total bytes of data idxOut.writeLong(offset); @@ -145,11 +155,12 @@ class VarSortedBytesImpl { offsetWriter.add(offsets[i]); } offsetWriter.finish(); + success = true; } finally { - super.finish(docCount); bytesUsed.addAndGet((-docToEntry.length) * RamUsageEstimator.NUM_BYTES_INT); - hash.close(); + docToEntry = null; + IOUtils.closeSafely(!success, idxOut); } } } diff --git a/lucene/src/java/org/apache/lucene/index/values/VarStraightBytesImpl.java b/lucene/src/java/org/apache/lucene/index/values/VarStraightBytesImpl.java index f736ae7eafa..88e68a56f59 100644 --- a/lucene/src/java/org/apache/lucene/index/values/VarStraightBytesImpl.java +++ b/lucene/src/java/org/apache/lucene/index/values/VarStraightBytesImpl.java @@ -25,12 +25,17 @@ import org.apache.lucene.index.values.Bytes.BytesReaderBase; import org.apache.lucene.index.values.Bytes.BytesWriterBase; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.AttributeSource; +import org.apache.lucene.util.ByteBlockPool; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.PagedBytes; import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator; import org.apache.lucene.util.packed.PackedInts; +import org.apache.lucene.util.packed.PackedInts.ReaderIterator; // Variable length byte[] per document, no sharing @@ -48,11 +53,15 @@ class VarStraightBytesImpl { // start at -1 if the first added value is > 0 private int lastDocID = -1; private long[] docToAddress; - + private final ByteBlockPool pool; + private IndexOutput datOut; + private boolean merge = false; public Writer(Directory dir, String id, AtomicLong bytesUsed) throws IOException { - super(dir, id, CODEC_NAME, VERSION_CURRENT, true, null, bytesUsed); + super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed); + pool = new ByteBlockPool(new DirectTrackingAllocator(bytesUsed)); docToAddress = new long[1]; + pool.nextBuffer(); // init bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT); } @@ -67,21 +76,109 @@ class VarStraightBytesImpl { for (int i = lastDocID + 1; i < docID; i++) { docToAddress[i] = address; } - lastDocID = docID; } @Override public void add(int docID, BytesRef bytes) throws IOException { - if (bytes.length == 0) + assert !merge; + if (bytes.length == 0) { return; // default + } fill(docID); docToAddress[docID] = address; - datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length); + pool.copy(bytes); address += bytes.length; + lastDocID = docID; } + + @Override + protected void merge(MergeState state) throws IOException { + merge = true; + datOut = getDataOut(); + boolean success = false; + try { + if (state.bits == null && state.reader instanceof Reader) { + // bulk merge since we don't have any deletes + Reader reader = (Reader) state.reader; + final int maxDocs = reader.maxDoc; + if (maxDocs == 0) { + return; + } + if (lastDocID+1 < state.docBase) { + fill(state.docBase); + lastDocID = state.docBase-1; + } + final long numDataBytes; + final IndexInput cloneIdx = reader.cloneIndex(); + try { + numDataBytes = cloneIdx.readVLong(); + final ReaderIterator iter = PackedInts.getReaderIterator(cloneIdx); + for (int i = 0; i < maxDocs; i++) { + long offset = iter.next(); + ++lastDocID; + if (lastDocID >= docToAddress.length) { + int oldSize = docToAddress.length; + docToAddress = ArrayUtil.grow(docToAddress, 1 + lastDocID); + bytesUsed.addAndGet((docToAddress.length - oldSize) + * RamUsageEstimator.NUM_BYTES_INT); + } + docToAddress[lastDocID] = address + offset; + } + address += numDataBytes; // this is the address after all addr pointers are updated + iter.close(); + } finally { + IOUtils.closeSafely(true, cloneIdx); + } + final IndexInput cloneData = reader.cloneData(); + try { + datOut.copyBytes(cloneData, numDataBytes); + } finally { + IOUtils.closeSafely(true, cloneData); + } + } else { + super.merge(state); + } + success = true; + } finally { + if (!success) { + IOUtils.closeSafely(!success, datOut); + } + } + } + + @Override + protected void mergeDoc(int docID) throws IOException { + assert merge; + assert lastDocID < docID; + if (bytesRef.length == 0) { + return; // default + } + fill(docID); + datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length); + docToAddress[docID] = address; + address += bytesRef.length; + lastDocID = docID; + } + @Override public void finish(int docCount) throws IOException { + boolean success = false; + assert (!merge && datOut == null) || (merge && datOut != null); + final IndexOutput datOut = getDataOut(); + try { + if (!merge) { + // header is already written in getDataOut() + pool.writePool(datOut); + } + success = true; + } finally { + IOUtils.closeSafely(!success, datOut); + pool.dropBuffersAndReset(); + } + + success = false; + final IndexOutput idxOut = getIndexOut(); try { if (lastDocID == -1) { idxOut.writeVLong(0); @@ -101,11 +198,12 @@ class VarStraightBytesImpl { } w.finish(); } + success = true; } finally { bytesUsed.addAndGet(-(docToAddress.length) * RamUsageEstimator.NUM_BYTES_INT); docToAddress = null; - super.finish(docCount); + IOUtils.closeSafely(!success, idxOut); } } @@ -179,21 +277,23 @@ class VarStraightBytesImpl { } private class VarStraightBytesEnum extends ValuesEnum { - private final PackedInts.Reader addresses; + private final PackedInts.ReaderIterator addresses; private final IndexInput datIn; private final IndexInput idxIn; private final long fp; private final long totBytes; private int pos = -1; + private long nextAddress; protected VarStraightBytesEnum(AttributeSource source, IndexInput datIn, IndexInput idxIn) throws IOException { super(source, ValueType.BYTES_VAR_STRAIGHT); totBytes = idxIn.readVLong(); fp = datIn.getFilePointer(); - addresses = PackedInts.getReader(idxIn); + addresses = PackedInts.getReaderIterator(idxIn); this.datIn = datIn; this.idxIn = idxIn; + nextAddress = addresses.next(); } @Override @@ -207,7 +307,7 @@ class VarStraightBytesImpl { if (target >= maxDoc) { return pos = NO_MORE_DOCS; } - final long addr = addresses.get(target); + final long addr = pos+1 == target ? nextAddress : addresses.advance(target); if (addr == totBytes) { // empty values at the end bytesRef.length = 0; bytesRef.offset = 0; @@ -215,7 +315,7 @@ class VarStraightBytesImpl { } datIn.seek(fp + addr); final int size = (int) (target == maxDoc - 1 ? totBytes - addr - : addresses.get(target + 1) - addr); + : (nextAddress = addresses.next()) - addr); if (bytesRef.bytes.length < size) { bytesRef.grow(size); } diff --git a/lucene/src/java/org/apache/lucene/util/ByteBlockPool.java b/lucene/src/java/org/apache/lucene/util/ByteBlockPool.java index 58e3b93824f..24530d5e02d 100644 --- a/lucene/src/java/org/apache/lucene/util/ByteBlockPool.java +++ b/lucene/src/java/org/apache/lucene/util/ByteBlockPool.java @@ -16,10 +16,13 @@ package org.apache.lucene.util; * See the License for the specific language governing permissions and * limitations under the License. */ +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicLong; +import org.apache.lucene.store.DataOutput; + import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF; /** @@ -241,5 +244,42 @@ public final class ByteBlockPool { assert term.length >= 0; return term; } + + /** + * Copies the given {@link BytesRef} at the current positions ( + * {@link #byteUpto} across buffer boundaries + */ + public final void copy(final BytesRef bytes) { + int length = bytes.length; + int offset = bytes.offset; + int overflow = (length + byteUpto) - BYTE_BLOCK_SIZE; + do { + if (overflow <= 0) { + System.arraycopy(bytes.bytes, offset, buffer, byteUpto, length); + byteUpto += length; + break; + } else { + final int bytesToCopy = length-overflow; + System.arraycopy(bytes.bytes, offset, buffer, byteUpto, bytesToCopy); + offset += bytesToCopy; + length -= bytesToCopy; + nextBuffer(); + overflow = overflow - BYTE_BLOCK_SIZE; + } + } while(true); + } + + /** + * Writes the pools content to the given {@link DataOutput} + */ + public final void writePool(final DataOutput out) throws IOException { + int bytesOffset = byteOffset; + int block = 0; + while (bytesOffset > 0) { + out.writeBytes(buffers[block++], BYTE_BLOCK_SIZE); + bytesOffset -= BYTE_BLOCK_SIZE; + } + out.writeBytes(buffers[block], byteUpto); + } } diff --git a/lucene/src/test/org/apache/lucene/index/values/TestDocValues.java b/lucene/src/test/org/apache/lucene/index/values/TestDocValues.java index 22ff9da69f3..a72a28b9820 100644 --- a/lucene/src/test/org/apache/lucene/index/values/TestDocValues.java +++ b/lucene/src/test/org/apache/lucene/index/values/TestDocValues.java @@ -64,7 +64,7 @@ public class TestDocValues extends LuceneTestCase { Writer w = Bytes.getWriter(dir, "test", mode, comp, fixedSize, trackBytes); int maxDoc = 220; final String[] values = new String[maxDoc]; - final int fixedLength = 3 + random.nextInt(7); + final int fixedLength = 1 + atLeast(50); for (int i = 0; i < 100; i++) { final String s; if (i > 0 && random.nextInt(5) <= 2) { diff --git a/lucene/src/test/org/apache/lucene/index/values/TestDocValuesIndexing.java b/lucene/src/test/org/apache/lucene/index/values/TestDocValuesIndexing.java index f5588b9d4c2..9be727e3167 100644 --- a/lucene/src/test/org/apache/lucene/index/values/TestDocValuesIndexing.java +++ b/lucene/src/test/org/apache/lucene/index/values/TestDocValuesIndexing.java @@ -329,8 +329,7 @@ public class TestDocValuesIndexing extends LuceneTestCase { final int numValues = 50 + atLeast(10); for (ValueType byteIndexValue : byteVariantList) { List closeables = new ArrayList(); - - int bytesSize = 1 + atLeast(10); + final int bytesSize = 1 + atLeast(50); OpenBitSet deleted = indexValues(w, numValues, byteIndexValue, byteVariantList, withDeletions, bytesSize); final IndexReader r = IndexReader.open(w, withDeletions); @@ -357,7 +356,7 @@ public class TestDocValuesIndexing extends LuceneTestCase { assertNotNull("expected none null - " + msg, br); if (br.length != 0) { assertEquals("expected zero bytes of length " + bytesSize + " - " - + msg, bytesSize, br.length); + + msg + br.utf8ToString(), bytesSize, br.length); for (int j = 0; j < br.length; j++) { assertEquals("Byte at index " + j + " doesn't match - " + msg, 0, br.bytes[br.offset + j]); @@ -391,12 +390,12 @@ public class TestDocValuesIndexing extends LuceneTestCase { while (withDeletions && deleted.get(v++)) { upto += bytesSize; } - BytesRef br = bytes.getBytes(i, new BytesRef()); if (bytesEnum.docID() != i) { assertEquals("seek failed for index " + i + " " + msg, i, bytesEnum .advance(i)); } + assertTrue(msg, br.length > 0); for (int j = 0; j < br.length; j++, upto++) { assertTrue(" enumRef not initialized " + msg, enumRef.bytes.length > 0); diff --git a/lucene/src/test/org/apache/lucene/util/TestByteBlockPool.java b/lucene/src/test/org/apache/lucene/util/TestByteBlockPool.java new file mode 100644 index 00000000000..ef12523f06c --- /dev/null +++ b/lucene/src/test/org/apache/lucene/util/TestByteBlockPool.java @@ -0,0 +1,67 @@ +package org.apache.lucene.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.RAMDirectory; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +public class TestByteBlockPool extends LuceneTestCase { + + public void testCopyRefAndWrite() throws IOException { + List list = new ArrayList(); + int maxLength = atLeast(500); + ByteBlockPool pool = new ByteBlockPool(new ByteBlockPool.DirectAllocator()); + pool.nextBuffer(); + final int numValues = atLeast(100); + BytesRef ref = new BytesRef(); + for (int i = 0; i < numValues; i++) { + final String value = _TestUtil.randomRealisticUnicodeString(random, + maxLength); + list.add(value); + ref.copy(value); + pool.copy(ref); + } + RAMDirectory dir = new RAMDirectory(); + IndexOutput stream = dir.createOutput("foo.txt"); + pool.writePool(stream); + stream.flush(); + stream.close(); + IndexInput input = dir.openInput("foo.txt"); + assertEquals(pool.byteOffset + pool.byteUpto, stream.length()); + BytesRef expected = new BytesRef(); + BytesRef actual = new BytesRef(); + for (String string : list) { + expected.copy(string); + actual.grow(expected.length); + actual.length = expected.length; + input.readBytes(actual.bytes, 0, actual.length); + assertEquals(expected, actual); + } + try { + input.readByte(); + fail("must be EOF"); + } catch (IOException e) { + // expected - read past EOF + } + dir.close(); + } +}