HBASE-13819 Make RPC layer CellBlock buffer a DirectByteBuffer.
This commit is contained in:
parent
411683c22b
commit
5dba2c71f7
|
@ -67,16 +67,20 @@ public class BoundedByteBufferPool {
|
||||||
|
|
||||||
private ReentrantLock lock = new ReentrantLock();
|
private ReentrantLock lock = new ReentrantLock();
|
||||||
|
|
||||||
|
private boolean createDirectByteBuffer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param maxByteBufferSizeToCache
|
* @param maxByteBufferSizeToCache
|
||||||
* @param initialByteBufferSize
|
* @param initialByteBufferSize
|
||||||
* @param maxToCache
|
* @param maxToCache
|
||||||
|
* @param createDirectByteBuffer whether the buffers created by this pool to be off heap
|
||||||
*/
|
*/
|
||||||
public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
|
public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
|
||||||
final int maxToCache) {
|
final int maxToCache, final boolean createDirectByteBuffer) {
|
||||||
this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
|
this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
|
||||||
this.runningAverage = initialByteBufferSize;
|
this.runningAverage = initialByteBufferSize;
|
||||||
this.buffers = new BoundedArrayQueue<ByteBuffer>(maxToCache);
|
this.buffers = new BoundedArrayQueue<ByteBuffer>(maxToCache);
|
||||||
|
this.createDirectByteBuffer = createDirectByteBuffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteBuffer getBuffer() {
|
public ByteBuffer getBuffer() {
|
||||||
|
@ -94,7 +98,8 @@ public class BoundedByteBufferPool {
|
||||||
// Clear sets limit == capacity. Postion == 0.
|
// Clear sets limit == capacity. Postion == 0.
|
||||||
bb.clear();
|
bb.clear();
|
||||||
} else {
|
} else {
|
||||||
bb = ByteBuffer.allocate(this.runningAverage);
|
bb = this.createDirectByteBuffer ? ByteBuffer.allocateDirect(this.runningAverage)
|
||||||
|
: ByteBuffer.allocate(this.runningAverage);
|
||||||
this.allocations.incrementAndGet();
|
this.allocations.incrementAndGet();
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
|
|
|
@ -37,8 +37,8 @@ public class TestBoundedByteBufferPool {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void before() {
|
public void before() {
|
||||||
this.reservoir =
|
this.reservoir = new BoundedByteBufferPool(maxByteBufferSizeToCache, initialByteBufferSize,
|
||||||
new BoundedByteBufferPool(maxByteBufferSizeToCache, initialByteBufferSize, maxToCache);
|
maxToCache, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
|
@ -1967,7 +1967,9 @@ public class RpcServer implements RpcServerInterface {
|
||||||
// Make the max twice the number of handlers to be safe.
|
// Make the max twice the number of handlers to be safe.
|
||||||
conf.getInt("hbase.ipc.server.reservoir.initial.max",
|
conf.getInt("hbase.ipc.server.reservoir.initial.max",
|
||||||
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
|
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
|
||||||
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
|
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2),
|
||||||
|
// By default make direct byte buffers from the buffer pool.
|
||||||
|
conf.getBoolean("hbase.ipc.server.reservoir.direct.buffer", true));
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.services = services;
|
this.services = services;
|
||||||
this.bindAddress = bindAddress;
|
this.bindAddress = bindAddress;
|
||||||
|
|
Loading…
Reference in New Issue