HBASE-22159 ByteBufferIOEngine should support write off-heap ByteBuff to the bufferArray

This commit is contained in:
huzheng 2019-04-03 22:29:31 +08:00
parent d1eb6171f9
commit 773c0d6635
11 changed files with 490 additions and 412 deletions

View File

@ -29,7 +29,6 @@ import sun.nio.ch.DirectBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -175,7 +174,7 @@ public class ByteBuffAllocator {
return allocateOnHeap(this.bufSize);
}
private SingleByteBuff allocateOnHeap(int size) {
private static SingleByteBuff allocateOnHeap(int size) {
return new SingleByteBuff(NONE, ByteBuffer.allocate(size));
}
@ -213,7 +212,7 @@ public class ByteBuffAllocator {
// just allocate the ByteBuffer from on-heap.
bbs.add(ByteBuffer.allocate(remain));
}
ByteBuff bb = wrap(bbs, () -> {
ByteBuff bb = ByteBuff.wrap(bbs, () -> {
for (int i = 0; i < lenFromReservoir; i++) {
this.putbackBuffer(bbs.get(i));
}
@ -238,30 +237,6 @@ public class ByteBuffAllocator {
}
}
public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
if (buffers == null || buffers.length == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
}
return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0])
: new MultiByteBuff(recycler, buffers);
}
public static ByteBuff wrap(ByteBuffer[] buffers) {
return wrap(buffers, NONE);
}
public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
if (buffers == null || buffers.size() == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
}
return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0))
: new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0]));
}
public static ByteBuff wrap(List<ByteBuffer> buffers) {
return wrap(buffers, NONE);
}
/**
* @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached
* the maximum pool size, it will create a new one and return. In case of max pool size

View File

@ -20,7 +20,10 @@ package org.apache.hadoop.hbase.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.List;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ObjectIntPair;
@ -557,4 +560,34 @@ public abstract class ByteBuff implements ReferenceCounted {
return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() +
", cap= " + capacity() + "]";
}
/********************************* ByteBuff wrapper methods ***********************************/
public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
if (buffers == null || buffers.length == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
}
return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0])
: new MultiByteBuff(recycler, buffers);
}
public static ByteBuff wrap(ByteBuffer[] buffers) {
return wrap(buffers, ByteBuffAllocator.NONE);
}
public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
if (buffers == null || buffers.size() == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
}
return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0))
: new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0]));
}
public static ByteBuff wrap(List<ByteBuffer> buffers) {
return wrap(buffers, ByteBuffAllocator.NONE);
}
public static ByteBuff wrap(ByteBuffer buffer) {
return new SingleByteBuff(ByteBuffAllocator.NONE, buffer);
}
}

View File

@ -20,15 +20,14 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
@ -38,279 +37,248 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* This class manages an array of ByteBuffers with a default size 4MB. These
* buffers are sequential and could be considered as a large buffer.It supports
* reading/writing data from this large buffer with a position and offset
* This class manages an array of ByteBuffers with a default size 4MB. These buffers are sequential
* and could be considered as a large buffer.It supports reading/writing data from this large buffer
* with a position and offset
*/
@InterfaceAudience.Private
public class ByteBufferArray {
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferArray.class);
public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
@VisibleForTesting
ByteBuffer buffers[];
private int bufferSize;
@VisibleForTesting
int bufferCount;
private final int bufferSize;
private final int bufferCount;
final ByteBuffer[] buffers;
/**
* We allocate a number of byte buffers as the capacity. In order not to out
* of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
* we will allocate one additional buffer with capacity 0;
* We allocate a number of byte buffers as the capacity.
* @param capacity total size of the byte buffer array
* @param allocator the ByteBufferAllocator that will create the buffers
* @throws IOException throws IOException if there is an exception thrown by the allocator
*/
public ByteBufferArray(long capacity, ByteBufferAllocator allocator)
throws IOException {
this.bufferSize = DEFAULT_BUFFER_SIZE;
if (this.bufferSize > (capacity / 16))
this.bufferSize = (int) roundUp(capacity / 16, 32768);
this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
+ ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
+ bufferCount);
buffers = new ByteBuffer[bufferCount + 1];
createBuffers(allocator);
public ByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException {
this(getBufferSize(capacity), getBufferCount(capacity),
Runtime.getRuntime().availableProcessors(), capacity, allocator);
}
@VisibleForTesting
void createBuffers(ByteBufferAllocator allocator)
throws IOException {
int threadCount = getThreadCount();
ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
int perThreadCount = (int)Math.floor((double) (bufferCount) / threadCount);
int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1));
Future<ByteBuffer[]>[] futures = new Future[threadCount];
ByteBufferArray(int bufferSize, int bufferCount, int threadCount, long capacity,
ByteBufferAllocator alloc) throws IOException {
this.bufferSize = bufferSize;
this.bufferCount = bufferCount;
LOG.info("Allocating buffers total={}, sizePerBuffer={}, count={}",
StringUtils.byteDesc(capacity), StringUtils.byteDesc(bufferSize), bufferCount);
this.buffers = new ByteBuffer[bufferCount];
createBuffers(threadCount, alloc);
}
private void createBuffers(int threadCount, ByteBufferAllocator alloc) throws IOException {
ExecutorService pool = Executors.newFixedThreadPool(threadCount);
int perThreadCount = bufferCount / threadCount;
int reminder = bufferCount % threadCount;
try {
List<Future<ByteBuffer[]>> futures = new ArrayList<>(threadCount);
// Dispatch the creation task to each thread.
for (int i = 0; i < threadCount; i++) {
// Last thread will have to deal with a different number of buffers
int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount;
futures[i] = service.submit(
new BufferCreatorCallable(bufferSize, buffersToCreate, allocator));
}
int bufferIndex = 0;
for (Future<ByteBuffer[]> future : futures) {
try {
ByteBuffer[] buffers = future.get();
for (ByteBuffer buffer : buffers) {
this.buffers[bufferIndex++] = buffer;
final int chunkSize = perThreadCount + ((i == threadCount - 1) ? reminder : 0);
futures.add(pool.submit(() -> {
ByteBuffer[] chunk = new ByteBuffer[chunkSize];
for (int k = 0; k < chunkSize; k++) {
chunk[k] = alloc.allocate(bufferSize);
}
return chunk;
}));
}
// Append the buffers created by each thread.
int bufferIndex = 0;
try {
for (Future<ByteBuffer[]> f : futures) {
for (ByteBuffer b : f.get()) {
this.buffers[bufferIndex++] = b;
}
} catch (InterruptedException | ExecutionException e) {
LOG.error("Buffer creation interrupted", e);
throw new IOException(e);
}
assert bufferIndex == bufferCount;
} catch (Exception e) {
LOG.error("Buffer creation interrupted", e);
throw new IOException(e);
}
} finally {
service.shutdownNow();
pool.shutdownNow();
}
// always create on heap empty dummy buffer at last
this.buffers[bufferCount] = ByteBuffer.allocate(0);
}
@VisibleForTesting
int getThreadCount() {
return Runtime.getRuntime().availableProcessors();
static int getBufferSize(long capacity) {
int bufferSize = DEFAULT_BUFFER_SIZE;
if (bufferSize > (capacity / 16)) {
bufferSize = (int) roundUp(capacity / 16, 32768);
}
return bufferSize;
}
/**
* A callable that creates buffers of the specified length either onheap/offheap using the
* {@link ByteBufferAllocator}
*/
private static class BufferCreatorCallable implements Callable<ByteBuffer[]> {
private final int bufferCapacity;
private final int bufferCount;
private final ByteBufferAllocator allocator;
BufferCreatorCallable(int bufferCapacity, int bufferCount, ByteBufferAllocator allocator) {
this.bufferCapacity = bufferCapacity;
this.bufferCount = bufferCount;
this.allocator = allocator;
}
@Override
public ByteBuffer[] call() throws Exception {
ByteBuffer[] buffers = new ByteBuffer[this.bufferCount];
for (int i = 0; i < this.bufferCount; i++) {
buffers[i] = allocator.allocate(this.bufferCapacity);
}
return buffers;
}
private static int getBufferCount(long capacity) {
int bufferSize = getBufferSize(capacity);
return (int) (roundUp(capacity, bufferSize) / bufferSize);
}
private long roundUp(long n, long to) {
private static long roundUp(long n, long to) {
return ((n + to - 1) / to) * to;
}
/**
* Transfers bytes from this buffer array into the given destination array
* @param start start position in the ByteBufferArray
* @param len The maximum number of bytes to be written to the given array
* @param dstArray The array into which bytes are to be written
* Transfers bytes from this buffers array into the given destination {@link ByteBuff}
* @param offset start position in this big logical array.
* @param dst the destination ByteBuff. Notice that its position will be advanced.
* @return number of bytes read
*/
public int getMultiple(long start, int len, byte[] dstArray) {
return getMultiple(start, len, dstArray, 0);
public int read(long offset, ByteBuff dst) {
return internalTransfer(offset, dst, READER);
}
/**
* Transfers bytes from this buffer array into the given destination array
* @param start start offset of this buffer array
* @param len The maximum number of bytes to be written to the given array
* @param dstArray The array into which bytes are to be written
* @param dstOffset The offset within the given array of the first byte to be
* written
* @return number of bytes read
* Transfers bytes from the given source {@link ByteBuff} into this buffer array
* @param offset start offset of this big logical array.
* @param src the source ByteBuff. Notice that its position will be advanced.
* @return number of bytes write
*/
public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
multiple(start, len, dstArray, dstOffset, GET_MULTIPLE_VISTOR);
return len;
public int write(long offset, ByteBuff src) {
return internalTransfer(offset, src, WRITER);
}
private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() {
@Override
public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
ByteBufferUtils.copyFromBufferToArray(array, bb, pos, arrayIdx, len);
}
/**
* Transfer bytes from source {@link ByteBuff} to destination {@link ByteBuffer}. Position of both
* source and destination will be advanced.
*/
private static final BiConsumer<ByteBuffer, ByteBuff> WRITER = (dst, src) -> {
int off = src.position(), len = dst.remaining();
src.get(dst, off, len);
src.position(off + len);
};
/**
* Transfers bytes from the given source array into this buffer array
* @param start start offset of this buffer array
* @param len The maximum number of bytes to be read from the given array
* @param srcArray The array from which bytes are to be read
* Transfer bytes from source {@link ByteBuffer} to destination {@link ByteBuff}, Position of both
* source and destination will be advanced.
*/
public void putMultiple(long start, int len, byte[] srcArray) {
putMultiple(start, len, srcArray, 0);
}
/**
* Transfers bytes from the given source array into this buffer array
* @param start start offset of this buffer array
* @param len The maximum number of bytes to be read from the given array
* @param srcArray The array from which bytes are to be read
* @param srcOffset The offset within the given array of the first byte to be
* read
*/
public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
multiple(start, len, srcArray, srcOffset, PUT_MULTIPLE_VISITOR);
}
private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() {
@Override
public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
ByteBufferUtils.copyFromArrayToBuffer(bb, pos, array, arrayIdx, len);
}
private static final BiConsumer<ByteBuffer, ByteBuff> READER = (src, dst) -> {
int off = dst.position(), len = src.remaining(), srcOff = src.position();
dst.put(off, ByteBuff.wrap(src), srcOff, len);
src.position(srcOff + len);
dst.position(off + len);
};
private interface Visitor {
/**
* Visit the given byte buffer, if it is a read action, we will transfer the
* bytes from the buffer to the destination array, else if it is a write
* action, we will transfer the bytes from the source array to the buffer
* @param bb byte buffer
* @param pos Start position in ByteBuffer
* @param array a source or destination byte array
* @param arrayOffset offset of the byte array
* @param len read/write length
*/
void visit(ByteBuffer bb, int pos, byte[] array, int arrayOffset, int len);
}
/**
* Access(read or write) this buffer array with a position and length as the
* given array. Here we will only lock one buffer even if it may be need visit
* several buffers. The consistency is guaranteed by the caller.
* @param start start offset of this buffer array
* @param len The maximum number of bytes to be accessed
* @param array The array from/to which bytes are to be read/written
* @param arrayOffset The offset within the given array of the first byte to
* be read or written
* @param visitor implement of how to visit the byte buffer
* Transferring all remaining bytes from b to the buffers array starting at offset, or
* transferring bytes from the buffers array at offset to b until b is filled. Notice that
* position of ByteBuff b will be advanced.
* @param offset where we start in the big logical array.
* @param b the ByteBuff to transfer from or to
* @param transfer the transfer interface.
* @return the length of bytes we transferred.
*/
void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
assert len >= 0;
long end = start + len;
int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
assert array.length >= len + arrayOffset;
assert startBuffer >= 0 && startBuffer < bufferCount;
assert (endBuffer >= 0 && endBuffer < bufferCount)
|| (endBuffer == bufferCount && endOffset == 0);
if (startBuffer >= buffers.length || startBuffer < 0) {
String msg = "Failed multiple, start=" + start + ",startBuffer="
+ startBuffer + ",bufferSize=" + bufferSize;
LOG.error(msg);
throw new RuntimeException(msg);
private int internalTransfer(long offset, ByteBuff b, BiConsumer<ByteBuffer, ByteBuff> transfer) {
int expectedTransferLen = b.remaining();
if (expectedTransferLen == 0) {
return 0;
}
int srcIndex = 0, cnt = -1;
for (int i = startBuffer; i <= endBuffer; ++i) {
ByteBuffer bb = buffers[i].duplicate();
int pos = 0;
if (i == startBuffer) {
cnt = bufferSize - startOffset;
if (cnt > len) cnt = len;
pos = startOffset;
} else if (i == endBuffer) {
cnt = endOffset;
} else {
cnt = bufferSize;
}
visitor.visit(bb, pos, array, srcIndex + arrayOffset, cnt);
srcIndex += cnt;
BufferIterator it = new BufferIterator(offset, expectedTransferLen);
while (it.hasNext()) {
ByteBuffer a = it.next();
transfer.accept(a, b);
assert !a.hasRemaining();
}
assert srcIndex == len;
assert expectedTransferLen == it.getSum() : "Expected transfer length (=" + expectedTransferLen
+ ") don't match the actual transfer length(=" + it.getSum() + ")";
return expectedTransferLen;
}
/**
* Creates a ByteBuff from a given array of ByteBuffers from the given offset to the
* length specified. For eg, if there are 4 buffers forming an array each with length 10 and
* if we call asSubBuffer(5, 10) then we will create an MBB consisting of two BBs
* and the first one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from
* 'position' 0 to 'length' 5.
* @param offset
* @param len
* Creates a ByteBuff from a given array of ByteBuffers from the given offset to the length
* specified. For eg, if there are 4 buffers forming an array each with length 10 and if we call
* asSubBuffer(5, 10) then we will create an MBB consisting of two BBs and the first one be a BB
* from 'position' 5 to a 'length' 5 and the 2nd BB will be from 'position' 0 to 'length' 5.
* @param offset the position in the whole array which is composited by multiple byte buffers.
* @param len the length of bytes
* @return a ByteBuff formed from the underlying ByteBuffers
*/
public ByteBuff asSubByteBuff(long offset, int len) {
assert len >= 0;
long end = offset + len;
int startBuffer = (int) (offset / bufferSize), startBufferOffset = (int) (offset % bufferSize);
int endBuffer = (int) (end / bufferSize), endBufferOffset = (int) (end % bufferSize);
// Last buffer in the array is a dummy one with 0 capacity. Avoid sending back that
if (endBuffer == this.bufferCount) {
endBuffer--;
endBufferOffset = bufferSize;
public ByteBuff asSubByteBuff(long offset, final int len) {
BufferIterator it = new BufferIterator(offset, len);
ByteBuffer[] mbb = new ByteBuffer[it.getBufferCount()];
for (int i = 0; i < mbb.length; i++) {
assert it.hasNext();
mbb[i] = it.next();
}
assert startBuffer >= 0 && startBuffer < bufferCount;
assert (endBuffer >= 0 && endBuffer < bufferCount)
|| (endBuffer == bufferCount && endBufferOffset == 0);
if (startBuffer >= buffers.length || startBuffer < 0) {
String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer
+ ",bufferSize=" + bufferSize;
LOG.error(msg);
throw new RuntimeException(msg);
assert it.getSum() == len;
return ByteBuff.wrap(mbb);
}
/**
* Iterator to fetch ByteBuffers from offset with given length in this big logical array.
*/
private class BufferIterator implements Iterator<ByteBuffer> {
private final int len;
private int startBuffer, startOffset, endBuffer, endOffset;
private int curIndex, sum = 0;
private int index(long pos) {
return (int) (pos / bufferSize);
}
int srcIndex = 0, cnt = -1;
ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1];
for (int i = startBuffer, j = 0; i <= endBuffer; ++i, j++) {
ByteBuffer bb = buffers[i].duplicate();
if (i == startBuffer) {
cnt = bufferSize - startBufferOffset;
if (cnt > len) cnt = len;
bb.limit(startBufferOffset + cnt).position(startBufferOffset);
} else if (i == endBuffer) {
cnt = endBufferOffset;
bb.position(0).limit(cnt);
} else {
cnt = bufferSize;
bb.position(0).limit(cnt);
private int offset(long pos) {
return (int) (pos % bufferSize);
}
public BufferIterator(long offset, int len) {
assert len >= 0 && offset >= 0;
this.len = len;
this.startBuffer = index(offset);
this.startOffset = offset(offset);
this.endBuffer = index(offset + len);
this.endOffset = offset(offset + len);
if (startBuffer < endBuffer && endOffset == 0) {
endBuffer--;
endOffset = bufferSize;
}
mbb[j] = bb.slice();
srcIndex += cnt;
assert startBuffer >= 0 && startBuffer < bufferCount;
assert endBuffer >= 0 && endBuffer < bufferCount;
// initialize the index to the first buffer index.
this.curIndex = startBuffer;
}
@Override
public boolean hasNext() {
return this.curIndex <= endBuffer;
}
/**
* The returned ByteBuffer is an sliced one, it won't affect the position or limit of the
* original one.
*/
@Override
public ByteBuffer next() {
ByteBuffer bb = buffers[curIndex].duplicate();
if (curIndex == startBuffer) {
bb.position(startOffset).limit(Math.min(bufferSize, startOffset + len));
} else if (curIndex == endBuffer) {
bb.position(0).limit(endOffset);
} else {
bb.position(0).limit(bufferSize);
}
curIndex++;
sum += bb.remaining();
// Make sure that its pos is zero, it's important because MBB will count from zero for all nio
// ByteBuffers.
return bb.slice();
}
int getSum() {
return sum;
}
int getBufferCount() {
return this.endBuffer - this.startBuffer + 1;
}
assert srcIndex == len;
return ByteBuffAllocator.wrap(mbb);
}
}

View File

@ -20,34 +20,37 @@ package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({MiscTests.class, SmallTests.class})
@Category({ MiscTests.class, SmallTests.class })
public class TestByteBufferArray {
private static final Random RANDOM = new Random(System.currentTimeMillis());
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestByteBufferArray.class);
private static final ByteBufferAllocator ALLOC = (size) -> ByteBuffer.allocateDirect((int) size);
@Test
public void testAsSubBufferWhenEndOffsetLandInLastBuffer() throws Exception {
int capacity = 4 * 1024 * 1024;
ByteBufferAllocator allocator = new ByteBufferAllocator() {
@Override
public ByteBuffer allocate(long size) throws IOException {
return ByteBuffer.allocateDirect((int) size);
}
};
ByteBufferArray array = new ByteBufferArray(capacity, allocator);
ByteBufferArray array = new ByteBufferArray(capacity, ALLOC);
ByteBuff subBuf = array.asSubByteBuff(0, capacity);
subBuf.position(capacity - 1);// Position to the last byte
assertTrue(subBuf.hasRemaining());
@ -59,54 +62,148 @@ public class TestByteBufferArray {
@Test
public void testByteBufferCreation() throws Exception {
int capacity = 470 * 1021 * 1023;
ByteBufferAllocator allocator = new ByteBufferAllocator() {
@Override
public ByteBuffer allocate(long size) throws IOException {
return ByteBuffer.allocateDirect((int) size);
}
};
ByteBufferArray array = new ByteBufferArray(capacity, allocator);
assertEquals(119, array.buffers.length);
ByteBufferArray array = new ByteBufferArray(capacity, ALLOC);
assertEquals(118, array.buffers.length);
for (int i = 0; i < array.buffers.length; i++) {
if (i == array.buffers.length - 1) {
assertEquals(0, array.buffers[i].capacity());
} else {
assertEquals(ByteBufferArray.DEFAULT_BUFFER_SIZE, array.buffers[i].capacity());
}
assertEquals(ByteBufferArray.DEFAULT_BUFFER_SIZE, array.buffers[i].capacity());
}
}
@Test
public void testByteBufferCreation1() throws Exception {
ByteBufferAllocator allocator = new ByteBufferAllocator() {
@Override
public ByteBuffer allocate(long size) throws IOException {
return ByteBuffer.allocateDirect((int) size);
}
};
ByteBufferArray array = new DummyByteBufferArray(7 * 1024 * 1024, allocator);
// overwrite
array.bufferCount = 25;
array.buffers = new ByteBuffer[array.bufferCount + 1];
array.createBuffers(allocator);
long cap = 7 * 1024L * 1024L;
int bufferSize = ByteBufferArray.getBufferSize(cap), bufferCount = 25;
ByteBufferArray array = new ByteBufferArray(bufferSize, bufferCount, 16, cap, ALLOC);
for (int i = 0; i < array.buffers.length; i++) {
if (i == array.buffers.length - 1) {
assertEquals(0, array.buffers[i].capacity());
} else {
assertEquals(458752, array.buffers[i].capacity());
}
assertEquals(458752, array.buffers[i].capacity());
}
}
private static class DummyByteBufferArray extends ByteBufferArray {
private static void fill(ByteBuff buf, byte val) {
for (int i = buf.position(); i < buf.limit(); i++) {
buf.put(i, val);
}
}
public DummyByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException {
super(capacity, allocator);
private ByteBuff createByteBuff(int len) {
assert len >= 0;
int pos = len == 0 ? 0 : RANDOM.nextInt(len);
ByteBuff b = ByteBuff.wrap(ByteBuffer.allocate(2 * len));
b.position(pos).limit(pos + len);
return b;
}
private interface Call {
void run() throws IOException;
}
private void expectedAssert(Call r) throws IOException {
try {
r.run();
fail();
} catch (AssertionError e) {
// Ignore
}
}
@Test
public void testArrayIO() throws IOException {
int cap = 9 * 1024 * 1024, bufferSize = ByteBufferArray.getBufferSize(cap);
ByteBufferArray array = new ByteBufferArray(cap, ALLOC);
testReadAndWrite(array, 0, 512, (byte) 2);
testReadAndWrite(array, cap - 512, 512, (byte) 3);
testReadAndWrite(array, 4 * 1024 * 1024, 5 * 1024 * 1024, (byte) 4);
testReadAndWrite(array, 256, 256, (byte) 5);
testReadAndWrite(array, 257, 513, (byte) 6);
testReadAndWrite(array, 0, cap, (byte) 7);
testReadAndWrite(array, cap, 0, (byte) 8);
testReadAndWrite(array, cap - 1, 1, (byte) 9);
testReadAndWrite(array, cap - 2, 2, (byte) 10);
expectedAssert(() -> testReadAndWrite(array, cap - 2, 3, (byte) 11));
expectedAssert(() -> testReadAndWrite(array, cap + 1, 0, (byte) 12));
expectedAssert(() -> testReadAndWrite(array, 0, cap + 1, (byte) 12));
expectedAssert(() -> testReadAndWrite(array, -1, 0, (byte) 13));
expectedAssert(() -> testReadAndWrite(array, 0, -23, (byte) 14));
expectedAssert(() -> testReadAndWrite(array, 0, 0, (byte) 15));
expectedAssert(() -> testReadAndWrite(array, 4096, cap - 4096 + 1, (byte) 16));
testAsSubByteBuff(array, 0, cap, true);
testAsSubByteBuff(array, 0, 0, false);
testAsSubByteBuff(array, 0, 1, false);
testAsSubByteBuff(array, 0, bufferSize - 1, false);
testAsSubByteBuff(array, 0, bufferSize, false);
testAsSubByteBuff(array, 0, bufferSize + 1, true);
testAsSubByteBuff(array, 0, 2 * bufferSize, true);
testAsSubByteBuff(array, 0, 5 * bufferSize, true);
testAsSubByteBuff(array, cap - bufferSize - 1, bufferSize, true);
testAsSubByteBuff(array, cap - bufferSize, bufferSize, false);
testAsSubByteBuff(array, cap - bufferSize, 0, false);
testAsSubByteBuff(array, cap - bufferSize, 1, false);
testAsSubByteBuff(array, cap - bufferSize, bufferSize - 1, false);
testAsSubByteBuff(array, cap - 2 * bufferSize, 2 * bufferSize, true);
testAsSubByteBuff(array, cap - 2 * bufferSize, bufferSize + 1, true);
testAsSubByteBuff(array, cap - 2 * bufferSize, bufferSize - 1, false);
testAsSubByteBuff(array, cap - 2 * bufferSize, 0, false);
expectedAssert(() -> testAsSubByteBuff(array, 0, cap + 1, false));
expectedAssert(() -> testAsSubByteBuff(array, 0, -1, false));
expectedAssert(() -> testAsSubByteBuff(array, -1, -1, false));
expectedAssert(() -> testAsSubByteBuff(array, cap - bufferSize, bufferSize + 1, false));
expectedAssert(() -> testAsSubByteBuff(array, 2 * bufferSize, cap - 2 * bufferSize + 1, false));
}
private void testReadAndWrite(ByteBufferArray array, int off, int dataSize, byte val) {
ByteBuff src = createByteBuff(dataSize);
int pos = src.position(), lim = src.limit();
fill(src, val);
assertEquals(src.remaining(), dataSize);
try {
assertEquals(dataSize, array.write(off, src));
assertEquals(0, src.remaining());
} finally {
src.position(pos).limit(lim);
}
@Override
int getThreadCount() {
return 16;
ByteBuff dst = createByteBuff(dataSize);
pos = dst.position();
lim = dst.limit();
try {
assertEquals(dataSize, array.read(off, dst));
assertEquals(0, dst.remaining());
} finally {
dst.position(pos).limit(lim);
}
assertByteBuffEquals(src, dst);
}
private void testAsSubByteBuff(ByteBufferArray array, int off, int len, boolean isMulti) {
ByteBuff ret = array.asSubByteBuff(off, len);
if (isMulti) {
assertTrue(ret instanceof MultiByteBuff);
} else {
assertTrue(ret instanceof SingleByteBuff);
}
assertTrue(!ret.hasArray());
assertEquals(len, ret.remaining());
ByteBuff tmp = createByteBuff(len);
int pos = tmp.position(), lim = tmp.limit();
try {
assertEquals(len, array.read(off, tmp));
assertEquals(0, tmp.remaining());
} finally {
tmp.position(pos).limit(lim);
}
assertByteBuffEquals(ret, tmp);
}
private void assertByteBuffEquals(ByteBuff a, ByteBuff b) {
assertEquals(a.remaining(), b.remaining());
for (int i = a.position(), j = b.position(); i < a.limit(); i++, j++) {
assertEquals(a.get(i), b.get(j));
}
}
}

View File

@ -30,39 +30,37 @@ import org.apache.hadoop.hbase.util.ByteBufferAllocator;
import org.apache.hadoop.hbase.util.ByteBufferArray;
/**
* IO engine that stores data in memory using an array of ByteBuffers
* {@link ByteBufferArray}.
*
*<h2>How it Works</h2>
* First, see {@link ByteBufferArray} and how it gives a view across multiple ByteBuffers managed
* by it internally. This class does the physical BB create and the write and read to the
* underlying BBs. So we will create N BBs based on the total BC capacity specified on create
* of the ByteBufferArray. So say we have 10 GB of off heap BucketCache, we will create 2560 such
* BBs inside our ByteBufferArray.
*
* <p>Now the way BucketCache works is that the entire 10 GB is split into diff sized buckets: by
* default from 5 KB to 513 KB. Within each bucket of a particular size, there are
* usually more than one bucket 'block'. The way it is calculate in bucketcache is that the total
* bucketcache size is divided by 4 (hard-coded currently) * max size option. So using defaults,
* buckets will be is 4 * 513kb (the biggest default value) = 2052kb. A bucket of 2052kb at offset
* zero will serve out bucket 'blocks' of 5kb, the next bucket will do the next size up and so on
* up to the maximum (default) of 513kb).
*
* <p>When we write blocks to the bucketcache, we will see which bucket size group it best fits.
* So a 4 KB block size goes to the 5 KB size group. Each of the block writes, writes within its
* appropriate bucket. Though the bucket is '4kb' in size, it will occupy one of the
* 5 KB bucket 'blocks' (even if actual size of the bucket is less). Bucket 'blocks' will not span
* buckets.
*
* <p>But you can see the physical memory under the bucket 'blocks' can be split across the
* underlying backing BBs from ByteBufferArray. All is split into 4 MB sized BBs.
*
* <p>Each Bucket knows its offset in the entire space of BC and when block is written the offset
* IO engine that stores data in memory using an array of ByteBuffers {@link ByteBufferArray}.
* <p>
* <h2>How it Works</h2> First, see {@link ByteBufferArray} and how it gives a view across multiple
* ByteBuffers managed by it internally. This class does the physical BB create and the write and
* read to the underlying BBs. So we will create N BBs based on the total BC capacity specified on
* create of the ByteBufferArray. So say we have 10 GB of off heap BucketCache, we will create 2560
* such BBs inside our ByteBufferArray. <br>
* <p>
* Now the way BucketCache works is that the entire 10 GB is split into diff sized buckets: by
* default from 5 KB to 513 KB. Within each bucket of a particular size, there are usually more than
* one bucket 'block'. The way it is calculate in bucketcache is that the total bucketcache size is
* divided by 4 (hard-coded currently) * max size option. So using defaults, buckets will be is 4 *
* 513kb (the biggest default value) = 2052kb. A bucket of 2052kb at offset zero will serve out
* bucket 'blocks' of 5kb, the next bucket will do the next size up and so on up to the maximum
* (default) of 513kb). <br>
* <p>
* When we write blocks to the bucketcache, we will see which bucket size group it best fits. So a 4
* KB block size goes to the 5 KB size group. Each of the block writes, writes within its
* appropriate bucket. Though the bucket is '4kb' in size, it will occupy one of the 5 KB bucket
* 'blocks' (even if actual size of the bucket is less). Bucket 'blocks' will not span buckets. <br>
* <p>
* But you can see the physical memory under the bucket 'blocks' can be split across the underlying
* backing BBs from ByteBufferArray. All is split into 4 MB sized BBs. <br>
* <p>
* Each Bucket knows its offset in the entire space of BC and when block is written the offset
* arrives at ByteBufferArray and it figures which BB to write to. It may so happen that the entire
* block to be written does not fit a particular backing ByteBufferArray so the remainder goes to
* another BB. See {@link ByteBufferArray#putMultiple(long, int, byte[])}.
So said all these, when we read a block it may be possible that the bytes of that blocks is physically placed in 2 adjucent BBs. In such case also, we avoid any copy need by having the MBB...
* another BB. See {@link ByteBufferArray#write(long, ByteBuff)}. <br>
* So said all these, when we read a block it may be possible that the bytes of that blocks is
* physically placed in 2 adjucent BBs. In such case also, we avoid any copy need by having the
* MBB...
*/
@InterfaceAudience.Private
public class ByteBufferIOEngine implements IOEngine {
@ -74,15 +72,9 @@ public class ByteBufferIOEngine implements IOEngine {
* @param capacity
* @throws IOException ideally here no exception to be thrown from the allocator
*/
public ByteBufferIOEngine(long capacity)
throws IOException {
public ByteBufferIOEngine(long capacity) throws IOException {
this.capacity = capacity;
ByteBufferAllocator allocator = new ByteBufferAllocator() {
@Override
public ByteBuffer allocate(long size) throws IOException {
return ByteBuffer.allocateDirect((int) size);
}
};
ByteBufferAllocator allocator = (size) -> ByteBuffer.allocateDirect((int) size);
bufferArray = new ByteBufferArray(capacity, allocator);
}
@ -121,27 +113,29 @@ public class ByteBufferIOEngine implements IOEngine {
}
/**
* Transfers data from the given byte buffer to the buffer array
* @param srcBuffer the given byte buffer from which bytes are to be read
* @param offset The offset in the ByteBufferArray of the first byte to be
* written
* Transfers data from the given {@link ByteBuffer} to the buffer array. Position of source will
* be advanced by the {@link ByteBuffer#remaining()}.
* @param src the given byte buffer from which bytes are to be read.
* @param offset The offset in the ByteBufferArray of the first byte to be written
* @throws IOException throws IOException if writing to the array throws exception
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
assert srcBuffer.hasArray();
bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
srcBuffer.arrayOffset());
public void write(ByteBuffer src, long offset) throws IOException {
bufferArray.write(offset, ByteBuff.wrap(src));
}
/**
* Transfers data from the given {@link ByteBuff} to the buffer array. Position of source will be
* advanced by the {@link ByteBuffer#remaining()}.
* @param src the given byte buffer from which bytes are to be read.
* @param offset The offset in the ByteBufferArray of the first byte to be written
* @throws IOException throws IOException if writing to the array throws exception
*/
@Override
public void write(ByteBuff srcBuffer, long offset) throws IOException {
// When caching block into BucketCache there will be single buffer backing for this HFileBlock.
// This will work for now. But from the DFS itself if we get DBB then this may not hold true.
assert srcBuffer.hasArray();
bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
srcBuffer.arrayOffset());
public void write(ByteBuff src, long offset) throws IOException {
bufferArray.write(offset, src);
}
/**
* No operation for the sync in the memory IO engine
*/

View File

@ -16,16 +16,15 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* IO engine that stores data to a file on the local block device using memory mapping
@ -33,7 +32,6 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Private
public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
static final Logger LOG = LoggerFactory.getLogger(ExclusiveMemoryMmapIOEngine.class);
public ExclusiveMemoryMmapIOEngine(String filePath, long capacity) throws IOException {
super(filePath, capacity);
@ -42,9 +40,8 @@ public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
@Override
public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
throws IOException {
byte[] dst = new byte[length];
bufferArray.getMultiple(offset, length, dst);
return deserializer.deserialize(new SingleByteBuff(ByteBuffer.wrap(dst)), true,
MemoryType.EXCLUSIVE);
ByteBuff dst = HEAP.allocate(length);
bufferArray.read(offset, dst);
return deserializer.deserialize(dst.position(0).limit(length), true, MemoryType.EXCLUSIVE);
}
}

View File

@ -143,6 +143,7 @@ public class FileIOEngine implements IOEngine {
+ " expected");
}
}
dstBuffer.rewind();
return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE);
}
@ -210,10 +211,8 @@ public class FileIOEngine implements IOEngine {
@Override
public void write(ByteBuff srcBuffer, long offset) throws IOException {
// When caching block into BucketCache there will be single buffer backing for this HFileBlock.
assert srcBuffer.hasArray();
write(ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(),
srcBuffer.remaining()), offset);
ByteBuffer dup = srcBuffer.asSubByteBuffer(srcBuffer.remaining()).duplicate();
write(dup, offset);
}
private void accessFile(FileAccessor accessor, ByteBuffer buffer,
@ -229,8 +228,7 @@ public class FileIOEngine implements IOEngine {
int accessLen = 0;
if (endFileNum > accessFileNum) {
// short the limit;
buffer.limit((int) (buffer.limit() - remainingAccessDataLen
+ sizePerFile - accessOffset));
buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
}
try {
accessLen = accessor.access(fileChannel, buffer, accessOffset);
@ -307,7 +305,7 @@ public class FileIOEngine implements IOEngine {
}
}
private static interface FileAccessor {
private interface FileAccessor {
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
throws IOException;
}

View File

@ -112,17 +112,12 @@ public abstract class FileMmapIOEngine implements IOEngine {
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
assert srcBuffer.hasArray();
bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
srcBuffer.arrayOffset());
bufferArray.write(offset, ByteBuff.wrap(srcBuffer));
}
@Override
public void write(ByteBuff srcBuffer, long offset) throws IOException {
// This singleByteBuff can be considered to be array backed
assert srcBuffer.hasArray();
bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
srcBuffer.arrayOffset());
bufferArray.write(offset, srcBuffer);
}
/**

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -28,6 +26,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -56,12 +55,10 @@ public class TestByteBufferIOEngine {
if (blockSize == 0) {
blockSize = 1;
}
byte[] byteArray = new byte[blockSize];
for (int j = 0; j < byteArray.length; ++j) {
byteArray[j] = val;
}
ByteBuffer srcBuffer = ByteBuffer.wrap(byteArray);
int offset = 0;
ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0);
int pos = src.position(), lim = src.limit();
int offset;
if (testOffsetAtStartNum > 0) {
testOffsetAtStartNum--;
offset = 0;
@ -71,13 +68,16 @@ public class TestByteBufferIOEngine {
} else {
offset = (int) (Math.random() * (capacity - maxBlockSize));
}
ioEngine.write(srcBuffer, offset);
ioEngine.write(src, offset);
src.position(pos).limit(lim);
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
ioEngine.read(offset, blockSize, deserializer);
ByteBuff dstBuffer = deserializer.buf;
for (int j = 0; j < byteArray.length; ++j) {
assertTrue(byteArray[j] == dstBuffer.get(j));
}
ByteBuff dst = deserializer.buf;
Assert.assertEquals(src.remaining(), blockSize);
Assert.assertEquals(dst.remaining(), blockSize);
Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,
dst.position(), dst.remaining()));
}
assert testOffsetAtStartNum == 0;
assert testOffsetAtEndNum == 0;
@ -112,6 +112,16 @@ public class TestByteBufferIOEngine {
}
}
static ByteBuff createByteBuffer(int len, int val, boolean useHeap) {
ByteBuffer b = useHeap ? ByteBuffer.allocate(2 * len) : ByteBuffer.allocateDirect(2 * len);
int pos = (int) (Math.random() * len);
b.position(pos).limit(pos + len);
for (int i = pos; i < pos + len; i++) {
b.put(i, (byte) val);
}
return ByteBuff.wrap(b);
}
@Test
public void testByteBufferIOEngineWithMBB() throws Exception {
int capacity = 32 * 1024 * 1024; // 32 MB
@ -126,12 +136,9 @@ public class TestByteBufferIOEngine {
if (blockSize == 0) {
blockSize = 1;
}
byte[] byteArray = new byte[blockSize];
for (int j = 0; j < byteArray.length; ++j) {
byteArray[j] = val;
}
ByteBuffer srcBuffer = ByteBuffer.wrap(byteArray);
int offset = 0;
ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0);
int pos = src.position(), lim = src.limit();
int offset;
if (testOffsetAtStartNum > 0) {
testOffsetAtStartNum--;
offset = 0;
@ -141,13 +148,16 @@ public class TestByteBufferIOEngine {
} else {
offset = (int) (Math.random() * (capacity - maxBlockSize));
}
ioEngine.write(srcBuffer, offset);
ioEngine.write(src, offset);
src.position(pos).limit(lim);
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
ioEngine.read(offset, blockSize, deserializer);
ByteBuff dstBuffer = deserializer.buf;
for (int j = 0; j < byteArray.length; ++j) {
assertTrue(srcBuffer.get(j) == dstBuffer.get(j));
}
ByteBuff dst = deserializer.buf;
Assert.assertEquals(src.remaining(), blockSize);
Assert.assertEquals(dst.remaining(), blockSize);
Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,
dst.position(), dst.remaining()));
}
assert testOffsetAtStartNum == 0;
assert testOffsetAtEndNum == 0;

View File

@ -17,16 +17,14 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -34,7 +32,7 @@ import org.junit.experimental.categories.Category;
/**
* Basic test for {@link ExclusiveMemoryMmapIOEngine}
*/
@Category({IOTests.class, SmallTests.class})
@Category({ IOTests.class, SmallTests.class })
public class TestExclusiveMemoryMmapEngine {
@ClassRule
@ -50,17 +48,23 @@ public class TestExclusiveMemoryMmapEngine {
for (int i = 0; i < 50; i++) {
int len = (int) Math.floor(Math.random() * 100);
long offset = (long) Math.floor(Math.random() * size % (size - len));
byte[] data1 = new byte[len];
for (int j = 0; j < data1.length; ++j) {
data1[j] = (byte) (Math.random() * 255);
}
fileMmapEngine.write(ByteBuffer.wrap(data1), offset);
int val = (int) (Math.random() * 255);
// write
ByteBuff src = TestByteBufferIOEngine.createByteBuffer(len, val, i % 2 == 0);
int pos = src.position(), lim = src.limit();
fileMmapEngine.write(src, offset);
src.position(pos).limit(lim);
// read
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
fileMmapEngine.read(offset, len, deserializer);
ByteBuff data2 = deserializer.getDeserializedByteBuff();
for (int j = 0; j < data1.length; ++j) {
assertTrue(data1[j] == data2.get(j));
}
ByteBuff dst = deserializer.getDeserializedByteBuff();
Assert.assertEquals(src.remaining(), len);
Assert.assertEquals(dst.remaining(), len);
Assert.assertEquals(0,
ByteBuff.compareTo(src, pos, len, dst, dst.position(), dst.remaining()));
}
} finally {
File file = new File(filePath);
@ -68,6 +72,5 @@ public class TestExclusiveMemoryMmapEngine {
file.delete();
}
}
}
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@ -132,15 +133,22 @@ public class TestFileIOEngine {
fileIOEngine.closeFileChannels();
int len = 5;
long offset = 0L;
byte[] data1 = new byte[len];
for (int j = 0; j < data1.length; ++j) {
data1[j] = (byte) (Math.random() * 255);
int val = (int) (Math.random() * 255);
for (int i = 0; i < 2; i++) {
ByteBuff src = TestByteBufferIOEngine.createByteBuffer(len, val, i % 2 == 0);
int pos = src.position(), lim = src.limit();
fileIOEngine.write(src, offset);
src.position(pos).limit(lim);
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
fileIOEngine.read(offset, len, deserializer);
ByteBuff dst = deserializer.getDeserializedByteBuff();
Assert.assertEquals(src.remaining(), len);
Assert.assertEquals(dst.remaining(), len);
Assert.assertEquals(0,
ByteBuff.compareTo(src, pos, len, dst, dst.position(), dst.remaining()));
}
fileIOEngine.write(ByteBuffer.wrap(data1), offset);
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
fileIOEngine.read(offset, len, deserializer);
ByteBuff data2 = deserializer.getDeserializedByteBuff();
assertArrayEquals(data1, data2.array());
}
@Test