From 21332418499469e4affd48731f31c55c3d2cf2a7 Mon Sep 17 00:00:00 2001 From: anoopsjohn Date: Thu, 28 May 2015 11:53:45 +0530 Subject: [PATCH] HBASE-13778 BoundedByteBufferPool incorrectly increasing runningAverage buffer length. --- .../hbase/io/BoundedByteBufferPool.java | 44 +++++++--- .../hadoop/hbase/util/BoundedArrayQueue.java | 81 +++++++++++++++++++ .../hbase/io/TestBoundedByteBufferPool.java | 63 ++++++++++++++- .../hbase/util/TestBoundedArrayQueue.java | 59 ++++++++++++++ 4 files changed, 233 insertions(+), 14 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedArrayQueue.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBoundedArrayQueue.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java index 132abf54f4f..c685a925001 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java @@ -19,12 +19,13 @@ package org.apache.hadoop.hbase.io; import java.nio.ByteBuffer; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.BoundedArrayQueue; import com.google.common.annotations.VisibleForTesting; @@ -55,7 +56,8 @@ public class BoundedByteBufferPool { private final int maxByteBufferSizeToCache; // A running average only it only rises, it never recedes - private volatile int runningAverage; + @VisibleForTesting + volatile int runningAverage; // Scratch that keeps rough total size of pooled bytebuffers private volatile int totalReservoirCapacity; @@ -63,6 +65,8 @@ public class BoundedByteBufferPool { // For reporting private AtomicLong allocations = new AtomicLong(0); + private ReentrantLock lock = new ReentrantLock(); + /** * @param maxByteBufferSizeToCache * @param initialByteBufferSize @@ -72,15 +76,23 @@ public class BoundedByteBufferPool { final int maxToCache) { this.maxByteBufferSizeToCache = maxByteBufferSizeToCache; this.runningAverage = initialByteBufferSize; - this.buffers = new ArrayBlockingQueue(maxToCache, true); + this.buffers = new BoundedArrayQueue(maxToCache); } public ByteBuffer getBuffer() { - ByteBuffer bb = this.buffers.poll(); + ByteBuffer bb = null; + lock.lock(); + try { + bb = this.buffers.poll(); + if (bb != null) { + this.totalReservoirCapacity -= bb.capacity(); + } + } finally { + lock.unlock(); + } if (bb != null) { - // Clear sets limit == capacity. Postion == 0. + // Clear sets limit == capacity. Postion == 0. bb.clear(); - this.totalReservoirCapacity -= bb.capacity(); } else { bb = ByteBuffer.allocate(this.runningAverage); this.allocations.incrementAndGet(); @@ -96,15 +108,21 @@ public class BoundedByteBufferPool { public void putBuffer(ByteBuffer bb) { // If buffer is larger than we want to keep around, just let it go. if (bb.capacity() > this.maxByteBufferSizeToCache) return; - if (!this.buffers.offer(bb)) { + boolean success = false; + int average = 0; + lock.lock(); + try { + success = this.buffers.offer(bb); + if (success) { + this.totalReservoirCapacity += bb.capacity(); + average = this.totalReservoirCapacity / this.buffers.size(); // size will never be 0. + } + } finally { + lock.unlock(); + } + if (!success) { LOG.warn("At capacity: " + this.buffers.size()); } else { - int size = this.buffers.size(); // This size may be inexact. - this.totalReservoirCapacity += bb.capacity(); - int average = 0; - if (size != 0) { - average = this.totalReservoirCapacity / size; - } if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) { this.runningAverage = average; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedArrayQueue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedArrayQueue.java new file mode 100644 index 00000000000..9db4c5c5022 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedArrayQueue.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.AbstractQueue; +import java.util.Iterator; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A bounded non-thread safe implementation of {@link java.util.Queue}. + */ +@InterfaceAudience.Private +public class BoundedArrayQueue extends AbstractQueue { + + private Object[] items; + private int takeIndex, putIndex; + private int count; + + public BoundedArrayQueue(int maxElements) { + items = new Object[maxElements]; + } + + @Override + public int size() { + return count; + } + + /** + * Not implemented and will throw {@link UnsupportedOperationException} + */ + @Override + public Iterator iterator() { + // We don't need this. Leaving it as not implemented. + throw new UnsupportedOperationException(); + } + + @Override + public boolean offer(E e) { + if (count == items.length) return false; + items[putIndex] = e; + if (++putIndex == items.length) putIndex = 0; + count++; + return true; + } + + @Override + public E poll() { + return (count == 0) ? null : dequeue(); + } + + @SuppressWarnings("unchecked") + private E dequeue() { + E x = (E) items[takeIndex]; + items[takeIndex] = null; + if (++takeIndex == items.length) takeIndex = 0; + count--; + return x; + } + + @SuppressWarnings("unchecked") + @Override + public E peek() { + return (E) items[takeIndex]; + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java index 79b9e6837d0..5074b4cdf67 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io; import static org.junit.Assert.assertEquals; import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentLinkedDeque; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.After; @@ -84,4 +85,64 @@ public class TestBoundedByteBufferPool { } assertEquals(maxToCache, this.reservoir.buffers.size()); } -} + + @Test + public void testBufferSizeGrowWithMultiThread() throws Exception { + final ConcurrentLinkedDeque bufferQueue = new ConcurrentLinkedDeque(); + int takeBufferThreadsCount = 30; + int putBufferThreadsCount = 1; + Thread takeBufferThreads[] = new Thread[takeBufferThreadsCount]; + for (int i = 0; i < takeBufferThreadsCount; i++) { + takeBufferThreads[i] = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + ByteBuffer buffer = reservoir.getBuffer(); + try { + Thread.sleep(5); + } catch (InterruptedException e) { + break; + } + bufferQueue.offer(buffer); + if (Thread.currentThread().isInterrupted()) break; + } + } + }); + } + + Thread putBufferThread[] = new Thread[putBufferThreadsCount]; + for (int i = 0; i < putBufferThreadsCount; i++) { + putBufferThread[i] = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + ByteBuffer buffer = bufferQueue.poll(); + if (buffer != null) { + reservoir.putBuffer(buffer); + } + if (Thread.currentThread().isInterrupted()) break; + } + } + }); + } + + for (int i = 0; i < takeBufferThreadsCount; i++) { + takeBufferThreads[i].start(); + } + for (int i = 0; i < putBufferThreadsCount; i++) { + putBufferThread[i].start(); + } + Thread.sleep(2 * 1000);// Let the threads run for 2 secs + for (int i = 0; i < takeBufferThreadsCount; i++) { + takeBufferThreads[i].interrupt(); + takeBufferThreads[i].join(); + } + for (int i = 0; i < putBufferThreadsCount; i++) { + putBufferThread[i].interrupt(); + putBufferThread[i].join(); + } + // None of the BBs we got from pool is growing while in use. So we should not change the + // runningAverage in pool + assertEquals(initialByteBufferSize, this.reservoir.runningAverage); + } +} \ No newline at end of file diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBoundedArrayQueue.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBoundedArrayQueue.java new file mode 100644 index 00000000000..957000904af --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBoundedArrayQueue.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestBoundedArrayQueue { + + private int qMaxElements = 5; + private BoundedArrayQueue queue = new BoundedArrayQueue(qMaxElements); + + @Test + public void testBoundedArrayQueueOperations() throws Exception { + assertEquals(0, queue.size()); + assertNull(queue.poll()); + assertNull(queue.peek()); + for(int i=0;i