From 7bb0624bab68d7dd136d0cd54a8f0c74790aca31 Mon Sep 17 00:00:00 2001 From: CHIA-PING TSAI Date: Mon, 20 Mar 2017 09:11:53 +0800 Subject: [PATCH] HBASE-17805 We should remove BoundedByteBufferPool because it is replaced by ByteBufferPool --- .../hbase/io/BoundedByteBufferPool.java | 194 ------------------ .../hbase/io/TestBoundedByteBufferPool.java | 167 --------------- 2 files changed, 361 deletions(-) delete mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java delete mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java deleted file mode 100644 index 7bce0e5a25c..00000000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io; - -import java.nio.ByteBuffer; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. - * This pool keeps an upper bound on the count of ByteBuffers in the pool and on the maximum size - * of ByteBuffer that it will retain (Hence the pool is 'bounded' as opposed to, say, - * Hadoop's ElasticByteBuffferPool). - * If a ByteBuffer is bigger than the configured threshold, we will just let the ByteBuffer go - * rather than add it to the pool. If more ByteBuffers than the configured maximum instances, - * we will not add the passed ByteBuffer to the pool; we will just drop it - * (we will log a WARN in this case that we are at capacity). - * - *

The intended use case is a reservoir of bytebuffers that an RPC can reuse; buffers tend to - * achieve a particular 'run' size over time give or take a few extremes. Set TRACE level on this - * class for a couple of seconds to get reporting on how it is running when deployed. - * - *

This pool returns off heap ByteBuffers. - * - *

This class is thread safe. - */ -@InterfaceAudience.Private -public class BoundedByteBufferPool { - private static final Log LOG = LogFactory.getLog(BoundedByteBufferPool.class); - - private final Queue buffers = new ConcurrentLinkedQueue<>(); - - @VisibleForTesting - int getQueueSize() { - return buffers.size(); - } - - private final int maxToCache; - - // Maximum size of a ByteBuffer to retain in pool - private final int maxByteBufferSizeToCache; - - // A running average only it only rises, it never recedes - private final AtomicInteger runningAverageRef; - - @VisibleForTesting - int getRunningAverage() { - return runningAverageRef.get(); - } - - // Count (lower 32bit) and total capacity (upper 32bit) of pooled bytebuffers. - // Both are non-negative. They are equal to or larger than those of the actual - // queued buffers in any transition. - private final AtomicLong stateRef = new AtomicLong(); - - @VisibleForTesting - static int toCountOfBuffers(long state) { - return (int)state; - } - - @VisibleForTesting - static int toTotalCapacity(long state) { - return (int)(state >>> 32); - } - - @VisibleForTesting - static long toState(int countOfBuffers, int totalCapacity) { - return ((long)totalCapacity << 32) | countOfBuffers; - } - - @VisibleForTesting - static long subtractOneBufferFromState(long state, int capacity) { - return state - ((long)capacity << 32) - 1; - } - - // For reporting, only used in the log - private final AtomicLong allocationsRef = new AtomicLong(); - - /** - * @param maxByteBufferSizeToCache - * @param initialByteBufferSize - * @param maxToCache - */ - public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize, - final int maxToCache) { - this.maxByteBufferSizeToCache = maxByteBufferSizeToCache; - this.runningAverageRef = new AtomicInteger(initialByteBufferSize); - this.maxToCache = maxToCache; - } - - public ByteBuffer getBuffer() { - ByteBuffer bb = buffers.poll(); - if (bb != null) { - long state; - while (true) { - long prevState = stateRef.get(); - state = subtractOneBufferFromState(prevState, bb.capacity()); - if (stateRef.compareAndSet(prevState, state)) { - break; - } - } - // Clear sets limit == capacity. Postion == 0. - bb.clear(); - - if (LOG.isTraceEnabled()) { - int countOfBuffers = toCountOfBuffers(state); - int totalCapacity = toTotalCapacity(state); - LOG.trace("totalCapacity=" + totalCapacity + ", count=" + countOfBuffers); - } - return bb; - } - - int runningAverage = runningAverageRef.get(); - bb = ByteBuffer.allocateDirect(runningAverage); - - if (LOG.isTraceEnabled()) { - long allocations = allocationsRef.incrementAndGet(); - LOG.trace("runningAverage=" + runningAverage + ", allocations=" + allocations); - } - return bb; - } - - public void putBuffer(ByteBuffer bb) { - // If buffer is larger than we want to keep around, just let it go. - if (bb.capacity() > maxByteBufferSizeToCache) { - return; - } - - int countOfBuffers; - int totalCapacity; - while (true) { - long prevState = stateRef.get(); - countOfBuffers = toCountOfBuffers(prevState); - if (countOfBuffers >= maxToCache) { - if (LOG.isDebugEnabled()) { - LOG.debug("At capacity: " + countOfBuffers); - } - return; - } - countOfBuffers++; - assert 0 < countOfBuffers && countOfBuffers <= maxToCache; - - totalCapacity = toTotalCapacity(prevState) + bb.capacity(); - if (totalCapacity < 0) { - if (LOG.isWarnEnabled()) { - LOG.warn("Overflowed total capacity."); - } - return; - } - - long state = toState(countOfBuffers, totalCapacity); - if (stateRef.compareAndSet(prevState, state)) { - break; - } - } - - // ConcurrentLinkQueue#offer says "this method will never return false" - buffers.offer(bb); - - int runningAverageUpdate = Math.min( - totalCapacity / countOfBuffers, // size will never be 0. - maxByteBufferSizeToCache); - while (true) { - int prev = runningAverageRef.get(); - if (prev >= runningAverageUpdate || // only rises, never recedes - runningAverageRef.compareAndSet(prev, runningAverageUpdate)) { - break; - } - } - } -} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java deleted file mode 100644 index eca7712dca7..00000000000 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io; - -import static org.apache.hadoop.hbase.io.BoundedByteBufferPool.subtractOneBufferFromState; -import static org.apache.hadoop.hbase.io.BoundedByteBufferPool.toCountOfBuffers; -import static org.apache.hadoop.hbase.io.BoundedByteBufferPool.toState; -import static org.apache.hadoop.hbase.io.BoundedByteBufferPool.toTotalCapacity; -import static org.junit.Assert.assertEquals; - -import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentLinkedDeque; - -import org.apache.hadoop.hbase.testclassification.IOTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ IOTests.class, SmallTests.class }) -public class TestBoundedByteBufferPool { - final int maxByteBufferSizeToCache = 10; - final int initialByteBufferSize = 1; - final int maxToCache = 10; - BoundedByteBufferPool reservoir; - - @Before - public void before() { - this.reservoir = - new BoundedByteBufferPool(maxByteBufferSizeToCache, initialByteBufferSize, maxToCache); - } - - @After - public void after() { - this.reservoir = null; - } - - @Test - public void testEquivalence() { - ByteBuffer bb = ByteBuffer.allocate(1); - this.reservoir.putBuffer(bb); - this.reservoir.putBuffer(bb); - this.reservoir.putBuffer(bb); - assertEquals(3, this.reservoir.getQueueSize()); - } - - @Test - public void testGetPut() { - ByteBuffer bb = this.reservoir.getBuffer(); - assertEquals(initialByteBufferSize, bb.capacity()); - assertEquals(0, this.reservoir.getQueueSize()); - this.reservoir.putBuffer(bb); - assertEquals(1, this.reservoir.getQueueSize()); - // Now remove a buffer and don't put it back so reservoir is empty. - this.reservoir.getBuffer(); - assertEquals(0, this.reservoir.getQueueSize()); - // Try adding in a buffer with a bigger-than-initial size and see if our runningAverage works. - // Need to add then remove, then get a new bytebuffer so reservoir internally is doing - // allocation - final int newCapacity = 2; - this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity)); - assertEquals(1, reservoir.getQueueSize()); - this.reservoir.getBuffer(); - assertEquals(0, this.reservoir.getQueueSize()); - bb = this.reservoir.getBuffer(); - assertEquals(newCapacity, bb.capacity()); - // Assert that adding a too-big buffer won't happen - assertEquals(0, this.reservoir.getQueueSize()); - this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2)); - assertEquals(0, this.reservoir.getQueueSize()); - // Assert we can't add more than max allowed instances. - for (int i = 0; i < maxToCache; i++) { - this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize)); - } - assertEquals(maxToCache, this.reservoir.getQueueSize()); - } - - @Test - public void testBufferSizeGrowWithMultiThread() throws Exception { - final ConcurrentLinkedDeque bufferQueue = new ConcurrentLinkedDeque<>(); - int takeBufferThreadsCount = 30; - int putBufferThreadsCount = 1; - Thread takeBufferThreads[] = new Thread[takeBufferThreadsCount]; - for (int i = 0; i < takeBufferThreadsCount; i++) { - takeBufferThreads[i] = new Thread(new Runnable() { - @Override - public void run() { - while (true) { - ByteBuffer buffer = reservoir.getBuffer(); - try { - Thread.sleep(5); - } catch (InterruptedException e) { - break; - } - bufferQueue.offer(buffer); - if (Thread.currentThread().isInterrupted()) break; - } - } - }); - } - - Thread putBufferThread[] = new Thread[putBufferThreadsCount]; - for (int i = 0; i < putBufferThreadsCount; i++) { - putBufferThread[i] = new Thread(new Runnable() { - @Override - public void run() { - while (true) { - ByteBuffer buffer = bufferQueue.poll(); - if (buffer != null) { - reservoir.putBuffer(buffer); - } - if (Thread.currentThread().isInterrupted()) break; - } - } - }); - } - - for (int i = 0; i < takeBufferThreadsCount; i++) { - takeBufferThreads[i].start(); - } - for (int i = 0; i < putBufferThreadsCount; i++) { - putBufferThread[i].start(); - } - Thread.sleep(2 * 1000);// Let the threads run for 2 secs - for (int i = 0; i < takeBufferThreadsCount; i++) { - takeBufferThreads[i].interrupt(); - takeBufferThreads[i].join(); - } - for (int i = 0; i < putBufferThreadsCount; i++) { - putBufferThread[i].interrupt(); - putBufferThread[i].join(); - } - // None of the BBs we got from pool is growing while in use. So we should not change the - // runningAverage in pool - assertEquals(initialByteBufferSize, this.reservoir.getRunningAverage()); - } - - @Test - public void testStateConversionMethods() { - int countOfBuffers = 123; - int totalCapacity = 456; - - long state = toState(countOfBuffers, totalCapacity); - assertEquals(countOfBuffers, toCountOfBuffers(state)); - assertEquals(totalCapacity, toTotalCapacity(state)); - - long state2 = subtractOneBufferFromState(state, 7); - assertEquals(countOfBuffers - 1, toCountOfBuffers(state2)); - assertEquals(totalCapacity - 7, toTotalCapacity(state2)); - } -}