HBASE-17805 We should remove BoundedByteBufferPool because it is replaced by ByteBufferPool
This commit is contained in:
parent
9c8f02e4ef
commit
7bb0624bab
|
@ -1,194 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.io;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer.
|
|
||||||
* This pool keeps an upper bound on the count of ByteBuffers in the pool and on the maximum size
|
|
||||||
* of ByteBuffer that it will retain (Hence the pool is 'bounded' as opposed to, say,
|
|
||||||
* Hadoop's ElasticByteBuffferPool).
|
|
||||||
* If a ByteBuffer is bigger than the configured threshold, we will just let the ByteBuffer go
|
|
||||||
* rather than add it to the pool. If more ByteBuffers than the configured maximum instances,
|
|
||||||
* we will not add the passed ByteBuffer to the pool; we will just drop it
|
|
||||||
* (we will log a WARN in this case that we are at capacity).
|
|
||||||
*
|
|
||||||
* <p>The intended use case is a reservoir of bytebuffers that an RPC can reuse; buffers tend to
|
|
||||||
* achieve a particular 'run' size over time give or take a few extremes. Set TRACE level on this
|
|
||||||
* class for a couple of seconds to get reporting on how it is running when deployed.
|
|
||||||
*
|
|
||||||
* <p>This pool returns off heap ByteBuffers.
|
|
||||||
*
|
|
||||||
* <p>This class is thread safe.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class BoundedByteBufferPool {
|
|
||||||
private static final Log LOG = LogFactory.getLog(BoundedByteBufferPool.class);
|
|
||||||
|
|
||||||
private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
int getQueueSize() {
|
|
||||||
return buffers.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
private final int maxToCache;
|
|
||||||
|
|
||||||
// Maximum size of a ByteBuffer to retain in pool
|
|
||||||
private final int maxByteBufferSizeToCache;
|
|
||||||
|
|
||||||
// A running average only it only rises, it never recedes
|
|
||||||
private final AtomicInteger runningAverageRef;
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
int getRunningAverage() {
|
|
||||||
return runningAverageRef.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Count (lower 32bit) and total capacity (upper 32bit) of pooled bytebuffers.
|
|
||||||
// Both are non-negative. They are equal to or larger than those of the actual
|
|
||||||
// queued buffers in any transition.
|
|
||||||
private final AtomicLong stateRef = new AtomicLong();
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
static int toCountOfBuffers(long state) {
|
|
||||||
return (int)state;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
static int toTotalCapacity(long state) {
|
|
||||||
return (int)(state >>> 32);
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
static long toState(int countOfBuffers, int totalCapacity) {
|
|
||||||
return ((long)totalCapacity << 32) | countOfBuffers;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
static long subtractOneBufferFromState(long state, int capacity) {
|
|
||||||
return state - ((long)capacity << 32) - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// For reporting, only used in the log
|
|
||||||
private final AtomicLong allocationsRef = new AtomicLong();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param maxByteBufferSizeToCache
|
|
||||||
* @param initialByteBufferSize
|
|
||||||
* @param maxToCache
|
|
||||||
*/
|
|
||||||
public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
|
|
||||||
final int maxToCache) {
|
|
||||||
this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
|
|
||||||
this.runningAverageRef = new AtomicInteger(initialByteBufferSize);
|
|
||||||
this.maxToCache = maxToCache;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ByteBuffer getBuffer() {
|
|
||||||
ByteBuffer bb = buffers.poll();
|
|
||||||
if (bb != null) {
|
|
||||||
long state;
|
|
||||||
while (true) {
|
|
||||||
long prevState = stateRef.get();
|
|
||||||
state = subtractOneBufferFromState(prevState, bb.capacity());
|
|
||||||
if (stateRef.compareAndSet(prevState, state)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Clear sets limit == capacity. Postion == 0.
|
|
||||||
bb.clear();
|
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
int countOfBuffers = toCountOfBuffers(state);
|
|
||||||
int totalCapacity = toTotalCapacity(state);
|
|
||||||
LOG.trace("totalCapacity=" + totalCapacity + ", count=" + countOfBuffers);
|
|
||||||
}
|
|
||||||
return bb;
|
|
||||||
}
|
|
||||||
|
|
||||||
int runningAverage = runningAverageRef.get();
|
|
||||||
bb = ByteBuffer.allocateDirect(runningAverage);
|
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
long allocations = allocationsRef.incrementAndGet();
|
|
||||||
LOG.trace("runningAverage=" + runningAverage + ", allocations=" + allocations);
|
|
||||||
}
|
|
||||||
return bb;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void putBuffer(ByteBuffer bb) {
|
|
||||||
// If buffer is larger than we want to keep around, just let it go.
|
|
||||||
if (bb.capacity() > maxByteBufferSizeToCache) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int countOfBuffers;
|
|
||||||
int totalCapacity;
|
|
||||||
while (true) {
|
|
||||||
long prevState = stateRef.get();
|
|
||||||
countOfBuffers = toCountOfBuffers(prevState);
|
|
||||||
if (countOfBuffers >= maxToCache) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("At capacity: " + countOfBuffers);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
countOfBuffers++;
|
|
||||||
assert 0 < countOfBuffers && countOfBuffers <= maxToCache;
|
|
||||||
|
|
||||||
totalCapacity = toTotalCapacity(prevState) + bb.capacity();
|
|
||||||
if (totalCapacity < 0) {
|
|
||||||
if (LOG.isWarnEnabled()) {
|
|
||||||
LOG.warn("Overflowed total capacity.");
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
long state = toState(countOfBuffers, totalCapacity);
|
|
||||||
if (stateRef.compareAndSet(prevState, state)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ConcurrentLinkQueue#offer says "this method will never return false"
|
|
||||||
buffers.offer(bb);
|
|
||||||
|
|
||||||
int runningAverageUpdate = Math.min(
|
|
||||||
totalCapacity / countOfBuffers, // size will never be 0.
|
|
||||||
maxByteBufferSizeToCache);
|
|
||||||
while (true) {
|
|
||||||
int prev = runningAverageRef.get();
|
|
||||||
if (prev >= runningAverageUpdate || // only rises, never recedes
|
|
||||||
runningAverageRef.compareAndSet(prev, runningAverageUpdate)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,167 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.io;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.io.BoundedByteBufferPool.subtractOneBufferFromState;
|
|
||||||
import static org.apache.hadoop.hbase.io.BoundedByteBufferPool.toCountOfBuffers;
|
|
||||||
import static org.apache.hadoop.hbase.io.BoundedByteBufferPool.toState;
|
|
||||||
import static org.apache.hadoop.hbase.io.BoundedByteBufferPool.toTotalCapacity;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
@Category({ IOTests.class, SmallTests.class })
|
|
||||||
public class TestBoundedByteBufferPool {
|
|
||||||
final int maxByteBufferSizeToCache = 10;
|
|
||||||
final int initialByteBufferSize = 1;
|
|
||||||
final int maxToCache = 10;
|
|
||||||
BoundedByteBufferPool reservoir;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void before() {
|
|
||||||
this.reservoir =
|
|
||||||
new BoundedByteBufferPool(maxByteBufferSizeToCache, initialByteBufferSize, maxToCache);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void after() {
|
|
||||||
this.reservoir = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testEquivalence() {
|
|
||||||
ByteBuffer bb = ByteBuffer.allocate(1);
|
|
||||||
this.reservoir.putBuffer(bb);
|
|
||||||
this.reservoir.putBuffer(bb);
|
|
||||||
this.reservoir.putBuffer(bb);
|
|
||||||
assertEquals(3, this.reservoir.getQueueSize());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetPut() {
|
|
||||||
ByteBuffer bb = this.reservoir.getBuffer();
|
|
||||||
assertEquals(initialByteBufferSize, bb.capacity());
|
|
||||||
assertEquals(0, this.reservoir.getQueueSize());
|
|
||||||
this.reservoir.putBuffer(bb);
|
|
||||||
assertEquals(1, this.reservoir.getQueueSize());
|
|
||||||
// Now remove a buffer and don't put it back so reservoir is empty.
|
|
||||||
this.reservoir.getBuffer();
|
|
||||||
assertEquals(0, this.reservoir.getQueueSize());
|
|
||||||
// Try adding in a buffer with a bigger-than-initial size and see if our runningAverage works.
|
|
||||||
// Need to add then remove, then get a new bytebuffer so reservoir internally is doing
|
|
||||||
// allocation
|
|
||||||
final int newCapacity = 2;
|
|
||||||
this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity));
|
|
||||||
assertEquals(1, reservoir.getQueueSize());
|
|
||||||
this.reservoir.getBuffer();
|
|
||||||
assertEquals(0, this.reservoir.getQueueSize());
|
|
||||||
bb = this.reservoir.getBuffer();
|
|
||||||
assertEquals(newCapacity, bb.capacity());
|
|
||||||
// Assert that adding a too-big buffer won't happen
|
|
||||||
assertEquals(0, this.reservoir.getQueueSize());
|
|
||||||
this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2));
|
|
||||||
assertEquals(0, this.reservoir.getQueueSize());
|
|
||||||
// Assert we can't add more than max allowed instances.
|
|
||||||
for (int i = 0; i < maxToCache; i++) {
|
|
||||||
this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize));
|
|
||||||
}
|
|
||||||
assertEquals(maxToCache, this.reservoir.getQueueSize());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBufferSizeGrowWithMultiThread() throws Exception {
|
|
||||||
final ConcurrentLinkedDeque<ByteBuffer> bufferQueue = new ConcurrentLinkedDeque<>();
|
|
||||||
int takeBufferThreadsCount = 30;
|
|
||||||
int putBufferThreadsCount = 1;
|
|
||||||
Thread takeBufferThreads[] = new Thread[takeBufferThreadsCount];
|
|
||||||
for (int i = 0; i < takeBufferThreadsCount; i++) {
|
|
||||||
takeBufferThreads[i] = new Thread(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
while (true) {
|
|
||||||
ByteBuffer buffer = reservoir.getBuffer();
|
|
||||||
try {
|
|
||||||
Thread.sleep(5);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
bufferQueue.offer(buffer);
|
|
||||||
if (Thread.currentThread().isInterrupted()) break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Thread putBufferThread[] = new Thread[putBufferThreadsCount];
|
|
||||||
for (int i = 0; i < putBufferThreadsCount; i++) {
|
|
||||||
putBufferThread[i] = new Thread(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
while (true) {
|
|
||||||
ByteBuffer buffer = bufferQueue.poll();
|
|
||||||
if (buffer != null) {
|
|
||||||
reservoir.putBuffer(buffer);
|
|
||||||
}
|
|
||||||
if (Thread.currentThread().isInterrupted()) break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < takeBufferThreadsCount; i++) {
|
|
||||||
takeBufferThreads[i].start();
|
|
||||||
}
|
|
||||||
for (int i = 0; i < putBufferThreadsCount; i++) {
|
|
||||||
putBufferThread[i].start();
|
|
||||||
}
|
|
||||||
Thread.sleep(2 * 1000);// Let the threads run for 2 secs
|
|
||||||
for (int i = 0; i < takeBufferThreadsCount; i++) {
|
|
||||||
takeBufferThreads[i].interrupt();
|
|
||||||
takeBufferThreads[i].join();
|
|
||||||
}
|
|
||||||
for (int i = 0; i < putBufferThreadsCount; i++) {
|
|
||||||
putBufferThread[i].interrupt();
|
|
||||||
putBufferThread[i].join();
|
|
||||||
}
|
|
||||||
// None of the BBs we got from pool is growing while in use. So we should not change the
|
|
||||||
// runningAverage in pool
|
|
||||||
assertEquals(initialByteBufferSize, this.reservoir.getRunningAverage());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testStateConversionMethods() {
|
|
||||||
int countOfBuffers = 123;
|
|
||||||
int totalCapacity = 456;
|
|
||||||
|
|
||||||
long state = toState(countOfBuffers, totalCapacity);
|
|
||||||
assertEquals(countOfBuffers, toCountOfBuffers(state));
|
|
||||||
assertEquals(totalCapacity, toTotalCapacity(state));
|
|
||||||
|
|
||||||
long state2 = subtractOneBufferFromState(state, 7);
|
|
||||||
assertEquals(countOfBuffers - 1, toCountOfBuffers(state2));
|
|
||||||
assertEquals(totalCapacity - 7, toTotalCapacity(state2));
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue