diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java index 984d46d575e..51de22a54b9 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java @@ -29,7 +29,6 @@ import sun.nio.ch.DirectBuffer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.nio.ByteBuff; -import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -175,7 +174,7 @@ public class ByteBuffAllocator { return allocateOnHeap(this.bufSize); } - private SingleByteBuff allocateOnHeap(int size) { + private static SingleByteBuff allocateOnHeap(int size) { return new SingleByteBuff(NONE, ByteBuffer.allocate(size)); } @@ -213,7 +212,7 @@ public class ByteBuffAllocator { // just allocate the ByteBuffer from on-heap. bbs.add(ByteBuffer.allocate(remain)); } - ByteBuff bb = wrap(bbs, () -> { + ByteBuff bb = ByteBuff.wrap(bbs, () -> { for (int i = 0; i < lenFromReservoir; i++) { this.putbackBuffer(bbs.get(i)); } @@ -238,30 +237,6 @@ public class ByteBuffAllocator { } } - public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) { - if (buffers == null || buffers.length == 0) { - throw new IllegalArgumentException("buffers shouldn't be null or empty"); - } - return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0]) - : new MultiByteBuff(recycler, buffers); - } - - public static ByteBuff wrap(ByteBuffer[] buffers) { - return wrap(buffers, NONE); - } - - public static ByteBuff wrap(List buffers, Recycler recycler) { - if (buffers == null || buffers.size() == 0) { - throw new IllegalArgumentException("buffers shouldn't be null or empty"); - } - return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0)) - : new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0])); - } - - public static ByteBuff wrap(List buffers) { - return wrap(buffers, NONE); - } - /** * @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached * the maximum pool size, it will create a new one and return. In case of max pool size diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java index 1ee36077ece..9339f4377b8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java @@ -20,7 +20,10 @@ package org.apache.hadoop.hbase.nio; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import java.util.List; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ObjectIntPair; @@ -557,4 +560,34 @@ public abstract class ByteBuff implements ReferenceCounted { return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() + ", cap= " + capacity() + "]"; } + + /********************************* ByteBuff wrapper methods ***********************************/ + + public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) { + if (buffers == null || buffers.length == 0) { + throw new IllegalArgumentException("buffers shouldn't be null or empty"); + } + return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0]) + : new MultiByteBuff(recycler, buffers); + } + + public static ByteBuff wrap(ByteBuffer[] buffers) { + return wrap(buffers, ByteBuffAllocator.NONE); + } + + public static ByteBuff wrap(List buffers, Recycler recycler) { + if (buffers == null || buffers.size() == 0) { + throw new IllegalArgumentException("buffers shouldn't be null or empty"); + } + return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0)) + : new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0])); + } + + public static ByteBuff wrap(List buffers) { + return wrap(buffers, ByteBuffAllocator.NONE); + } + + public static ByteBuff wrap(ByteBuffer buffer) { + return new SingleByteBuff(ByteBuffAllocator.NONE, buffer); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index d023339fd48..e5a0b134969 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -20,15 +20,14 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; -import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -38,279 +37,248 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** - * This class manages an array of ByteBuffers with a default size 4MB. These - * buffers are sequential and could be considered as a large buffer.It supports - * reading/writing data from this large buffer with a position and offset + * This class manages an array of ByteBuffers with a default size 4MB. These buffers are sequential + * and could be considered as a large buffer.It supports reading/writing data from this large buffer + * with a position and offset */ @InterfaceAudience.Private public class ByteBufferArray { private static final Logger LOG = LoggerFactory.getLogger(ByteBufferArray.class); public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; - @VisibleForTesting - ByteBuffer buffers[]; - private int bufferSize; - @VisibleForTesting - int bufferCount; + private final int bufferSize; + private final int bufferCount; + final ByteBuffer[] buffers; /** - * We allocate a number of byte buffers as the capacity. In order not to out - * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}), - * we will allocate one additional buffer with capacity 0; + * We allocate a number of byte buffers as the capacity. * @param capacity total size of the byte buffer array * @param allocator the ByteBufferAllocator that will create the buffers * @throws IOException throws IOException if there is an exception thrown by the allocator */ - public ByteBufferArray(long capacity, ByteBufferAllocator allocator) - throws IOException { - this.bufferSize = DEFAULT_BUFFER_SIZE; - if (this.bufferSize > (capacity / 16)) - this.bufferSize = (int) roundUp(capacity / 16, 32768); - this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize); - LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity) - + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" - + bufferCount); - buffers = new ByteBuffer[bufferCount + 1]; - createBuffers(allocator); + public ByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException { + this(getBufferSize(capacity), getBufferCount(capacity), + Runtime.getRuntime().availableProcessors(), capacity, allocator); } @VisibleForTesting - void createBuffers(ByteBufferAllocator allocator) - throws IOException { - int threadCount = getThreadCount(); - ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); - int perThreadCount = (int)Math.floor((double) (bufferCount) / threadCount); - int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1)); - Future[] futures = new Future[threadCount]; + ByteBufferArray(int bufferSize, int bufferCount, int threadCount, long capacity, + ByteBufferAllocator alloc) throws IOException { + this.bufferSize = bufferSize; + this.bufferCount = bufferCount; + LOG.info("Allocating buffers total={}, sizePerBuffer={}, count={}", + StringUtils.byteDesc(capacity), StringUtils.byteDesc(bufferSize), bufferCount); + this.buffers = new ByteBuffer[bufferCount]; + createBuffers(threadCount, alloc); + } + + private void createBuffers(int threadCount, ByteBufferAllocator alloc) throws IOException { + ExecutorService pool = Executors.newFixedThreadPool(threadCount); + int perThreadCount = bufferCount / threadCount; + int reminder = bufferCount % threadCount; try { + List> futures = new ArrayList<>(threadCount); + // Dispatch the creation task to each thread. for (int i = 0; i < threadCount; i++) { - // Last thread will have to deal with a different number of buffers - int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount; - futures[i] = service.submit( - new BufferCreatorCallable(bufferSize, buffersToCreate, allocator)); - } - int bufferIndex = 0; - for (Future future : futures) { - try { - ByteBuffer[] buffers = future.get(); - for (ByteBuffer buffer : buffers) { - this.buffers[bufferIndex++] = buffer; + final int chunkSize = perThreadCount + ((i == threadCount - 1) ? reminder : 0); + futures.add(pool.submit(() -> { + ByteBuffer[] chunk = new ByteBuffer[chunkSize]; + for (int k = 0; k < chunkSize; k++) { + chunk[k] = alloc.allocate(bufferSize); + } + return chunk; + })); + } + // Append the buffers created by each thread. + int bufferIndex = 0; + try { + for (Future f : futures) { + for (ByteBuffer b : f.get()) { + this.buffers[bufferIndex++] = b; } - } catch (InterruptedException | ExecutionException e) { - LOG.error("Buffer creation interrupted", e); - throw new IOException(e); } + assert bufferIndex == bufferCount; + } catch (Exception e) { + LOG.error("Buffer creation interrupted", e); + throw new IOException(e); } } finally { - service.shutdownNow(); + pool.shutdownNow(); } - // always create on heap empty dummy buffer at last - this.buffers[bufferCount] = ByteBuffer.allocate(0); } @VisibleForTesting - int getThreadCount() { - return Runtime.getRuntime().availableProcessors(); + static int getBufferSize(long capacity) { + int bufferSize = DEFAULT_BUFFER_SIZE; + if (bufferSize > (capacity / 16)) { + bufferSize = (int) roundUp(capacity / 16, 32768); + } + return bufferSize; } - /** - * A callable that creates buffers of the specified length either onheap/offheap using the - * {@link ByteBufferAllocator} - */ - private static class BufferCreatorCallable implements Callable { - private final int bufferCapacity; - private final int bufferCount; - private final ByteBufferAllocator allocator; - - BufferCreatorCallable(int bufferCapacity, int bufferCount, ByteBufferAllocator allocator) { - this.bufferCapacity = bufferCapacity; - this.bufferCount = bufferCount; - this.allocator = allocator; - } - - @Override - public ByteBuffer[] call() throws Exception { - ByteBuffer[] buffers = new ByteBuffer[this.bufferCount]; - for (int i = 0; i < this.bufferCount; i++) { - buffers[i] = allocator.allocate(this.bufferCapacity); - } - return buffers; - } + private static int getBufferCount(long capacity) { + int bufferSize = getBufferSize(capacity); + return (int) (roundUp(capacity, bufferSize) / bufferSize); } - private long roundUp(long n, long to) { + private static long roundUp(long n, long to) { return ((n + to - 1) / to) * to; } /** - * Transfers bytes from this buffer array into the given destination array - * @param start start position in the ByteBufferArray - * @param len The maximum number of bytes to be written to the given array - * @param dstArray The array into which bytes are to be written + * Transfers bytes from this buffers array into the given destination {@link ByteBuff} + * @param offset start position in this big logical array. + * @param dst the destination ByteBuff. Notice that its position will be advanced. * @return number of bytes read */ - public int getMultiple(long start, int len, byte[] dstArray) { - return getMultiple(start, len, dstArray, 0); + public int read(long offset, ByteBuff dst) { + return internalTransfer(offset, dst, READER); } /** - * Transfers bytes from this buffer array into the given destination array - * @param start start offset of this buffer array - * @param len The maximum number of bytes to be written to the given array - * @param dstArray The array into which bytes are to be written - * @param dstOffset The offset within the given array of the first byte to be - * written - * @return number of bytes read + * Transfers bytes from the given source {@link ByteBuff} into this buffer array + * @param offset start offset of this big logical array. + * @param src the source ByteBuff. Notice that its position will be advanced. + * @return number of bytes write */ - public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) { - multiple(start, len, dstArray, dstOffset, GET_MULTIPLE_VISTOR); - return len; + public int write(long offset, ByteBuff src) { + return internalTransfer(offset, src, WRITER); } - private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() { - @Override - public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) { - ByteBufferUtils.copyFromBufferToArray(array, bb, pos, arrayIdx, len); - } + /** + * Transfer bytes from source {@link ByteBuff} to destination {@link ByteBuffer}. Position of both + * source and destination will be advanced. + */ + private static final BiConsumer WRITER = (dst, src) -> { + int off = src.position(), len = dst.remaining(); + src.get(dst, off, len); + src.position(off + len); }; /** - * Transfers bytes from the given source array into this buffer array - * @param start start offset of this buffer array - * @param len The maximum number of bytes to be read from the given array - * @param srcArray The array from which bytes are to be read + * Transfer bytes from source {@link ByteBuffer} to destination {@link ByteBuff}, Position of both + * source and destination will be advanced. */ - public void putMultiple(long start, int len, byte[] srcArray) { - putMultiple(start, len, srcArray, 0); - } - - /** - * Transfers bytes from the given source array into this buffer array - * @param start start offset of this buffer array - * @param len The maximum number of bytes to be read from the given array - * @param srcArray The array from which bytes are to be read - * @param srcOffset The offset within the given array of the first byte to be - * read - */ - public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) { - multiple(start, len, srcArray, srcOffset, PUT_MULTIPLE_VISITOR); - } - - private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() { - @Override - public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) { - ByteBufferUtils.copyFromArrayToBuffer(bb, pos, array, arrayIdx, len); - } + private static final BiConsumer READER = (src, dst) -> { + int off = dst.position(), len = src.remaining(), srcOff = src.position(); + dst.put(off, ByteBuff.wrap(src), srcOff, len); + src.position(srcOff + len); + dst.position(off + len); }; - private interface Visitor { - /** - * Visit the given byte buffer, if it is a read action, we will transfer the - * bytes from the buffer to the destination array, else if it is a write - * action, we will transfer the bytes from the source array to the buffer - * @param bb byte buffer - * @param pos Start position in ByteBuffer - * @param array a source or destination byte array - * @param arrayOffset offset of the byte array - * @param len read/write length - */ - void visit(ByteBuffer bb, int pos, byte[] array, int arrayOffset, int len); - } - /** - * Access(read or write) this buffer array with a position and length as the - * given array. Here we will only lock one buffer even if it may be need visit - * several buffers. The consistency is guaranteed by the caller. - * @param start start offset of this buffer array - * @param len The maximum number of bytes to be accessed - * @param array The array from/to which bytes are to be read/written - * @param arrayOffset The offset within the given array of the first byte to - * be read or written - * @param visitor implement of how to visit the byte buffer + * Transferring all remaining bytes from b to the buffers array starting at offset, or + * transferring bytes from the buffers array at offset to b until b is filled. Notice that + * position of ByteBuff b will be advanced. + * @param offset where we start in the big logical array. + * @param b the ByteBuff to transfer from or to + * @param transfer the transfer interface. + * @return the length of bytes we transferred. */ - void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) { - assert len >= 0; - long end = start + len; - int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize); - int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize); - assert array.length >= len + arrayOffset; - assert startBuffer >= 0 && startBuffer < bufferCount; - assert (endBuffer >= 0 && endBuffer < bufferCount) - || (endBuffer == bufferCount && endOffset == 0); - if (startBuffer >= buffers.length || startBuffer < 0) { - String msg = "Failed multiple, start=" + start + ",startBuffer=" - + startBuffer + ",bufferSize=" + bufferSize; - LOG.error(msg); - throw new RuntimeException(msg); + private int internalTransfer(long offset, ByteBuff b, BiConsumer transfer) { + int expectedTransferLen = b.remaining(); + if (expectedTransferLen == 0) { + return 0; } - int srcIndex = 0, cnt = -1; - for (int i = startBuffer; i <= endBuffer; ++i) { - ByteBuffer bb = buffers[i].duplicate(); - int pos = 0; - if (i == startBuffer) { - cnt = bufferSize - startOffset; - if (cnt > len) cnt = len; - pos = startOffset; - } else if (i == endBuffer) { - cnt = endOffset; - } else { - cnt = bufferSize; - } - visitor.visit(bb, pos, array, srcIndex + arrayOffset, cnt); - srcIndex += cnt; + BufferIterator it = new BufferIterator(offset, expectedTransferLen); + while (it.hasNext()) { + ByteBuffer a = it.next(); + transfer.accept(a, b); + assert !a.hasRemaining(); } - assert srcIndex == len; + assert expectedTransferLen == it.getSum() : "Expected transfer length (=" + expectedTransferLen + + ") don't match the actual transfer length(=" + it.getSum() + ")"; + return expectedTransferLen; } /** - * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the - * length specified. For eg, if there are 4 buffers forming an array each with length 10 and - * if we call asSubBuffer(5, 10) then we will create an MBB consisting of two BBs - * and the first one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from - * 'position' 0 to 'length' 5. - * @param offset - * @param len + * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the length + * specified. For eg, if there are 4 buffers forming an array each with length 10 and if we call + * asSubBuffer(5, 10) then we will create an MBB consisting of two BBs and the first one be a BB + * from 'position' 5 to a 'length' 5 and the 2nd BB will be from 'position' 0 to 'length' 5. + * @param offset the position in the whole array which is composited by multiple byte buffers. + * @param len the length of bytes * @return a ByteBuff formed from the underlying ByteBuffers */ - public ByteBuff asSubByteBuff(long offset, int len) { - assert len >= 0; - long end = offset + len; - int startBuffer = (int) (offset / bufferSize), startBufferOffset = (int) (offset % bufferSize); - int endBuffer = (int) (end / bufferSize), endBufferOffset = (int) (end % bufferSize); - // Last buffer in the array is a dummy one with 0 capacity. Avoid sending back that - if (endBuffer == this.bufferCount) { - endBuffer--; - endBufferOffset = bufferSize; + public ByteBuff asSubByteBuff(long offset, final int len) { + BufferIterator it = new BufferIterator(offset, len); + ByteBuffer[] mbb = new ByteBuffer[it.getBufferCount()]; + for (int i = 0; i < mbb.length; i++) { + assert it.hasNext(); + mbb[i] = it.next(); } - assert startBuffer >= 0 && startBuffer < bufferCount; - assert (endBuffer >= 0 && endBuffer < bufferCount) - || (endBuffer == bufferCount && endBufferOffset == 0); - if (startBuffer >= buffers.length || startBuffer < 0) { - String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer - + ",bufferSize=" + bufferSize; - LOG.error(msg); - throw new RuntimeException(msg); + assert it.getSum() == len; + return ByteBuff.wrap(mbb); + } + + /** + * Iterator to fetch ByteBuffers from offset with given length in this big logical array. + */ + private class BufferIterator implements Iterator { + private final int len; + private int startBuffer, startOffset, endBuffer, endOffset; + private int curIndex, sum = 0; + + private int index(long pos) { + return (int) (pos / bufferSize); } - int srcIndex = 0, cnt = -1; - ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1]; - for (int i = startBuffer, j = 0; i <= endBuffer; ++i, j++) { - ByteBuffer bb = buffers[i].duplicate(); - if (i == startBuffer) { - cnt = bufferSize - startBufferOffset; - if (cnt > len) cnt = len; - bb.limit(startBufferOffset + cnt).position(startBufferOffset); - } else if (i == endBuffer) { - cnt = endBufferOffset; - bb.position(0).limit(cnt); - } else { - cnt = bufferSize; - bb.position(0).limit(cnt); + + private int offset(long pos) { + return (int) (pos % bufferSize); + } + + public BufferIterator(long offset, int len) { + assert len >= 0 && offset >= 0; + this.len = len; + + this.startBuffer = index(offset); + this.startOffset = offset(offset); + + this.endBuffer = index(offset + len); + this.endOffset = offset(offset + len); + if (startBuffer < endBuffer && endOffset == 0) { + endBuffer--; + endOffset = bufferSize; } - mbb[j] = bb.slice(); - srcIndex += cnt; + assert startBuffer >= 0 && startBuffer < bufferCount; + assert endBuffer >= 0 && endBuffer < bufferCount; + + // initialize the index to the first buffer index. + this.curIndex = startBuffer; + } + + @Override + public boolean hasNext() { + return this.curIndex <= endBuffer; + } + + /** + * The returned ByteBuffer is an sliced one, it won't affect the position or limit of the + * original one. + */ + @Override + public ByteBuffer next() { + ByteBuffer bb = buffers[curIndex].duplicate(); + if (curIndex == startBuffer) { + bb.position(startOffset).limit(Math.min(bufferSize, startOffset + len)); + } else if (curIndex == endBuffer) { + bb.position(0).limit(endOffset); + } else { + bb.position(0).limit(bufferSize); + } + curIndex++; + sum += bb.remaining(); + // Make sure that its pos is zero, it's important because MBB will count from zero for all nio + // ByteBuffers. + return bb.slice(); + } + + int getSum() { + return sum; + } + + int getBufferCount() { + return this.endBuffer - this.startBuffer + 1; } - assert srcIndex == len; - return ByteBuffAllocator.wrap(mbb); } } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java index 3fc1c230f5a..05349242e66 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java @@ -20,34 +20,37 @@ package org.apache.hadoop.hbase.util; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Random; + import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.MultiByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category({MiscTests.class, SmallTests.class}) +@Category({ MiscTests.class, SmallTests.class }) public class TestByteBufferArray { + private static final Random RANDOM = new Random(System.currentTimeMillis()); + @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestByteBufferArray.class); + private static final ByteBufferAllocator ALLOC = (size) -> ByteBuffer.allocateDirect((int) size); + @Test public void testAsSubBufferWhenEndOffsetLandInLastBuffer() throws Exception { int capacity = 4 * 1024 * 1024; - ByteBufferAllocator allocator = new ByteBufferAllocator() { - @Override - public ByteBuffer allocate(long size) throws IOException { - return ByteBuffer.allocateDirect((int) size); - } - }; - ByteBufferArray array = new ByteBufferArray(capacity, allocator); + ByteBufferArray array = new ByteBufferArray(capacity, ALLOC); ByteBuff subBuf = array.asSubByteBuff(0, capacity); subBuf.position(capacity - 1);// Position to the last byte assertTrue(subBuf.hasRemaining()); @@ -59,54 +62,148 @@ public class TestByteBufferArray { @Test public void testByteBufferCreation() throws Exception { int capacity = 470 * 1021 * 1023; - ByteBufferAllocator allocator = new ByteBufferAllocator() { - @Override - public ByteBuffer allocate(long size) throws IOException { - return ByteBuffer.allocateDirect((int) size); - } - }; - ByteBufferArray array = new ByteBufferArray(capacity, allocator); - assertEquals(119, array.buffers.length); + ByteBufferArray array = new ByteBufferArray(capacity, ALLOC); + assertEquals(118, array.buffers.length); for (int i = 0; i < array.buffers.length; i++) { - if (i == array.buffers.length - 1) { - assertEquals(0, array.buffers[i].capacity()); - } else { - assertEquals(ByteBufferArray.DEFAULT_BUFFER_SIZE, array.buffers[i].capacity()); - } + assertEquals(ByteBufferArray.DEFAULT_BUFFER_SIZE, array.buffers[i].capacity()); } } @Test public void testByteBufferCreation1() throws Exception { - ByteBufferAllocator allocator = new ByteBufferAllocator() { - @Override - public ByteBuffer allocate(long size) throws IOException { - return ByteBuffer.allocateDirect((int) size); - } - }; - ByteBufferArray array = new DummyByteBufferArray(7 * 1024 * 1024, allocator); - // overwrite - array.bufferCount = 25; - array.buffers = new ByteBuffer[array.bufferCount + 1]; - array.createBuffers(allocator); + long cap = 7 * 1024L * 1024L; + int bufferSize = ByteBufferArray.getBufferSize(cap), bufferCount = 25; + ByteBufferArray array = new ByteBufferArray(bufferSize, bufferCount, 16, cap, ALLOC); for (int i = 0; i < array.buffers.length; i++) { - if (i == array.buffers.length - 1) { - assertEquals(0, array.buffers[i].capacity()); - } else { - assertEquals(458752, array.buffers[i].capacity()); - } + assertEquals(458752, array.buffers[i].capacity()); } } - private static class DummyByteBufferArray extends ByteBufferArray { + private static void fill(ByteBuff buf, byte val) { + for (int i = buf.position(); i < buf.limit(); i++) { + buf.put(i, val); + } + } - public DummyByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException { - super(capacity, allocator); + private ByteBuff createByteBuff(int len) { + assert len >= 0; + int pos = len == 0 ? 0 : RANDOM.nextInt(len); + ByteBuff b = ByteBuff.wrap(ByteBuffer.allocate(2 * len)); + b.position(pos).limit(pos + len); + return b; + } + + private interface Call { + void run() throws IOException; + } + + private void expectedAssert(Call r) throws IOException { + try { + r.run(); + fail(); + } catch (AssertionError e) { + // Ignore + } + } + + + @Test + public void testArrayIO() throws IOException { + int cap = 9 * 1024 * 1024, bufferSize = ByteBufferArray.getBufferSize(cap); + ByteBufferArray array = new ByteBufferArray(cap, ALLOC); + testReadAndWrite(array, 0, 512, (byte) 2); + testReadAndWrite(array, cap - 512, 512, (byte) 3); + testReadAndWrite(array, 4 * 1024 * 1024, 5 * 1024 * 1024, (byte) 4); + testReadAndWrite(array, 256, 256, (byte) 5); + testReadAndWrite(array, 257, 513, (byte) 6); + testReadAndWrite(array, 0, cap, (byte) 7); + testReadAndWrite(array, cap, 0, (byte) 8); + testReadAndWrite(array, cap - 1, 1, (byte) 9); + testReadAndWrite(array, cap - 2, 2, (byte) 10); + + expectedAssert(() -> testReadAndWrite(array, cap - 2, 3, (byte) 11)); + expectedAssert(() -> testReadAndWrite(array, cap + 1, 0, (byte) 12)); + expectedAssert(() -> testReadAndWrite(array, 0, cap + 1, (byte) 12)); + expectedAssert(() -> testReadAndWrite(array, -1, 0, (byte) 13)); + expectedAssert(() -> testReadAndWrite(array, 0, -23, (byte) 14)); + expectedAssert(() -> testReadAndWrite(array, 0, 0, (byte) 15)); + expectedAssert(() -> testReadAndWrite(array, 4096, cap - 4096 + 1, (byte) 16)); + + testAsSubByteBuff(array, 0, cap, true); + testAsSubByteBuff(array, 0, 0, false); + testAsSubByteBuff(array, 0, 1, false); + testAsSubByteBuff(array, 0, bufferSize - 1, false); + testAsSubByteBuff(array, 0, bufferSize, false); + testAsSubByteBuff(array, 0, bufferSize + 1, true); + testAsSubByteBuff(array, 0, 2 * bufferSize, true); + testAsSubByteBuff(array, 0, 5 * bufferSize, true); + testAsSubByteBuff(array, cap - bufferSize - 1, bufferSize, true); + testAsSubByteBuff(array, cap - bufferSize, bufferSize, false); + testAsSubByteBuff(array, cap - bufferSize, 0, false); + testAsSubByteBuff(array, cap - bufferSize, 1, false); + testAsSubByteBuff(array, cap - bufferSize, bufferSize - 1, false); + testAsSubByteBuff(array, cap - 2 * bufferSize, 2 * bufferSize, true); + testAsSubByteBuff(array, cap - 2 * bufferSize, bufferSize + 1, true); + testAsSubByteBuff(array, cap - 2 * bufferSize, bufferSize - 1, false); + testAsSubByteBuff(array, cap - 2 * bufferSize, 0, false); + + expectedAssert(() -> testAsSubByteBuff(array, 0, cap + 1, false)); + expectedAssert(() -> testAsSubByteBuff(array, 0, -1, false)); + expectedAssert(() -> testAsSubByteBuff(array, -1, -1, false)); + expectedAssert(() -> testAsSubByteBuff(array, cap - bufferSize, bufferSize + 1, false)); + expectedAssert(() -> testAsSubByteBuff(array, 2 * bufferSize, cap - 2 * bufferSize + 1, false)); + } + + private void testReadAndWrite(ByteBufferArray array, int off, int dataSize, byte val) { + ByteBuff src = createByteBuff(dataSize); + int pos = src.position(), lim = src.limit(); + fill(src, val); + assertEquals(src.remaining(), dataSize); + try { + assertEquals(dataSize, array.write(off, src)); + assertEquals(0, src.remaining()); + } finally { + src.position(pos).limit(lim); } - @Override - int getThreadCount() { - return 16; + ByteBuff dst = createByteBuff(dataSize); + pos = dst.position(); + lim = dst.limit(); + try { + assertEquals(dataSize, array.read(off, dst)); + assertEquals(0, dst.remaining()); + } finally { + dst.position(pos).limit(lim); + } + assertByteBuffEquals(src, dst); + } + + private void testAsSubByteBuff(ByteBufferArray array, int off, int len, boolean isMulti) { + ByteBuff ret = array.asSubByteBuff(off, len); + if (isMulti) { + assertTrue(ret instanceof MultiByteBuff); + } else { + assertTrue(ret instanceof SingleByteBuff); + } + assertTrue(!ret.hasArray()); + assertEquals(len, ret.remaining()); + + ByteBuff tmp = createByteBuff(len); + int pos = tmp.position(), lim = tmp.limit(); + try { + assertEquals(len, array.read(off, tmp)); + assertEquals(0, tmp.remaining()); + } finally { + tmp.position(pos).limit(lim); + } + + assertByteBuffEquals(ret, tmp); + } + + private void assertByteBuffEquals(ByteBuff a, ByteBuff b) { + assertEquals(a.remaining(), b.remaining()); + for (int i = a.position(), j = b.position(); i < a.limit(); i++, j++) { + assertEquals(a.get(i), b.get(j)); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index 3b832fe3974..fa8b1848cab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -30,39 +30,37 @@ import org.apache.hadoop.hbase.util.ByteBufferAllocator; import org.apache.hadoop.hbase.util.ByteBufferArray; /** - * IO engine that stores data in memory using an array of ByteBuffers - * {@link ByteBufferArray}. - * - *

How it Works

- * First, see {@link ByteBufferArray} and how it gives a view across multiple ByteBuffers managed - * by it internally. This class does the physical BB create and the write and read to the - * underlying BBs. So we will create N BBs based on the total BC capacity specified on create - * of the ByteBufferArray. So say we have 10 GB of off heap BucketCache, we will create 2560 such - * BBs inside our ByteBufferArray. - * - *

Now the way BucketCache works is that the entire 10 GB is split into diff sized buckets: by - * default from 5 KB to 513 KB. Within each bucket of a particular size, there are - * usually more than one bucket 'block'. The way it is calculate in bucketcache is that the total - * bucketcache size is divided by 4 (hard-coded currently) * max size option. So using defaults, - * buckets will be is 4 * 513kb (the biggest default value) = 2052kb. A bucket of 2052kb at offset - * zero will serve out bucket 'blocks' of 5kb, the next bucket will do the next size up and so on - * up to the maximum (default) of 513kb). - * - *

When we write blocks to the bucketcache, we will see which bucket size group it best fits. - * So a 4 KB block size goes to the 5 KB size group. Each of the block writes, writes within its - * appropriate bucket. Though the bucket is '4kb' in size, it will occupy one of the - * 5 KB bucket 'blocks' (even if actual size of the bucket is less). Bucket 'blocks' will not span - * buckets. - * - *

But you can see the physical memory under the bucket 'blocks' can be split across the - * underlying backing BBs from ByteBufferArray. All is split into 4 MB sized BBs. - * - *

Each Bucket knows its offset in the entire space of BC and when block is written the offset + * IO engine that stores data in memory using an array of ByteBuffers {@link ByteBufferArray}. + *

+ *

How it Works

First, see {@link ByteBufferArray} and how it gives a view across multiple + * ByteBuffers managed by it internally. This class does the physical BB create and the write and + * read to the underlying BBs. So we will create N BBs based on the total BC capacity specified on + * create of the ByteBufferArray. So say we have 10 GB of off heap BucketCache, we will create 2560 + * such BBs inside our ByteBufferArray.
+ *

+ * Now the way BucketCache works is that the entire 10 GB is split into diff sized buckets: by + * default from 5 KB to 513 KB. Within each bucket of a particular size, there are usually more than + * one bucket 'block'. The way it is calculate in bucketcache is that the total bucketcache size is + * divided by 4 (hard-coded currently) * max size option. So using defaults, buckets will be is 4 * + * 513kb (the biggest default value) = 2052kb. A bucket of 2052kb at offset zero will serve out + * bucket 'blocks' of 5kb, the next bucket will do the next size up and so on up to the maximum + * (default) of 513kb).
+ *

+ * When we write blocks to the bucketcache, we will see which bucket size group it best fits. So a 4 + * KB block size goes to the 5 KB size group. Each of the block writes, writes within its + * appropriate bucket. Though the bucket is '4kb' in size, it will occupy one of the 5 KB bucket + * 'blocks' (even if actual size of the bucket is less). Bucket 'blocks' will not span buckets.
+ *

+ * But you can see the physical memory under the bucket 'blocks' can be split across the underlying + * backing BBs from ByteBufferArray. All is split into 4 MB sized BBs.
+ *

+ * Each Bucket knows its offset in the entire space of BC and when block is written the offset * arrives at ByteBufferArray and it figures which BB to write to. It may so happen that the entire * block to be written does not fit a particular backing ByteBufferArray so the remainder goes to - * another BB. See {@link ByteBufferArray#putMultiple(long, int, byte[])}. - -So said all these, when we read a block it may be possible that the bytes of that blocks is physically placed in 2 adjucent BBs. In such case also, we avoid any copy need by having the MBB... + * another BB. See {@link ByteBufferArray#write(long, ByteBuff)}.
+ * So said all these, when we read a block it may be possible that the bytes of that blocks is + * physically placed in 2 adjucent BBs. In such case also, we avoid any copy need by having the + * MBB... */ @InterfaceAudience.Private public class ByteBufferIOEngine implements IOEngine { @@ -74,15 +72,9 @@ public class ByteBufferIOEngine implements IOEngine { * @param capacity * @throws IOException ideally here no exception to be thrown from the allocator */ - public ByteBufferIOEngine(long capacity) - throws IOException { + public ByteBufferIOEngine(long capacity) throws IOException { this.capacity = capacity; - ByteBufferAllocator allocator = new ByteBufferAllocator() { - @Override - public ByteBuffer allocate(long size) throws IOException { - return ByteBuffer.allocateDirect((int) size); - } - }; + ByteBufferAllocator allocator = (size) -> ByteBuffer.allocateDirect((int) size); bufferArray = new ByteBufferArray(capacity, allocator); } @@ -121,27 +113,29 @@ public class ByteBufferIOEngine implements IOEngine { } /** - * Transfers data from the given byte buffer to the buffer array - * @param srcBuffer the given byte buffer from which bytes are to be read - * @param offset The offset in the ByteBufferArray of the first byte to be - * written + * Transfers data from the given {@link ByteBuffer} to the buffer array. Position of source will + * be advanced by the {@link ByteBuffer#remaining()}. + * @param src the given byte buffer from which bytes are to be read. + * @param offset The offset in the ByteBufferArray of the first byte to be written * @throws IOException throws IOException if writing to the array throws exception */ @Override - public void write(ByteBuffer srcBuffer, long offset) throws IOException { - assert srcBuffer.hasArray(); - bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(), - srcBuffer.arrayOffset()); + public void write(ByteBuffer src, long offset) throws IOException { + bufferArray.write(offset, ByteBuff.wrap(src)); } + /** + * Transfers data from the given {@link ByteBuff} to the buffer array. Position of source will be + * advanced by the {@link ByteBuffer#remaining()}. + * @param src the given byte buffer from which bytes are to be read. + * @param offset The offset in the ByteBufferArray of the first byte to be written + * @throws IOException throws IOException if writing to the array throws exception + */ @Override - public void write(ByteBuff srcBuffer, long offset) throws IOException { - // When caching block into BucketCache there will be single buffer backing for this HFileBlock. - // This will work for now. But from the DFS itself if we get DBB then this may not hold true. - assert srcBuffer.hasArray(); - bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(), - srcBuffer.arrayOffset()); + public void write(ByteBuff src, long offset) throws IOException { + bufferArray.write(offset, src); } + /** * No operation for the sync in the memory IO engine */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java index 8b024f00d22..b8e29c62757 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java @@ -16,16 +16,15 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; + import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; -import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * IO engine that stores data to a file on the local block device using memory mapping @@ -33,7 +32,6 @@ import org.slf4j.LoggerFactory; */ @InterfaceAudience.Private public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine { - static final Logger LOG = LoggerFactory.getLogger(ExclusiveMemoryMmapIOEngine.class); public ExclusiveMemoryMmapIOEngine(String filePath, long capacity) throws IOException { super(filePath, capacity); @@ -42,9 +40,8 @@ public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine { @Override public Cacheable read(long offset, int length, CacheableDeserializer deserializer) throws IOException { - byte[] dst = new byte[length]; - bufferArray.getMultiple(offset, length, dst); - return deserializer.deserialize(new SingleByteBuff(ByteBuffer.wrap(dst)), true, - MemoryType.EXCLUSIVE); + ByteBuff dst = HEAP.allocate(length); + bufferArray.read(offset, dst); + return deserializer.deserialize(dst.position(0).limit(length), true, MemoryType.EXCLUSIVE); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index 0710d26a22e..f6e49cf44ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -143,6 +143,7 @@ public class FileIOEngine implements IOEngine { + " expected"); } } + dstBuffer.rewind(); return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE); } @@ -210,10 +211,8 @@ public class FileIOEngine implements IOEngine { @Override public void write(ByteBuff srcBuffer, long offset) throws IOException { - // When caching block into BucketCache there will be single buffer backing for this HFileBlock. - assert srcBuffer.hasArray(); - write(ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(), - srcBuffer.remaining()), offset); + ByteBuffer dup = srcBuffer.asSubByteBuffer(srcBuffer.remaining()).duplicate(); + write(dup, offset); } private void accessFile(FileAccessor accessor, ByteBuffer buffer, @@ -229,8 +228,7 @@ public class FileIOEngine implements IOEngine { int accessLen = 0; if (endFileNum > accessFileNum) { // short the limit; - buffer.limit((int) (buffer.limit() - remainingAccessDataLen - + sizePerFile - accessOffset)); + buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset)); } try { accessLen = accessor.access(fileChannel, buffer, accessOffset); @@ -307,7 +305,7 @@ public class FileIOEngine implements IOEngine { } } - private static interface FileAccessor { + private interface FileAccessor { int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java index 9580efe35f0..bd17fd52a33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java @@ -112,17 +112,12 @@ public abstract class FileMmapIOEngine implements IOEngine { */ @Override public void write(ByteBuffer srcBuffer, long offset) throws IOException { - assert srcBuffer.hasArray(); - bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(), - srcBuffer.arrayOffset()); + bufferArray.write(offset, ByteBuff.wrap(srcBuffer)); } @Override public void write(ByteBuff srcBuffer, long offset) throws IOException { - // This singleByteBuff can be considered to be array backed - assert srcBuffer.hasArray(); - bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(), - srcBuffer.arrayOffset()); + bufferArray.write(offset, srcBuffer); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java index bb58b4e9d30..a06d86d7639 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -28,6 +26,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -56,12 +55,10 @@ public class TestByteBufferIOEngine { if (blockSize == 0) { blockSize = 1; } - byte[] byteArray = new byte[blockSize]; - for (int j = 0; j < byteArray.length; ++j) { - byteArray[j] = val; - } - ByteBuffer srcBuffer = ByteBuffer.wrap(byteArray); - int offset = 0; + + ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0); + int pos = src.position(), lim = src.limit(); + int offset; if (testOffsetAtStartNum > 0) { testOffsetAtStartNum--; offset = 0; @@ -71,13 +68,16 @@ public class TestByteBufferIOEngine { } else { offset = (int) (Math.random() * (capacity - maxBlockSize)); } - ioEngine.write(srcBuffer, offset); + ioEngine.write(src, offset); + src.position(pos).limit(lim); + BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); ioEngine.read(offset, blockSize, deserializer); - ByteBuff dstBuffer = deserializer.buf; - for (int j = 0; j < byteArray.length; ++j) { - assertTrue(byteArray[j] == dstBuffer.get(j)); - } + ByteBuff dst = deserializer.buf; + Assert.assertEquals(src.remaining(), blockSize); + Assert.assertEquals(dst.remaining(), blockSize); + Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst, + dst.position(), dst.remaining())); } assert testOffsetAtStartNum == 0; assert testOffsetAtEndNum == 0; @@ -112,6 +112,16 @@ public class TestByteBufferIOEngine { } } + static ByteBuff createByteBuffer(int len, int val, boolean useHeap) { + ByteBuffer b = useHeap ? ByteBuffer.allocate(2 * len) : ByteBuffer.allocateDirect(2 * len); + int pos = (int) (Math.random() * len); + b.position(pos).limit(pos + len); + for (int i = pos; i < pos + len; i++) { + b.put(i, (byte) val); + } + return ByteBuff.wrap(b); + } + @Test public void testByteBufferIOEngineWithMBB() throws Exception { int capacity = 32 * 1024 * 1024; // 32 MB @@ -126,12 +136,9 @@ public class TestByteBufferIOEngine { if (blockSize == 0) { blockSize = 1; } - byte[] byteArray = new byte[blockSize]; - for (int j = 0; j < byteArray.length; ++j) { - byteArray[j] = val; - } - ByteBuffer srcBuffer = ByteBuffer.wrap(byteArray); - int offset = 0; + ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0); + int pos = src.position(), lim = src.limit(); + int offset; if (testOffsetAtStartNum > 0) { testOffsetAtStartNum--; offset = 0; @@ -141,13 +148,16 @@ public class TestByteBufferIOEngine { } else { offset = (int) (Math.random() * (capacity - maxBlockSize)); } - ioEngine.write(srcBuffer, offset); + ioEngine.write(src, offset); + src.position(pos).limit(lim); + BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); ioEngine.read(offset, blockSize, deserializer); - ByteBuff dstBuffer = deserializer.buf; - for (int j = 0; j < byteArray.length; ++j) { - assertTrue(srcBuffer.get(j) == dstBuffer.get(j)); - } + ByteBuff dst = deserializer.buf; + Assert.assertEquals(src.remaining(), blockSize); + Assert.assertEquals(dst.remaining(), blockSize); + Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst, + dst.position(), dst.remaining())); } assert testOffsetAtStartNum == 0; assert testOffsetAtEndNum == 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java index d0d8c8a7dda..79d58f0a994 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java @@ -17,16 +17,14 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; -import static org.junit.Assert.assertTrue; - import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -34,7 +32,7 @@ import org.junit.experimental.categories.Category; /** * Basic test for {@link ExclusiveMemoryMmapIOEngine} */ -@Category({IOTests.class, SmallTests.class}) +@Category({ IOTests.class, SmallTests.class }) public class TestExclusiveMemoryMmapEngine { @ClassRule @@ -50,17 +48,23 @@ public class TestExclusiveMemoryMmapEngine { for (int i = 0; i < 50; i++) { int len = (int) Math.floor(Math.random() * 100); long offset = (long) Math.floor(Math.random() * size % (size - len)); - byte[] data1 = new byte[len]; - for (int j = 0; j < data1.length; ++j) { - data1[j] = (byte) (Math.random() * 255); - } - fileMmapEngine.write(ByteBuffer.wrap(data1), offset); + int val = (int) (Math.random() * 255); + + // write + ByteBuff src = TestByteBufferIOEngine.createByteBuffer(len, val, i % 2 == 0); + int pos = src.position(), lim = src.limit(); + fileMmapEngine.write(src, offset); + src.position(pos).limit(lim); + + // read BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); fileMmapEngine.read(offset, len, deserializer); - ByteBuff data2 = deserializer.getDeserializedByteBuff(); - for (int j = 0; j < data1.length; ++j) { - assertTrue(data1[j] == data2.get(j)); - } + ByteBuff dst = deserializer.getDeserializedByteBuff(); + + Assert.assertEquals(src.remaining(), len); + Assert.assertEquals(dst.remaining(), len); + Assert.assertEquals(0, + ByteBuff.compareTo(src, pos, len, dst, dst.position(), dst.remaining())); } } finally { File file = new File(filePath); @@ -68,6 +72,5 @@ public class TestExclusiveMemoryMmapEngine { file.delete(); } } - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java index efb8145d060..6b0d603abcb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -132,15 +133,22 @@ public class TestFileIOEngine { fileIOEngine.closeFileChannels(); int len = 5; long offset = 0L; - byte[] data1 = new byte[len]; - for (int j = 0; j < data1.length; ++j) { - data1[j] = (byte) (Math.random() * 255); + int val = (int) (Math.random() * 255); + for (int i = 0; i < 2; i++) { + ByteBuff src = TestByteBufferIOEngine.createByteBuffer(len, val, i % 2 == 0); + int pos = src.position(), lim = src.limit(); + fileIOEngine.write(src, offset); + src.position(pos).limit(lim); + + BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); + fileIOEngine.read(offset, len, deserializer); + ByteBuff dst = deserializer.getDeserializedByteBuff(); + + Assert.assertEquals(src.remaining(), len); + Assert.assertEquals(dst.remaining(), len); + Assert.assertEquals(0, + ByteBuff.compareTo(src, pos, len, dst, dst.position(), dst.remaining())); } - fileIOEngine.write(ByteBuffer.wrap(data1), offset); - BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); - fileIOEngine.read(offset, len, deserializer); - ByteBuff data2 = deserializer.getDeserializedByteBuff(); - assertArrayEquals(data1, data2.array()); } @Test