LUCENE-8468: A ByteBuffer based Directory implementation (and associated classes).

This commit is contained in:
Dawid Weiss 2018-08-28 15:02:17 +02:00
parent a452dd9ce3
commit f762953aab
11 changed files with 2219 additions and 30 deletions

View File

@ -237,6 +237,8 @@ Changes in Runtime Behavior:
Improvements
* LUCENE-8468: A ByteBuffer based Directory implementation. (Dawid Weiss)
* LUCENE-8447: Add DISJOINT and WITHIN support to LatLonShape queries. (Nick Knize)
* LUCENE-8440: Add support for indexing and searching Line and Point shapes using LatLonShape encoding (Nick Knize)

View File

@ -14,55 +14,56 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;
import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Locale;
/**
* DataInput backed by a byte array.
* <b>WARNING:</b> This class omits all low-level checks.
* @lucene.experimental
/**
* A {@link IndexInput} backed by a byte array.
*
* @lucene.experimental
*/
public final class ByteArrayIndexInput extends IndexInput {
public final class ByteArrayIndexInput extends IndexInput implements RandomAccessInput {
private byte[] bytes;
private final int offset;
private final int length;
private int pos;
private int limit;
public ByteArrayIndexInput(String description, byte[] bytes) {
this(description, bytes, 0, bytes.length);
}
public ByteArrayIndexInput(String description, byte[] bytes, int offs, int length) {
super(description);
this.offset = offs;
this.bytes = bytes;
this.limit = bytes.length;
this.length = length;
this.pos = offs;
}
public long getFilePointer() {
return pos;
return pos - offset;
}
public void seek(long pos) {
this.pos = (int) pos;
}
public void reset(byte[] bytes, int offset, int len) {
this.bytes = bytes;
pos = offset;
limit = offset + len;
public void seek(long pos) throws EOFException {
int newPos = Math.toIntExact(pos + offset);
try {
if (pos < 0 || pos > length) {
throw new EOFException();
}
} finally {
this.pos = newPos;
}
}
@Override
public long length() {
return limit;
}
public boolean eof() {
return pos == limit;
}
@Override
public void skipBytes(long count) {
pos += count;
return length;
}
@Override
@ -153,9 +154,55 @@ public final class ByteArrayIndexInput extends IndexInput {
@Override
public void close() {
bytes = null;
}
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
throw new UnsupportedOperationException();
@Override
public IndexInput clone() {
ByteArrayIndexInput slice = slice("(cloned)" + toString(), 0, length());
try {
slice.seek(getFilePointer());
} catch (EOFException e) {
throw new UncheckedIOException(e);
}
return slice;
}
public ByteArrayIndexInput slice(String sliceDescription, long offset, long length) {
if (offset < 0 || length < 0 || offset + length > this.length) {
throw new IllegalArgumentException(String.format(Locale.ROOT,
"slice(offset=%s, length=%s) is out of bounds: %s",
offset, length, this));
}
return new ByteArrayIndexInput(sliceDescription, this.bytes, Math.toIntExact(this.offset + offset),
Math.toIntExact(length));
}
@Override
public byte readByte(long pos) throws IOException {
return bytes[Math.toIntExact(offset + pos)];
}
@Override
public short readShort(long pos) throws IOException {
int i = Math.toIntExact(offset + pos);
return (short) (((bytes[i] & 0xFF) << 8) |
(bytes[i + 1] & 0xFF));
}
@Override
public int readInt(long pos) throws IOException {
int i = Math.toIntExact(offset + pos);
return ((bytes[i] & 0xFF) << 24) |
((bytes[i + 1] & 0xFF) << 16) |
((bytes[i + 2] & 0xFF) << 8) |
(bytes[i + 3] & 0xFF);
}
@Override
public long readLong(long pos) throws IOException {
return (((long) readInt(pos)) << 32) |
(readInt(pos + 4) & 0xFFFFFFFFL);
}
}

View File

@ -0,0 +1,323 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;
import java.io.EOFException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.RandomAccessInput;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
/**
* A {@link DataInput} implementing {@link RandomAccessInput} and reading data from a
* list of {@link ByteBuffer}s.
*/
public final class ByteBuffersDataInput extends DataInput implements Accountable, RandomAccessInput {
private final ByteBuffer[] blocks;
private final int blockBits;
private final int blockMask;
private final long size;
private final long offset;
private long pos;
/**
* Read data from a set of contiguous buffers. All data buffers except for the last one
* must have an identical remaining number of bytes in the buffer (that is a power of two). The last
* buffer can be of an arbitrary remaining length.
*/
public ByteBuffersDataInput(List<ByteBuffer> buffers) {
ensureAssumptions(buffers);
this.blocks = buffers.stream().map(buf -> buf.asReadOnlyBuffer()).toArray(ByteBuffer[]::new);
if (blocks.length == 1) {
this.blockBits = 32;
this.blockMask = ~0;
} else {
final int blockBytes = determineBlockPage(buffers);
this.blockBits = Integer.numberOfTrailingZeros(blockBytes);
this.blockMask = (1 << blockBits) - 1;
}
this.size = Arrays.stream(blocks).mapToLong(block -> block.remaining()).sum();
// The initial "position" of this stream is shifted by the position of the first block.
this.offset = blocks[0].position();
this.pos = offset;
}
public long size() {
return size;
}
@Override
public long ramBytesUsed() {
// Return a rough estimation for allocated blocks. Note that we do not make
// any special distinction for what the type of buffer is (direct vs. heap-based).
return RamUsageEstimator.NUM_BYTES_OBJECT_REF * blocks.length +
Arrays.stream(blocks).mapToLong(buf -> buf.capacity()).sum();
}
@Override
public byte readByte() throws EOFException {
try {
ByteBuffer block = blocks[blockIndex(pos)];
byte v = block.get(blockOffset(pos));
pos++;
return v;
} catch (IndexOutOfBoundsException e) {
if (pos >= size()) {
throw new EOFException();
} else {
throw e; // Something is wrong.
}
}
}
/**
* Reads exactly {@code len} bytes into the given buffer. The buffer must have
* enough remaining limit.
*
* If there are fewer than {@code len} bytes in the input, {@link EOFException}
* is thrown.
*/
public void readBytes(ByteBuffer buffer, int len) throws EOFException {
try {
while (len > 0) {
ByteBuffer block = blocks[blockIndex(pos)].duplicate();
int blockOffset = blockOffset(pos);
block.position(blockOffset);
int chunk = Math.min(len, block.remaining());
if (chunk == 0) {
throw new EOFException();
}
// Update pos early on for EOF detection on output buffer, then try to get buffer content.
pos += chunk;
block.limit(blockOffset + chunk);
buffer.put(block);
len -= chunk;
}
} catch (BufferUnderflowException | ArrayIndexOutOfBoundsException e) {
if (pos >= size()) {
throw new EOFException();
} else {
throw e; // Something is wrong.
}
}
}
@Override
public void readBytes(byte[] arr, int off, int len) throws EOFException {
try {
while (len > 0) {
ByteBuffer block = blocks[blockIndex(pos)].duplicate();
block.position(blockOffset(pos));
int chunk = Math.min(len, block.remaining());
if (chunk == 0) {
throw new EOFException();
}
// Update pos early on for EOF detection, then try to get buffer content.
pos += chunk;
block.get(arr, off, chunk);
len -= chunk;
off += chunk;
}
} catch (BufferUnderflowException | ArrayIndexOutOfBoundsException e) {
if (pos >= size()) {
throw new EOFException();
} else {
throw e; // Something is wrong.
}
}
}
@Override
public byte readByte(long pos) {
pos += offset;
return blocks[blockIndex(pos)].get(blockOffset(pos));
}
@Override
public short readShort(long pos) {
long absPos = offset + pos;
int blockOffset = blockOffset(absPos);
if (blockOffset + Short.BYTES <= blockMask) {
return blocks[blockIndex(absPos)].getShort(blockOffset);
} else {
return (short) ((readByte(pos ) & 0xFF) << 8 |
(readByte(pos + 1) & 0xFF));
}
}
@Override
public int readInt(long pos) {
long absPos = offset + pos;
int blockOffset = blockOffset(absPos);
if (blockOffset + Integer.BYTES <= blockMask) {
return blocks[blockIndex(absPos)].getInt(blockOffset);
} else {
return ((readByte(pos ) ) << 24 |
(readByte(pos + 1) & 0xFF) << 16 |
(readByte(pos + 2) & 0xFF) << 8 |
(readByte(pos + 3) & 0xFF));
}
}
@Override
public long readLong(long pos) {
long absPos = offset + pos;
int blockOffset = blockOffset(absPos);
if (blockOffset + Long.BYTES <= blockMask) {
return blocks[blockIndex(absPos)].getLong(blockOffset);
} else {
return (((long) readInt(pos)) << 32) | (readInt(pos + 4) & 0xFFFFFFFFL);
}
}
public long position() {
return pos - offset;
}
public void seek(long position) throws EOFException {
this.pos = position + offset;
if (position > size()) {
this.pos = size();
throw new EOFException();
}
}
public ByteBuffersDataInput slice(long offset, long length) {
if (offset < 0 || length < 0 || offset + length > this.size) {
throw new IllegalArgumentException(String.format(Locale.ROOT,
"slice(offset=%s, length=%s) is out of bounds: %s",
offset, length, this));
}
return new ByteBuffersDataInput(sliceBufferList(Arrays.asList(this.blocks), offset, length));
}
@Override
public String toString() {
return String.format(Locale.ROOT,
"%,d bytes, block size: %,d, blocks: %,d, position: %,d%s",
size(),
blockSize(),
blocks.length,
position(),
offset == 0 ? "" : String.format(Locale.ROOT, " [offset: %,d]", offset));
}
private final int blockIndex(long pos) {
return Math.toIntExact(pos >> blockBits);
}
private final int blockOffset(long pos) {
return (int) pos & blockMask;
}
private int blockSize() {
return 1 << blockBits;
}
private static final boolean isPowerOfTwo(int v) {
return (v & (v - 1)) == 0;
}
private static void ensureAssumptions(List<ByteBuffer> buffers) {
if (buffers.isEmpty()) {
throw new IllegalArgumentException("Buffer list must not be empty.");
}
if (buffers.size() == 1) {
// Special case of just a single buffer, conditions don't apply.
} else {
final int blockPage = determineBlockPage(buffers);
// First buffer decides on block page length.
if (!isPowerOfTwo(blockPage)) {
throw new IllegalArgumentException("The first buffer must have power-of-two position() + remaining(): 0x"
+ Integer.toHexString(blockPage));
}
// Any block from 2..last-1 should have the same page size.
for (int i = 1, last = buffers.size() - 1; i < last; i++) {
ByteBuffer buffer = buffers.get(i);
if (buffer.position() != 0) {
throw new IllegalArgumentException("All buffers except for the first one must have position() == 0: " + buffer);
}
if (i != last && buffer.remaining() != blockPage) {
throw new IllegalArgumentException("Intermediate buffers must share an identical remaining() power-of-two block size: 0x"
+ Integer.toHexString(blockPage));
}
}
}
}
static int determineBlockPage(List<ByteBuffer> buffers) {
ByteBuffer first = buffers.get(0);
final int blockPage = Math.toIntExact((long) first.position() + first.remaining());
return blockPage;
}
private static List<ByteBuffer> sliceBufferList(List<ByteBuffer> buffers, long offset, long length) {
ensureAssumptions(buffers);
if (buffers.size() == 1) {
ByteBuffer cloned = buffers.get(0).asReadOnlyBuffer();
cloned.position(Math.toIntExact(cloned.position() + offset));
cloned.limit(Math.toIntExact(length + cloned.position()));
return Arrays.asList(cloned);
} else {
long absStart = buffers.get(0).position() + offset;
long absEnd = Math.toIntExact(absStart + length);
int blockBytes = ByteBuffersDataInput.determineBlockPage(buffers);
int blockBits = Integer.numberOfTrailingZeros(blockBytes);
int blockMask = (1 << blockBits) - 1;
int endOffset = (int) absEnd & blockMask;
ArrayList<ByteBuffer> cloned =
buffers.subList(Math.toIntExact(absStart / blockBytes),
Math.toIntExact(absEnd / blockBytes + (endOffset == 0 ? 0 : 1)))
.stream()
.map(buf -> buf.asReadOnlyBuffer())
.collect(Collectors.toCollection(ArrayList::new));
if (endOffset == 0) {
cloned.add(ByteBuffer.allocate(0));
}
cloned.get(0).position((int) absStart & blockMask);
cloned.get(cloned.size() - 1).limit(endOffset);
return cloned;
}
}
}

View File

@ -0,0 +1,541 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.UnicodeUtil;
/**
* A {@link DataOutput} storing data in a list of {@link ByteBuffer}s.
*/
public final class ByteBuffersDataOutput extends DataOutput implements Accountable {
private final static ByteBuffer EMPTY = ByteBuffer.allocate(0);
private final static byte [] EMPTY_BYTE_ARRAY = {};
public final static IntFunction<ByteBuffer> ALLOCATE_BB_ON_HEAP = ByteBuffer::allocate;
/**
* A singleton instance of "no-reuse" buffer strategy.
*/
public final static Consumer<ByteBuffer> NO_REUSE = (bb) -> {
throw new RuntimeException("reset() is not allowed on this buffer.");
};
/**
* An implementation of a {@link ByteBuffer} allocation and recycling policy.
* The blocks are recycled if exactly the same size is requested, otherwise
* they're released to be GCed.
*/
public final static class ByteBufferRecycler {
private final ArrayDeque<ByteBuffer> reuse = new ArrayDeque<>();
private final IntFunction<ByteBuffer> delegate;
public ByteBufferRecycler(IntFunction<ByteBuffer> delegate) {
this.delegate = Objects.requireNonNull(delegate);
}
public ByteBuffer allocate(int size) {
while (!reuse.isEmpty()) {
ByteBuffer bb = reuse.removeFirst();
// If we don't have a buffer of exactly the requested size, discard it.
if (bb.remaining() == size) {
return bb;
}
}
return delegate.apply(size);
}
public void reuse(ByteBuffer buffer) {
buffer.rewind();
reuse.addLast(buffer);
}
}
public final static int DEFAULT_MIN_BITS_PER_BLOCK = 10; // 1024 B
public final static int DEFAULT_MAX_BITS_PER_BLOCK = 26; // 64 MB
/**
* Maximum number of blocks at the current {@link #blockBits} block size
* before we increase the block size (and thus decrease the number of blocks).
*/
final static int MAX_BLOCKS_BEFORE_BLOCK_EXPANSION = 100;
/**
* Maximum block size: {@code 2^bits}.
*/
private final int maxBitsPerBlock;
/**
* {@link ByteBuffer} supplier.
*/
private final IntFunction<ByteBuffer> blockAllocate;
/**
* {@link ByteBuffer} recycler on {@link #reset}.
*/
private final Consumer<ByteBuffer> blockReuse;
/**
* Current block size: {@code 2^bits}.
*/
private int blockBits;
/**
* Blocks storing data.
*/
private final ArrayDeque<ByteBuffer> blocks = new ArrayDeque<>();
/**
* The current-or-next write block.
*/
private ByteBuffer currentBlock = EMPTY;
public ByteBuffersDataOutput(long expectedSize) {
this(computeBlockSizeBitsFor(expectedSize), DEFAULT_MAX_BITS_PER_BLOCK, ALLOCATE_BB_ON_HEAP, NO_REUSE);
}
public ByteBuffersDataOutput() {
this(DEFAULT_MIN_BITS_PER_BLOCK, DEFAULT_MAX_BITS_PER_BLOCK, ALLOCATE_BB_ON_HEAP, NO_REUSE);
}
public ByteBuffersDataOutput(int minBitsPerBlock,
int maxBitsPerBlock,
IntFunction<ByteBuffer> blockAllocate,
Consumer<ByteBuffer> blockReuse) {
if (minBitsPerBlock < 10 ||
minBitsPerBlock > maxBitsPerBlock ||
maxBitsPerBlock > 31) {
throw new IllegalArgumentException(String.format(Locale.ROOT,
"Invalid arguments: %s %s",
minBitsPerBlock,
maxBitsPerBlock));
}
this.maxBitsPerBlock = maxBitsPerBlock;
this.blockBits = minBitsPerBlock;
this.blockAllocate = Objects.requireNonNull(blockAllocate, "Block allocator must not be null.");
this.blockReuse = Objects.requireNonNull(blockReuse, "Block reuse must not be null.");
}
@Override
public void writeByte(byte b) {
if (!currentBlock.hasRemaining()) {
appendBlock();
}
currentBlock.put(b);
}
@Override
public void writeBytes(byte[] src, int offset, int length) {
assert length >= 0;
while (length > 0) {
if (!currentBlock.hasRemaining()) {
appendBlock();
}
int chunk = Math.min(currentBlock.remaining(), length);
currentBlock.put(src, offset, chunk);
length -= chunk;
offset += chunk;
}
}
@Override
public void writeBytes(byte[] b, int length) {
writeBytes(b, 0, length);
}
public void writeBytes(byte[] b) {
writeBytes(b, 0, b.length);
}
public void writeBytes(ByteBuffer buffer) {
buffer = buffer.duplicate();
int length = buffer.remaining();
while (length > 0) {
if (!currentBlock.hasRemaining()) {
appendBlock();
}
int chunk = Math.min(currentBlock.remaining(), length);
buffer.limit(buffer.position() + chunk);
currentBlock.put(buffer);
length -= chunk;
}
}
/**
* Return a list of read-only view of {@link ByteBuffer} blocks over the
* current content written to the output.
*/
public ArrayList<ByteBuffer> toBufferList() {
ArrayList<ByteBuffer> result = new ArrayList<>(Math.max(blocks.size(), 1));
if (blocks.isEmpty()) {
result.add(EMPTY);
} else {
for (ByteBuffer bb : blocks) {
bb = (ByteBuffer) bb.asReadOnlyBuffer().flip(); // cast for jdk8 (covariant in jdk9+)
result.add(bb);
}
}
return result;
}
/**
* Returns a list of writeable blocks over the (source) content buffers.
*
* This method returns the raw content of source buffers that may change over the lifetime
* of this object (blocks can be recycled or discarded, for example). Most applications
* should favor calling {@link #toBufferList()} which returns a read-only <i>view</i> over
* the content of the source buffers.
*
* The difference between {@link #toBufferList()} and {@link #toWriteableBufferList()} is that
* read-only view of source buffers will always return {@code false} from {@link ByteBuffer#hasArray()}
* (which sometimes may be required to avoid double copying).
*/
public ArrayList<ByteBuffer> toWriteableBufferList() {
ArrayList<ByteBuffer> result = new ArrayList<>(Math.max(blocks.size(), 1));
if (blocks.isEmpty()) {
result.add(EMPTY);
} else {
for (ByteBuffer bb : blocks) {
bb = (ByteBuffer) bb.duplicate().flip(); // cast for jdk8 (covariant in jdk9+)
result.add(bb);
}
}
return result;
}
/**
* Return a {@link ByteBuffersDataInput} for the set of current buffers ({@link #toBufferList()}).
*/
public ByteBuffersDataInput toDataInput() {
return new ByteBuffersDataInput(toBufferList());
}
/**
* Return a contiguous array with the current content written to the output. The returned
* array is always a copy (can be mutated).
*/
public byte[] toArrayCopy() {
if (blocks.size() == 0) {
return EMPTY_BYTE_ARRAY;
}
// We could try to detect single-block, array-based ByteBuffer here
// and use Arrays.copyOfRange, but I don't think it's worth the extra
// instance checks.
byte [] arr = new byte[Math.toIntExact(size())];
int offset = 0;
for (ByteBuffer bb : toBufferList()) {
int len = bb.remaining();
bb.get(arr, offset, len);
offset += len;
}
return arr;
}
/**
* Copy the current content of this object into another {@link DataOutput}.
*/
public void copyTo(DataOutput output) throws IOException {
for (ByteBuffer bb : toBufferList()) {
if (bb.hasArray()) {
output.writeBytes(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
} else {
output.copyBytes(new ByteBuffersDataInput(Arrays.asList(bb)), bb.remaining());
}
}
}
/**
* @return The number of bytes written to this output so far.
*/
public long size() {
long size = 0;
int blockCount = blocks.size();
if (blockCount >= 1) {
int fullBlockSize = (blockCount - 1) * blockSize();
int lastBlockSize = blocks.getLast().position();
size = fullBlockSize + lastBlockSize;
}
return size;
}
@Override
public String toString() {
return String.format(Locale.ROOT,
"%,d bytes, block size: %,d, blocks: %,d",
size(),
blockSize(),
blocks.size());
}
// Specialized versions of writeXXX methods that break execution into
// fast/ slow path if the result would fall on the current block's
// boundary.
//
// We also remove the IOException from methods because it (theoretically)
// cannot be thrown from byte buffers.
@Override
public void writeShort(short v) {
try {
if (currentBlock.remaining() >= Short.BYTES) {
currentBlock.putShort(v);
} else {
super.writeShort(v);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public void writeInt(int v) {
try {
if (currentBlock.remaining() >= Integer.BYTES) {
currentBlock.putInt(v);
} else {
super.writeInt(v);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public void writeLong(long v) {
try {
if (currentBlock.remaining() >= Long.BYTES) {
currentBlock.putLong(v);
} else {
super.writeLong(v);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public void writeString(String v) {
try {
final int MAX_CHARS_PER_WINDOW = 1024;
if (v.length() <= MAX_CHARS_PER_WINDOW) {
final BytesRef utf8 = new BytesRef(v);
writeVInt(utf8.length);
writeBytes(utf8.bytes, utf8.offset, utf8.length);
} else {
writeVInt(UnicodeUtil.calcUTF16toUTF8Length(v, 0, v.length()));
final byte [] buf = new byte [UnicodeUtil.MAX_UTF8_BYTES_PER_CHAR * MAX_CHARS_PER_WINDOW];
UTF16toUTF8(v, 0, v.length(), buf, (len) -> {
writeBytes(buf, 0, len);
});
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public void writeMapOfStrings(Map<String, String> map) {
try {
super.writeMapOfStrings(map);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public void writeSetOfStrings(Set<String> set) {
try {
super.writeSetOfStrings(set);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public long ramBytesUsed() {
// Return a rough estimation for allocated blocks. Note that we do not make
// any special distinction for direct memory buffers.
return RamUsageEstimator.NUM_BYTES_OBJECT_REF * blocks.size() +
blocks.stream().mapToLong(buf -> buf.capacity()).sum();
}
/**
* This method resets this object to a clean (zero-size) state and
* publishes any currently allocated buffers for reuse to the reuse strategy
* provided in the constructor.
*
* Sharing byte buffers for reads and writes is dangerous and will very likely
* lead to hard-to-debug issues, use with great care.
*/
public void reset() {
blocks.stream().forEach(blockReuse);
blocks.clear();
currentBlock = EMPTY;
}
/**
* @return Returns a new {@link ByteBuffersDataOutput} with the {@link #reset()} capability.
*/
// TODO: perhaps we can move it out to an utility class (as a supplier of preconfigured instances?)
public static ByteBuffersDataOutput newResettableInstance() {
ByteBuffersDataOutput.ByteBufferRecycler reuser = new ByteBuffersDataOutput.ByteBufferRecycler(
ByteBuffersDataOutput.ALLOCATE_BB_ON_HEAP);
return new ByteBuffersDataOutput(
ByteBuffersDataOutput.DEFAULT_MIN_BITS_PER_BLOCK,
ByteBuffersDataOutput.DEFAULT_MAX_BITS_PER_BLOCK,
reuser::allocate,
reuser::reuse);
}
private int blockSize() {
return 1 << blockBits;
}
private void appendBlock() {
if (blocks.size() >= MAX_BLOCKS_BEFORE_BLOCK_EXPANSION && blockBits < maxBitsPerBlock) {
rewriteToBlockSize(blockBits + 1);
if (blocks.getLast().hasRemaining()) {
return;
}
}
final int requiredBlockSize = 1 << blockBits;
currentBlock = blockAllocate.apply(requiredBlockSize);
assert currentBlock.capacity() == requiredBlockSize;
blocks.add(currentBlock);
}
private void rewriteToBlockSize(int targetBlockBits) {
assert targetBlockBits <= maxBitsPerBlock;
// We copy over data blocks to an output with one-larger block bit size.
// We also discard references to blocks as we're copying to allow GC to
// clean up partial results in case of memory pressure.
ByteBuffersDataOutput cloned = new ByteBuffersDataOutput(targetBlockBits, targetBlockBits, blockAllocate, NO_REUSE);
ByteBuffer block;
while ((block = blocks.pollFirst()) != null) {
block.flip();
cloned.writeBytes(block);
if (blockReuse != NO_REUSE) {
blockReuse.accept(block);
}
}
assert blocks.isEmpty();
this.blockBits = targetBlockBits;
blocks.addAll(cloned.blocks);
}
private static int computeBlockSizeBitsFor(long bytes) {
long powerOfTwo = BitUtil.nextHighestPowerOfTwo(bytes / MAX_BLOCKS_BEFORE_BLOCK_EXPANSION);
if (powerOfTwo == 0) {
return DEFAULT_MIN_BITS_PER_BLOCK;
}
int blockBits = Long.numberOfTrailingZeros(powerOfTwo);
blockBits = Math.min(blockBits, DEFAULT_MAX_BITS_PER_BLOCK);
blockBits = Math.max(blockBits, DEFAULT_MIN_BITS_PER_BLOCK);
return blockBits;
}
// TODO: move this block-based conversion to UnicodeUtil.
private static final long HALF_SHIFT = 10;
private static final int SURROGATE_OFFSET =
Character.MIN_SUPPLEMENTARY_CODE_POINT -
(UnicodeUtil.UNI_SUR_HIGH_START << HALF_SHIFT) - UnicodeUtil.UNI_SUR_LOW_START;
/**
* A consumer-based UTF16-UTF8 encoder (writes the input string in smaller buffers.).
*/
private static int UTF16toUTF8(final CharSequence s,
final int offset,
final int length,
byte[] buf,
IntConsumer bufferFlusher) {
int utf8Len = 0;
int j = 0;
for (int i = offset, end = offset + length; i < end; i++) {
final int chr = (int) s.charAt(i);
if (j + 4 >= buf.length) {
bufferFlusher.accept(j);
utf8Len += j;
j = 0;
}
if (chr < 0x80)
buf[j++] = (byte) chr;
else if (chr < 0x800) {
buf[j++] = (byte) (0xC0 | (chr >> 6));
buf[j++] = (byte) (0x80 | (chr & 0x3F));
} else if (chr < 0xD800 || chr > 0xDFFF) {
buf[j++] = (byte) (0xE0 | (chr >> 12));
buf[j++] = (byte) (0x80 | ((chr >> 6) & 0x3F));
buf[j++] = (byte) (0x80 | (chr & 0x3F));
} else {
// A surrogate pair. Confirm valid high surrogate.
if (chr < 0xDC00 && (i < end - 1)) {
int utf32 = (int) s.charAt(i + 1);
// Confirm valid low surrogate and write pair.
if (utf32 >= 0xDC00 && utf32 <= 0xDFFF) {
utf32 = (chr << 10) + utf32 + SURROGATE_OFFSET;
i++;
buf[j++] = (byte) (0xF0 | (utf32 >> 18));
buf[j++] = (byte) (0x80 | ((utf32 >> 12) & 0x3F));
buf[j++] = (byte) (0x80 | ((utf32 >> 6) & 0x3F));
buf[j++] = (byte) (0x80 | (utf32 & 0x3F));
continue;
}
}
// Replace unpaired surrogate or out-of-order low surrogate
// with substitution character.
buf[j++] = (byte) 0xEF;
buf[j++] = (byte) 0xBF;
buf[j++] = (byte) 0xBD;
}
}
bufferFlusher.accept(j);
utf8Len += j;
return utf8Len;
}
}

View File

@ -0,0 +1,275 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.AccessDeniedException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.zip.CRC32;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.BitUtil;
/**
* A {@link ByteBuffer}-based {@link Directory} implementation that
* can be used to store index files on the heap.
*
* <p>Important: Note that {@link MMapDirectory} is nearly always a better choice as
* it uses OS caches more effectively (through memory-mapped buffers).
* A heap-based directory like this one can have the advantage in case of ephemeral, small,
* short-lived indexes when disk syncs provide an additional overhead.</p>
*
* @lucene.experimental
*/
public final class ByteBuffersDirectory extends BaseDirectory {
public static final BiFunction<String, ByteBuffersDataOutput, IndexInput> OUTPUT_AS_MANY_BUFFERS =
(fileName, output) -> {
ByteBuffersDataInput dataInput = output.toDataInput();
String inputName = String.format(Locale.ROOT, "%s (file=%s, buffers=%s)",
ByteBuffersIndexInput.class.getSimpleName(),
fileName,
dataInput.toString());
return new ByteBuffersIndexInput(dataInput, inputName);
};
public static final BiFunction<String, ByteBuffersDataOutput, IndexInput> OUTPUT_AS_ONE_BUFFER =
(fileName, output) -> {
ByteBuffersDataInput dataInput = new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(output.toArrayCopy())));
String inputName = String.format(Locale.ROOT, "%s (file=%s, buffers=%s)",
ByteBuffersIndexInput.class.getSimpleName(),
fileName,
dataInput.toString());
return new ByteBuffersIndexInput(dataInput, inputName);
};
public static final BiFunction<String, ByteBuffersDataOutput, IndexInput> OUTPUT_AS_BYTE_ARRAY =
(fileName, output) -> {
byte[] array = output.toArrayCopy();
String inputName = String.format(Locale.ROOT, "%s (file=%s, length=%s)",
ByteArrayIndexInput.class.getSimpleName(),
fileName,
array.length);
return new ByteArrayIndexInput(inputName, array, 0, array.length);
};
public static final BiFunction<String, ByteBuffersDataOutput, IndexInput> OUTPUT_AS_MANY_BUFFERS_LUCENE =
(fileName, output) -> {
List<ByteBuffer> bufferList = output.toBufferList();
int chunkSizePower;
bufferList.add(ByteBuffer.allocate(0));
int blockSize = ByteBuffersDataInput.determineBlockPage(bufferList);
if (blockSize == 0) {
chunkSizePower = 30;
} else {
chunkSizePower = Integer.numberOfTrailingZeros(BitUtil.nextHighestPowerOfTwo(blockSize));
}
String inputName = String.format(Locale.ROOT, "%s (file=%s)",
ByteBuffersDirectory.class.getSimpleName(),
fileName);
ByteBufferGuard guard = new ByteBufferGuard("none", (String resourceDescription, ByteBuffer b) -> {});
return ByteBufferIndexInput.newInstance(inputName,
bufferList.toArray(new ByteBuffer [bufferList.size()]),
output.size(), chunkSizePower, guard);
};
private final Function<String, String> tempFileName = new Function<String, String>() {
private final AtomicLong counter = new AtomicLong();
@Override
public String apply(String suffix) {
return suffix + "_" + Long.toString(counter.getAndIncrement(), Character.MAX_RADIX);
}
};
private final ConcurrentHashMap<String, FileEntry> files = new ConcurrentHashMap<>();
/**
* Conversion between a buffered index output and the corresponding index input
* for a given file.
*/
private final BiFunction<String, ByteBuffersDataOutput, IndexInput> outputToInput;
/**
* A supplier of {@link ByteBuffersDataOutput} instances used to buffer up
* the content of written files.
*/
private final Supplier<ByteBuffersDataOutput> bbOutputSupplier;
public ByteBuffersDirectory() {
this(new SingleInstanceLockFactory());
}
public ByteBuffersDirectory(LockFactory lockFactory) {
this(lockFactory, ByteBuffersDataOutput::new, OUTPUT_AS_MANY_BUFFERS);
}
public ByteBuffersDirectory(LockFactory factory,
Supplier<ByteBuffersDataOutput> bbOutputSupplier,
BiFunction<String, ByteBuffersDataOutput, IndexInput> outputToInput) {
super(factory);
this.outputToInput = Objects.requireNonNull(outputToInput);
this.bbOutputSupplier = Objects.requireNonNull(bbOutputSupplier);
}
@Override
public String[] listAll() throws IOException {
ensureOpen();
return files.keySet().stream().sorted().toArray(String[]::new);
}
@Override
public void deleteFile(String name) throws IOException {
ensureOpen();
FileEntry removed = files.remove(name);
if (removed == null) {
throw new FileNotFoundException(name);
}
}
@Override
public long fileLength(String name) throws IOException {
ensureOpen();
FileEntry file = files.get(name);
if (file == null) {
throw new FileNotFoundException(name);
}
return file.length();
}
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen();
FileEntry e = new FileEntry(name);
if (files.putIfAbsent(name, e) != null) {
throw new FileAlreadyExistsException("File already exists: " + name);
}
return e.createOutput(outputToInput);
}
@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
ensureOpen();
while (true) {
String name = IndexFileNames.segmentFileName(prefix, tempFileName.apply(suffix), "tmp");
FileEntry e = new FileEntry(name);
if (files.putIfAbsent(name, e) == null) {
return e.createOutput(outputToInput);
}
}
}
@Override
public void rename(String source, String dest) throws IOException {
ensureOpen();
FileEntry file = files.get(source);
if (file == null) {
throw new FileNotFoundException(source);
}
if (files.putIfAbsent(dest, file) != null) {
throw new FileAlreadyExistsException(dest);
}
if (!files.remove(source, file)) {
throw new IllegalStateException("File was unexpectedly replaced: " + source);
}
files.remove(source);
}
@Override
public void sync(Collection<String> names) throws IOException {
ensureOpen();
}
@Override
public void syncMetaData() throws IOException {
ensureOpen();
}
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
FileEntry e = files.get(name);
if (e == null) {
throw new NoSuchFileException(name);
} else {
return e.openInput();
}
}
@Override
public void close() throws IOException {
isOpen = false;
files.clear();
}
private final class FileEntry {
private final String fileName;
private volatile IndexInput content;
private volatile long cachedLength;
public FileEntry(String name) {
this.fileName = name;
}
public long length() {
// We return 0 length until the IndexOutput is closed and flushed.
return cachedLength;
}
public IndexInput openInput() throws IOException {
IndexInput local = this.content;
if (local == null) {
throw new AccessDeniedException("Can't open a file still open for writing: " + fileName);
}
return local.clone();
}
final IndexOutput createOutput(BiFunction<String, ByteBuffersDataOutput, IndexInput> outputToInput) throws IOException {
if (content != null) {
throw new IOException("Can only write to a file once: " + fileName);
}
String clazzName = ByteBuffersDirectory.class.getSimpleName();
String outputName = String.format(Locale.ROOT, "%s output (file=%s)", clazzName, fileName);
return new ByteBuffersIndexOutput(
bbOutputSupplier.get(), outputName, fileName,
new CRC32(),
(output) -> {
content = outputToInput.apply(fileName, output);
cachedLength = output.size();
});
}
}
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RandomAccessInput;
/**
* An {@link IndexInput} implementing {@link RandomAccessInput} and backed
* by a {@link ByteBuffersDataInput}.
*/
public final class ByteBuffersIndexInput extends IndexInput implements RandomAccessInput {
private ByteBuffersDataInput in;
public ByteBuffersIndexInput(ByteBuffersDataInput in, String resourceDescription) {
super(resourceDescription);
this.in = in;
}
@Override
public void close() throws IOException {
in = null;
}
@Override
public long getFilePointer() {
ensureOpen();
return in.position();
}
@Override
public void seek(long pos) throws IOException {
ensureOpen();
in.seek(pos);
}
@Override
public long length() {
ensureOpen();
return in.size();
}
@Override
public ByteBuffersIndexInput slice(String sliceDescription, long offset, long length) throws IOException {
ensureOpen();
return new ByteBuffersIndexInput(in.slice(offset, length),
"(sliced) offset=" + offset + ", length=" + length + " " + toString());
}
@Override
public byte readByte() throws IOException {
ensureOpen();
return in.readByte();
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
ensureOpen();
in.readBytes(b, offset, len);
}
@Override
public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException {
ensureOpen();
return slice("", offset, length);
}
@Override
public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
ensureOpen();
in.readBytes(b, offset, len, useBuffer);
}
@Override
public short readShort() throws IOException {
ensureOpen();
return in.readShort();
}
@Override
public int readInt() throws IOException {
ensureOpen();
return in.readInt();
}
@Override
public int readVInt() throws IOException {
ensureOpen();
return in.readVInt();
}
@Override
public int readZInt() throws IOException {
ensureOpen();
return in.readZInt();
}
@Override
public long readLong() throws IOException {
ensureOpen();
return in.readLong();
}
@Override
public long readVLong() throws IOException {
ensureOpen();
return in.readVLong();
}
@Override
public long readZLong() throws IOException {
ensureOpen();
return in.readZLong();
}
@Override
public String readString() throws IOException {
ensureOpen();
return in.readString();
}
@Override
public Map<String, String> readMapOfStrings() throws IOException {
ensureOpen();
return in.readMapOfStrings();
}
@Override
public Set<String> readSetOfStrings() throws IOException {
ensureOpen();
return in.readSetOfStrings();
}
@Override
public void skipBytes(long numBytes) throws IOException {
ensureOpen();
super.skipBytes(numBytes);
}
@Override
public byte readByte(long pos) throws IOException {
ensureOpen();
return in.readByte(pos);
}
@Override
public short readShort(long pos) throws IOException {
ensureOpen();
return in.readShort(pos);
}
@Override
public int readInt(long pos) throws IOException {
ensureOpen();
return in.readInt(pos);
}
@Override
public long readLong(long pos) throws IOException {
ensureOpen();
return in.readLong(pos);
}
@Override
public IndexInput clone() {
ensureOpen();
ByteBuffersIndexInput cloned = new ByteBuffersIndexInput(in.slice(0, in.size()), "(clone of) " + toString());
try {
cloned.seek(getFilePointer());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return cloned;
}
private void ensureOpen() {
if (in == null) {
throw new AlreadyClosedException("Already closed.");
}
}
}

View File

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
/**
* An {@link IndexOutput} writing to a {@link ByteBuffersDataOutput}.
*/
public final class ByteBuffersIndexOutput extends IndexOutput {
private final Consumer<ByteBuffersDataOutput> onClose;
private final Checksum checksum;
private long lastChecksumPosition;
private long lastChecksum;
private ByteBuffersDataOutput delegate;
public ByteBuffersIndexOutput(ByteBuffersDataOutput delegate, String resourceDescription, String name) {
this(delegate, resourceDescription, name, new CRC32(), null);
}
public ByteBuffersIndexOutput(ByteBuffersDataOutput delegate, String resourceDescription, String name,
Checksum checksum,
Consumer<ByteBuffersDataOutput> onClose) {
super(resourceDescription, name);
this.delegate = delegate;
this.checksum = checksum;
this.onClose = onClose;
}
@Override
public void close() throws IOException {
// No special effort to be thread-safe here since IndexOutputs are not required to be thread-safe.
ByteBuffersDataOutput local = delegate;
delegate = null;
if (local != null && onClose != null) {
onClose.accept(local);
}
}
@Override
public long getFilePointer() {
ensureOpen();
return delegate.size();
}
@Override
public long getChecksum() throws IOException {
ensureOpen();
if (checksum == null) {
throw new IOException("This index output has no checksum computing ability: " + toString());
}
// Compute checksum on the current content of the delegate.
//
// This way we can override more methods and pass them directly to the delegate for efficiency of writing,
// while allowing the checksum to be correctly computed on the current content of the output buffer (IndexOutput
// is per-thread, so no concurrent changes).
if (lastChecksumPosition != delegate.size()) {
lastChecksumPosition = delegate.size();
checksum.reset();
byte [] buffer = null;
for (ByteBuffer bb : delegate.toBufferList()) {
if (bb.hasArray()) {
checksum.update(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
} else {
if (buffer == null) buffer = new byte [1024 * 4];
bb = bb.asReadOnlyBuffer();
int remaining = bb.remaining();
while (remaining > 0) {
int len = Math.min(remaining, buffer.length);
bb.get(buffer, 0, len);
checksum.update(buffer, 0, len);
remaining -= len;
}
}
}
lastChecksum = checksum.getValue();
}
return lastChecksum;
}
@Override
public void writeByte(byte b) throws IOException {
ensureOpen();
delegate.writeByte(b);
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
ensureOpen();
delegate.writeBytes(b, offset, length);
}
@Override
public void writeBytes(byte[] b, int length) throws IOException {
ensureOpen();
delegate.writeBytes(b, length);
}
@Override
public void writeInt(int i) throws IOException {
ensureOpen();
delegate.writeInt(i);
}
@Override
public void writeShort(short i) throws IOException {
ensureOpen();
delegate.writeShort(i);
}
@Override
public void writeLong(long i) throws IOException {
ensureOpen();
delegate.writeLong(i);
}
@Override
public void writeString(String s) throws IOException {
ensureOpen();
delegate.writeString(s);
}
@Override
public void copyBytes(DataInput input, long numBytes) throws IOException {
ensureOpen();
delegate.copyBytes(input, numBytes);
}
@Override
public void writeMapOfStrings(Map<String, String> map) throws IOException {
ensureOpen();
delegate.writeMapOfStrings(map);
}
@Override
public void writeSetOfStrings(Set<String> set) throws IOException {
ensureOpen();
delegate.writeSetOfStrings(set);
}
private void ensureOpen() {
if (delegate == null) {
throw new AlreadyClosedException("Already closed.");
}
}
}

View File

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;
import static org.junit.Assert.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IOUtils.IOConsumer;
import org.junit.Test;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.Xoroshiro128PlusRandom;
import com.carrotsearch.randomizedtesting.generators.RandomBytes;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
public abstract class BaseDataOutputTestCase<T extends DataOutput> extends RandomizedTest {
protected abstract T newInstance();
protected abstract byte[] toBytes(T instance);
@FunctionalInterface
private interface ThrowingBiFunction<T, U, R> {
R apply(T t, U u) throws Exception;
}
@Test
public void testRandomizedWrites() throws IOException {
T dst = newInstance();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutput ref = new OutputStreamDataOutput(baos);
long seed = randomLong();
int max = 50_000;
addRandomData(dst, new Xoroshiro128PlusRandom(seed), max);
addRandomData(ref, new Xoroshiro128PlusRandom(seed), max);
assertArrayEquals(baos.toByteArray(), toBytes(dst));
}
protected static List<IOConsumer<DataInput>> addRandomData(DataOutput dst, Random rnd, int maxAddCalls) throws IOException {
try {
List<IOConsumer<DataInput>> reply = new ArrayList<>();
for (int i = 0; i < maxAddCalls; i++) {
reply.add(RandomPicks.randomFrom(rnd, GENERATORS).apply(dst, rnd));
}
return reply;
} catch (Exception e) {
throw new IOException(e);
}
}
private static List<ThrowingBiFunction<DataOutput, Random, IOConsumer<DataInput>>> GENERATORS;
static {
GENERATORS = new ArrayList<>();
// writeByte/ readByte
GENERATORS.add((dst, rnd) -> {
byte value = (byte) rnd.nextInt();
dst.writeByte(value);
return (src) -> assertEquals("readByte()", value, src.readByte());
});
// writeBytes/ readBytes (array and buffer version).
GENERATORS.add((dst, rnd) -> {
byte[] bytes = RandomBytes.randomBytesOfLengthBetween(rnd, 0, 100);
ByteBuffersDataOutput rdo = dst instanceof ByteBuffersDataOutput ? (ByteBuffersDataOutput) dst : null;
if (rnd.nextBoolean() && rdo != null) {
rdo.writeBytes(ByteBuffer.wrap(bytes));
} else {
dst.writeBytes(bytes, bytes.length);
}
boolean useBuffersForRead = rnd.nextBoolean();
return (src) -> {
byte [] read = new byte [bytes.length];
if (useBuffersForRead && src instanceof ByteBuffersDataInput) {
((ByteBuffersDataInput) src).readBytes(ByteBuffer.wrap(read), read.length);
assertArrayEquals("readBytes(ByteBuffer)", bytes, read);
} else {
src.readBytes(read, 0, read.length);
assertArrayEquals("readBytes(byte[])", bytes, read);
}
};
}
);
// writeBytes/ readBytes (array + offset).
GENERATORS.add((dst, rnd) -> {
byte[] bytes = RandomBytes.randomBytesOfLengthBetween(rnd, 0, 100);
int off = RandomNumbers.randomIntBetween(rnd, 0, bytes.length);
int len = RandomNumbers.randomIntBetween(rnd, 0, bytes.length - off);
dst.writeBytes(bytes, off, len);
return (src) -> {
byte [] read = new byte [bytes.length + off];
src.readBytes(read, off, len);
assertArrayEquals(
"readBytes(byte[], off)",
ArrayUtil.copyOfSubArray(bytes, off, len + off),
ArrayUtil.copyOfSubArray(read, off, len + off));
};
}
);
GENERATORS.add((dst, rnd) -> {
int v = rnd.nextInt();
dst.writeInt(v);
return (src) -> assertEquals("readInt()", v, src.readInt());
});
GENERATORS.add((dst, rnd) -> {
long v = rnd.nextLong();
dst.writeLong(v);
return (src) -> assertEquals("readLong()", v, src.readLong());
});
GENERATORS.add((dst, rnd) -> {
short v = (short) rnd.nextInt();
dst.writeShort(v);
return (src) -> assertEquals("readShort()", v, src.readShort());
});
GENERATORS.add((dst, rnd) -> {
int v = rnd.nextInt();
dst.writeVInt(v);
return (src) -> assertEquals("readVInt()", v, src.readVInt());
});
GENERATORS.add((dst, rnd) -> {
int v = rnd.nextInt();
dst.writeZInt(v);
return (src) -> assertEquals("readZInt()", v, src.readZInt());
});
GENERATORS.add((dst, rnd) -> {
long v = rnd.nextLong() & (-1L >>> 1);
dst.writeVLong(v);
return (src) -> assertEquals("readVLong()", v, src.readVLong());
});
GENERATORS.add((dst, rnd) -> {
long v = rnd.nextLong();
dst.writeZLong(v);
return (src) -> assertEquals("readZLong()", v, src.readZLong());
});
GENERATORS.add((dst, rnd) -> {
String v;
if (rnd.nextInt(50) == 0) {
// Occasionally a large blob.
v = RandomStrings.randomUnicodeOfLength(rnd, RandomNumbers.randomIntBetween(rnd, 2048, 4096));
} else {
v = RandomStrings.randomUnicodeOfLength(rnd, RandomNumbers.randomIntBetween(rnd, 0, 10));
}
dst.writeString(v);
return (src) -> assertEquals("readString()", v, src.readString());
});
}
}

View File

@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;
import static org.junit.Assert.*;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IOUtils.IOConsumer;
import org.apache.lucene.util.LuceneTestCase;
import org.junit.Test;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.Xoroshiro128PlusRandom;
import com.carrotsearch.randomizedtesting.annotations.Timeout;
public final class TestByteBuffersDataInput extends RandomizedTest {
@Test
public void testSanity() throws IOException {
ByteBuffersDataOutput out = new ByteBuffersDataOutput();
ByteBuffersDataInput o1 = out.toDataInput();
assertEquals(0, o1.size());
LuceneTestCase.expectThrows(EOFException.class, () -> {
o1.readByte();
});
out.writeByte((byte) 1);
ByteBuffersDataInput o2 = out.toDataInput();
assertEquals(1, o2.size());
assertEquals(0, o2.position());
assertEquals(0, o1.size());
assertTrue(o2.ramBytesUsed() > 0);
assertEquals(1, o2.readByte());
assertEquals(1, o2.position());
assertEquals(1, o2.readByte(0));
LuceneTestCase.expectThrows(EOFException.class, () -> {
o2.readByte();
});
assertEquals(1, o2.position());
}
@Test
public void testRandomReads() throws Exception {
ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
long seed = randomLong();
int max = 1_000_000;
List<IOConsumer<DataInput>> reply =
TestByteBuffersDataOutput.addRandomData(dst, new Xoroshiro128PlusRandom(seed), max);
ByteBuffersDataInput src = dst.toDataInput();
for (IOConsumer<DataInput> c : reply) {
c.accept(src);
}
LuceneTestCase.expectThrows(EOFException.class, () -> {
src.readByte();
});
}
@Test
public void testRandomReadsOnSlices() throws Exception {
for (int reps = randomIntBetween(1, 200); --reps > 0;) {
ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
byte [] prefix = new byte [randomIntBetween(0, 1024 * 8)];
dst.writeBytes(prefix);
long seed = randomLong();
int max = 10_000;
List<IOConsumer<DataInput>> reply =
TestByteBuffersDataOutput.addRandomData(dst, new Xoroshiro128PlusRandom(seed), max);
byte [] suffix = new byte [randomIntBetween(0, 1024 * 8)];
dst.writeBytes(suffix);
ByteBuffersDataInput src = dst.toDataInput().slice(prefix.length, dst.size() - prefix.length - suffix.length);
assertEquals(0, src.position());
assertEquals(dst.size() - prefix.length - suffix.length, src.size());
for (IOConsumer<DataInput> c : reply) {
c.accept(src);
}
LuceneTestCase.expectThrows(EOFException.class, () -> {
src.readByte();
});
}
}
@Test
public void testSeekEmpty() throws Exception {
ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
ByteBuffersDataInput in = dst.toDataInput();
in.seek(0);
LuceneTestCase.expectThrows(EOFException.class, () -> {
in.seek(1);
});
in.seek(0);
LuceneTestCase.expectThrows(EOFException.class, () -> {
in.readByte();
});
}
@Test
public void testSeek() throws Exception {
for (int reps = randomIntBetween(1, 200); --reps > 0;) {
ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
byte [] prefix = {};
if (randomBoolean()) {
prefix = new byte [randomIntBetween(1, 1024 * 8)];
dst.writeBytes(prefix);
}
long seed = randomLong();
int max = 1000;
List<IOConsumer<DataInput>> reply =
TestByteBuffersDataOutput.addRandomData(dst, new Xoroshiro128PlusRandom(seed), max);
ByteBuffersDataInput in = dst.toDataInput().slice(prefix.length, dst.size() - prefix.length);
in.seek(0);
for (IOConsumer<DataInput> c : reply) {
c.accept(in);
}
in.seek(0);
for (IOConsumer<DataInput> c : reply) {
c.accept(in);
}
byte [] array = dst.toArrayCopy();
array = ArrayUtil.copyOfSubArray(array, prefix.length, array.length);
for (int i = 0; i < 1000; i++) {
int offs = randomIntBetween(0, array.length - 1);
in.seek(offs);
assertEquals(offs, in.position());
assertEquals(array[offs], in.readByte());
}
in.seek(in.size());
assertEquals(in.size(), in.position());
LuceneTestCase.expectThrows(EOFException.class, () -> {
in.readByte();
});
}
}
@Test
public void testSlicingWindow() throws Exception {
ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
assertEquals(0, dst.toDataInput().slice(0, 0).size());;
dst.writeBytes(randomBytesOfLength(1024 * 8));
ByteBuffersDataInput in = dst.toDataInput();
for (int offset = 0, max = (int) dst.size(); offset < max; offset++) {
assertEquals(0, in.slice(offset, 0).size());
assertEquals(1, in.slice(offset, 1).size());
int window = Math.min(max - offset, 1024);
assertEquals(window, in.slice(offset, window).size());
}
assertEquals(0, in.slice((int) dst.size(), 0).size());
}
@Test
@Timeout(millis = 5000)
public void testEofOnArrayReadPastBufferSize() throws Exception {
ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
dst.writeBytes(new byte [10]);
LuceneTestCase.expectThrows(EOFException.class, () -> {
ByteBuffersDataInput in = dst.toDataInput();
in.readBytes(new byte [100], 0, 100);
});
LuceneTestCase.expectThrows(EOFException.class, () -> {
ByteBuffersDataInput in = dst.toDataInput();
in.readBytes(ByteBuffer.allocate(100), 100);
});
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;
import static org.junit.Assert.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.ArrayUtil;
import org.junit.Assert;
import org.junit.Test;
public final class TestByteBuffersDataOutput extends BaseDataOutputTestCase<ByteBuffersDataOutput> {
@Override
protected ByteBuffersDataOutput newInstance() {
return new ByteBuffersDataOutput();
}
@Override
protected byte[] toBytes(ByteBuffersDataOutput instance) {
return instance.toArrayCopy();
}
@Test
public void testReuse() throws IOException {
AtomicInteger allocations = new AtomicInteger(0);
ByteBuffersDataOutput.ByteBufferRecycler reuser = new ByteBuffersDataOutput.ByteBufferRecycler(
(size) -> {
allocations.incrementAndGet();
return ByteBuffer.allocate(size);
});
ByteBuffersDataOutput o = new ByteBuffersDataOutput(
ByteBuffersDataOutput.DEFAULT_MIN_BITS_PER_BLOCK,
ByteBuffersDataOutput.DEFAULT_MAX_BITS_PER_BLOCK,
reuser::allocate,
reuser::reuse);
// Add some random data first.
long genSeed = randomLong();
int addCount = randomIntBetween(1000, 5000);
addRandomData(o, new Random(genSeed), addCount);
byte[] data = o.toArrayCopy();
// Use the same sequence over reused instance.
final int expectedAllocationCount = allocations.get();
o.reset();
addRandomData(o, new Random(genSeed), addCount);
assertEquals(expectedAllocationCount, allocations.get());
assertArrayEquals(data, o.toArrayCopy());
}
@Test
public void testConstructorWithExpectedSize() {
{
ByteBuffersDataOutput o = new ByteBuffersDataOutput(0);
o.writeByte((byte) 0);
assertEquals(1 << ByteBuffersDataOutput.DEFAULT_MIN_BITS_PER_BLOCK, o.toBufferList().get(0).capacity());
}
{
long MB = 1024 * 1024;
long expectedSize = randomLongBetween(MB, MB * 1024);
ByteBuffersDataOutput o = new ByteBuffersDataOutput(expectedSize);
o.writeByte((byte) 0);
int cap = o.toBufferList().get(0).capacity();
assertTrue((cap >> 1) * ByteBuffersDataOutput.MAX_BLOCKS_BEFORE_BLOCK_EXPANSION < expectedSize);
assertTrue("cap=" + cap + ", exp=" + expectedSize,
(cap) * ByteBuffersDataOutput.MAX_BLOCKS_BEFORE_BLOCK_EXPANSION >= expectedSize);
}
}
@Test
public void testSanity() {
ByteBuffersDataOutput o = newInstance();
assertEquals(0, o.size());
assertEquals(0, o.toArrayCopy().length);
assertEquals(0, o.ramBytesUsed());
o.writeByte((byte) 1);
assertEquals(1, o.size());
assertTrue(o.ramBytesUsed() > 0);
assertArrayEquals(new byte [] { 1 }, o.toArrayCopy());
o.writeBytes(new byte [] {2, 3, 4}, 3);
assertEquals(4, o.size());
assertArrayEquals(new byte [] { 1, 2, 3, 4 }, o.toArrayCopy());
}
@Test
public void testWriteByteBuffer() {
ByteBuffersDataOutput o = new ByteBuffersDataOutput();
byte[] bytes = randomBytesOfLength(1024 * 8 + 10);
ByteBuffer src = ByteBuffer.wrap(bytes);
int offset = randomIntBetween(0, 100);
int len = bytes.length - offset;
src.position(offset);
src.limit(offset + len);
o.writeBytes(src);
assertEquals(len, o.size());
Assert.assertArrayEquals(ArrayUtil.copyOfSubArray(bytes, offset, offset + len), o.toArrayCopy());
}
@Test
public void testLargeArrayAdd() {
ByteBuffersDataOutput o = new ByteBuffersDataOutput();
int MB = 1024 * 1024;
byte [] bytes = randomBytesOfLength(5 * MB, 15 * MB);
int offset = randomIntBetween(0, 100);
int len = bytes.length - offset;
o.writeBytes(bytes, offset, len);
assertEquals(len, o.size());
Assert.assertArrayEquals(ArrayUtil.copyOfSubArray(bytes, offset, offset + len), o.toArrayCopy());
}
@Test
public void testToBufferListReturnsReadOnlyBuffers() throws Exception {
ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
dst.writeBytes(new byte [100]);
for (ByteBuffer bb : dst.toBufferList()) {
assertTrue(bb.isReadOnly());
}
}
@Test
public void testToWriteableBufferListReturnsOriginalBuffers() throws Exception {
ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
for (ByteBuffer bb : dst.toWriteableBufferList()) {
assertTrue(!bb.isReadOnly());
assertTrue(bb.hasArray()); // even the empty buffer should have a backing array.
}
dst.writeBytes(new byte [100]);
for (ByteBuffer bb : dst.toWriteableBufferList()) {
assertTrue(!bb.isReadOnly());
assertTrue(bb.hasArray()); // heap-based by default, so array should be there.
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.function.Supplier;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.util.English;
import org.junit.Test;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
public class TestByteBuffersDirectory extends BaseDirectoryTestCase {
private Supplier<ByteBuffersDirectory> implSupplier;
public TestByteBuffersDirectory(Supplier<ByteBuffersDirectory> implSupplier, String name) {
this.implSupplier = implSupplier;
}
@Override
protected Directory getDirectory(Path path) throws IOException {
return implSupplier.get();
}
@Test
public void testBuildIndex() throws IOException {
try (Directory dir = getDirectory(null);
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(
new MockAnalyzer(random())).setOpenMode(OpenMode.CREATE))) {
int docs = RandomizedTest.randomIntBetween(0, 10);
for (int i = docs; i > 0; i--) {
Document doc = new Document();
doc.add(newStringField("content", English.intToEnglish(i).trim(), Field.Store.YES));
writer.addDocument(doc);
}
writer.commit();
assertEquals(docs, writer.numDocs());
}
}
@ParametersFactory(argumentFormatting = "impl=%2$s")
public static Iterable<Object[]> parametersWithCustomName() {
return Arrays.asList(new Object [][] {
{(Supplier<ByteBuffersDirectory>) () -> new ByteBuffersDirectory(
new SingleInstanceLockFactory(),
ByteBuffersDataOutput::new,
ByteBuffersDirectory.OUTPUT_AS_MANY_BUFFERS), "many buffers (heap)"},
{(Supplier<ByteBuffersDirectory>) () -> new ByteBuffersDirectory(
new SingleInstanceLockFactory(),
ByteBuffersDataOutput::new,
ByteBuffersDirectory.OUTPUT_AS_ONE_BUFFER), "one buffer (heap)"},
{(Supplier<ByteBuffersDirectory>) () -> new ByteBuffersDirectory(
new SingleInstanceLockFactory(),
ByteBuffersDataOutput::new,
ByteBuffersDirectory.OUTPUT_AS_MANY_BUFFERS_LUCENE), "lucene's buffers (heap)"},
{(Supplier<ByteBuffersDirectory>) () -> new ByteBuffersDirectory(
new SingleInstanceLockFactory(),
ByteBuffersDataOutput::new,
ByteBuffersDirectory.OUTPUT_AS_BYTE_ARRAY), "byte array (heap)"},
});
}
}