HBASE-17738 BucketCache startup is slow - addendum (Ram)

This commit is contained in:
Ramkrishna 2017-07-20 22:38:13 +05:30
parent 9300fbc9c1
commit e095d3964b
2 changed files with 51 additions and 5 deletions

View File

@ -44,14 +44,15 @@ import com.google.common.annotations.VisibleForTesting;
* reading/writing data from this large buffer with a position and offset * reading/writing data from this large buffer with a position and offset
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public final class ByteBufferArray { public class ByteBufferArray {
private static final Log LOG = LogFactory.getLog(ByteBufferArray.class); private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
@VisibleForTesting @VisibleForTesting
ByteBuffer buffers[]; ByteBuffer buffers[];
private int bufferSize; private int bufferSize;
private int bufferCount; @VisibleForTesting
int bufferCount;
/** /**
* We allocate a number of byte buffers as the capacity. In order not to out * We allocate a number of byte buffers as the capacity. In order not to out
@ -75,12 +76,13 @@ public final class ByteBufferArray {
createBuffers(directByteBuffer, allocator); createBuffers(directByteBuffer, allocator);
} }
private void createBuffers(boolean directByteBuffer, ByteBufferAllocator allocator) @VisibleForTesting
void createBuffers(boolean directByteBuffer, ByteBufferAllocator allocator)
throws IOException { throws IOException {
int threadCount = Runtime.getRuntime().availableProcessors(); int threadCount = getThreadCount();
ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L, ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
int perThreadCount = Math.round((float) (bufferCount) / threadCount); int perThreadCount = (int)Math.floor((double) (bufferCount) / threadCount);
int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1)); int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1));
Future<ByteBuffer[]>[] futures = new Future[threadCount]; Future<ByteBuffer[]>[] futures = new Future[threadCount];
try { try {
@ -109,6 +111,11 @@ public final class ByteBufferArray {
this.buffers[bufferCount] = ByteBuffer.allocate(0); this.buffers[bufferCount] = ByteBuffer.allocate(0);
} }
@VisibleForTesting
int getThreadCount() {
return Runtime.getRuntime().availableProcessors();
}
/** /**
* A callable that creates buffers of the specified length either onheap/offheap using the * A callable that creates buffers of the specified length either onheap/offheap using the
* {@link ByteBufferAllocator} * {@link ByteBufferAllocator}

View File

@ -79,4 +79,43 @@ public class TestByteBufferArray {
} }
} }
} }
@Test
public void testByteBufferCreation1() throws Exception {
ByteBufferAllocator allocator = new ByteBufferAllocator() {
@Override
public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException {
if (directByteBuffer) {
return ByteBuffer.allocateDirect((int) size);
} else {
return ByteBuffer.allocate((int) size);
}
}
};
ByteBufferArray array = new DummyByteBufferArray(7 * 1024 * 1024, false, allocator);
// overwrite
array.bufferCount = 25;
array.buffers = new ByteBuffer[array.bufferCount + 1];
array.createBuffers(true, allocator);
for (int i = 0; i < array.buffers.length; i++) {
if (i == array.buffers.length - 1) {
assertEquals(array.buffers[i].capacity(), 0);
} else {
assertEquals(array.buffers[i].capacity(), 458752);
}
}
}
private static class DummyByteBufferArray extends ByteBufferArray {
public DummyByteBufferArray(long capacity, boolean directByteBuffer,
ByteBufferAllocator allocator) throws IOException {
super(capacity, directByteBuffer, allocator);
}
@Override
int getThreadCount() {
return 16;
}
}
} }