HBASE-16195 Should not add chunk into chunkQueue if not using chunk pool in HeapMemStoreLAB
This commit is contained in:
parent
7c4c51f2c7
commit
95d141f382
|
@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.util.ByteRange;
|
import org.apache.hadoop.hbase.util.ByteRange;
|
||||||
|
@ -62,9 +64,11 @@ public class HeapMemStoreLAB implements MemStoreLAB {
|
||||||
static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
|
static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
|
||||||
// allocator
|
// allocator
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class);
|
||||||
|
|
||||||
private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
|
private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
|
||||||
// A queue of chunks contained by this memstore
|
// A queue of chunks contained by this memstore, used with chunk pool
|
||||||
private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
|
private BlockingQueue<Chunk> chunkQueue = null;
|
||||||
final int chunkSize;
|
final int chunkSize;
|
||||||
final int maxAlloc;
|
final int maxAlloc;
|
||||||
private final MemStoreChunkPool chunkPool;
|
private final MemStoreChunkPool chunkPool;
|
||||||
|
@ -87,6 +91,12 @@ public class HeapMemStoreLAB implements MemStoreLAB {
|
||||||
chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
|
chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
|
||||||
maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
|
maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
|
||||||
this.chunkPool = MemStoreChunkPool.getPool(conf);
|
this.chunkPool = MemStoreChunkPool.getPool(conf);
|
||||||
|
// currently chunkQueue is only used for chunkPool
|
||||||
|
if (this.chunkPool != null) {
|
||||||
|
// set queue length to chunk pool max count to avoid keeping reference of
|
||||||
|
// too many non-reclaimable chunks
|
||||||
|
chunkQueue = new LinkedBlockingQueue<Chunk>(chunkPool.getMaxCount());
|
||||||
|
}
|
||||||
|
|
||||||
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
|
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
|
||||||
Preconditions.checkArgument(
|
Preconditions.checkArgument(
|
||||||
|
@ -166,6 +176,8 @@ public class HeapMemStoreLAB implements MemStoreLAB {
|
||||||
* Try to retire the current chunk if it is still
|
* Try to retire the current chunk if it is still
|
||||||
* <code>c</code>. Postcondition is that curChunk.get()
|
* <code>c</code>. Postcondition is that curChunk.get()
|
||||||
* != c
|
* != c
|
||||||
|
* @param c the chunk to retire
|
||||||
|
* @return true if we won the race to retire the chunk
|
||||||
*/
|
*/
|
||||||
private void tryRetireChunk(Chunk c) {
|
private void tryRetireChunk(Chunk c) {
|
||||||
curChunk.compareAndSet(c, null);
|
curChunk.compareAndSet(c, null);
|
||||||
|
@ -197,7 +209,12 @@ public class HeapMemStoreLAB implements MemStoreLAB {
|
||||||
// we won race - now we need to actually do the expensive
|
// we won race - now we need to actually do the expensive
|
||||||
// allocation step
|
// allocation step
|
||||||
c.init();
|
c.init();
|
||||||
this.chunkQueue.add(c);
|
if (chunkQueue != null && !this.closed && !this.chunkQueue.offer(c)) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
|
||||||
|
+ chunkQueue.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
return c;
|
return c;
|
||||||
} else if (chunkPool != null) {
|
} else if (chunkPool != null) {
|
||||||
chunkPool.putbackChunk(c);
|
chunkPool.putbackChunk(c);
|
||||||
|
@ -212,6 +229,11 @@ public class HeapMemStoreLAB implements MemStoreLAB {
|
||||||
return this.curChunk.get();
|
return this.curChunk.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
BlockingQueue<Chunk> getChunkQueue() {
|
||||||
|
return this.chunkQueue;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A chunk of memory out of which allocations are sliced.
|
* A chunk of memory out of which allocations are sliced.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
|
||||||
import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk;
|
import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -123,6 +124,13 @@ public class MemStoreChunkPool {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
chunks.drainTo(reclaimedChunks, maxNumToPutback);
|
chunks.drainTo(reclaimedChunks, maxNumToPutback);
|
||||||
|
// clear reference of any non-reclaimable chunks
|
||||||
|
if (chunks.size() > 0) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Left " + chunks.size() + " unreclaimable chunks, removing them from queue");
|
||||||
|
}
|
||||||
|
chunks.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -217,4 +225,13 @@ public class MemStoreChunkPool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int getMaxCount() {
|
||||||
|
return this.maxCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static void clearDisableFlag() {
|
||||||
|
chunkPoolDisabled = false;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
||||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
|
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
@ -36,6 +38,7 @@ import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
|
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
@Category(SmallTests.class)
|
@Category(SmallTests.class)
|
||||||
|
@ -149,6 +152,77 @@ public class TestMemStoreLAB {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure
|
||||||
|
* there's no memory leak (HBASE-16195)
|
||||||
|
* @throws Exception if any error occurred
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testLABChunkQueue() throws Exception {
|
||||||
|
HeapMemStoreLAB mslab = new HeapMemStoreLAB();
|
||||||
|
// by default setting, there should be no chunk queue initialized
|
||||||
|
assertNull(mslab.getChunkQueue());
|
||||||
|
// reset mslab with chunk pool
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
conf.setDouble(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.1);
|
||||||
|
// set chunk size to default max alloc size, so we could easily trigger chunk retirement
|
||||||
|
conf.setLong(HeapMemStoreLAB.CHUNK_SIZE_KEY, HeapMemStoreLAB.MAX_ALLOC_DEFAULT);
|
||||||
|
// reconstruct mslab
|
||||||
|
MemStoreChunkPool.clearDisableFlag();
|
||||||
|
mslab = new HeapMemStoreLAB(conf);
|
||||||
|
// launch multiple threads to trigger frequent chunk retirement
|
||||||
|
List<Thread> threads = new ArrayList<Thread>();
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i));
|
||||||
|
}
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
// let it run for some time
|
||||||
|
Thread.sleep(1000);
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
thread.interrupt();
|
||||||
|
}
|
||||||
|
boolean threadsRunning = true;
|
||||||
|
while (threadsRunning) {
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
if (thread.isAlive()) {
|
||||||
|
threadsRunning = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
threadsRunning = false;
|
||||||
|
}
|
||||||
|
// close the mslab
|
||||||
|
mslab.close();
|
||||||
|
// make sure all chunks reclaimed or removed from chunk queue
|
||||||
|
int queueLength = mslab.getChunkQueue().size();
|
||||||
|
assertTrue("All chunks in chunk queue should be reclaimed or removed"
|
||||||
|
+ " after mslab closed but actually: " + queueLength, queueLength == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Thread getChunkQueueTestThread(final HeapMemStoreLAB mslab, String threadName) {
|
||||||
|
Thread thread = new Thread() {
|
||||||
|
boolean stopped = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (!stopped) {
|
||||||
|
// keep triggering chunk retirement
|
||||||
|
mslab.allocateBytes(HeapMemStoreLAB.MAX_ALLOC_DEFAULT - 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void interrupt() {
|
||||||
|
this.stopped = true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
thread.setName(threadName);
|
||||||
|
thread.setDaemon(true);
|
||||||
|
return thread;
|
||||||
|
}
|
||||||
|
|
||||||
private static class AllocRecord implements Comparable<AllocRecord>{
|
private static class AllocRecord implements Comparable<AllocRecord>{
|
||||||
private final ByteRange alloc;
|
private final ByteRange alloc;
|
||||||
private final int size;
|
private final int size;
|
||||||
|
|
Loading…
Reference in New Issue