HBASE-14860 Improve BoundedByteBufferPool; make lockless (Hiroshi Ikeda)

This commit is contained in:
stack 2015-11-21 20:32:13 -08:00
parent 52edd83baf
commit d8f2ac3e6e
2 changed files with 111 additions and 54 deletions

View File

@ -19,13 +19,13 @@ package org.apache.hadoop.hbase.io;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.BoundedArrayQueue;
import com.google.common.annotations.VisibleForTesting;
@ -51,23 +51,49 @@ import com.google.common.annotations.VisibleForTesting;
public class BoundedByteBufferPool {
private static final Log LOG = LogFactory.getLog(BoundedByteBufferPool.class);
private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<ByteBuffer>();
@VisibleForTesting
final Queue<ByteBuffer> buffers;
int getQueueSize() {
return buffers.size();
}
private final int maxToCache;
// Maximum size of a ByteBuffer to retain in pool
private final int maxByteBufferSizeToCache;
// A running average only it only rises, it never recedes
private final AtomicInteger runningAverageRef;
@VisibleForTesting
volatile int runningAverage;
int getRunningAverage() {
return runningAverageRef.get();
}
// Scratch that keeps rough total size of pooled bytebuffers
private volatile int totalReservoirCapacity;
// Count (lower 32bit) and total capacity (upper 32bit) of pooled bytebuffers.
// Both are non-negative. They are equal to or larger than those of the actual
// queued buffers in any transition.
private final AtomicLong stateRef = new AtomicLong();
// For reporting
private AtomicLong allocations = new AtomicLong(0);
private static int toCountOfBuffers(long state) {
return (int)state;
}
private ReentrantLock lock = new ReentrantLock();
private static int toTotalCapacity(long state) {
return (int)(state >>> 32);
}
private static long toState(int countOfBuffers, int totalCapacity) {
return ((long)totalCapacity << 32) | totalCapacity;
}
private static long subtractOneBufferFromState(long state, int capacity) {
return state - ((long)capacity << 32) - 1;
}
// For reporting, only used in the log
private final AtomicLong allocationsRef = new AtomicLong();
/**
* @param maxByteBufferSizeToCache
@ -77,56 +103,87 @@ public class BoundedByteBufferPool {
public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
final int maxToCache) {
this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
this.runningAverage = initialByteBufferSize;
this.buffers = new BoundedArrayQueue<ByteBuffer>(maxToCache);
this.runningAverageRef = new AtomicInteger(initialByteBufferSize);
this.maxToCache = maxToCache;
}
public ByteBuffer getBuffer() {
ByteBuffer bb = null;
lock.lock();
try {
bb = this.buffers.poll();
if (bb != null) {
this.totalReservoirCapacity -= bb.capacity();
}
} finally {
lock.unlock();
}
ByteBuffer bb = buffers.poll();
if (bb != null) {
long state;
while (true) {
long prevState = stateRef.get();
state = subtractOneBufferFromState(prevState, bb.capacity());
if (stateRef.compareAndSet(prevState, state)) {
break;
}
}
// Clear sets limit == capacity. Postion == 0.
bb.clear();
} else {
bb = ByteBuffer.allocateDirect(this.runningAverage);
this.allocations.incrementAndGet();
if (LOG.isTraceEnabled()) {
int countOfBuffers = toCountOfBuffers(state);
int totalCapacity = toTotalCapacity(state);
LOG.trace("totalCapacity=" + totalCapacity + ", count=" + countOfBuffers);
}
return bb;
}
int runningAverage = runningAverageRef.get();
bb = ByteBuffer.allocateDirect(runningAverage);
if (LOG.isTraceEnabled()) {
LOG.trace("runningAverage=" + this.runningAverage +
", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size() +
", alloctions=" + this.allocations.get());
long allocations = allocationsRef.incrementAndGet();
LOG.trace("runningAverage=" + runningAverage + ", alloctions=" + allocations);
}
return bb;
}
public void putBuffer(ByteBuffer bb) {
// If buffer is larger than we want to keep around, just let it go.
if (bb.capacity() > this.maxByteBufferSizeToCache) return;
boolean success = false;
int average = 0;
lock.lock();
try {
success = this.buffers.offer(bb);
if (success) {
this.totalReservoirCapacity += bb.capacity();
average = this.totalReservoirCapacity / this.buffers.size(); // size will never be 0.
}
} finally {
lock.unlock();
if (bb.capacity() > maxByteBufferSizeToCache) {
return;
}
if (!success) {
LOG.warn("At capacity: " + this.buffers.size());
} else {
if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) {
this.runningAverage = average;
int countOfBuffers;
int totalCapacity;
while (true) {
long prevState = stateRef.get();
countOfBuffers = toCountOfBuffers(prevState);
if (countOfBuffers >= maxToCache) {
if (LOG.isWarnEnabled()) {
LOG.warn("At capacity: " + countOfBuffers);
}
return;
}
countOfBuffers++;
assert 0 < countOfBuffers && countOfBuffers <= maxToCache;
totalCapacity = toTotalCapacity(prevState) + bb.capacity();
if (totalCapacity < 0) {
if (LOG.isWarnEnabled()) {
LOG.warn("Overflowed total capacity.");
}
return;
}
long state = toState(countOfBuffers, totalCapacity);
if (stateRef.compareAndSet(prevState, state)) {
break;
}
}
// ConcurrentLinkQueue#offer says "this method will never return false"
buffers.offer(bb);
int runningAverageUpdate = Math.min(
totalCapacity / countOfBuffers, // size will never be 0.
maxByteBufferSizeToCache);
while (true) {
int prev = runningAverageRef.get();
if (prev >= runningAverageUpdate || // only rises, never recedes
runningAverageRef.compareAndSet(prev, runningAverageUpdate)) {
break;
}
}
}

View File

@ -53,38 +53,38 @@ public class TestBoundedByteBufferPool {
this.reservoir.putBuffer(bb);
this.reservoir.putBuffer(bb);
this.reservoir.putBuffer(bb);
assertEquals(3, this.reservoir.buffers.size());
assertEquals(3, this.reservoir.getQueueSize());
}
@Test
public void testGetPut() {
ByteBuffer bb = this.reservoir.getBuffer();
assertEquals(initialByteBufferSize, bb.capacity());
assertEquals(0, this.reservoir.buffers.size());
assertEquals(0, this.reservoir.getQueueSize());
this.reservoir.putBuffer(bb);
assertEquals(1, this.reservoir.buffers.size());
assertEquals(1, this.reservoir.getQueueSize());
// Now remove a buffer and don't put it back so reservoir is empty.
this.reservoir.getBuffer();
assertEquals(0, this.reservoir.buffers.size());
assertEquals(0, this.reservoir.getQueueSize());
// Try adding in a buffer with a bigger-than-initial size and see if our runningAverage works.
// Need to add then remove, then get a new bytebuffer so reservoir internally is doing
// allocation
final int newCapacity = 2;
this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity));
assertEquals(1, reservoir.buffers.size());
assertEquals(1, reservoir.getQueueSize());
this.reservoir.getBuffer();
assertEquals(0, this.reservoir.buffers.size());
assertEquals(0, this.reservoir.getQueueSize());
bb = this.reservoir.getBuffer();
assertEquals(newCapacity, bb.capacity());
// Assert that adding a too-big buffer won't happen
assertEquals(0, this.reservoir.buffers.size());
assertEquals(0, this.reservoir.getQueueSize());
this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2));
assertEquals(0, this.reservoir.buffers.size());
assertEquals(0, this.reservoir.getQueueSize());
// Assert we can't add more than max allowed instances.
for (int i = 0; i < maxToCache; i++) {
this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize));
}
assertEquals(maxToCache, this.reservoir.buffers.size());
assertEquals(maxToCache, this.reservoir.getQueueSize());
}
@Test
@ -144,6 +144,6 @@ public class TestBoundedByteBufferPool {
}
// None of the BBs we got from pool is growing while in use. So we should not change the
// runningAverage in pool
assertEquals(initialByteBufferSize, this.reservoir.runningAverage);
assertEquals(initialByteBufferSize, this.reservoir.getRunningAverage());
}
}