diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index 2bb820ed81a..60f8c792a74 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -20,6 +20,13 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,6 +36,8 @@ import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.util.StringUtils; +import com.google.common.annotations.VisibleForTesting; + /** * This class manages an array of ByteBuffers with a default size 4MB. These * buffers are sequential and could be considered as a large buffer.It supports @@ -39,7 +48,8 @@ public final class ByteBufferArray { private static final Log LOG = LogFactory.getLog(ByteBufferArray.class); public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; - private ByteBuffer buffers[]; + @VisibleForTesting + ByteBuffer buffers[]; private int bufferSize; private int bufferCount; @@ -62,13 +72,68 @@ public final class ByteBufferArray { + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" + bufferCount + ", direct=" + directByteBuffer); buffers = new ByteBuffer[bufferCount + 1]; - for (int i = 0; i <= bufferCount; i++) { - if (i < bufferCount) { - buffers[i] = allocator.allocate(bufferSize, directByteBuffer); - } else { - // always create on heap - buffers[i] = ByteBuffer.allocate(0); + createBuffers(directByteBuffer, allocator); + } + + private void createBuffers(boolean directByteBuffer, ByteBufferAllocator allocator) + throws IOException { + int threadCount = Runtime.getRuntime().availableProcessors(); + ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + int perThreadCount = Math.round((float) (bufferCount) / threadCount); + int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1)); + Future[] futures = new Future[threadCount]; + try { + for (int i = 0; i < threadCount; i++) { + // Last thread will have to deal with a different number of buffers + int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount; + futures[i] = service.submit( + new BufferCreatorCallable(bufferSize, directByteBuffer, buffersToCreate, allocator)); } + int bufferIndex = 0; + for (Future future : futures) { + try { + ByteBuffer[] buffers = future.get(); + for (ByteBuffer buffer : buffers) { + this.buffers[bufferIndex++] = buffer; + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Buffer creation interrupted", e); + throw new IOException(e); + } + } + } finally { + service.shutdownNow(); + } + // always create on heap empty dummy buffer at last + this.buffers[bufferCount] = ByteBuffer.allocate(0); + } + + /** + * A callable that creates buffers of the specified length either onheap/offheap using the + * {@link ByteBufferAllocator} + */ + private static class BufferCreatorCallable implements Callable { + private final int bufferCapacity; + private final boolean directByteBuffer; + private final int bufferCount; + private final ByteBufferAllocator allocator; + + BufferCreatorCallable(int bufferCapacity, boolean directByteBuffer, int bufferCount, + ByteBufferAllocator allocator) { + this.bufferCapacity = bufferCapacity; + this.directByteBuffer = directByteBuffer; + this.bufferCount = bufferCount; + this.allocator = allocator; + } + + @Override + public ByteBuffer[] call() throws Exception { + ByteBuffer[] buffers = new ByteBuffer[this.bufferCount]; + for (int i = 0; i < this.bufferCount; i++) { + buffers[i] = allocator.allocate(this.bufferCapacity, this.directByteBuffer); + } + return buffers; } } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java index f2c85498506..c71b86c8ba0 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.util; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -54,4 +55,28 @@ public class TestByteBufferArray { subBuf.get(); assertFalse(subBuf.hasRemaining()); } + + @Test + public void testByteBufferCreation() throws Exception { + int capacity = 470 * 1021 * 1023; + ByteBufferAllocator allocator = new ByteBufferAllocator() { + @Override + public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException { + if (directByteBuffer) { + return ByteBuffer.allocateDirect((int) size); + } else { + return ByteBuffer.allocate((int) size); + } + } + }; + ByteBufferArray array = new ByteBufferArray(capacity, false, allocator); + assertEquals(119, array.buffers.length); + for (int i = 0; i < array.buffers.length; i++) { + if (i == array.buffers.length - 1) { + assertEquals(array.buffers[i].capacity(), 0); + } else { + assertEquals(array.buffers[i].capacity(), ByteBufferArray.DEFAULT_BUFFER_SIZE); + } + } + } }