From f762953aab5be90052ee8bbfe6cbc9f1535356a6 Mon Sep 17 00:00:00 2001 From: Dawid Weiss Date: Tue, 28 Aug 2018 15:02:17 +0200 Subject: [PATCH] LUCENE-8468: A ByteBuffer based Directory implementation (and associated classes). --- lucene/CHANGES.txt | 2 + .../lucene/store/ByteArrayIndexInput.java | 107 +++- .../lucene/store/ByteBuffersDataInput.java | 323 +++++++++++ .../lucene/store/ByteBuffersDataOutput.java | 541 ++++++++++++++++++ .../lucene/store/ByteBuffersDirectory.java | 275 +++++++++ .../lucene/store/ByteBuffersIndexInput.java | 200 +++++++ .../lucene/store/ByteBuffersIndexOutput.java | 171 ++++++ .../lucene/store/BaseDataOutputTestCase.java | 181 ++++++ .../store/TestByteBuffersDataInput.java | 206 +++++++ .../store/TestByteBuffersDataOutput.java | 157 +++++ .../store/TestByteBuffersDirectory.java | 86 +++ 11 files changed, 2219 insertions(+), 30 deletions(-) create mode 100644 lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java create mode 100644 lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataOutput.java create mode 100644 lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java create mode 100644 lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java create mode 100644 lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexOutput.java create mode 100644 lucene/core/src/test/org/apache/lucene/store/BaseDataOutputTestCase.java create mode 100644 lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java create mode 100644 lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataOutput.java create mode 100644 lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDirectory.java diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 2dec765355b..413dda56cc1 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -237,6 +237,8 @@ Changes in Runtime Behavior: Improvements +* LUCENE-8468: A ByteBuffer based Directory implementation. (Dawid Weiss) + * LUCENE-8447: Add DISJOINT and WITHIN support to LatLonShape queries. (Nick Knize) * LUCENE-8440: Add support for indexing and searching Line and Point shapes using LatLonShape encoding (Nick Knize) diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java index 6ad6125704e..9bf5ab2bf08 100644 --- a/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java +++ b/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java @@ -14,55 +14,56 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.lucene.store; +import java.io.EOFException; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Locale; -/** - * DataInput backed by a byte array. - * WARNING: This class omits all low-level checks. - * @lucene.experimental +/** + * A {@link IndexInput} backed by a byte array. + * + * @lucene.experimental */ -public final class ByteArrayIndexInput extends IndexInput { - +public final class ByteArrayIndexInput extends IndexInput implements RandomAccessInput { private byte[] bytes; + private final int offset; + private final int length; + private int pos; - private int limit; public ByteArrayIndexInput(String description, byte[] bytes) { + this(description, bytes, 0, bytes.length); + } + + public ByteArrayIndexInput(String description, byte[] bytes, int offs, int length) { super(description); + this.offset = offs; this.bytes = bytes; - this.limit = bytes.length; + this.length = length; + this.pos = offs; } public long getFilePointer() { - return pos; + return pos - offset; } - public void seek(long pos) { - this.pos = (int) pos; - } - - public void reset(byte[] bytes, int offset, int len) { - this.bytes = bytes; - pos = offset; - limit = offset + len; + public void seek(long pos) throws EOFException { + int newPos = Math.toIntExact(pos + offset); + try { + if (pos < 0 || pos > length) { + throw new EOFException(); + } + } finally { + this.pos = newPos; + } } @Override public long length() { - return limit; - } - - public boolean eof() { - return pos == limit; - } - - @Override - public void skipBytes(long count) { - pos += count; + return length; } @Override @@ -153,9 +154,55 @@ public final class ByteArrayIndexInput extends IndexInput { @Override public void close() { + bytes = null; } - public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { - throw new UnsupportedOperationException(); + @Override + public IndexInput clone() { + ByteArrayIndexInput slice = slice("(cloned)" + toString(), 0, length()); + try { + slice.seek(getFilePointer()); + } catch (EOFException e) { + throw new UncheckedIOException(e); + } + return slice; + } + + public ByteArrayIndexInput slice(String sliceDescription, long offset, long length) { + if (offset < 0 || length < 0 || offset + length > this.length) { + throw new IllegalArgumentException(String.format(Locale.ROOT, + "slice(offset=%s, length=%s) is out of bounds: %s", + offset, length, this)); + } + + return new ByteArrayIndexInput(sliceDescription, this.bytes, Math.toIntExact(this.offset + offset), + Math.toIntExact(length)); + } + + @Override + public byte readByte(long pos) throws IOException { + return bytes[Math.toIntExact(offset + pos)]; + } + + @Override + public short readShort(long pos) throws IOException { + int i = Math.toIntExact(offset + pos); + return (short) (((bytes[i] & 0xFF) << 8) | + (bytes[i + 1] & 0xFF)); + } + + @Override + public int readInt(long pos) throws IOException { + int i = Math.toIntExact(offset + pos); + return ((bytes[i] & 0xFF) << 24) | + ((bytes[i + 1] & 0xFF) << 16) | + ((bytes[i + 2] & 0xFF) << 8) | + (bytes[i + 3] & 0xFF); + } + + @Override + public long readLong(long pos) throws IOException { + return (((long) readInt(pos)) << 32) | + (readInt(pos + 4) & 0xFFFFFFFFL); } } diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java new file mode 100644 index 00000000000..e8418ed0146 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.store; + +import java.io.EOFException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; + +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.RandomAccessInput; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; + +/** + * A {@link DataInput} implementing {@link RandomAccessInput} and reading data from a + * list of {@link ByteBuffer}s. + */ +public final class ByteBuffersDataInput extends DataInput implements Accountable, RandomAccessInput { + private final ByteBuffer[] blocks; + private final int blockBits; + private final int blockMask; + private final long size; + private final long offset; + + private long pos; + + /** + * Read data from a set of contiguous buffers. All data buffers except for the last one + * must have an identical remaining number of bytes in the buffer (that is a power of two). The last + * buffer can be of an arbitrary remaining length. + */ + public ByteBuffersDataInput(List buffers) { + ensureAssumptions(buffers); + + this.blocks = buffers.stream().map(buf -> buf.asReadOnlyBuffer()).toArray(ByteBuffer[]::new); + + if (blocks.length == 1) { + this.blockBits = 32; + this.blockMask = ~0; + } else { + final int blockBytes = determineBlockPage(buffers); + this.blockBits = Integer.numberOfTrailingZeros(blockBytes); + this.blockMask = (1 << blockBits) - 1; + } + + this.size = Arrays.stream(blocks).mapToLong(block -> block.remaining()).sum(); + + // The initial "position" of this stream is shifted by the position of the first block. + this.offset = blocks[0].position(); + this.pos = offset; + } + + public long size() { + return size; + } + + @Override + public long ramBytesUsed() { + // Return a rough estimation for allocated blocks. Note that we do not make + // any special distinction for what the type of buffer is (direct vs. heap-based). + return RamUsageEstimator.NUM_BYTES_OBJECT_REF * blocks.length + + Arrays.stream(blocks).mapToLong(buf -> buf.capacity()).sum(); + } + + @Override + public byte readByte() throws EOFException { + try { + ByteBuffer block = blocks[blockIndex(pos)]; + byte v = block.get(blockOffset(pos)); + pos++; + return v; + } catch (IndexOutOfBoundsException e) { + if (pos >= size()) { + throw new EOFException(); + } else { + throw e; // Something is wrong. + } + } + } + + /** + * Reads exactly {@code len} bytes into the given buffer. The buffer must have + * enough remaining limit. + * + * If there are fewer than {@code len} bytes in the input, {@link EOFException} + * is thrown. + */ + public void readBytes(ByteBuffer buffer, int len) throws EOFException { + try { + while (len > 0) { + ByteBuffer block = blocks[blockIndex(pos)].duplicate(); + int blockOffset = blockOffset(pos); + block.position(blockOffset); + int chunk = Math.min(len, block.remaining()); + if (chunk == 0) { + throw new EOFException(); + } + + // Update pos early on for EOF detection on output buffer, then try to get buffer content. + pos += chunk; + block.limit(blockOffset + chunk); + buffer.put(block); + + len -= chunk; + } + } catch (BufferUnderflowException | ArrayIndexOutOfBoundsException e) { + if (pos >= size()) { + throw new EOFException(); + } else { + throw e; // Something is wrong. + } + } + } + + @Override + public void readBytes(byte[] arr, int off, int len) throws EOFException { + try { + while (len > 0) { + ByteBuffer block = blocks[blockIndex(pos)].duplicate(); + block.position(blockOffset(pos)); + int chunk = Math.min(len, block.remaining()); + if (chunk == 0) { + throw new EOFException(); + } + + // Update pos early on for EOF detection, then try to get buffer content. + pos += chunk; + block.get(arr, off, chunk); + + len -= chunk; + off += chunk; + } + } catch (BufferUnderflowException | ArrayIndexOutOfBoundsException e) { + if (pos >= size()) { + throw new EOFException(); + } else { + throw e; // Something is wrong. + } + } + } + + @Override + public byte readByte(long pos) { + pos += offset; + return blocks[blockIndex(pos)].get(blockOffset(pos)); + } + + @Override + public short readShort(long pos) { + long absPos = offset + pos; + int blockOffset = blockOffset(absPos); + if (blockOffset + Short.BYTES <= blockMask) { + return blocks[blockIndex(absPos)].getShort(blockOffset); + } else { + return (short) ((readByte(pos ) & 0xFF) << 8 | + (readByte(pos + 1) & 0xFF)); + } + } + + @Override + public int readInt(long pos) { + long absPos = offset + pos; + int blockOffset = blockOffset(absPos); + if (blockOffset + Integer.BYTES <= blockMask) { + return blocks[blockIndex(absPos)].getInt(blockOffset); + } else { + return ((readByte(pos ) ) << 24 | + (readByte(pos + 1) & 0xFF) << 16 | + (readByte(pos + 2) & 0xFF) << 8 | + (readByte(pos + 3) & 0xFF)); + } + } + + @Override + public long readLong(long pos) { + long absPos = offset + pos; + int blockOffset = blockOffset(absPos); + if (blockOffset + Long.BYTES <= blockMask) { + return blocks[blockIndex(absPos)].getLong(blockOffset); + } else { + return (((long) readInt(pos)) << 32) | (readInt(pos + 4) & 0xFFFFFFFFL); + } + } + + public long position() { + return pos - offset; + } + + public void seek(long position) throws EOFException { + this.pos = position + offset; + if (position > size()) { + this.pos = size(); + throw new EOFException(); + } + } + + public ByteBuffersDataInput slice(long offset, long length) { + if (offset < 0 || length < 0 || offset + length > this.size) { + throw new IllegalArgumentException(String.format(Locale.ROOT, + "slice(offset=%s, length=%s) is out of bounds: %s", + offset, length, this)); + } + + return new ByteBuffersDataInput(sliceBufferList(Arrays.asList(this.blocks), offset, length)); + } + + @Override + public String toString() { + return String.format(Locale.ROOT, + "%,d bytes, block size: %,d, blocks: %,d, position: %,d%s", + size(), + blockSize(), + blocks.length, + position(), + offset == 0 ? "" : String.format(Locale.ROOT, " [offset: %,d]", offset)); + } + + private final int blockIndex(long pos) { + return Math.toIntExact(pos >> blockBits); + } + + private final int blockOffset(long pos) { + return (int) pos & blockMask; + } + + private int blockSize() { + return 1 << blockBits; + } + + private static final boolean isPowerOfTwo(int v) { + return (v & (v - 1)) == 0; + } + + private static void ensureAssumptions(List buffers) { + if (buffers.isEmpty()) { + throw new IllegalArgumentException("Buffer list must not be empty."); + } + + if (buffers.size() == 1) { + // Special case of just a single buffer, conditions don't apply. + } else { + final int blockPage = determineBlockPage(buffers); + + // First buffer decides on block page length. + if (!isPowerOfTwo(blockPage)) { + throw new IllegalArgumentException("The first buffer must have power-of-two position() + remaining(): 0x" + + Integer.toHexString(blockPage)); + } + + // Any block from 2..last-1 should have the same page size. + for (int i = 1, last = buffers.size() - 1; i < last; i++) { + ByteBuffer buffer = buffers.get(i); + if (buffer.position() != 0) { + throw new IllegalArgumentException("All buffers except for the first one must have position() == 0: " + buffer); + } + if (i != last && buffer.remaining() != blockPage) { + throw new IllegalArgumentException("Intermediate buffers must share an identical remaining() power-of-two block size: 0x" + + Integer.toHexString(blockPage)); + } + } + } + } + + static int determineBlockPage(List buffers) { + ByteBuffer first = buffers.get(0); + final int blockPage = Math.toIntExact((long) first.position() + first.remaining()); + return blockPage; + } + + private static List sliceBufferList(List buffers, long offset, long length) { + ensureAssumptions(buffers); + + if (buffers.size() == 1) { + ByteBuffer cloned = buffers.get(0).asReadOnlyBuffer(); + cloned.position(Math.toIntExact(cloned.position() + offset)); + cloned.limit(Math.toIntExact(length + cloned.position())); + return Arrays.asList(cloned); + } else { + long absStart = buffers.get(0).position() + offset; + long absEnd = Math.toIntExact(absStart + length); + + int blockBytes = ByteBuffersDataInput.determineBlockPage(buffers); + int blockBits = Integer.numberOfTrailingZeros(blockBytes); + int blockMask = (1 << blockBits) - 1; + + int endOffset = (int) absEnd & blockMask; + + ArrayList cloned = + buffers.subList(Math.toIntExact(absStart / blockBytes), + Math.toIntExact(absEnd / blockBytes + (endOffset == 0 ? 0 : 1))) + .stream() + .map(buf -> buf.asReadOnlyBuffer()) + .collect(Collectors.toCollection(ArrayList::new)); + + if (endOffset == 0) { + cloned.add(ByteBuffer.allocate(0)); + } + + cloned.get(0).position((int) absStart & blockMask); + cloned.get(cloned.size() - 1).limit(endOffset); + return cloned; + } + } +} diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataOutput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataOutput.java new file mode 100644 index 00000000000..8840f213b53 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataOutput.java @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.store; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.IntConsumer; +import java.util.function.IntFunction; + +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.BitUtil; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.UnicodeUtil; + +/** + * A {@link DataOutput} storing data in a list of {@link ByteBuffer}s. + */ +public final class ByteBuffersDataOutput extends DataOutput implements Accountable { + private final static ByteBuffer EMPTY = ByteBuffer.allocate(0); + private final static byte [] EMPTY_BYTE_ARRAY = {}; + + public final static IntFunction ALLOCATE_BB_ON_HEAP = ByteBuffer::allocate; + + /** + * A singleton instance of "no-reuse" buffer strategy. + */ + public final static Consumer NO_REUSE = (bb) -> { + throw new RuntimeException("reset() is not allowed on this buffer."); + }; + + /** + * An implementation of a {@link ByteBuffer} allocation and recycling policy. + * The blocks are recycled if exactly the same size is requested, otherwise + * they're released to be GCed. + */ + public final static class ByteBufferRecycler { + private final ArrayDeque reuse = new ArrayDeque<>(); + private final IntFunction delegate; + + public ByteBufferRecycler(IntFunction delegate) { + this.delegate = Objects.requireNonNull(delegate); + } + + public ByteBuffer allocate(int size) { + while (!reuse.isEmpty()) { + ByteBuffer bb = reuse.removeFirst(); + // If we don't have a buffer of exactly the requested size, discard it. + if (bb.remaining() == size) { + return bb; + } + } + + return delegate.apply(size); + } + + public void reuse(ByteBuffer buffer) { + buffer.rewind(); + reuse.addLast(buffer); + } + } + + public final static int DEFAULT_MIN_BITS_PER_BLOCK = 10; // 1024 B + public final static int DEFAULT_MAX_BITS_PER_BLOCK = 26; // 64 MB + + /** + * Maximum number of blocks at the current {@link #blockBits} block size + * before we increase the block size (and thus decrease the number of blocks). + */ + final static int MAX_BLOCKS_BEFORE_BLOCK_EXPANSION = 100; + + /** + * Maximum block size: {@code 2^bits}. + */ + private final int maxBitsPerBlock; + + /** + * {@link ByteBuffer} supplier. + */ + private final IntFunction blockAllocate; + + /** + * {@link ByteBuffer} recycler on {@link #reset}. + */ + private final Consumer blockReuse; + + /** + * Current block size: {@code 2^bits}. + */ + private int blockBits; + + /** + * Blocks storing data. + */ + private final ArrayDeque blocks = new ArrayDeque<>(); + + /** + * The current-or-next write block. + */ + private ByteBuffer currentBlock = EMPTY; + + public ByteBuffersDataOutput(long expectedSize) { + this(computeBlockSizeBitsFor(expectedSize), DEFAULT_MAX_BITS_PER_BLOCK, ALLOCATE_BB_ON_HEAP, NO_REUSE); + } + + public ByteBuffersDataOutput() { + this(DEFAULT_MIN_BITS_PER_BLOCK, DEFAULT_MAX_BITS_PER_BLOCK, ALLOCATE_BB_ON_HEAP, NO_REUSE); + } + + public ByteBuffersDataOutput(int minBitsPerBlock, + int maxBitsPerBlock, + IntFunction blockAllocate, + Consumer blockReuse) { + if (minBitsPerBlock < 10 || + minBitsPerBlock > maxBitsPerBlock || + maxBitsPerBlock > 31) { + throw new IllegalArgumentException(String.format(Locale.ROOT, + "Invalid arguments: %s %s", + minBitsPerBlock, + maxBitsPerBlock)); + } + this.maxBitsPerBlock = maxBitsPerBlock; + this.blockBits = minBitsPerBlock; + this.blockAllocate = Objects.requireNonNull(blockAllocate, "Block allocator must not be null."); + this.blockReuse = Objects.requireNonNull(blockReuse, "Block reuse must not be null."); + } + + @Override + public void writeByte(byte b) { + if (!currentBlock.hasRemaining()) { + appendBlock(); + } + currentBlock.put(b); + } + + @Override + public void writeBytes(byte[] src, int offset, int length) { + assert length >= 0; + while (length > 0) { + if (!currentBlock.hasRemaining()) { + appendBlock(); + } + + int chunk = Math.min(currentBlock.remaining(), length); + currentBlock.put(src, offset, chunk); + length -= chunk; + offset += chunk; + } + } + + @Override + public void writeBytes(byte[] b, int length) { + writeBytes(b, 0, length); + } + + public void writeBytes(byte[] b) { + writeBytes(b, 0, b.length); + } + + public void writeBytes(ByteBuffer buffer) { + buffer = buffer.duplicate(); + int length = buffer.remaining(); + while (length > 0) { + if (!currentBlock.hasRemaining()) { + appendBlock(); + } + + int chunk = Math.min(currentBlock.remaining(), length); + buffer.limit(buffer.position() + chunk); + currentBlock.put(buffer); + + length -= chunk; + } + } + + /** + * Return a list of read-only view of {@link ByteBuffer} blocks over the + * current content written to the output. + */ + public ArrayList toBufferList() { + ArrayList result = new ArrayList<>(Math.max(blocks.size(), 1)); + if (blocks.isEmpty()) { + result.add(EMPTY); + } else { + for (ByteBuffer bb : blocks) { + bb = (ByteBuffer) bb.asReadOnlyBuffer().flip(); // cast for jdk8 (covariant in jdk9+) + result.add(bb); + } + } + return result; + } + + /** + * Returns a list of writeable blocks over the (source) content buffers. + * + * This method returns the raw content of source buffers that may change over the lifetime + * of this object (blocks can be recycled or discarded, for example). Most applications + * should favor calling {@link #toBufferList()} which returns a read-only view over + * the content of the source buffers. + * + * The difference between {@link #toBufferList()} and {@link #toWriteableBufferList()} is that + * read-only view of source buffers will always return {@code false} from {@link ByteBuffer#hasArray()} + * (which sometimes may be required to avoid double copying). + */ + public ArrayList toWriteableBufferList() { + ArrayList result = new ArrayList<>(Math.max(blocks.size(), 1)); + if (blocks.isEmpty()) { + result.add(EMPTY); + } else { + for (ByteBuffer bb : blocks) { + bb = (ByteBuffer) bb.duplicate().flip(); // cast for jdk8 (covariant in jdk9+) + result.add(bb); + } + } + return result; + } + + /** + * Return a {@link ByteBuffersDataInput} for the set of current buffers ({@link #toBufferList()}). + */ + public ByteBuffersDataInput toDataInput() { + return new ByteBuffersDataInput(toBufferList()); + } + + /** + * Return a contiguous array with the current content written to the output. The returned + * array is always a copy (can be mutated). + */ + public byte[] toArrayCopy() { + if (blocks.size() == 0) { + return EMPTY_BYTE_ARRAY; + } + + // We could try to detect single-block, array-based ByteBuffer here + // and use Arrays.copyOfRange, but I don't think it's worth the extra + // instance checks. + + byte [] arr = new byte[Math.toIntExact(size())]; + int offset = 0; + for (ByteBuffer bb : toBufferList()) { + int len = bb.remaining(); + bb.get(arr, offset, len); + offset += len; + } + return arr; + } + + /** + * Copy the current content of this object into another {@link DataOutput}. + */ + public void copyTo(DataOutput output) throws IOException { + for (ByteBuffer bb : toBufferList()) { + if (bb.hasArray()) { + output.writeBytes(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()); + } else { + output.copyBytes(new ByteBuffersDataInput(Arrays.asList(bb)), bb.remaining()); + } + } + } + + /** + * @return The number of bytes written to this output so far. + */ + public long size() { + long size = 0; + int blockCount = blocks.size(); + if (blockCount >= 1) { + int fullBlockSize = (blockCount - 1) * blockSize(); + int lastBlockSize = blocks.getLast().position(); + size = fullBlockSize + lastBlockSize; + } + return size; + } + + @Override + public String toString() { + return String.format(Locale.ROOT, + "%,d bytes, block size: %,d, blocks: %,d", + size(), + blockSize(), + blocks.size()); + } + + // Specialized versions of writeXXX methods that break execution into + // fast/ slow path if the result would fall on the current block's + // boundary. + // + // We also remove the IOException from methods because it (theoretically) + // cannot be thrown from byte buffers. + + @Override + public void writeShort(short v) { + try { + if (currentBlock.remaining() >= Short.BYTES) { + currentBlock.putShort(v); + } else { + super.writeShort(v); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void writeInt(int v) { + try { + if (currentBlock.remaining() >= Integer.BYTES) { + currentBlock.putInt(v); + } else { + super.writeInt(v); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void writeLong(long v) { + try { + if (currentBlock.remaining() >= Long.BYTES) { + currentBlock.putLong(v); + } else { + super.writeLong(v); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void writeString(String v) { + try { + final int MAX_CHARS_PER_WINDOW = 1024; + if (v.length() <= MAX_CHARS_PER_WINDOW) { + final BytesRef utf8 = new BytesRef(v); + writeVInt(utf8.length); + writeBytes(utf8.bytes, utf8.offset, utf8.length); + } else { + writeVInt(UnicodeUtil.calcUTF16toUTF8Length(v, 0, v.length())); + final byte [] buf = new byte [UnicodeUtil.MAX_UTF8_BYTES_PER_CHAR * MAX_CHARS_PER_WINDOW]; + UTF16toUTF8(v, 0, v.length(), buf, (len) -> { + writeBytes(buf, 0, len); + }); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void writeMapOfStrings(Map map) { + try { + super.writeMapOfStrings(map); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void writeSetOfStrings(Set set) { + try { + super.writeSetOfStrings(set); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public long ramBytesUsed() { + // Return a rough estimation for allocated blocks. Note that we do not make + // any special distinction for direct memory buffers. + return RamUsageEstimator.NUM_BYTES_OBJECT_REF * blocks.size() + + blocks.stream().mapToLong(buf -> buf.capacity()).sum(); + } + + /** + * This method resets this object to a clean (zero-size) state and + * publishes any currently allocated buffers for reuse to the reuse strategy + * provided in the constructor. + * + * Sharing byte buffers for reads and writes is dangerous and will very likely + * lead to hard-to-debug issues, use with great care. + */ + public void reset() { + blocks.stream().forEach(blockReuse); + blocks.clear(); + currentBlock = EMPTY; + } + + /** + * @return Returns a new {@link ByteBuffersDataOutput} with the {@link #reset()} capability. + */ + // TODO: perhaps we can move it out to an utility class (as a supplier of preconfigured instances?) + public static ByteBuffersDataOutput newResettableInstance() { + ByteBuffersDataOutput.ByteBufferRecycler reuser = new ByteBuffersDataOutput.ByteBufferRecycler( + ByteBuffersDataOutput.ALLOCATE_BB_ON_HEAP); + return new ByteBuffersDataOutput( + ByteBuffersDataOutput.DEFAULT_MIN_BITS_PER_BLOCK, + ByteBuffersDataOutput.DEFAULT_MAX_BITS_PER_BLOCK, + reuser::allocate, + reuser::reuse); + } + + private int blockSize() { + return 1 << blockBits; + } + + private void appendBlock() { + if (blocks.size() >= MAX_BLOCKS_BEFORE_BLOCK_EXPANSION && blockBits < maxBitsPerBlock) { + rewriteToBlockSize(blockBits + 1); + if (blocks.getLast().hasRemaining()) { + return; + } + } + + final int requiredBlockSize = 1 << blockBits; + currentBlock = blockAllocate.apply(requiredBlockSize); + assert currentBlock.capacity() == requiredBlockSize; + blocks.add(currentBlock); + } + + private void rewriteToBlockSize(int targetBlockBits) { + assert targetBlockBits <= maxBitsPerBlock; + + // We copy over data blocks to an output with one-larger block bit size. + // We also discard references to blocks as we're copying to allow GC to + // clean up partial results in case of memory pressure. + ByteBuffersDataOutput cloned = new ByteBuffersDataOutput(targetBlockBits, targetBlockBits, blockAllocate, NO_REUSE); + ByteBuffer block; + while ((block = blocks.pollFirst()) != null) { + block.flip(); + cloned.writeBytes(block); + if (blockReuse != NO_REUSE) { + blockReuse.accept(block); + } + } + + assert blocks.isEmpty(); + this.blockBits = targetBlockBits; + blocks.addAll(cloned.blocks); + } + + private static int computeBlockSizeBitsFor(long bytes) { + long powerOfTwo = BitUtil.nextHighestPowerOfTwo(bytes / MAX_BLOCKS_BEFORE_BLOCK_EXPANSION); + if (powerOfTwo == 0) { + return DEFAULT_MIN_BITS_PER_BLOCK; + } + + int blockBits = Long.numberOfTrailingZeros(powerOfTwo); + blockBits = Math.min(blockBits, DEFAULT_MAX_BITS_PER_BLOCK); + blockBits = Math.max(blockBits, DEFAULT_MIN_BITS_PER_BLOCK); + return blockBits; + } + + // TODO: move this block-based conversion to UnicodeUtil. + + private static final long HALF_SHIFT = 10; + private static final int SURROGATE_OFFSET = + Character.MIN_SUPPLEMENTARY_CODE_POINT - + (UnicodeUtil.UNI_SUR_HIGH_START << HALF_SHIFT) - UnicodeUtil.UNI_SUR_LOW_START; + + /** + * A consumer-based UTF16-UTF8 encoder (writes the input string in smaller buffers.). + */ + private static int UTF16toUTF8(final CharSequence s, + final int offset, + final int length, + byte[] buf, + IntConsumer bufferFlusher) { + int utf8Len = 0; + int j = 0; + for (int i = offset, end = offset + length; i < end; i++) { + final int chr = (int) s.charAt(i); + + if (j + 4 >= buf.length) { + bufferFlusher.accept(j); + utf8Len += j; + j = 0; + } + + if (chr < 0x80) + buf[j++] = (byte) chr; + else if (chr < 0x800) { + buf[j++] = (byte) (0xC0 | (chr >> 6)); + buf[j++] = (byte) (0x80 | (chr & 0x3F)); + } else if (chr < 0xD800 || chr > 0xDFFF) { + buf[j++] = (byte) (0xE0 | (chr >> 12)); + buf[j++] = (byte) (0x80 | ((chr >> 6) & 0x3F)); + buf[j++] = (byte) (0x80 | (chr & 0x3F)); + } else { + // A surrogate pair. Confirm valid high surrogate. + if (chr < 0xDC00 && (i < end - 1)) { + int utf32 = (int) s.charAt(i + 1); + // Confirm valid low surrogate and write pair. + if (utf32 >= 0xDC00 && utf32 <= 0xDFFF) { + utf32 = (chr << 10) + utf32 + SURROGATE_OFFSET; + i++; + buf[j++] = (byte) (0xF0 | (utf32 >> 18)); + buf[j++] = (byte) (0x80 | ((utf32 >> 12) & 0x3F)); + buf[j++] = (byte) (0x80 | ((utf32 >> 6) & 0x3F)); + buf[j++] = (byte) (0x80 | (utf32 & 0x3F)); + continue; + } + } + // Replace unpaired surrogate or out-of-order low surrogate + // with substitution character. + buf[j++] = (byte) 0xEF; + buf[j++] = (byte) 0xBF; + buf[j++] = (byte) 0xBD; + } + } + + bufferFlusher.accept(j); + utf8Len += j; + + return utf8Len; + } +} diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java new file mode 100644 index 00000000000..acff5cf14a6 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.store; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.AccessDeniedException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.zip.CRC32; + +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.util.BitUtil; + +/** + * A {@link ByteBuffer}-based {@link Directory} implementation that + * can be used to store index files on the heap. + * + *

Important: Note that {@link MMapDirectory} is nearly always a better choice as + * it uses OS caches more effectively (through memory-mapped buffers). + * A heap-based directory like this one can have the advantage in case of ephemeral, small, + * short-lived indexes when disk syncs provide an additional overhead.

+ * + * @lucene.experimental + */ +public final class ByteBuffersDirectory extends BaseDirectory { + public static final BiFunction OUTPUT_AS_MANY_BUFFERS = + (fileName, output) -> { + ByteBuffersDataInput dataInput = output.toDataInput(); + String inputName = String.format(Locale.ROOT, "%s (file=%s, buffers=%s)", + ByteBuffersIndexInput.class.getSimpleName(), + fileName, + dataInput.toString()); + return new ByteBuffersIndexInput(dataInput, inputName); + }; + + public static final BiFunction OUTPUT_AS_ONE_BUFFER = + (fileName, output) -> { + ByteBuffersDataInput dataInput = new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(output.toArrayCopy()))); + String inputName = String.format(Locale.ROOT, "%s (file=%s, buffers=%s)", + ByteBuffersIndexInput.class.getSimpleName(), + fileName, + dataInput.toString()); + return new ByteBuffersIndexInput(dataInput, inputName); + }; + + public static final BiFunction OUTPUT_AS_BYTE_ARRAY = + (fileName, output) -> { + byte[] array = output.toArrayCopy(); + String inputName = String.format(Locale.ROOT, "%s (file=%s, length=%s)", + ByteArrayIndexInput.class.getSimpleName(), + fileName, + array.length); + return new ByteArrayIndexInput(inputName, array, 0, array.length); + }; + + public static final BiFunction OUTPUT_AS_MANY_BUFFERS_LUCENE = + (fileName, output) -> { + List bufferList = output.toBufferList(); + int chunkSizePower; + bufferList.add(ByteBuffer.allocate(0)); + int blockSize = ByteBuffersDataInput.determineBlockPage(bufferList); + if (blockSize == 0) { + chunkSizePower = 30; + } else { + chunkSizePower = Integer.numberOfTrailingZeros(BitUtil.nextHighestPowerOfTwo(blockSize)); + } + + String inputName = String.format(Locale.ROOT, "%s (file=%s)", + ByteBuffersDirectory.class.getSimpleName(), + fileName); + + ByteBufferGuard guard = new ByteBufferGuard("none", (String resourceDescription, ByteBuffer b) -> {}); + return ByteBufferIndexInput.newInstance(inputName, + bufferList.toArray(new ByteBuffer [bufferList.size()]), + output.size(), chunkSizePower, guard); + }; + + private final Function tempFileName = new Function() { + private final AtomicLong counter = new AtomicLong(); + + @Override + public String apply(String suffix) { + return suffix + "_" + Long.toString(counter.getAndIncrement(), Character.MAX_RADIX); + } + }; + + private final ConcurrentHashMap files = new ConcurrentHashMap<>(); + + /** + * Conversion between a buffered index output and the corresponding index input + * for a given file. + */ + private final BiFunction outputToInput; + + /** + * A supplier of {@link ByteBuffersDataOutput} instances used to buffer up + * the content of written files. + */ + private final Supplier bbOutputSupplier; + + public ByteBuffersDirectory() { + this(new SingleInstanceLockFactory()); + } + + public ByteBuffersDirectory(LockFactory lockFactory) { + this(lockFactory, ByteBuffersDataOutput::new, OUTPUT_AS_MANY_BUFFERS); + } + + public ByteBuffersDirectory(LockFactory factory, + Supplier bbOutputSupplier, + BiFunction outputToInput) { + super(factory); + this.outputToInput = Objects.requireNonNull(outputToInput); + this.bbOutputSupplier = Objects.requireNonNull(bbOutputSupplier); + } + + @Override + public String[] listAll() throws IOException { + ensureOpen(); + return files.keySet().stream().sorted().toArray(String[]::new); + } + + @Override + public void deleteFile(String name) throws IOException { + ensureOpen(); + FileEntry removed = files.remove(name); + if (removed == null) { + throw new FileNotFoundException(name); + } + } + + @Override + public long fileLength(String name) throws IOException { + ensureOpen(); + FileEntry file = files.get(name); + if (file == null) { + throw new FileNotFoundException(name); + } + return file.length(); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + ensureOpen(); + FileEntry e = new FileEntry(name); + if (files.putIfAbsent(name, e) != null) { + throw new FileAlreadyExistsException("File already exists: " + name); + } + return e.createOutput(outputToInput); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { + ensureOpen(); + while (true) { + String name = IndexFileNames.segmentFileName(prefix, tempFileName.apply(suffix), "tmp"); + FileEntry e = new FileEntry(name); + if (files.putIfAbsent(name, e) == null) { + return e.createOutput(outputToInput); + } + } + } + + @Override + public void rename(String source, String dest) throws IOException { + ensureOpen(); + + FileEntry file = files.get(source); + if (file == null) { + throw new FileNotFoundException(source); + } + if (files.putIfAbsent(dest, file) != null) { + throw new FileAlreadyExistsException(dest); + } + if (!files.remove(source, file)) { + throw new IllegalStateException("File was unexpectedly replaced: " + source); + } + files.remove(source); + } + + @Override + public void sync(Collection names) throws IOException { + ensureOpen(); + } + + @Override + public void syncMetaData() throws IOException { + ensureOpen(); + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + ensureOpen(); + FileEntry e = files.get(name); + if (e == null) { + throw new NoSuchFileException(name); + } else { + return e.openInput(); + } + } + + @Override + public void close() throws IOException { + isOpen = false; + files.clear(); + } + + private final class FileEntry { + private final String fileName; + + private volatile IndexInput content; + private volatile long cachedLength; + + public FileEntry(String name) { + this.fileName = name; + } + + public long length() { + // We return 0 length until the IndexOutput is closed and flushed. + return cachedLength; + } + + public IndexInput openInput() throws IOException { + IndexInput local = this.content; + if (local == null) { + throw new AccessDeniedException("Can't open a file still open for writing: " + fileName); + } + + return local.clone(); + } + + final IndexOutput createOutput(BiFunction outputToInput) throws IOException { + if (content != null) { + throw new IOException("Can only write to a file once: " + fileName); + } + + String clazzName = ByteBuffersDirectory.class.getSimpleName(); + String outputName = String.format(Locale.ROOT, "%s output (file=%s)", clazzName, fileName); + + return new ByteBuffersIndexOutput( + bbOutputSupplier.get(), outputName, fileName, + new CRC32(), + (output) -> { + content = outputToInput.apply(fileName, output); + cachedLength = output.size(); + }); + } + } +} diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java new file mode 100644 index 00000000000..7c87d2489dd --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.store; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import java.util.Set; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RandomAccessInput; + +/** + * An {@link IndexInput} implementing {@link RandomAccessInput} and backed + * by a {@link ByteBuffersDataInput}. + */ +public final class ByteBuffersIndexInput extends IndexInput implements RandomAccessInput { + private ByteBuffersDataInput in; + + public ByteBuffersIndexInput(ByteBuffersDataInput in, String resourceDescription) { + super(resourceDescription); + this.in = in; + } + + @Override + public void close() throws IOException { + in = null; + } + + @Override + public long getFilePointer() { + ensureOpen(); + return in.position(); + } + + @Override + public void seek(long pos) throws IOException { + ensureOpen(); + in.seek(pos); + } + + @Override + public long length() { + ensureOpen(); + return in.size(); + } + + @Override + public ByteBuffersIndexInput slice(String sliceDescription, long offset, long length) throws IOException { + ensureOpen(); + return new ByteBuffersIndexInput(in.slice(offset, length), + "(sliced) offset=" + offset + ", length=" + length + " " + toString()); + } + + @Override + public byte readByte() throws IOException { + ensureOpen(); + return in.readByte(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + ensureOpen(); + in.readBytes(b, offset, len); + } + + @Override + public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException { + ensureOpen(); + return slice("", offset, length); + } + + @Override + public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException { + ensureOpen(); + in.readBytes(b, offset, len, useBuffer); + } + + @Override + public short readShort() throws IOException { + ensureOpen(); + return in.readShort(); + } + + @Override + public int readInt() throws IOException { + ensureOpen(); + return in.readInt(); + } + + @Override + public int readVInt() throws IOException { + ensureOpen(); + return in.readVInt(); + } + + @Override + public int readZInt() throws IOException { + ensureOpen(); + return in.readZInt(); + } + + @Override + public long readLong() throws IOException { + ensureOpen(); + return in.readLong(); + } + + @Override + public long readVLong() throws IOException { + ensureOpen(); + return in.readVLong(); + } + + @Override + public long readZLong() throws IOException { + ensureOpen(); + return in.readZLong(); + } + + @Override + public String readString() throws IOException { + ensureOpen(); + return in.readString(); + } + + @Override + public Map readMapOfStrings() throws IOException { + ensureOpen(); + return in.readMapOfStrings(); + } + + @Override + public Set readSetOfStrings() throws IOException { + ensureOpen(); + return in.readSetOfStrings(); + } + + @Override + public void skipBytes(long numBytes) throws IOException { + ensureOpen(); + super.skipBytes(numBytes); + } + + @Override + public byte readByte(long pos) throws IOException { + ensureOpen(); + return in.readByte(pos); + } + + @Override + public short readShort(long pos) throws IOException { + ensureOpen(); + return in.readShort(pos); + } + + @Override + public int readInt(long pos) throws IOException { + ensureOpen(); + return in.readInt(pos); + } + + @Override + public long readLong(long pos) throws IOException { + ensureOpen(); + return in.readLong(pos); + } + + @Override + public IndexInput clone() { + ensureOpen(); + ByteBuffersIndexInput cloned = new ByteBuffersIndexInput(in.slice(0, in.size()), "(clone of) " + toString()); + try { + cloned.seek(getFilePointer()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return cloned; + } + + private void ensureOpen() { + if (in == null) { + throw new AlreadyClosedException("Already closed."); + } + } +} diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexOutput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexOutput.java new file mode 100644 index 00000000000..19dc4004853 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexOutput.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.store; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +/** + * An {@link IndexOutput} writing to a {@link ByteBuffersDataOutput}. + */ +public final class ByteBuffersIndexOutput extends IndexOutput { + private final Consumer onClose; + + private final Checksum checksum; + private long lastChecksumPosition; + private long lastChecksum; + + private ByteBuffersDataOutput delegate; + + public ByteBuffersIndexOutput(ByteBuffersDataOutput delegate, String resourceDescription, String name) { + this(delegate, resourceDescription, name, new CRC32(), null); + } + + public ByteBuffersIndexOutput(ByteBuffersDataOutput delegate, String resourceDescription, String name, + Checksum checksum, + Consumer onClose) { + super(resourceDescription, name); + this.delegate = delegate; + this.checksum = checksum; + this.onClose = onClose; + } + + @Override + public void close() throws IOException { + // No special effort to be thread-safe here since IndexOutputs are not required to be thread-safe. + ByteBuffersDataOutput local = delegate; + delegate = null; + if (local != null && onClose != null) { + onClose.accept(local); + } + } + + @Override + public long getFilePointer() { + ensureOpen(); + return delegate.size(); + } + + @Override + public long getChecksum() throws IOException { + ensureOpen(); + + if (checksum == null) { + throw new IOException("This index output has no checksum computing ability: " + toString()); + } + + // Compute checksum on the current content of the delegate. + // + // This way we can override more methods and pass them directly to the delegate for efficiency of writing, + // while allowing the checksum to be correctly computed on the current content of the output buffer (IndexOutput + // is per-thread, so no concurrent changes). + if (lastChecksumPosition != delegate.size()) { + lastChecksumPosition = delegate.size(); + checksum.reset(); + byte [] buffer = null; + for (ByteBuffer bb : delegate.toBufferList()) { + if (bb.hasArray()) { + checksum.update(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()); + } else { + if (buffer == null) buffer = new byte [1024 * 4]; + + bb = bb.asReadOnlyBuffer(); + int remaining = bb.remaining(); + while (remaining > 0) { + int len = Math.min(remaining, buffer.length); + bb.get(buffer, 0, len); + checksum.update(buffer, 0, len); + remaining -= len; + } + } + } + lastChecksum = checksum.getValue(); + } + return lastChecksum; + } + + @Override + public void writeByte(byte b) throws IOException { + ensureOpen(); + delegate.writeByte(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + ensureOpen(); + delegate.writeBytes(b, offset, length); + } + + @Override + public void writeBytes(byte[] b, int length) throws IOException { + ensureOpen(); + delegate.writeBytes(b, length); + } + + @Override + public void writeInt(int i) throws IOException { + ensureOpen(); + delegate.writeInt(i); + } + + @Override + public void writeShort(short i) throws IOException { + ensureOpen(); + delegate.writeShort(i); + } + + @Override + public void writeLong(long i) throws IOException { + ensureOpen(); + delegate.writeLong(i); + } + + @Override + public void writeString(String s) throws IOException { + ensureOpen(); + delegate.writeString(s); + } + + @Override + public void copyBytes(DataInput input, long numBytes) throws IOException { + ensureOpen(); + delegate.copyBytes(input, numBytes); + } + + @Override + public void writeMapOfStrings(Map map) throws IOException { + ensureOpen(); + delegate.writeMapOfStrings(map); + } + + @Override + public void writeSetOfStrings(Set set) throws IOException { + ensureOpen(); + delegate.writeSetOfStrings(set); + } + + private void ensureOpen() { + if (delegate == null) { + throw new AlreadyClosedException("Already closed."); + } + } +} diff --git a/lucene/core/src/test/org/apache/lucene/store/BaseDataOutputTestCase.java b/lucene/core/src/test/org/apache/lucene/store/BaseDataOutputTestCase.java new file mode 100644 index 00000000000..4578a4f36e6 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/store/BaseDataOutputTestCase.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.store; + +import static org.junit.Assert.*; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IOUtils.IOConsumer; +import org.junit.Test; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.Xoroshiro128PlusRandom; +import com.carrotsearch.randomizedtesting.generators.RandomBytes; +import com.carrotsearch.randomizedtesting.generators.RandomNumbers; +import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import com.carrotsearch.randomizedtesting.generators.RandomStrings; + +public abstract class BaseDataOutputTestCase extends RandomizedTest { + protected abstract T newInstance(); + protected abstract byte[] toBytes(T instance); + + @FunctionalInterface + private interface ThrowingBiFunction { + R apply(T t, U u) throws Exception; + } + + @Test + public void testRandomizedWrites() throws IOException { + T dst = newInstance(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput ref = new OutputStreamDataOutput(baos); + + long seed = randomLong(); + int max = 50_000; + addRandomData(dst, new Xoroshiro128PlusRandom(seed), max); + addRandomData(ref, new Xoroshiro128PlusRandom(seed), max); + assertArrayEquals(baos.toByteArray(), toBytes(dst)); + } + + protected static List> addRandomData(DataOutput dst, Random rnd, int maxAddCalls) throws IOException { + try { + List> reply = new ArrayList<>(); + for (int i = 0; i < maxAddCalls; i++) { + reply.add(RandomPicks.randomFrom(rnd, GENERATORS).apply(dst, rnd)); + } + return reply; + } catch (Exception e) { + throw new IOException(e); + } + } + + private static List>> GENERATORS; + static { + GENERATORS = new ArrayList<>(); + + // writeByte/ readByte + GENERATORS.add((dst, rnd) -> { + byte value = (byte) rnd.nextInt(); + dst.writeByte(value); + return (src) -> assertEquals("readByte()", value, src.readByte()); + }); + + // writeBytes/ readBytes (array and buffer version). + GENERATORS.add((dst, rnd) -> { + byte[] bytes = RandomBytes.randomBytesOfLengthBetween(rnd, 0, 100); + ByteBuffersDataOutput rdo = dst instanceof ByteBuffersDataOutput ? (ByteBuffersDataOutput) dst : null; + + if (rnd.nextBoolean() && rdo != null) { + rdo.writeBytes(ByteBuffer.wrap(bytes)); + } else { + dst.writeBytes(bytes, bytes.length); + } + + boolean useBuffersForRead = rnd.nextBoolean(); + return (src) -> { + byte [] read = new byte [bytes.length]; + if (useBuffersForRead && src instanceof ByteBuffersDataInput) { + ((ByteBuffersDataInput) src).readBytes(ByteBuffer.wrap(read), read.length); + assertArrayEquals("readBytes(ByteBuffer)", bytes, read); + } else { + src.readBytes(read, 0, read.length); + assertArrayEquals("readBytes(byte[])", bytes, read); + } + }; + } + ); + + // writeBytes/ readBytes (array + offset). + GENERATORS.add((dst, rnd) -> { + byte[] bytes = RandomBytes.randomBytesOfLengthBetween(rnd, 0, 100); + int off = RandomNumbers.randomIntBetween(rnd, 0, bytes.length); + int len = RandomNumbers.randomIntBetween(rnd, 0, bytes.length - off); + dst.writeBytes(bytes, off, len); + + return (src) -> { + byte [] read = new byte [bytes.length + off]; + src.readBytes(read, off, len); + assertArrayEquals( + "readBytes(byte[], off)", + ArrayUtil.copyOfSubArray(bytes, off, len + off), + ArrayUtil.copyOfSubArray(read, off, len + off)); + }; + } + ); + + GENERATORS.add((dst, rnd) -> { + int v = rnd.nextInt(); + dst.writeInt(v); + return (src) -> assertEquals("readInt()", v, src.readInt()); + }); + + GENERATORS.add((dst, rnd) -> { + long v = rnd.nextLong(); + dst.writeLong(v); + return (src) -> assertEquals("readLong()", v, src.readLong()); + }); + + GENERATORS.add((dst, rnd) -> { + short v = (short) rnd.nextInt(); + dst.writeShort(v); + return (src) -> assertEquals("readShort()", v, src.readShort()); + }); + + GENERATORS.add((dst, rnd) -> { + int v = rnd.nextInt(); + dst.writeVInt(v); + return (src) -> assertEquals("readVInt()", v, src.readVInt()); + }); + + GENERATORS.add((dst, rnd) -> { + int v = rnd.nextInt(); + dst.writeZInt(v); + return (src) -> assertEquals("readZInt()", v, src.readZInt()); + }); + + GENERATORS.add((dst, rnd) -> { + long v = rnd.nextLong() & (-1L >>> 1); + dst.writeVLong(v); + return (src) -> assertEquals("readVLong()", v, src.readVLong()); + }); + + GENERATORS.add((dst, rnd) -> { + long v = rnd.nextLong(); + dst.writeZLong(v); + return (src) -> assertEquals("readZLong()", v, src.readZLong()); + }); + + GENERATORS.add((dst, rnd) -> { + String v; + if (rnd.nextInt(50) == 0) { + // Occasionally a large blob. + v = RandomStrings.randomUnicodeOfLength(rnd, RandomNumbers.randomIntBetween(rnd, 2048, 4096)); + } else { + v = RandomStrings.randomUnicodeOfLength(rnd, RandomNumbers.randomIntBetween(rnd, 0, 10)); + } + dst.writeString(v); + return (src) -> assertEquals("readString()", v, src.readString()); + }); + } +} diff --git a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java new file mode 100644 index 00000000000..5d3d7f60dcd --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.store; + +import static org.junit.Assert.*; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IOUtils.IOConsumer; +import org.apache.lucene.util.LuceneTestCase; +import org.junit.Test; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.Xoroshiro128PlusRandom; +import com.carrotsearch.randomizedtesting.annotations.Timeout; + +public final class TestByteBuffersDataInput extends RandomizedTest { + @Test + public void testSanity() throws IOException { + ByteBuffersDataOutput out = new ByteBuffersDataOutput(); + ByteBuffersDataInput o1 = out.toDataInput(); + assertEquals(0, o1.size()); + LuceneTestCase.expectThrows(EOFException.class, () -> { + o1.readByte(); + }); + + out.writeByte((byte) 1); + + ByteBuffersDataInput o2 = out.toDataInput(); + assertEquals(1, o2.size()); + assertEquals(0, o2.position()); + assertEquals(0, o1.size()); + + assertTrue(o2.ramBytesUsed() > 0); + assertEquals(1, o2.readByte()); + assertEquals(1, o2.position()); + assertEquals(1, o2.readByte(0)); + + LuceneTestCase.expectThrows(EOFException.class, () -> { + o2.readByte(); + }); + + assertEquals(1, o2.position()); + } + + @Test + public void testRandomReads() throws Exception { + ByteBuffersDataOutput dst = new ByteBuffersDataOutput(); + + long seed = randomLong(); + int max = 1_000_000; + List> reply = + TestByteBuffersDataOutput.addRandomData(dst, new Xoroshiro128PlusRandom(seed), max); + + ByteBuffersDataInput src = dst.toDataInput(); + for (IOConsumer c : reply) { + c.accept(src); + } + + LuceneTestCase.expectThrows(EOFException.class, () -> { + src.readByte(); + }); + } + + @Test + public void testRandomReadsOnSlices() throws Exception { + for (int reps = randomIntBetween(1, 200); --reps > 0;) { + ByteBuffersDataOutput dst = new ByteBuffersDataOutput(); + + byte [] prefix = new byte [randomIntBetween(0, 1024 * 8)]; + dst.writeBytes(prefix); + + long seed = randomLong(); + int max = 10_000; + List> reply = + TestByteBuffersDataOutput.addRandomData(dst, new Xoroshiro128PlusRandom(seed), max); + + byte [] suffix = new byte [randomIntBetween(0, 1024 * 8)]; + dst.writeBytes(suffix); + + ByteBuffersDataInput src = dst.toDataInput().slice(prefix.length, dst.size() - prefix.length - suffix.length); + + assertEquals(0, src.position()); + assertEquals(dst.size() - prefix.length - suffix.length, src.size()); + for (IOConsumer c : reply) { + c.accept(src); + } + + LuceneTestCase.expectThrows(EOFException.class, () -> { + src.readByte(); + }); + } + } + + @Test + public void testSeekEmpty() throws Exception { + ByteBuffersDataOutput dst = new ByteBuffersDataOutput(); + ByteBuffersDataInput in = dst.toDataInput(); + in.seek(0); + + LuceneTestCase.expectThrows(EOFException.class, () -> { + in.seek(1); + }); + + in.seek(0); + LuceneTestCase.expectThrows(EOFException.class, () -> { + in.readByte(); + }); + } + + @Test + public void testSeek() throws Exception { + for (int reps = randomIntBetween(1, 200); --reps > 0;) { + ByteBuffersDataOutput dst = new ByteBuffersDataOutput(); + + byte [] prefix = {}; + if (randomBoolean()) { + prefix = new byte [randomIntBetween(1, 1024 * 8)]; + dst.writeBytes(prefix); + } + + long seed = randomLong(); + int max = 1000; + List> reply = + TestByteBuffersDataOutput.addRandomData(dst, new Xoroshiro128PlusRandom(seed), max); + + ByteBuffersDataInput in = dst.toDataInput().slice(prefix.length, dst.size() - prefix.length); + + in.seek(0); + for (IOConsumer c : reply) { + c.accept(in); + } + + in.seek(0); + for (IOConsumer c : reply) { + c.accept(in); + } + + byte [] array = dst.toArrayCopy(); + array = ArrayUtil.copyOfSubArray(array, prefix.length, array.length); + for (int i = 0; i < 1000; i++) { + int offs = randomIntBetween(0, array.length - 1); + in.seek(offs); + assertEquals(offs, in.position()); + assertEquals(array[offs], in.readByte()); + } + in.seek(in.size()); + assertEquals(in.size(), in.position()); + LuceneTestCase.expectThrows(EOFException.class, () -> { + in.readByte(); + }); + } + } + + @Test + public void testSlicingWindow() throws Exception { + ByteBuffersDataOutput dst = new ByteBuffersDataOutput(); + assertEquals(0, dst.toDataInput().slice(0, 0).size());; + + dst.writeBytes(randomBytesOfLength(1024 * 8)); + ByteBuffersDataInput in = dst.toDataInput(); + for (int offset = 0, max = (int) dst.size(); offset < max; offset++) { + assertEquals(0, in.slice(offset, 0).size()); + assertEquals(1, in.slice(offset, 1).size()); + + int window = Math.min(max - offset, 1024); + assertEquals(window, in.slice(offset, window).size()); + } + assertEquals(0, in.slice((int) dst.size(), 0).size()); + } + + @Test + @Timeout(millis = 5000) + public void testEofOnArrayReadPastBufferSize() throws Exception { + ByteBuffersDataOutput dst = new ByteBuffersDataOutput(); + dst.writeBytes(new byte [10]); + + LuceneTestCase.expectThrows(EOFException.class, () -> { + ByteBuffersDataInput in = dst.toDataInput(); + in.readBytes(new byte [100], 0, 100); + }); + + LuceneTestCase.expectThrows(EOFException.class, () -> { + ByteBuffersDataInput in = dst.toDataInput(); + in.readBytes(ByteBuffer.allocate(100), 100); + }); + } +} diff --git a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataOutput.java b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataOutput.java new file mode 100644 index 00000000000..893aa37e498 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataOutput.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.store; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.util.ArrayUtil; +import org.junit.Assert; +import org.junit.Test; + +public final class TestByteBuffersDataOutput extends BaseDataOutputTestCase { + @Override + protected ByteBuffersDataOutput newInstance() { + return new ByteBuffersDataOutput(); + } + + @Override + protected byte[] toBytes(ByteBuffersDataOutput instance) { + return instance.toArrayCopy(); + } + + @Test + public void testReuse() throws IOException { + AtomicInteger allocations = new AtomicInteger(0); + ByteBuffersDataOutput.ByteBufferRecycler reuser = new ByteBuffersDataOutput.ByteBufferRecycler( + (size) -> { + allocations.incrementAndGet(); + return ByteBuffer.allocate(size); + }); + + ByteBuffersDataOutput o = new ByteBuffersDataOutput( + ByteBuffersDataOutput.DEFAULT_MIN_BITS_PER_BLOCK, + ByteBuffersDataOutput.DEFAULT_MAX_BITS_PER_BLOCK, + reuser::allocate, + reuser::reuse); + + // Add some random data first. + long genSeed = randomLong(); + int addCount = randomIntBetween(1000, 5000); + addRandomData(o, new Random(genSeed), addCount); + byte[] data = o.toArrayCopy(); + + // Use the same sequence over reused instance. + final int expectedAllocationCount = allocations.get(); + o.reset(); + addRandomData(o, new Random(genSeed), addCount); + + assertEquals(expectedAllocationCount, allocations.get()); + assertArrayEquals(data, o.toArrayCopy()); + } + + @Test + public void testConstructorWithExpectedSize() { + { + ByteBuffersDataOutput o = new ByteBuffersDataOutput(0); + o.writeByte((byte) 0); + assertEquals(1 << ByteBuffersDataOutput.DEFAULT_MIN_BITS_PER_BLOCK, o.toBufferList().get(0).capacity()); + } + + { + long MB = 1024 * 1024; + long expectedSize = randomLongBetween(MB, MB * 1024); + ByteBuffersDataOutput o = new ByteBuffersDataOutput(expectedSize); + o.writeByte((byte) 0); + int cap = o.toBufferList().get(0).capacity(); + assertTrue((cap >> 1) * ByteBuffersDataOutput.MAX_BLOCKS_BEFORE_BLOCK_EXPANSION < expectedSize); + assertTrue("cap=" + cap + ", exp=" + expectedSize, + (cap) * ByteBuffersDataOutput.MAX_BLOCKS_BEFORE_BLOCK_EXPANSION >= expectedSize); + } + } + + @Test + public void testSanity() { + ByteBuffersDataOutput o = newInstance(); + assertEquals(0, o.size()); + assertEquals(0, o.toArrayCopy().length); + assertEquals(0, o.ramBytesUsed()); + + o.writeByte((byte) 1); + assertEquals(1, o.size()); + assertTrue(o.ramBytesUsed() > 0); + assertArrayEquals(new byte [] { 1 }, o.toArrayCopy()); + + o.writeBytes(new byte [] {2, 3, 4}, 3); + assertEquals(4, o.size()); + assertArrayEquals(new byte [] { 1, 2, 3, 4 }, o.toArrayCopy()); + } + + @Test + public void testWriteByteBuffer() { + ByteBuffersDataOutput o = new ByteBuffersDataOutput(); + byte[] bytes = randomBytesOfLength(1024 * 8 + 10); + ByteBuffer src = ByteBuffer.wrap(bytes); + int offset = randomIntBetween(0, 100); + int len = bytes.length - offset; + src.position(offset); + src.limit(offset + len); + o.writeBytes(src); + assertEquals(len, o.size()); + Assert.assertArrayEquals(ArrayUtil.copyOfSubArray(bytes, offset, offset + len), o.toArrayCopy()); + } + + @Test + public void testLargeArrayAdd() { + ByteBuffersDataOutput o = new ByteBuffersDataOutput(); + int MB = 1024 * 1024; + byte [] bytes = randomBytesOfLength(5 * MB, 15 * MB); + int offset = randomIntBetween(0, 100); + int len = bytes.length - offset; + o.writeBytes(bytes, offset, len); + assertEquals(len, o.size()); + Assert.assertArrayEquals(ArrayUtil.copyOfSubArray(bytes, offset, offset + len), o.toArrayCopy()); + } + + @Test + public void testToBufferListReturnsReadOnlyBuffers() throws Exception { + ByteBuffersDataOutput dst = new ByteBuffersDataOutput(); + dst.writeBytes(new byte [100]); + for (ByteBuffer bb : dst.toBufferList()) { + assertTrue(bb.isReadOnly()); + } + } + + @Test + public void testToWriteableBufferListReturnsOriginalBuffers() throws Exception { + ByteBuffersDataOutput dst = new ByteBuffersDataOutput(); + for (ByteBuffer bb : dst.toWriteableBufferList()) { + assertTrue(!bb.isReadOnly()); + assertTrue(bb.hasArray()); // even the empty buffer should have a backing array. + } + + dst.writeBytes(new byte [100]); + for (ByteBuffer bb : dst.toWriteableBufferList()) { + assertTrue(!bb.isReadOnly()); + assertTrue(bb.hasArray()); // heap-based by default, so array should be there. + } + } +} diff --git a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDirectory.java b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDirectory.java new file mode 100644 index 00000000000..5f2d447c240 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDirectory.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.store; + + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.function.Supplier; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.util.English; +import org.junit.Test; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +public class TestByteBuffersDirectory extends BaseDirectoryTestCase { + private Supplier implSupplier; + + public TestByteBuffersDirectory(Supplier implSupplier, String name) { + this.implSupplier = implSupplier; + } + + @Override + protected Directory getDirectory(Path path) throws IOException { + return implSupplier.get(); + } + + @Test + public void testBuildIndex() throws IOException { + try (Directory dir = getDirectory(null); + IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig( + new MockAnalyzer(random())).setOpenMode(OpenMode.CREATE))) { + int docs = RandomizedTest.randomIntBetween(0, 10); + for (int i = docs; i > 0; i--) { + Document doc = new Document(); + doc.add(newStringField("content", English.intToEnglish(i).trim(), Field.Store.YES)); + writer.addDocument(doc); + } + writer.commit(); + assertEquals(docs, writer.numDocs()); + } + } + + @ParametersFactory(argumentFormatting = "impl=%2$s") + public static Iterable parametersWithCustomName() { + return Arrays.asList(new Object [][] { + {(Supplier) () -> new ByteBuffersDirectory( + new SingleInstanceLockFactory(), + ByteBuffersDataOutput::new, + ByteBuffersDirectory.OUTPUT_AS_MANY_BUFFERS), "many buffers (heap)"}, + {(Supplier) () -> new ByteBuffersDirectory( + new SingleInstanceLockFactory(), + ByteBuffersDataOutput::new, + ByteBuffersDirectory.OUTPUT_AS_ONE_BUFFER), "one buffer (heap)"}, + {(Supplier) () -> new ByteBuffersDirectory( + new SingleInstanceLockFactory(), + ByteBuffersDataOutput::new, + ByteBuffersDirectory.OUTPUT_AS_MANY_BUFFERS_LUCENE), "lucene's buffers (heap)"}, + {(Supplier) () -> new ByteBuffersDirectory( + new SingleInstanceLockFactory(), + ByteBuffersDataOutput::new, + ByteBuffersDirectory.OUTPUT_AS_BYTE_ARRAY), "byte array (heap)"}, + }); + } +}