HBASE-17738 BucketCache startup is slow (Ram)
This commit is contained in:
parent
f10f8198af
commit
d0e4a643a0
|
@ -20,6 +20,13 @@ package org.apache.hadoop.hbase.util;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -29,6 +36,8 @@ import org.apache.hadoop.hbase.nio.MultiByteBuff;
|
|||
import org.apache.hadoop.hbase.nio.SingleByteBuff;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* This class manages an array of ByteBuffers with a default size 4MB. These
|
||||
* buffers are sequential and could be considered as a large buffer.It supports
|
||||
|
@ -39,7 +48,8 @@ public final class ByteBufferArray {
|
|||
private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
|
||||
|
||||
public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
|
||||
private ByteBuffer buffers[];
|
||||
@VisibleForTesting
|
||||
ByteBuffer buffers[];
|
||||
private int bufferSize;
|
||||
private int bufferCount;
|
||||
|
||||
|
@ -62,13 +72,68 @@ public final class ByteBufferArray {
|
|||
+ ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
|
||||
+ bufferCount + ", direct=" + directByteBuffer);
|
||||
buffers = new ByteBuffer[bufferCount + 1];
|
||||
for (int i = 0; i <= bufferCount; i++) {
|
||||
if (i < bufferCount) {
|
||||
buffers[i] = allocator.allocate(bufferSize, directByteBuffer);
|
||||
} else {
|
||||
// always create on heap
|
||||
buffers[i] = ByteBuffer.allocate(0);
|
||||
createBuffers(directByteBuffer, allocator);
|
||||
}
|
||||
|
||||
private void createBuffers(boolean directByteBuffer, ByteBufferAllocator allocator)
|
||||
throws IOException {
|
||||
int threadCount = Runtime.getRuntime().availableProcessors();
|
||||
ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L,
|
||||
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
|
||||
int perThreadCount = Math.round((float) (bufferCount) / threadCount);
|
||||
int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1));
|
||||
Future<ByteBuffer[]>[] futures = new Future[threadCount];
|
||||
try {
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
// Last thread will have to deal with a different number of buffers
|
||||
int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount;
|
||||
futures[i] = service.submit(
|
||||
new BufferCreatorCallable(bufferSize, directByteBuffer, buffersToCreate, allocator));
|
||||
}
|
||||
int bufferIndex = 0;
|
||||
for (Future<ByteBuffer[]> future : futures) {
|
||||
try {
|
||||
ByteBuffer[] buffers = future.get();
|
||||
for (ByteBuffer buffer : buffers) {
|
||||
this.buffers[bufferIndex++] = buffer;
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
LOG.error("Buffer creation interrupted", e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
service.shutdownNow();
|
||||
}
|
||||
// always create on heap empty dummy buffer at last
|
||||
this.buffers[bufferCount] = ByteBuffer.allocate(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* A callable that creates buffers of the specified length either onheap/offheap using the
|
||||
* {@link ByteBufferAllocator}
|
||||
*/
|
||||
private static class BufferCreatorCallable implements Callable<ByteBuffer[]> {
|
||||
private final int bufferCapacity;
|
||||
private final boolean directByteBuffer;
|
||||
private final int bufferCount;
|
||||
private final ByteBufferAllocator allocator;
|
||||
|
||||
BufferCreatorCallable(int bufferCapacity, boolean directByteBuffer, int bufferCount,
|
||||
ByteBufferAllocator allocator) {
|
||||
this.bufferCapacity = bufferCapacity;
|
||||
this.directByteBuffer = directByteBuffer;
|
||||
this.bufferCount = bufferCount;
|
||||
this.allocator = allocator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] call() throws Exception {
|
||||
ByteBuffer[] buffers = new ByteBuffer[this.bufferCount];
|
||||
for (int i = 0; i < this.bufferCount; i++) {
|
||||
buffers[i] = allocator.allocate(this.bufferCapacity, this.directByteBuffer);
|
||||
}
|
||||
return buffers;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -54,4 +55,28 @@ public class TestByteBufferArray {
|
|||
subBuf.get();
|
||||
assertFalse(subBuf.hasRemaining());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteBufferCreation() throws Exception {
|
||||
int capacity = 470 * 1021 * 1023;
|
||||
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
||||
@Override
|
||||
public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException {
|
||||
if (directByteBuffer) {
|
||||
return ByteBuffer.allocateDirect((int) size);
|
||||
} else {
|
||||
return ByteBuffer.allocate((int) size);
|
||||
}
|
||||
}
|
||||
};
|
||||
ByteBufferArray array = new ByteBufferArray(capacity, false, allocator);
|
||||
assertEquals(119, array.buffers.length);
|
||||
for (int i = 0; i < array.buffers.length; i++) {
|
||||
if (i == array.buffers.length - 1) {
|
||||
assertEquals(array.buffers[i].capacity(), 0);
|
||||
} else {
|
||||
assertEquals(array.buffers[i].capacity(), ByteBufferArray.DEFAULT_BUFFER_SIZE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue