HBASE-16407 Handle MemstoreChunkPool size when HeapMemoryManager tunes memory.

This commit is contained in:
anoopsamjohn 2016-08-29 10:22:27 +05:30
parent 7eaba369e7
commit 950d547dae
4 changed files with 78 additions and 17 deletions

View File

@ -1463,6 +1463,11 @@ public class HRegionServer extends HasThread implements
this, this.regionServerAccounting);
if (this.hMemManager != null) {
this.hMemManager.start(getChoreService());
MemStoreChunkPool chunkPool = MemStoreChunkPool.getPool(this.conf);
if (chunkPool != null) {
// Register it as HeapMemoryTuneObserver
this.hMemManager.registerTuneObserver(chunkPool);
}
}
}

View File

@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@ -87,6 +89,8 @@ public class HeapMemoryManager {
private MetricsHeapMemoryManager metricsHeapMemoryManager;
private List<HeapMemoryTuneObserver> tuneObservers = new ArrayList<HeapMemoryTuneObserver>();
public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
Server server, RegionServerAccounting regionServerAccounting) {
BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
@ -206,6 +210,10 @@ public class HeapMemoryManager {
this.heapMemTunerChore.cancel(true);
}
public void registerTuneObserver(HeapMemoryTuneObserver observer) {
this.tuneObservers.add(observer);
}
// Used by the test cases.
boolean isTunerOn() {
return this.tunerOn;
@ -351,6 +359,9 @@ public class HeapMemoryManager {
blockCache.setMaxSize(newBlockCacheSize);
globalMemStorePercent = memstoreSize;
memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize);
for (HeapMemoryTuneObserver observer : tuneObservers) {
observer.onHeapMemoryTune(newMemstoreSize, newBlockCacheSize);
}
}
} else {
metricsHeapMemoryManager.increaseTunerDoNothingCounter();
@ -489,4 +500,17 @@ public class HeapMemoryManager {
return needsTuning;
}
}
/**
* Every class that wants to observe heap memory tune actions must implement this interface.
*/
public static interface HeapMemoryTuneObserver {
/**
* This method would be called by HeapMemoryManger when a heap memory tune action took place.
* @param newMemstoreSize The newly calculated global memstore size
* @param newBlockCacheSize The newly calculated global blockcache size
*/
void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize);
}
}

View File

@ -31,13 +31,14 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A pool of {@link HeapMemStoreLAB.Chunk} instances.
* A pool of {@link Chunk} instances.
*
* MemStoreChunkPool caches a number of retired chunks for reusing, it could
* decrease allocating bytes when writing, thereby optimizing the garbage
@ -52,7 +53,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
*/
@SuppressWarnings("javadoc")
@InterfaceAudience.Private
public class MemStoreChunkPool {
public class MemStoreChunkPool implements HeapMemoryTuneObserver {
private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
final static String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize";
final static String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize";
@ -64,30 +65,32 @@ public class MemStoreChunkPool {
/** Boolean whether we have disabled the memstore chunk pool entirely. */
static boolean chunkPoolDisabled = false;
private final int maxCount;
private int maxCount;
// A queue of reclaimed chunks
private final BlockingQueue<PooledChunk> reclaimedChunks;
private final int chunkSize;
private final float poolSizePercentage;
/** Statistics thread schedule pool */
private final ScheduledExecutorService scheduleThreadPool;
/** Statistics thread */
private static final int statThreadPeriod = 60 * 5;
private final AtomicLong createdChunkCount = new AtomicLong();
private final AtomicLong chunkCount = new AtomicLong();
private final AtomicLong reusedChunkCount = new AtomicLong();
MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount,
int initialCount) {
int initialCount, float poolSizePercentage) {
this.maxCount = maxCount;
this.chunkSize = chunkSize;
this.poolSizePercentage = poolSizePercentage;
this.reclaimedChunks = new LinkedBlockingQueue<PooledChunk>();
for (int i = 0; i < initialCount; i++) {
PooledChunk chunk = new PooledChunk(chunkSize);
chunk.init();
reclaimedChunks.add(chunk);
}
createdChunkCount.set(initialCount);
chunkCount.set(initialCount);
final String n = Thread.currentThread().getName();
scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
@ -113,10 +116,10 @@ public class MemStoreChunkPool {
} else {
// Make a chunk iff we have not yet created the maxCount chunks
while (true) {
long created = this.createdChunkCount.get();
long created = this.chunkCount.get();
if (created < this.maxCount) {
chunk = new PooledChunk(chunkSize);
if (this.createdChunkCount.compareAndSet(created, created + 1)) {
if (this.chunkCount.compareAndSet(created, created + 1)) {
break;
}
} else {
@ -132,11 +135,12 @@ public class MemStoreChunkPool {
* skip the remaining chunks
* @param chunks
*/
void putbackChunks(BlockingQueue<PooledChunk> chunks) {
assert reclaimedChunks.size() < this.maxCount;
synchronized void putbackChunks(BlockingQueue<PooledChunk> chunks) {
int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
PooledChunk chunk = null;
while ((chunk = chunks.poll()) != null) {
while ((chunk = chunks.poll()) != null && toAdd > 0) {
reclaimedChunks.add(chunk);
toAdd--;
}
}
@ -145,9 +149,10 @@ public class MemStoreChunkPool {
* skip it
* @param chunk
*/
void putbackChunk(PooledChunk chunk) {
assert reclaimedChunks.size() < this.maxCount;
reclaimedChunks.add(chunk);
synchronized void putbackChunk(PooledChunk chunk) {
if (reclaimedChunks.size() < this.maxCount) {
reclaimedChunks.add(chunk);
}
}
int getPoolSize() {
@ -174,7 +179,7 @@ public class MemStoreChunkPool {
private void logStats() {
if (!LOG.isDebugEnabled()) return;
long created = createdChunkCount.get();
long created = chunkCount.get();
long reused = reusedChunkCount.get();
long total = created + reused;
LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
@ -222,7 +227,8 @@ public class MemStoreChunkPool {
int initialCount = (int) (initialCountPercentage * maxCount);
LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
+ ", max count " + maxCount + ", initial count " + initialCount);
GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount,
poolSizePercentage);
return GLOBAL_INSTANCE;
}
}
@ -241,4 +247,30 @@ public class MemStoreChunkPool {
super(size);
}
}
@Override
public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize);
if (newMaxCount != this.maxCount) {
// We need an adjustment in the chunks numbers
if (newMaxCount > this.maxCount) {
// Max chunks getting increased. Just change the variable. Later calls to getChunk() would
// create and add them to Q
LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount);
this.maxCount = newMaxCount;
} else {
// Max chunks getting decreased. We may need to clear off some of the pooled chunks now
// itself. If the extra chunks are serving already, do not pool those when we get them back
LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount);
this.maxCount = newMaxCount;
if (this.reclaimedChunks.size() > newMaxCount) {
synchronized (this) {
while (this.reclaimedChunks.size() > newMaxCount) {
this.reclaimedChunks.poll();
}
}
}
}
}
}
}

View File

@ -203,7 +203,7 @@ public class TestMemStoreChunkPool {
final int maxCount = 10;
final int initialCount = 5;
final int chunkSize = 10;
MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount, 1);
assertEquals(initialCount, pool.getPoolSize());
assertEquals(maxCount, pool.getMaxCount());
MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created.