diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
new file mode 100644
index 00000000000..d968ed9137b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * A chunk of memory out of which allocations are sliced.
+ */
+@InterfaceAudience.Private
+public class Chunk {
+ /** Actual underlying data */
+ private byte[] data;
+
+ private static final int UNINITIALIZED = -1;
+ private static final int OOM = -2;
+ /**
+ * Offset for the next allocation, or the sentinel value -1 which implies that the chunk is still
+ * uninitialized.
+ */
+ private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
+
+ /** Total number of allocations satisfied from this buffer */
+ private AtomicInteger allocCount = new AtomicInteger();
+
+ /** Size of chunk in bytes */
+ private final int size;
+
+ /**
+ * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap.
+ *
+ * @param size in bytes
+ */
+ Chunk(int size) {
+ this.size = size;
+ }
+
+ /**
+ * Actually claim the memory for this chunk. This should only be called from the thread that
+ * constructed the chunk. It is thread-safe against other threads calling alloc(), who will block
+ * until the allocation is complete.
+ */
+ public void init() {
+ assert nextFreeOffset.get() == UNINITIALIZED;
+ try {
+ if (data == null) {
+ data = new byte[size];
+ }
+ } catch (OutOfMemoryError e) {
+ boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+ assert failInit; // should be true.
+ throw e;
+ }
+ // Mark that it's ready for use
+ boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
+ // We should always succeed the above CAS since only one thread
+ // calls init()!
+ Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
+ }
+
+ /**
+ * Reset the offset to UNINITIALIZED before before reusing an old chunk
+ */
+ void reset() {
+ if (nextFreeOffset.get() != UNINITIALIZED) {
+ nextFreeOffset.set(UNINITIALIZED);
+ allocCount.set(0);
+ }
+ }
+
+ /**
+ * Try to allocate size
bytes from the chunk.
+ *
+ * @return the offset of the successful allocation, or -1 to indicate not-enough-space
+ */
+ public int alloc(int size) {
+ while (true) {
+ int oldOffset = nextFreeOffset.get();
+ if (oldOffset == UNINITIALIZED) {
+ // The chunk doesn't have its data allocated yet.
+ // Since we found this in curChunk, we know that whoever
+ // CAS-ed it there is allocating it right now. So spin-loop
+ // shouldn't spin long!
+ Thread.yield();
+ continue;
+ }
+ if (oldOffset == OOM) {
+ // doh we ran out of ram. return -1 to chuck this away.
+ return -1;
+ }
+
+ if (oldOffset + size > data.length) {
+ return -1; // alloc doesn't fit
+ }
+
+ // Try to atomically claim this chunk
+ if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
+ // we got the alloc
+ allocCount.incrementAndGet();
+ return oldOffset;
+ }
+ // we raced and lost alloc, try again
+ }
+ }
+
+ /**
+ * @return This chunk's backing data.
+ */
+ byte[] getData() {
+ return this.data;
+ }
+
+ @Override
+ public String toString() {
+ return "Chunk@" + System.identityHashCode(this) + " allocs=" + allocCount.get() + "waste="
+ + (data.length - nextFreeOffset.get());
+ }
+
+ @VisibleForTesting
+ int getNextFreeOffset() {
+ return this.nextFreeOffset.get();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
index d8fa5c3934b..3ca4b0c52b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.MemStoreChunkPool.PooledChunk;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
@@ -67,10 +68,11 @@ public class HeapMemStoreLAB implements MemStoreLAB {
static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class);
private AtomicReference curChunk = new AtomicReference();
- // A queue of chunks contained by this memstore, used with chunk pool
- private BlockingQueue chunkQueue = null;
- final int chunkSize;
- final int maxAlloc;
+ // A queue of chunks from pool contained by this memstore LAB
+ @VisibleForTesting
+ BlockingQueue pooledChunkQueue = null;
+ private final int chunkSize;
+ private final int maxAlloc;
private final MemStoreChunkPool chunkPool;
// This flag is for closing this instance, its set when clearing snapshot of
@@ -95,7 +97,7 @@ public class HeapMemStoreLAB implements MemStoreLAB {
if (this.chunkPool != null) {
// set queue length to chunk pool max count to avoid keeping reference of
// too many non-reclaimable chunks
- chunkQueue = new LinkedBlockingQueue(chunkPool.getMaxCount());
+ pooledChunkQueue = new LinkedBlockingQueue(chunkPool.getMaxCount());
}
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
@@ -128,7 +130,7 @@ public class HeapMemStoreLAB implements MemStoreLAB {
if (allocOffset != -1) {
// We succeeded - this is the common case - small alloc
// from a big buffer
- return new SimpleMutableByteRange(c.data, allocOffset, size);
+ return new SimpleMutableByteRange(c.getData(), allocOffset, size);
}
// not enough space!
@@ -148,7 +150,7 @@ public class HeapMemStoreLAB implements MemStoreLAB {
// opening scanner which will read their data
if (chunkPool != null && openScannerCount.get() == 0
&& reclaimed.compareAndSet(false, true)) {
- chunkPool.putbackChunks(this.chunkQueue);
+ chunkPool.putbackChunks(this.pooledChunkQueue);
}
}
@@ -166,9 +168,9 @@ public class HeapMemStoreLAB implements MemStoreLAB {
@Override
public void decScannerCount() {
int count = this.openScannerCount.decrementAndGet();
- if (chunkPool != null && count == 0 && this.closed
+ if (this.closed && chunkPool != null && count == 0
&& reclaimed.compareAndSet(false, true)) {
- chunkPool.putbackChunks(this.chunkQueue);
+ chunkPool.putbackChunks(this.pooledChunkQueue);
}
}
@@ -204,20 +206,31 @@ public class HeapMemStoreLAB implements MemStoreLAB {
// No current chunk, so we want to allocate one. We race
// against other allocators to CAS in an uninitialized chunk
// (which is cheap to allocate)
- c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
+ if (chunkPool != null) {
+ c = chunkPool.getChunk();
+ }
+ boolean pooledChunk = false;
+ if (c != null) {
+ // This is chunk from pool
+ pooledChunk = true;
+ } else {
+ c = new Chunk(chunkSize);
+ }
if (curChunk.compareAndSet(null, c)) {
// we won race - now we need to actually do the expensive
// allocation step
c.init();
- if (chunkQueue != null && !this.closed && !this.chunkQueue.offer(c)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
- + chunkQueue.size());
+ if (pooledChunk) {
+ if (!this.closed && !this.pooledChunkQueue.offer((PooledChunk) c)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
+ + pooledChunkQueue.size());
+ }
}
}
return c;
- } else if (chunkPool != null) {
- chunkPool.putbackChunk(c);
+ } else if (pooledChunk) {
+ chunkPool.putbackChunk((PooledChunk) c);
}
// someone else won race - that's fine, we'll try to grab theirs
// in the next iteration of the loop.
@@ -230,119 +243,7 @@ public class HeapMemStoreLAB implements MemStoreLAB {
}
@VisibleForTesting
- BlockingQueue getChunkQueue() {
- return this.chunkQueue;
- }
-
- /**
- * A chunk of memory out of which allocations are sliced.
- */
- static class Chunk {
- /** Actual underlying data */
- private byte[] data;
-
- private static final int UNINITIALIZED = -1;
- private static final int OOM = -2;
- /**
- * Offset for the next allocation, or the sentinel value -1
- * which implies that the chunk is still uninitialized.
- * */
- private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
-
- /** Total number of allocations satisfied from this buffer */
- private AtomicInteger allocCount = new AtomicInteger();
-
- /** Size of chunk in bytes */
- private final int size;
-
- /**
- * Create an uninitialized chunk. Note that memory is not allocated yet, so
- * this is cheap.
- * @param size in bytes
- */
- Chunk(int size) {
- this.size = size;
- }
-
- /**
- * Actually claim the memory for this chunk. This should only be called from
- * the thread that constructed the chunk. It is thread-safe against other
- * threads calling alloc(), who will block until the allocation is complete.
- */
- public void init() {
- assert nextFreeOffset.get() == UNINITIALIZED;
- try {
- if (data == null) {
- data = new byte[size];
- }
- } catch (OutOfMemoryError e) {
- boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
- assert failInit; // should be true.
- throw e;
- }
- // Mark that it's ready for use
- boolean initted = nextFreeOffset.compareAndSet(
- UNINITIALIZED, 0);
- // We should always succeed the above CAS since only one thread
- // calls init()!
- Preconditions.checkState(initted,
- "Multiple threads tried to init same chunk");
- }
-
- /**
- * Reset the offset to UNINITIALIZED before before reusing an old chunk
- */
- void reset() {
- if (nextFreeOffset.get() != UNINITIALIZED) {
- nextFreeOffset.set(UNINITIALIZED);
- allocCount.set(0);
- }
- }
-
- /**
- * Try to allocate size
bytes from the chunk.
- * @return the offset of the successful allocation, or -1 to indicate not-enough-space
- */
- public int alloc(int size) {
- while (true) {
- int oldOffset = nextFreeOffset.get();
- if (oldOffset == UNINITIALIZED) {
- // The chunk doesn't have its data allocated yet.
- // Since we found this in curChunk, we know that whoever
- // CAS-ed it there is allocating it right now. So spin-loop
- // shouldn't spin long!
- Thread.yield();
- continue;
- }
- if (oldOffset == OOM) {
- // doh we ran out of ram. return -1 to chuck this away.
- return -1;
- }
-
- if (oldOffset + size > data.length) {
- return -1; // alloc doesn't fit
- }
-
- // Try to atomically claim this chunk
- if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
- // we got the alloc
- allocCount.incrementAndGet();
- return oldOffset;
- }
- // we raced and lost alloc, try again
- }
- }
-
- @Override
- public String toString() {
- return "Chunk@" + System.identityHashCode(this) +
- " allocs=" + allocCount.get() + "waste=" +
- (data.length - nextFreeOffset.get());
- }
-
- @VisibleForTesting
- int getNextFreeOffset() {
- return this.nextFreeOffset.get();
- }
+ BlockingQueue getChunkQueue() {
+ return this.pooledChunkQueue;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
index 81b6046ce50..6b34d752cf9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
@@ -31,7 +31,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
-import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -61,54 +60,69 @@ public class MemStoreChunkPool {
final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f;
// Static reference to the MemStoreChunkPool
- private static MemStoreChunkPool GLOBAL_INSTANCE;
+ static MemStoreChunkPool GLOBAL_INSTANCE;
/** Boolean whether we have disabled the memstore chunk pool entirely. */
static boolean chunkPoolDisabled = false;
private final int maxCount;
// A queue of reclaimed chunks
- private final BlockingQueue reclaimedChunks;
+ private final BlockingQueue reclaimedChunks;
private final int chunkSize;
/** Statistics thread schedule pool */
private final ScheduledExecutorService scheduleThreadPool;
/** Statistics thread */
private static final int statThreadPeriod = 60 * 5;
- private AtomicLong createdChunkCount = new AtomicLong();
- private AtomicLong reusedChunkCount = new AtomicLong();
+ private final AtomicLong createdChunkCount = new AtomicLong();
+ private final AtomicLong reusedChunkCount = new AtomicLong();
MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount,
int initialCount) {
this.maxCount = maxCount;
this.chunkSize = chunkSize;
- this.reclaimedChunks = new LinkedBlockingQueue();
+ this.reclaimedChunks = new LinkedBlockingQueue();
for (int i = 0; i < initialCount; i++) {
- Chunk chunk = new Chunk(chunkSize);
+ PooledChunk chunk = new PooledChunk(chunkSize);
chunk.init();
reclaimedChunks.add(chunk);
}
+ createdChunkCount.set(initialCount);
final String n = Thread.currentThread().getName();
- scheduleThreadPool = Executors.newScheduledThreadPool(1,
- new ThreadFactoryBuilder().setNameFormat(n+"-MemStoreChunkPool Statistics")
- .setDaemon(true).build());
- this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
- statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
+ scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
+ .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
+ this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod,
+ statThreadPeriod, TimeUnit.SECONDS);
}
/**
- * Poll a chunk from the pool, reset it if not null, else create a new chunk
- * to return
+ * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have
+ * not yet created max allowed chunks count. When we have already created max allowed chunks and
+ * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk
+ * then.
+ * Note: Chunks returned by this pool must be put back to the pool after its use.
* @return a chunk
+ * @see #putbackChunk(Chunk)
+ * @see #putbackChunks(BlockingQueue)
*/
- Chunk getChunk() {
- Chunk chunk = reclaimedChunks.poll();
- if (chunk == null) {
- chunk = new Chunk(chunkSize);
- createdChunkCount.incrementAndGet();
- } else {
+ PooledChunk getChunk() {
+ PooledChunk chunk = reclaimedChunks.poll();
+ if (chunk != null) {
chunk.reset();
reusedChunkCount.incrementAndGet();
+ } else {
+ // Make a chunk iff we have not yet created the maxCount chunks
+ while (true) {
+ long created = this.createdChunkCount.get();
+ if (created < this.maxCount) {
+ chunk = new PooledChunk(chunkSize);
+ if (this.createdChunkCount.compareAndSet(created, created + 1)) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
}
return chunk;
}
@@ -118,18 +132,11 @@ public class MemStoreChunkPool {
* skip the remaining chunks
* @param chunks
*/
- void putbackChunks(BlockingQueue chunks) {
- int maxNumToPutback = this.maxCount - reclaimedChunks.size();
- if (maxNumToPutback <= 0) {
- return;
- }
- chunks.drainTo(reclaimedChunks, maxNumToPutback);
- // clear reference of any non-reclaimable chunks
- if (chunks.size() > 0) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Left " + chunks.size() + " unreclaimable chunks, removing them from queue");
- }
- chunks.clear();
+ void putbackChunks(BlockingQueue chunks) {
+ assert reclaimedChunks.size() < this.maxCount;
+ PooledChunk chunk = null;
+ while ((chunk = chunks.poll()) != null) {
+ reclaimedChunks.add(chunk);
}
}
@@ -138,10 +145,8 @@ public class MemStoreChunkPool {
* skip it
* @param chunk
*/
- void putbackChunk(Chunk chunk) {
- if (reclaimedChunks.size() >= this.maxCount) {
- return;
- }
+ void putbackChunk(PooledChunk chunk) {
+ assert reclaimedChunks.size() < this.maxCount;
reclaimedChunks.add(chunk);
}
@@ -156,31 +161,28 @@ public class MemStoreChunkPool {
this.reclaimedChunks.clear();
}
- private static class StatisticsThread extends Thread {
- MemStoreChunkPool mcp;
-
- public StatisticsThread(MemStoreChunkPool mcp) {
+ private class StatisticsThread extends Thread {
+ StatisticsThread() {
super("MemStoreChunkPool.StatisticsThread");
setDaemon(true);
- this.mcp = mcp;
}
@Override
public void run() {
- mcp.logStats();
+ logStats();
}
- }
- private void logStats() {
- if (!LOG.isDebugEnabled()) return;
- long created = createdChunkCount.get();
- long reused = reusedChunkCount.get();
- long total = created + reused;
- LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
- + ",created chunk count=" + created
- + ",reused chunk count=" + reused
- + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
- (float) reused / (float) total, 2)));
+ private void logStats() {
+ if (!LOG.isDebugEnabled()) return;
+ long created = createdChunkCount.get();
+ long reused = reusedChunkCount.get();
+ long total = created + reused;
+ LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
+ + ",created chunk count=" + created
+ + ",reused chunk count=" + reused
+ + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
+ (float) reused / (float) total, 2)));
+ }
}
/**
@@ -234,4 +236,9 @@ public class MemStoreChunkPool {
chunkPoolDisabled = false;
}
+ public static class PooledChunk extends Chunk {
+ PooledChunk(int size) {
+ super(size);
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
index b5e979890c9..c53ce809bc0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
@@ -196,4 +196,43 @@ public class TestMemStoreChunkPool {
assertTrue(chunkPool.getPoolSize() > 0);
}
+ @Test
+ public void testPutbackChunksMultiThreaded() throws Exception {
+ MemStoreChunkPool oldPool = MemStoreChunkPool.GLOBAL_INSTANCE;
+ final int maxCount = 10;
+ final int initialCount = 5;
+ final int chunkSize = 10;
+ MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
+ assertEquals(initialCount, pool.getPoolSize());
+ assertEquals(maxCount, pool.getMaxCount());
+ MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created.
+ // Used it for the testing. Later in finally we put
+ // back the original
+ try {
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ MemStoreLAB memStoreLAB = new HeapMemStoreLAB(conf);
+ for (int i = 0; i < maxCount; i++) {
+ memStoreLAB.allocateBytes(chunkSize);// Try allocate size = chunkSize. Means every
+ // allocate call will result in a new chunk
+ }
+ // Close MemStoreLAB so that all chunks will be tried to be put back to pool
+ memStoreLAB.close();
+ }
+ };
+ Thread t1 = new Thread(r);
+ Thread t2 = new Thread(r);
+ Thread t3 = new Thread(r);
+ t1.start();
+ t2.start();
+ t3.start();
+ t1.join();
+ t2.join();
+ t3.join();
+ assertTrue(pool.getPoolSize() <= maxCount);
+ } finally {
+ MemStoreChunkPool.GLOBAL_INSTANCE = oldPool;
+ }
+ }
}