HBASE-16440 MemstoreChunkPool might cross its maxCount of chunks to pool.

This commit is contained in:
anoopsamjohn 2016-08-23 22:28:14 +05:30
parent 77a7394f17
commit 897631f8d1
4 changed files with 273 additions and 183 deletions

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* A chunk of memory out of which allocations are sliced.
*/
@InterfaceAudience.Private
public class Chunk {
/** Actual underlying data */
private byte[] data;
private static final int UNINITIALIZED = -1;
private static final int OOM = -2;
/**
* Offset for the next allocation, or the sentinel value -1 which implies that the chunk is still
* uninitialized.
*/
private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
/** Total number of allocations satisfied from this buffer */
private AtomicInteger allocCount = new AtomicInteger();
/** Size of chunk in bytes */
private final int size;
/**
* Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap.
*
* @param size in bytes
*/
Chunk(int size) {
this.size = size;
}
/**
* Actually claim the memory for this chunk. This should only be called from the thread that
* constructed the chunk. It is thread-safe against other threads calling alloc(), who will block
* until the allocation is complete.
*/
public void init() {
assert nextFreeOffset.get() == UNINITIALIZED;
try {
if (data == null) {
data = new byte[size];
}
} catch (OutOfMemoryError e) {
boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
assert failInit; // should be true.
throw e;
}
// Mark that it's ready for use
boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
// We should always succeed the above CAS since only one thread
// calls init()!
Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
}
/**
* Reset the offset to UNINITIALIZED before before reusing an old chunk
*/
void reset() {
if (nextFreeOffset.get() != UNINITIALIZED) {
nextFreeOffset.set(UNINITIALIZED);
allocCount.set(0);
}
}
/**
* Try to allocate <code>size</code> bytes from the chunk.
*
* @return the offset of the successful allocation, or -1 to indicate not-enough-space
*/
public int alloc(int size) {
while (true) {
int oldOffset = nextFreeOffset.get();
if (oldOffset == UNINITIALIZED) {
// The chunk doesn't have its data allocated yet.
// Since we found this in curChunk, we know that whoever
// CAS-ed it there is allocating it right now. So spin-loop
// shouldn't spin long!
Thread.yield();
continue;
}
if (oldOffset == OOM) {
// doh we ran out of ram. return -1 to chuck this away.
return -1;
}
if (oldOffset + size > data.length) {
return -1; // alloc doesn't fit
}
// Try to atomically claim this chunk
if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
// we got the alloc
allocCount.incrementAndGet();
return oldOffset;
}
// we raced and lost alloc, try again
}
}
/**
* @return This chunk's backing data.
*/
byte[] getData() {
return this.data;
}
@Override
public String toString() {
return "Chunk@" + System.identityHashCode(this) + " allocs=" + allocCount.get() + "waste="
+ (data.length - nextFreeOffset.get());
}
@VisibleForTesting
int getNextFreeOffset() {
return this.nextFreeOffset.get();
}
}

View File

@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.MemStoreChunkPool.PooledChunk;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
@ -67,10 +68,11 @@ public class HeapMemStoreLAB implements MemStoreLAB {
static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class);
private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
// A queue of chunks contained by this memstore, used with chunk pool
private BlockingQueue<Chunk> chunkQueue = null;
final int chunkSize;
final int maxAlloc;
// A queue of chunks from pool contained by this memstore LAB
@VisibleForTesting
BlockingQueue<PooledChunk> pooledChunkQueue = null;
private final int chunkSize;
private final int maxAlloc;
private final MemStoreChunkPool chunkPool;
// This flag is for closing this instance, its set when clearing snapshot of
@ -95,7 +97,7 @@ public class HeapMemStoreLAB implements MemStoreLAB {
if (this.chunkPool != null) {
// set queue length to chunk pool max count to avoid keeping reference of
// too many non-reclaimable chunks
chunkQueue = new LinkedBlockingQueue<Chunk>(chunkPool.getMaxCount());
pooledChunkQueue = new LinkedBlockingQueue<PooledChunk>(chunkPool.getMaxCount());
}
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
@ -128,7 +130,7 @@ public class HeapMemStoreLAB implements MemStoreLAB {
if (allocOffset != -1) {
// We succeeded - this is the common case - small alloc
// from a big buffer
return new SimpleMutableByteRange(c.data, allocOffset, size);
return new SimpleMutableByteRange(c.getData(), allocOffset, size);
}
// not enough space!
@ -148,7 +150,7 @@ public class HeapMemStoreLAB implements MemStoreLAB {
// opening scanner which will read their data
if (chunkPool != null && openScannerCount.get() == 0
&& reclaimed.compareAndSet(false, true)) {
chunkPool.putbackChunks(this.chunkQueue);
chunkPool.putbackChunks(this.pooledChunkQueue);
}
}
@ -166,9 +168,9 @@ public class HeapMemStoreLAB implements MemStoreLAB {
@Override
public void decScannerCount() {
int count = this.openScannerCount.decrementAndGet();
if (chunkPool != null && count == 0 && this.closed
if (this.closed && chunkPool != null && count == 0
&& reclaimed.compareAndSet(false, true)) {
chunkPool.putbackChunks(this.chunkQueue);
chunkPool.putbackChunks(this.pooledChunkQueue);
}
}
@ -204,20 +206,31 @@ public class HeapMemStoreLAB implements MemStoreLAB {
// No current chunk, so we want to allocate one. We race
// against other allocators to CAS in an uninitialized chunk
// (which is cheap to allocate)
c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
if (chunkPool != null) {
c = chunkPool.getChunk();
}
boolean pooledChunk = false;
if (c != null) {
// This is chunk from pool
pooledChunk = true;
} else {
c = new Chunk(chunkSize);
}
if (curChunk.compareAndSet(null, c)) {
// we won race - now we need to actually do the expensive
// allocation step
c.init();
if (chunkQueue != null && !this.closed && !this.chunkQueue.offer(c)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
+ chunkQueue.size());
if (pooledChunk) {
if (!this.closed && !this.pooledChunkQueue.offer((PooledChunk) c)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
+ pooledChunkQueue.size());
}
}
}
return c;
} else if (chunkPool != null) {
chunkPool.putbackChunk(c);
} else if (pooledChunk) {
chunkPool.putbackChunk((PooledChunk) c);
}
// someone else won race - that's fine, we'll try to grab theirs
// in the next iteration of the loop.
@ -230,119 +243,7 @@ public class HeapMemStoreLAB implements MemStoreLAB {
}
@VisibleForTesting
BlockingQueue<Chunk> getChunkQueue() {
return this.chunkQueue;
}
/**
* A chunk of memory out of which allocations are sliced.
*/
static class Chunk {
/** Actual underlying data */
private byte[] data;
private static final int UNINITIALIZED = -1;
private static final int OOM = -2;
/**
* Offset for the next allocation, or the sentinel value -1
* which implies that the chunk is still uninitialized.
* */
private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
/** Total number of allocations satisfied from this buffer */
private AtomicInteger allocCount = new AtomicInteger();
/** Size of chunk in bytes */
private final int size;
/**
* Create an uninitialized chunk. Note that memory is not allocated yet, so
* this is cheap.
* @param size in bytes
*/
Chunk(int size) {
this.size = size;
}
/**
* Actually claim the memory for this chunk. This should only be called from
* the thread that constructed the chunk. It is thread-safe against other
* threads calling alloc(), who will block until the allocation is complete.
*/
public void init() {
assert nextFreeOffset.get() == UNINITIALIZED;
try {
if (data == null) {
data = new byte[size];
}
} catch (OutOfMemoryError e) {
boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
assert failInit; // should be true.
throw e;
}
// Mark that it's ready for use
boolean initted = nextFreeOffset.compareAndSet(
UNINITIALIZED, 0);
// We should always succeed the above CAS since only one thread
// calls init()!
Preconditions.checkState(initted,
"Multiple threads tried to init same chunk");
}
/**
* Reset the offset to UNINITIALIZED before before reusing an old chunk
*/
void reset() {
if (nextFreeOffset.get() != UNINITIALIZED) {
nextFreeOffset.set(UNINITIALIZED);
allocCount.set(0);
}
}
/**
* Try to allocate <code>size</code> bytes from the chunk.
* @return the offset of the successful allocation, or -1 to indicate not-enough-space
*/
public int alloc(int size) {
while (true) {
int oldOffset = nextFreeOffset.get();
if (oldOffset == UNINITIALIZED) {
// The chunk doesn't have its data allocated yet.
// Since we found this in curChunk, we know that whoever
// CAS-ed it there is allocating it right now. So spin-loop
// shouldn't spin long!
Thread.yield();
continue;
}
if (oldOffset == OOM) {
// doh we ran out of ram. return -1 to chuck this away.
return -1;
}
if (oldOffset + size > data.length) {
return -1; // alloc doesn't fit
}
// Try to atomically claim this chunk
if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
// we got the alloc
allocCount.incrementAndGet();
return oldOffset;
}
// we raced and lost alloc, try again
}
}
@Override
public String toString() {
return "Chunk@" + System.identityHashCode(this) +
" allocs=" + allocCount.get() + "waste=" +
(data.length - nextFreeOffset.get());
}
@VisibleForTesting
int getNextFreeOffset() {
return this.nextFreeOffset.get();
}
BlockingQueue<PooledChunk> getChunkQueue() {
return this.pooledChunkQueue;
}
}

View File

@ -31,7 +31,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@ -61,54 +60,69 @@ public class MemStoreChunkPool {
final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f;
// Static reference to the MemStoreChunkPool
private static MemStoreChunkPool GLOBAL_INSTANCE;
static MemStoreChunkPool GLOBAL_INSTANCE;
/** Boolean whether we have disabled the memstore chunk pool entirely. */
static boolean chunkPoolDisabled = false;
private final int maxCount;
// A queue of reclaimed chunks
private final BlockingQueue<Chunk> reclaimedChunks;
private final BlockingQueue<PooledChunk> reclaimedChunks;
private final int chunkSize;
/** Statistics thread schedule pool */
private final ScheduledExecutorService scheduleThreadPool;
/** Statistics thread */
private static final int statThreadPeriod = 60 * 5;
private AtomicLong createdChunkCount = new AtomicLong();
private AtomicLong reusedChunkCount = new AtomicLong();
private final AtomicLong createdChunkCount = new AtomicLong();
private final AtomicLong reusedChunkCount = new AtomicLong();
MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount,
int initialCount) {
this.maxCount = maxCount;
this.chunkSize = chunkSize;
this.reclaimedChunks = new LinkedBlockingQueue<Chunk>();
this.reclaimedChunks = new LinkedBlockingQueue<PooledChunk>();
for (int i = 0; i < initialCount; i++) {
Chunk chunk = new Chunk(chunkSize);
PooledChunk chunk = new PooledChunk(chunkSize);
chunk.init();
reclaimedChunks.add(chunk);
}
createdChunkCount.set(initialCount);
final String n = Thread.currentThread().getName();
scheduleThreadPool = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat(n+"-MemStoreChunkPool Statistics")
.setDaemon(true).build());
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod,
statThreadPeriod, TimeUnit.SECONDS);
}
/**
* Poll a chunk from the pool, reset it if not null, else create a new chunk
* to return
* Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have
* not yet created max allowed chunks count. When we have already created max allowed chunks and
* no free chunks as of now, return null. It is the responsibility of the caller to make a chunk
* then.
* Note: Chunks returned by this pool must be put back to the pool after its use.
* @return a chunk
* @see #putbackChunk(Chunk)
* @see #putbackChunks(BlockingQueue)
*/
Chunk getChunk() {
Chunk chunk = reclaimedChunks.poll();
if (chunk == null) {
chunk = new Chunk(chunkSize);
createdChunkCount.incrementAndGet();
} else {
PooledChunk getChunk() {
PooledChunk chunk = reclaimedChunks.poll();
if (chunk != null) {
chunk.reset();
reusedChunkCount.incrementAndGet();
} else {
// Make a chunk iff we have not yet created the maxCount chunks
while (true) {
long created = this.createdChunkCount.get();
if (created < this.maxCount) {
chunk = new PooledChunk(chunkSize);
if (this.createdChunkCount.compareAndSet(created, created + 1)) {
break;
}
} else {
break;
}
}
}
return chunk;
}
@ -118,18 +132,11 @@ public class MemStoreChunkPool {
* skip the remaining chunks
* @param chunks
*/
void putbackChunks(BlockingQueue<Chunk> chunks) {
int maxNumToPutback = this.maxCount - reclaimedChunks.size();
if (maxNumToPutback <= 0) {
return;
}
chunks.drainTo(reclaimedChunks, maxNumToPutback);
// clear reference of any non-reclaimable chunks
if (chunks.size() > 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Left " + chunks.size() + " unreclaimable chunks, removing them from queue");
}
chunks.clear();
void putbackChunks(BlockingQueue<PooledChunk> chunks) {
assert reclaimedChunks.size() < this.maxCount;
PooledChunk chunk = null;
while ((chunk = chunks.poll()) != null) {
reclaimedChunks.add(chunk);
}
}
@ -138,10 +145,8 @@ public class MemStoreChunkPool {
* skip it
* @param chunk
*/
void putbackChunk(Chunk chunk) {
if (reclaimedChunks.size() >= this.maxCount) {
return;
}
void putbackChunk(PooledChunk chunk) {
assert reclaimedChunks.size() < this.maxCount;
reclaimedChunks.add(chunk);
}
@ -156,31 +161,28 @@ public class MemStoreChunkPool {
this.reclaimedChunks.clear();
}
private static class StatisticsThread extends Thread {
MemStoreChunkPool mcp;
public StatisticsThread(MemStoreChunkPool mcp) {
private class StatisticsThread extends Thread {
StatisticsThread() {
super("MemStoreChunkPool.StatisticsThread");
setDaemon(true);
this.mcp = mcp;
}
@Override
public void run() {
mcp.logStats();
logStats();
}
}
private void logStats() {
if (!LOG.isDebugEnabled()) return;
long created = createdChunkCount.get();
long reused = reusedChunkCount.get();
long total = created + reused;
LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
+ ",created chunk count=" + created
+ ",reused chunk count=" + reused
+ ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
(float) reused / (float) total, 2)));
private void logStats() {
if (!LOG.isDebugEnabled()) return;
long created = createdChunkCount.get();
long reused = reusedChunkCount.get();
long total = created + reused;
LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
+ ",created chunk count=" + created
+ ",reused chunk count=" + reused
+ ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
(float) reused / (float) total, 2)));
}
}
/**
@ -234,4 +236,9 @@ public class MemStoreChunkPool {
chunkPoolDisabled = false;
}
public static class PooledChunk extends Chunk {
PooledChunk(int size) {
super(size);
}
}
}

View File

@ -196,4 +196,43 @@ public class TestMemStoreChunkPool {
assertTrue(chunkPool.getPoolSize() > 0);
}
@Test
public void testPutbackChunksMultiThreaded() throws Exception {
MemStoreChunkPool oldPool = MemStoreChunkPool.GLOBAL_INSTANCE;
final int maxCount = 10;
final int initialCount = 5;
final int chunkSize = 10;
MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
assertEquals(initialCount, pool.getPoolSize());
assertEquals(maxCount, pool.getMaxCount());
MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created.
// Used it for the testing. Later in finally we put
// back the original
try {
Runnable r = new Runnable() {
@Override
public void run() {
MemStoreLAB memStoreLAB = new HeapMemStoreLAB(conf);
for (int i = 0; i < maxCount; i++) {
memStoreLAB.allocateBytes(chunkSize);// Try allocate size = chunkSize. Means every
// allocate call will result in a new chunk
}
// Close MemStoreLAB so that all chunks will be tried to be put back to pool
memStoreLAB.close();
}
};
Thread t1 = new Thread(r);
Thread t2 = new Thread(r);
Thread t3 = new Thread(r);
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
assertTrue(pool.getPoolSize() <= maxCount);
} finally {
MemStoreChunkPool.GLOBAL_INSTANCE = oldPool;
}
}
}