HBASE-13778 BoundedByteBufferPool incorrectly increasing runningAverage buffer length.

This commit is contained in:
anoopsjohn 2015-05-28 11:53:45 +05:30
parent 499c40697b
commit 2133241849
4 changed files with 233 additions and 14 deletions

View File

@ -19,12 +19,13 @@ package org.apache.hadoop.hbase.io;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.BoundedArrayQueue;
import com.google.common.annotations.VisibleForTesting;
@ -55,7 +56,8 @@ public class BoundedByteBufferPool {
private final int maxByteBufferSizeToCache;
// A running average only it only rises, it never recedes
private volatile int runningAverage;
@VisibleForTesting
volatile int runningAverage;
// Scratch that keeps rough total size of pooled bytebuffers
private volatile int totalReservoirCapacity;
@ -63,6 +65,8 @@ public class BoundedByteBufferPool {
// For reporting
private AtomicLong allocations = new AtomicLong(0);
private ReentrantLock lock = new ReentrantLock();
/**
* @param maxByteBufferSizeToCache
* @param initialByteBufferSize
@ -72,15 +76,23 @@ public class BoundedByteBufferPool {
final int maxToCache) {
this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
this.runningAverage = initialByteBufferSize;
this.buffers = new ArrayBlockingQueue<ByteBuffer>(maxToCache, true);
this.buffers = new BoundedArrayQueue<ByteBuffer>(maxToCache);
}
public ByteBuffer getBuffer() {
ByteBuffer bb = this.buffers.poll();
ByteBuffer bb = null;
lock.lock();
try {
bb = this.buffers.poll();
if (bb != null) {
this.totalReservoirCapacity -= bb.capacity();
}
} finally {
lock.unlock();
}
if (bb != null) {
// Clear sets limit == capacity. Postion == 0.
// Clear sets limit == capacity. Postion == 0.
bb.clear();
this.totalReservoirCapacity -= bb.capacity();
} else {
bb = ByteBuffer.allocate(this.runningAverage);
this.allocations.incrementAndGet();
@ -96,15 +108,21 @@ public class BoundedByteBufferPool {
public void putBuffer(ByteBuffer bb) {
// If buffer is larger than we want to keep around, just let it go.
if (bb.capacity() > this.maxByteBufferSizeToCache) return;
if (!this.buffers.offer(bb)) {
boolean success = false;
int average = 0;
lock.lock();
try {
success = this.buffers.offer(bb);
if (success) {
this.totalReservoirCapacity += bb.capacity();
average = this.totalReservoirCapacity / this.buffers.size(); // size will never be 0.
}
} finally {
lock.unlock();
}
if (!success) {
LOG.warn("At capacity: " + this.buffers.size());
} else {
int size = this.buffers.size(); // This size may be inexact.
this.totalReservoirCapacity += bb.capacity();
int average = 0;
if (size != 0) {
average = this.totalReservoirCapacity / size;
}
if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) {
this.runningAverage = average;
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.AbstractQueue;
import java.util.Iterator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* A bounded non-thread safe implementation of {@link java.util.Queue}.
*/
@InterfaceAudience.Private
public class BoundedArrayQueue<E> extends AbstractQueue<E> {
private Object[] items;
private int takeIndex, putIndex;
private int count;
public BoundedArrayQueue(int maxElements) {
items = new Object[maxElements];
}
@Override
public int size() {
return count;
}
/**
* Not implemented and will throw {@link UnsupportedOperationException}
*/
@Override
public Iterator<E> iterator() {
// We don't need this. Leaving it as not implemented.
throw new UnsupportedOperationException();
}
@Override
public boolean offer(E e) {
if (count == items.length) return false;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
return true;
}
@Override
public E poll() {
return (count == 0) ? null : dequeue();
}
@SuppressWarnings("unchecked")
private E dequeue() {
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
return x;
}
@SuppressWarnings("unchecked")
@Override
public E peek() {
return (E) items[takeIndex];
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io;
import static org.junit.Assert.assertEquals;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
@ -84,4 +85,64 @@ public class TestBoundedByteBufferPool {
}
assertEquals(maxToCache, this.reservoir.buffers.size());
}
}
@Test
public void testBufferSizeGrowWithMultiThread() throws Exception {
final ConcurrentLinkedDeque<ByteBuffer> bufferQueue = new ConcurrentLinkedDeque<ByteBuffer>();
int takeBufferThreadsCount = 30;
int putBufferThreadsCount = 1;
Thread takeBufferThreads[] = new Thread[takeBufferThreadsCount];
for (int i = 0; i < takeBufferThreadsCount; i++) {
takeBufferThreads[i] = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
ByteBuffer buffer = reservoir.getBuffer();
try {
Thread.sleep(5);
} catch (InterruptedException e) {
break;
}
bufferQueue.offer(buffer);
if (Thread.currentThread().isInterrupted()) break;
}
}
});
}
Thread putBufferThread[] = new Thread[putBufferThreadsCount];
for (int i = 0; i < putBufferThreadsCount; i++) {
putBufferThread[i] = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
ByteBuffer buffer = bufferQueue.poll();
if (buffer != null) {
reservoir.putBuffer(buffer);
}
if (Thread.currentThread().isInterrupted()) break;
}
}
});
}
for (int i = 0; i < takeBufferThreadsCount; i++) {
takeBufferThreads[i].start();
}
for (int i = 0; i < putBufferThreadsCount; i++) {
putBufferThread[i].start();
}
Thread.sleep(2 * 1000);// Let the threads run for 2 secs
for (int i = 0; i < takeBufferThreadsCount; i++) {
takeBufferThreads[i].interrupt();
takeBufferThreads[i].join();
}
for (int i = 0; i < putBufferThreadsCount; i++) {
putBufferThread[i].interrupt();
putBufferThread[i].join();
}
// None of the BBs we got from pool is growing while in use. So we should not change the
// runningAverage in pool
assertEquals(initialByteBufferSize, this.reservoir.runningAverage);
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestBoundedArrayQueue {
private int qMaxElements = 5;
private BoundedArrayQueue<Integer> queue = new BoundedArrayQueue<Integer>(qMaxElements);
@Test
public void testBoundedArrayQueueOperations() throws Exception {
assertEquals(0, queue.size());
assertNull(queue.poll());
assertNull(queue.peek());
for(int i=0;i<qMaxElements;i++){
assertTrue(queue.offer(i));
}
assertEquals(qMaxElements, queue.size());
assertFalse(queue.offer(0));
assertEquals(0, queue.peek().intValue());
assertEquals(0, queue.peek().intValue());
for (int i = 0; i < qMaxElements; i++) {
assertEquals(i, queue.poll().intValue());
}
assertEquals(0, queue.size());
assertNull(queue.poll());
// Write after one cycle is over
assertTrue(queue.offer(100));
assertTrue(queue.offer(1000));
assertEquals(100, queue.peek().intValue());
assertEquals(100, queue.poll().intValue());
assertEquals(1000, queue.poll().intValue());
}
}