HBASE-16195 Should not add chunk into chunkQueue if not using chunk pool in HeapMemStoreLAB

This commit is contained in:
Yu Li 2016-07-13 09:33:24 +08:00
parent 911706a873
commit 3b3c3dc02d
3 changed files with 117 additions and 4 deletions

View File

@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ByteRange;
@ -62,9 +64,11 @@ public class HeapMemStoreLAB implements MemStoreLAB {
static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
// allocator
static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class);
private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
// A queue of chunks contained by this memstore
private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
// A queue of chunks contained by this memstore, used with chunk pool
private BlockingQueue<Chunk> chunkQueue = null;
final int chunkSize;
final int maxAlloc;
private final MemStoreChunkPool chunkPool;
@ -87,6 +91,12 @@ public class HeapMemStoreLAB implements MemStoreLAB {
chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
this.chunkPool = MemStoreChunkPool.getPool(conf);
// currently chunkQueue is only used for chunkPool
if (this.chunkPool != null) {
// set queue length to chunk pool max count to avoid keeping reference of
// too many non-reclaimable chunks
chunkQueue = new LinkedBlockingQueue<Chunk>(chunkPool.getMaxCount());
}
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
Preconditions.checkArgument(
@ -166,6 +176,8 @@ public class HeapMemStoreLAB implements MemStoreLAB {
* Try to retire the current chunk if it is still
* <code>c</code>. Postcondition is that curChunk.get()
* != c
* @param c the chunk to retire
* @return true if we won the race to retire the chunk
*/
private void tryRetireChunk(Chunk c) {
curChunk.compareAndSet(c, null);
@ -197,7 +209,12 @@ public class HeapMemStoreLAB implements MemStoreLAB {
// we won race - now we need to actually do the expensive
// allocation step
c.init();
this.chunkQueue.add(c);
if (chunkQueue != null && !this.closed && !this.chunkQueue.offer(c)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
+ chunkQueue.size());
}
}
return c;
} else if (chunkPool != null) {
chunkPool.putbackChunk(c);
@ -212,6 +229,11 @@ public class HeapMemStoreLAB implements MemStoreLAB {
return this.curChunk.get();
}
@VisibleForTesting
BlockingQueue<Chunk> getChunkQueue() {
return this.chunkQueue;
}
/**
* A chunk of memory out of which allocations are sliced.
*/

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@ -123,6 +124,13 @@ public class MemStoreChunkPool {
return;
}
chunks.drainTo(reclaimedChunks, maxNumToPutback);
// clear reference of any non-reclaimable chunks
if (chunks.size() > 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Left " + chunks.size() + " unreclaimable chunks, removing them from queue");
}
chunks.clear();
}
}
/**
@ -217,4 +225,13 @@ public class MemStoreChunkPool {
}
}
int getMaxCount() {
return this.maxCount;
}
@VisibleForTesting
static void clearDisableFlag() {
chunkPoolDisabled = false;
}
}

View File

@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -37,6 +39,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import org.junit.experimental.categories.Category;
@Category({RegionServerTests.class, SmallTests.class})
@ -150,6 +153,77 @@ public class TestMemStoreLAB {
}
/**
* Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure
* there's no memory leak (HBASE-16195)
* @throws Exception if any error occurred
*/
@Test
public void testLABChunkQueue() throws Exception {
HeapMemStoreLAB mslab = new HeapMemStoreLAB();
// by default setting, there should be no chunk queue initialized
assertNull(mslab.getChunkQueue());
// reset mslab with chunk pool
Configuration conf = HBaseConfiguration.create();
conf.setDouble(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.1);
// set chunk size to default max alloc size, so we could easily trigger chunk retirement
conf.setLong(HeapMemStoreLAB.CHUNK_SIZE_KEY, HeapMemStoreLAB.MAX_ALLOC_DEFAULT);
// reconstruct mslab
MemStoreChunkPool.clearDisableFlag();
mslab = new HeapMemStoreLAB(conf);
// launch multiple threads to trigger frequent chunk retirement
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < 10; i++) {
threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i));
}
for (Thread thread : threads) {
thread.start();
}
// let it run for some time
Thread.sleep(1000);
for (Thread thread : threads) {
thread.interrupt();
}
boolean threadsRunning = true;
while (threadsRunning) {
for (Thread thread : threads) {
if (thread.isAlive()) {
threadsRunning = true;
break;
}
}
threadsRunning = false;
}
// close the mslab
mslab.close();
// make sure all chunks reclaimed or removed from chunk queue
int queueLength = mslab.getChunkQueue().size();
assertTrue("All chunks in chunk queue should be reclaimed or removed"
+ " after mslab closed but actually: " + queueLength, queueLength == 0);
}
private Thread getChunkQueueTestThread(final HeapMemStoreLAB mslab, String threadName) {
Thread thread = new Thread() {
boolean stopped = false;
@Override
public void run() {
while (!stopped) {
// keep triggering chunk retirement
mslab.allocateBytes(HeapMemStoreLAB.MAX_ALLOC_DEFAULT - 1);
}
}
@Override
public void interrupt() {
this.stopped = true;
}
};
thread.setName(threadName);
thread.setDaemon(true);
return thread;
}
private static class AllocRecord implements Comparable<AllocRecord>{
private final ByteRange alloc;
private final int size;