diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index 60f8c792a74..068afe27aee 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -44,14 +44,15 @@ import com.google.common.annotations.VisibleForTesting; * reading/writing data from this large buffer with a position and offset */ @InterfaceAudience.Private -public final class ByteBufferArray { +public class ByteBufferArray { private static final Log LOG = LogFactory.getLog(ByteBufferArray.class); public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; @VisibleForTesting ByteBuffer buffers[]; private int bufferSize; - private int bufferCount; + @VisibleForTesting + int bufferCount; /** * We allocate a number of byte buffers as the capacity. In order not to out @@ -75,12 +76,13 @@ public final class ByteBufferArray { createBuffers(directByteBuffer, allocator); } - private void createBuffers(boolean directByteBuffer, ByteBufferAllocator allocator) + @VisibleForTesting + void createBuffers(boolean directByteBuffer, ByteBufferAllocator allocator) throws IOException { - int threadCount = Runtime.getRuntime().availableProcessors(); + int threadCount = getThreadCount(); ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); - int perThreadCount = Math.round((float) (bufferCount) / threadCount); + int perThreadCount = (int)Math.floor((double) (bufferCount) / threadCount); int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1)); Future[] futures = new Future[threadCount]; try { @@ -109,6 +111,11 @@ public final class ByteBufferArray { this.buffers[bufferCount] = ByteBuffer.allocate(0); } + @VisibleForTesting + int getThreadCount() { + return Runtime.getRuntime().availableProcessors(); + } + /** * A callable that creates buffers of the specified length either onheap/offheap using the * {@link ByteBufferAllocator} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java index c71b86c8ba0..70776436c93 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java @@ -79,4 +79,43 @@ public class TestByteBufferArray { } } } + + @Test + public void testByteBufferCreation1() throws Exception { + ByteBufferAllocator allocator = new ByteBufferAllocator() { + @Override + public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException { + if (directByteBuffer) { + return ByteBuffer.allocateDirect((int) size); + } else { + return ByteBuffer.allocate((int) size); + } + } + }; + ByteBufferArray array = new DummyByteBufferArray(7 * 1024 * 1024, false, allocator); + // overwrite + array.bufferCount = 25; + array.buffers = new ByteBuffer[array.bufferCount + 1]; + array.createBuffers(true, allocator); + for (int i = 0; i < array.buffers.length; i++) { + if (i == array.buffers.length - 1) { + assertEquals(array.buffers[i].capacity(), 0); + } else { + assertEquals(array.buffers[i].capacity(), 458752); + } + } + } + + private static class DummyByteBufferArray extends ByteBufferArray { + + public DummyByteBufferArray(long capacity, boolean directByteBuffer, + ByteBufferAllocator allocator) throws IOException { + super(capacity, directByteBuffer, allocator); + } + + @Override + int getThreadCount() { + return 16; + } + } }