HBASE-10750 Pluggable MemStoreLAB.(Anoop)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1581287 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f30de7377d
commit
e165d19130
|
@ -40,11 +40,12 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
|
||||
import org.apache.hadoop.hbase.util.ByteRange;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* The MemStore holds in-memory modifications to the Store. Modifications
|
||||
|
@ -65,10 +66,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
@InterfaceAudience.Private
|
||||
public class DefaultMemStore implements MemStore {
|
||||
private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
|
||||
|
||||
static final String USEMSLAB_KEY =
|
||||
"hbase.hregion.memstore.mslab.enabled";
|
||||
static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
|
||||
private static final boolean USEMSLAB_DEFAULT = true;
|
||||
static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
|
@ -94,7 +94,6 @@ public class DefaultMemStore implements MemStore {
|
|||
TimeRangeTracker timeRangeTracker;
|
||||
TimeRangeTracker snapshotTimeRangeTracker;
|
||||
|
||||
MemStoreChunkPool chunkPool;
|
||||
volatile MemStoreLAB allocator;
|
||||
volatile MemStoreLAB snapshotAllocator;
|
||||
volatile long snapshotId;
|
||||
|
@ -121,11 +120,11 @@ public class DefaultMemStore implements MemStore {
|
|||
this.size = new AtomicLong(DEEP_OVERHEAD);
|
||||
this.snapshotSize = 0;
|
||||
if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
|
||||
this.chunkPool = MemStoreChunkPool.getPool(conf);
|
||||
this.allocator = new MemStoreLAB(conf, chunkPool);
|
||||
String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
|
||||
this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
|
||||
new Class[] { Configuration.class }, new Object[] { conf });
|
||||
} else {
|
||||
this.allocator = null;
|
||||
this.chunkPool = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -162,7 +161,9 @@ public class DefaultMemStore implements MemStore {
|
|||
this.snapshotAllocator = this.allocator;
|
||||
// Reset allocator so we get a fresh buffer for the new memstore
|
||||
if (allocator != null) {
|
||||
this.allocator = new MemStoreLAB(conf, chunkPool);
|
||||
String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
|
||||
this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
|
||||
new Class[] { Configuration.class }, new Object[] { conf });
|
||||
}
|
||||
timeOfOldestEdit = Long.MAX_VALUE;
|
||||
}
|
||||
|
@ -258,15 +259,15 @@ public class DefaultMemStore implements MemStore {
|
|||
}
|
||||
|
||||
int len = kv.getLength();
|
||||
Allocation alloc = allocator.allocateBytes(len);
|
||||
ByteRange alloc = allocator.allocateBytes(len);
|
||||
if (alloc == null) {
|
||||
// The allocation was too large, allocator decided
|
||||
// not to do anything with it.
|
||||
return kv;
|
||||
}
|
||||
assert alloc.getData() != null;
|
||||
System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
|
||||
KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
|
||||
assert alloc.getBytes() != null;
|
||||
alloc.put(0, kv.getBuffer(), kv.getOffset(), len);
|
||||
KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
|
||||
newKv.setMvccVersion(kv.getMvccVersion());
|
||||
return newKv;
|
||||
}
|
||||
|
@ -987,7 +988,7 @@ public class DefaultMemStore implements MemStore {
|
|||
}
|
||||
|
||||
public final static long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG));
|
||||
ClassSize.OBJECT + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG));
|
||||
|
||||
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
|
||||
ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
|
||||
|
|
|
@ -0,0 +1,315 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.util.ByteRange;
|
||||
import org.apache.hadoop.hbase.util.SimpleByteRange;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* A memstore-local allocation buffer.
|
||||
* <p>
|
||||
* The MemStoreLAB is basically a bump-the-pointer allocator that allocates
|
||||
* big (2MB) byte[] chunks from and then doles it out to threads that request
|
||||
* slices into the array.
|
||||
* <p>
|
||||
* The purpose of this class is to combat heap fragmentation in the
|
||||
* regionserver. By ensuring that all KeyValues in a given memstore refer
|
||||
* only to large chunks of contiguous memory, we ensure that large blocks
|
||||
* get freed up when the memstore is flushed.
|
||||
* <p>
|
||||
* Without the MSLAB, the byte array allocated during insertion end up
|
||||
* interleaved throughout the heap, and the old generation gets progressively
|
||||
* more fragmented until a stop-the-world compacting collection occurs.
|
||||
* <p>
|
||||
* TODO: we should probably benchmark whether word-aligning the allocations
|
||||
* would provide a performance improvement - probably would speed up the
|
||||
* Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
|
||||
* anyway
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class HeapMemStoreLAB implements MemStoreLAB {
|
||||
|
||||
static final String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
|
||||
static final int CHUNK_SIZE_DEFAULT = 2048 * 1024;
|
||||
static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
|
||||
static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
|
||||
// allocator
|
||||
|
||||
private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
|
||||
// A queue of chunks contained by this memstore
|
||||
private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
|
||||
final int chunkSize;
|
||||
final int maxAlloc;
|
||||
private final MemStoreChunkPool chunkPool;
|
||||
|
||||
// This flag is for closing this instance, its set when clearing snapshot of
|
||||
// memstore
|
||||
private volatile boolean closed = false;
|
||||
// This flag is for reclaiming chunks. Its set when putting chunks back to
|
||||
// pool
|
||||
private AtomicBoolean reclaimed = new AtomicBoolean(false);
|
||||
// Current count of open scanners which reading data from this MemStoreLAB
|
||||
private final AtomicInteger openScannerCount = new AtomicInteger();
|
||||
|
||||
// Used in testing
|
||||
public HeapMemStoreLAB() {
|
||||
this(new Configuration());
|
||||
}
|
||||
|
||||
public HeapMemStoreLAB(Configuration conf) {
|
||||
chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
|
||||
maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
|
||||
this.chunkPool = MemStoreChunkPool.getPool(conf);
|
||||
|
||||
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
|
||||
Preconditions.checkArgument(
|
||||
maxAlloc <= chunkSize,
|
||||
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate a slice of the given length.
|
||||
*
|
||||
* If the size is larger than the maximum size specified for this
|
||||
* allocator, returns null.
|
||||
*/
|
||||
@Override
|
||||
public ByteRange allocateBytes(int size) {
|
||||
Preconditions.checkArgument(size >= 0, "negative size");
|
||||
|
||||
// Callers should satisfy large allocations directly from JVM since they
|
||||
// don't cause fragmentation as badly.
|
||||
if (size > maxAlloc) {
|
||||
return null;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
Chunk c = getOrMakeChunk();
|
||||
|
||||
// Try to allocate from this chunk
|
||||
int allocOffset = c.alloc(size);
|
||||
if (allocOffset != -1) {
|
||||
// We succeeded - this is the common case - small alloc
|
||||
// from a big buffer
|
||||
return new SimpleByteRange(c.data, allocOffset, size);
|
||||
}
|
||||
|
||||
// not enough space!
|
||||
// try to retire this chunk
|
||||
tryRetireChunk(c);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close this instance since it won't be used any more, try to put the chunks
|
||||
* back to pool
|
||||
*/
|
||||
@Override
|
||||
public void close() {
|
||||
this.closed = true;
|
||||
// We could put back the chunks to pool for reusing only when there is no
|
||||
// opening scanner which will read their data
|
||||
if (chunkPool != null && openScannerCount.get() == 0
|
||||
&& reclaimed.compareAndSet(false, true)) {
|
||||
chunkPool.putbackChunks(this.chunkQueue);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when opening a scanner on the data of this MemStoreLAB
|
||||
*/
|
||||
@Override
|
||||
public void incScannerCount() {
|
||||
this.openScannerCount.incrementAndGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when closing a scanner on the data of this MemStoreLAB
|
||||
*/
|
||||
@Override
|
||||
public void decScannerCount() {
|
||||
int count = this.openScannerCount.decrementAndGet();
|
||||
if (chunkPool != null && count == 0 && this.closed
|
||||
&& reclaimed.compareAndSet(false, true)) {
|
||||
chunkPool.putbackChunks(this.chunkQueue);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to retire the current chunk if it is still
|
||||
* <code>c</code>. Postcondition is that curChunk.get()
|
||||
* != c
|
||||
*/
|
||||
private void tryRetireChunk(Chunk c) {
|
||||
curChunk.compareAndSet(c, null);
|
||||
// If the CAS succeeds, that means that we won the race
|
||||
// to retire the chunk. We could use this opportunity to
|
||||
// update metrics on external fragmentation.
|
||||
//
|
||||
// If the CAS fails, that means that someone else already
|
||||
// retired the chunk for us.
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current chunk, or, if there is no current chunk,
|
||||
* allocate a new one from the JVM.
|
||||
*/
|
||||
private Chunk getOrMakeChunk() {
|
||||
while (true) {
|
||||
// Try to get the chunk
|
||||
Chunk c = curChunk.get();
|
||||
if (c != null) {
|
||||
return c;
|
||||
}
|
||||
|
||||
// No current chunk, so we want to allocate one. We race
|
||||
// against other allocators to CAS in an uninitialized chunk
|
||||
// (which is cheap to allocate)
|
||||
c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
|
||||
if (curChunk.compareAndSet(null, c)) {
|
||||
// we won race - now we need to actually do the expensive
|
||||
// allocation step
|
||||
c.init();
|
||||
this.chunkQueue.add(c);
|
||||
return c;
|
||||
} else if (chunkPool != null) {
|
||||
chunkPool.putbackChunk(c);
|
||||
}
|
||||
// someone else won race - that's fine, we'll try to grab theirs
|
||||
// in the next iteration of the loop.
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A chunk of memory out of which allocations are sliced.
|
||||
*/
|
||||
static class Chunk {
|
||||
/** Actual underlying data */
|
||||
private byte[] data;
|
||||
|
||||
private static final int UNINITIALIZED = -1;
|
||||
private static final int OOM = -2;
|
||||
/**
|
||||
* Offset for the next allocation, or the sentinel value -1
|
||||
* which implies that the chunk is still uninitialized.
|
||||
* */
|
||||
private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
|
||||
|
||||
/** Total number of allocations satisfied from this buffer */
|
||||
private AtomicInteger allocCount = new AtomicInteger();
|
||||
|
||||
/** Size of chunk in bytes */
|
||||
private final int size;
|
||||
|
||||
/**
|
||||
* Create an uninitialized chunk. Note that memory is not allocated yet, so
|
||||
* this is cheap.
|
||||
* @param size in bytes
|
||||
*/
|
||||
Chunk(int size) {
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Actually claim the memory for this chunk. This should only be called from
|
||||
* the thread that constructed the chunk. It is thread-safe against other
|
||||
* threads calling alloc(), who will block until the allocation is complete.
|
||||
*/
|
||||
public void init() {
|
||||
assert nextFreeOffset.get() == UNINITIALIZED;
|
||||
try {
|
||||
if (data == null) {
|
||||
data = new byte[size];
|
||||
}
|
||||
} catch (OutOfMemoryError e) {
|
||||
boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
|
||||
assert failInit; // should be true.
|
||||
throw e;
|
||||
}
|
||||
// Mark that it's ready for use
|
||||
boolean initted = nextFreeOffset.compareAndSet(
|
||||
UNINITIALIZED, 0);
|
||||
// We should always succeed the above CAS since only one thread
|
||||
// calls init()!
|
||||
Preconditions.checkState(initted,
|
||||
"Multiple threads tried to init same chunk");
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the offset to UNINITIALIZED before before reusing an old chunk
|
||||
*/
|
||||
void reset() {
|
||||
if (nextFreeOffset.get() != UNINITIALIZED) {
|
||||
nextFreeOffset.set(UNINITIALIZED);
|
||||
allocCount.set(0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to allocate <code>size</code> bytes from the chunk.
|
||||
* @return the offset of the successful allocation, or -1 to indicate not-enough-space
|
||||
*/
|
||||
public int alloc(int size) {
|
||||
while (true) {
|
||||
int oldOffset = nextFreeOffset.get();
|
||||
if (oldOffset == UNINITIALIZED) {
|
||||
// The chunk doesn't have its data allocated yet.
|
||||
// Since we found this in curChunk, we know that whoever
|
||||
// CAS-ed it there is allocating it right now. So spin-loop
|
||||
// shouldn't spin long!
|
||||
Thread.yield();
|
||||
continue;
|
||||
}
|
||||
if (oldOffset == OOM) {
|
||||
// doh we ran out of ram. return -1 to chuck this away.
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (oldOffset + size > data.length) {
|
||||
return -1; // alloc doesn't fit
|
||||
}
|
||||
|
||||
// Try to atomically claim this chunk
|
||||
if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
|
||||
// we got the alloc
|
||||
allocCount.incrementAndGet();
|
||||
return oldOffset;
|
||||
}
|
||||
// we raced and lost alloc, try again
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Chunk@" + System.identityHashCode(this) +
|
||||
" allocs=" + allocCount.get() + "waste=" +
|
||||
(data.length - nextFreeOffset.get());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,13 +30,13 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Chunk;
|
||||
import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* A pool of {@link MemStoreLAB$Chunk} instances.
|
||||
* A pool of {@link HeapMemStoreLAB$Chunk} instances.
|
||||
*
|
||||
* MemStoreChunkPool caches a number of retired chunks for reusing, it could
|
||||
* decrease allocating bytes when writing, thereby optimizing the garbage
|
||||
|
@ -177,42 +177,39 @@ public class MemStoreChunkPool {
|
|||
* @param conf
|
||||
* @return the global MemStoreChunkPool instance
|
||||
*/
|
||||
static synchronized MemStoreChunkPool getPool(Configuration conf) {
|
||||
static MemStoreChunkPool getPool(Configuration conf) {
|
||||
if (globalInstance != null) return globalInstance;
|
||||
if (chunkPoolDisabled) return null;
|
||||
|
||||
synchronized (MemStoreChunkPool.class) {
|
||||
if (globalInstance != null) return globalInstance;
|
||||
float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT);
|
||||
if (poolSizePercentage <= 0) {
|
||||
chunkPoolDisabled = true;
|
||||
return null;
|
||||
}
|
||||
if (poolSizePercentage > 1.0) {
|
||||
throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
|
||||
}
|
||||
long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
|
||||
long globalMemStoreLimit = (long) (heapMax * MemStoreFlusher.getGlobalMemStorePercent(conf));
|
||||
int chunkSize = conf.getInt(HeapMemStoreLAB.CHUNK_SIZE_KEY,
|
||||
HeapMemStoreLAB.CHUNK_SIZE_DEFAULT);
|
||||
int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);
|
||||
|
||||
float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY,
|
||||
POOL_MAX_SIZE_DEFAULT);
|
||||
if (poolSizePercentage <= 0) {
|
||||
chunkPoolDisabled = true;
|
||||
return null;
|
||||
}
|
||||
if (poolSizePercentage > 1.0) {
|
||||
throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY
|
||||
+ " must be between 0.0 and 1.0");
|
||||
}
|
||||
long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
|
||||
.getMax();
|
||||
long globalMemStoreLimit = (long) (heapMax * MemStoreFlusher.getGlobalMemStorePercent(conf));
|
||||
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY,
|
||||
MemStoreLAB.CHUNK_SIZE_DEFAULT);
|
||||
int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);
|
||||
float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY,
|
||||
POOL_INITIAL_SIZE_DEFAULT);
|
||||
if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
|
||||
throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY
|
||||
+ " must be between 0.0 and 1.0");
|
||||
}
|
||||
|
||||
float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY,
|
||||
POOL_INITIAL_SIZE_DEFAULT);
|
||||
if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
|
||||
throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY
|
||||
+ " must be between 0.0 and 1.0");
|
||||
int initialCount = (int) (initialCountPercentage * maxCount);
|
||||
LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
|
||||
+ ", max count " + maxCount + ", initial count " + initialCount);
|
||||
globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
|
||||
return globalInstance;
|
||||
}
|
||||
|
||||
int initialCount = (int) (initialCountPercentage * maxCount);
|
||||
LOG.info("Allocating MemStoreChunkPool with chunk size "
|
||||
+ StringUtils.byteDesc(chunkSize) + ", max count " + maxCount
|
||||
+ ", initial count " + initialCount);
|
||||
globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount,
|
||||
initialCount);
|
||||
return globalInstance;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,5 +1,4 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -18,325 +17,47 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hbase.util.ByteRange;
|
||||
|
||||
/**
|
||||
* A memstore-local allocation buffer.
|
||||
* <p>
|
||||
* The MemStoreLAB is basically a bump-the-pointer allocator that allocates
|
||||
* big (2MB) byte[] chunks from and then doles it out to threads that request
|
||||
* slices into the array.
|
||||
* The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) chunks from
|
||||
* and then doles it out to threads that request slices into the array.
|
||||
* <p>
|
||||
* The purpose of this class is to combat heap fragmentation in the
|
||||
* regionserver. By ensuring that all KeyValues in a given memstore refer
|
||||
* only to large chunks of contiguous memory, we ensure that large blocks
|
||||
* get freed up when the memstore is flushed.
|
||||
* The purpose of this is to combat heap fragmentation in the regionserver. By ensuring that all
|
||||
* KeyValues in a given memstore refer only to large chunks of contiguous memory, we ensure that
|
||||
* large blocks get freed up when the memstore is flushed.
|
||||
* <p>
|
||||
* Without the MSLAB, the byte array allocated during insertion end up
|
||||
* interleaved throughout the heap, and the old generation gets progressively
|
||||
* more fragmented until a stop-the-world compacting collection occurs.
|
||||
* Without the MSLAB, the byte array allocated during insertion end up interleaved throughout the
|
||||
* heap, and the old generation gets progressively more fragmented until a stop-the-world compacting
|
||||
* collection occurs.
|
||||
* <p>
|
||||
* TODO: we should probably benchmark whether word-aligning the allocations
|
||||
* would provide a performance improvement - probably would speed up the
|
||||
* Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
|
||||
* anyway
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MemStoreLAB {
|
||||
private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
|
||||
// A queue of chunks contained by this memstore
|
||||
private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
|
||||
|
||||
final static String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
|
||||
final static int CHUNK_SIZE_DEFAULT = 2048 * 1024;
|
||||
final int chunkSize;
|
||||
|
||||
final static String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
|
||||
final static int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through allocator
|
||||
final int maxAlloc;
|
||||
|
||||
private final MemStoreChunkPool chunkPool;
|
||||
|
||||
// This flag is for closing this instance, its set when clearing snapshot of
|
||||
// memstore
|
||||
private volatile boolean closed = false;
|
||||
// This flag is for reclaiming chunks. Its set when putting chunks back to
|
||||
// pool
|
||||
private AtomicBoolean reclaimed = new AtomicBoolean(false);
|
||||
// Current count of open scanners which reading data from this MemStoreLAB
|
||||
private final AtomicInteger openScannerCount = new AtomicInteger();
|
||||
|
||||
// Used in testing
|
||||
public MemStoreLAB() {
|
||||
this(new Configuration());
|
||||
}
|
||||
|
||||
private MemStoreLAB(Configuration conf) {
|
||||
this(conf, MemStoreChunkPool.getPool(conf));
|
||||
}
|
||||
|
||||
public MemStoreLAB(Configuration conf, MemStoreChunkPool pool) {
|
||||
chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
|
||||
maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
|
||||
this.chunkPool = pool;
|
||||
|
||||
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
|
||||
Preconditions.checkArgument(
|
||||
maxAlloc <= chunkSize,
|
||||
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
|
||||
}
|
||||
public interface MemStoreLAB {
|
||||
|
||||
/**
|
||||
* Allocate a slice of the given length.
|
||||
*
|
||||
* If the size is larger than the maximum size specified for this
|
||||
* allocator, returns null.
|
||||
* Allocate a slice of the given length. If the size is larger than the maximum size specified for
|
||||
* this allocator, returns null.
|
||||
* @param size
|
||||
* @return {@link ByteRange}
|
||||
*/
|
||||
public Allocation allocateBytes(int size) {
|
||||
Preconditions.checkArgument(size >= 0, "negative size");
|
||||
|
||||
// Callers should satisfy large allocations directly from JVM since they
|
||||
// don't cause fragmentation as badly.
|
||||
if (size > maxAlloc) {
|
||||
return null;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
Chunk c = getOrMakeChunk();
|
||||
|
||||
// Try to allocate from this chunk
|
||||
int allocOffset = c.alloc(size);
|
||||
if (allocOffset != -1) {
|
||||
// We succeeded - this is the common case - small alloc
|
||||
// from a big buffer
|
||||
return new Allocation(c.data, allocOffset);
|
||||
}
|
||||
|
||||
// not enough space!
|
||||
// try to retire this chunk
|
||||
tryRetireChunk(c);
|
||||
}
|
||||
}
|
||||
ByteRange allocateBytes(int size);
|
||||
|
||||
/**
|
||||
* Close this instance since it won't be used any more, try to put the chunks
|
||||
* back to pool
|
||||
* Close instance since it won't be used any more, try to put the chunks back to pool
|
||||
*/
|
||||
void close() {
|
||||
this.closed = true;
|
||||
// We could put back the chunks to pool for reusing only when there is no
|
||||
// opening scanner which will read their data
|
||||
if (chunkPool != null && openScannerCount.get() == 0
|
||||
&& reclaimed.compareAndSet(false, true)) {
|
||||
chunkPool.putbackChunks(this.chunkQueue);
|
||||
}
|
||||
}
|
||||
void close();
|
||||
|
||||
/**
|
||||
* Called when opening a scanner on the data of this MemStoreLAB
|
||||
*/
|
||||
void incScannerCount() {
|
||||
this.openScannerCount.incrementAndGet();
|
||||
}
|
||||
void incScannerCount();
|
||||
|
||||
/**
|
||||
* Called when closing a scanner on the data of this MemStoreLAB
|
||||
*/
|
||||
void decScannerCount() {
|
||||
int count = this.openScannerCount.decrementAndGet();
|
||||
if (chunkPool != null && count == 0 && this.closed
|
||||
&& reclaimed.compareAndSet(false, true)) {
|
||||
chunkPool.putbackChunks(this.chunkQueue);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to retire the current chunk if it is still
|
||||
* <code>c</code>. Postcondition is that curChunk.get()
|
||||
* != c
|
||||
*/
|
||||
private void tryRetireChunk(Chunk c) {
|
||||
curChunk.compareAndSet(c, null);
|
||||
// If the CAS succeeds, that means that we won the race
|
||||
// to retire the chunk. We could use this opportunity to
|
||||
// update metrics on external fragmentation.
|
||||
//
|
||||
// If the CAS fails, that means that someone else already
|
||||
// retired the chunk for us.
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current chunk, or, if there is no current chunk,
|
||||
* allocate a new one from the JVM.
|
||||
*/
|
||||
private Chunk getOrMakeChunk() {
|
||||
while (true) {
|
||||
// Try to get the chunk
|
||||
Chunk c = curChunk.get();
|
||||
if (c != null) {
|
||||
return c;
|
||||
}
|
||||
|
||||
// No current chunk, so we want to allocate one. We race
|
||||
// against other allocators to CAS in an uninitialized chunk
|
||||
// (which is cheap to allocate)
|
||||
c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
|
||||
if (curChunk.compareAndSet(null, c)) {
|
||||
// we won race - now we need to actually do the expensive
|
||||
// allocation step
|
||||
c.init();
|
||||
this.chunkQueue.add(c);
|
||||
return c;
|
||||
} else if (chunkPool != null) {
|
||||
chunkPool.putbackChunk(c);
|
||||
}
|
||||
// someone else won race - that's fine, we'll try to grab theirs
|
||||
// in the next iteration of the loop.
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A chunk of memory out of which allocations are sliced.
|
||||
*/
|
||||
static class Chunk {
|
||||
/** Actual underlying data */
|
||||
private byte[] data;
|
||||
|
||||
private static final int UNINITIALIZED = -1;
|
||||
private static final int OOM = -2;
|
||||
/**
|
||||
* Offset for the next allocation, or the sentinel value -1
|
||||
* which implies that the chunk is still uninitialized.
|
||||
* */
|
||||
private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
|
||||
|
||||
/** Total number of allocations satisfied from this buffer */
|
||||
private AtomicInteger allocCount = new AtomicInteger();
|
||||
|
||||
/** Size of chunk in bytes */
|
||||
private final int size;
|
||||
|
||||
/**
|
||||
* Create an uninitialized chunk. Note that memory is not allocated yet, so
|
||||
* this is cheap.
|
||||
* @param size in bytes
|
||||
*/
|
||||
Chunk(int size) {
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Actually claim the memory for this chunk. This should only be called from
|
||||
* the thread that constructed the chunk. It is thread-safe against other
|
||||
* threads calling alloc(), who will block until the allocation is complete.
|
||||
*/
|
||||
public void init() {
|
||||
assert nextFreeOffset.get() == UNINITIALIZED;
|
||||
try {
|
||||
if (data == null) {
|
||||
data = new byte[size];
|
||||
}
|
||||
} catch (OutOfMemoryError e) {
|
||||
boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
|
||||
assert failInit; // should be true.
|
||||
throw e;
|
||||
}
|
||||
// Mark that it's ready for use
|
||||
boolean initted = nextFreeOffset.compareAndSet(
|
||||
UNINITIALIZED, 0);
|
||||
// We should always succeed the above CAS since only one thread
|
||||
// calls init()!
|
||||
Preconditions.checkState(initted,
|
||||
"Multiple threads tried to init same chunk");
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the offset to UNINITIALIZED before before reusing an old chunk
|
||||
*/
|
||||
void reset() {
|
||||
if (nextFreeOffset.get() != UNINITIALIZED) {
|
||||
nextFreeOffset.set(UNINITIALIZED);
|
||||
allocCount.set(0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to allocate <code>size</code> bytes from the chunk.
|
||||
* @return the offset of the successful allocation, or -1 to indicate not-enough-space
|
||||
*/
|
||||
public int alloc(int size) {
|
||||
while (true) {
|
||||
int oldOffset = nextFreeOffset.get();
|
||||
if (oldOffset == UNINITIALIZED) {
|
||||
// The chunk doesn't have its data allocated yet.
|
||||
// Since we found this in curChunk, we know that whoever
|
||||
// CAS-ed it there is allocating it right now. So spin-loop
|
||||
// shouldn't spin long!
|
||||
Thread.yield();
|
||||
continue;
|
||||
}
|
||||
if (oldOffset == OOM) {
|
||||
// doh we ran out of ram. return -1 to chuck this away.
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (oldOffset + size > data.length) {
|
||||
return -1; // alloc doesn't fit
|
||||
}
|
||||
|
||||
// Try to atomically claim this chunk
|
||||
if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
|
||||
// we got the alloc
|
||||
allocCount.incrementAndGet();
|
||||
return oldOffset;
|
||||
}
|
||||
// we raced and lost alloc, try again
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Chunk@" + System.identityHashCode(this) +
|
||||
" allocs=" + allocCount.get() + "waste=" +
|
||||
(data.length - nextFreeOffset.get());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The result of a single allocation. Contains the chunk that the
|
||||
* allocation points into, and the offset in this array where the
|
||||
* slice begins.
|
||||
*/
|
||||
public static class Allocation {
|
||||
private final byte[] data;
|
||||
private final int offset;
|
||||
|
||||
private Allocation(byte[] data, int off) {
|
||||
this.data = data;
|
||||
this.offset = off;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Allocation(" + "capacity=" + data.length + ", off=" + offset
|
||||
+ ")";
|
||||
}
|
||||
|
||||
byte[] getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
int getOffset() {
|
||||
return offset;
|
||||
}
|
||||
}
|
||||
void decScannerCount();
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.util.Random;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
|
||||
import org.apache.hadoop.hbase.util.ByteRange;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -68,21 +68,21 @@ public class TestMemStoreChunkPool {
|
|||
@Test
|
||||
public void testReusingChunks() {
|
||||
Random rand = new Random();
|
||||
MemStoreLAB mslab = new MemStoreLAB(conf, chunkPool);
|
||||
MemStoreLAB mslab = new HeapMemStoreLAB(conf);
|
||||
int expectedOff = 0;
|
||||
byte[] lastBuffer = null;
|
||||
// Randomly allocate some bytes
|
||||
for (int i = 0; i < 100; i++) {
|
||||
int size = rand.nextInt(1000);
|
||||
Allocation alloc = mslab.allocateBytes(size);
|
||||
ByteRange alloc = mslab.allocateBytes(size);
|
||||
|
||||
if (alloc.getData() != lastBuffer) {
|
||||
if (alloc.getBytes() != lastBuffer) {
|
||||
expectedOff = 0;
|
||||
lastBuffer = alloc.getData();
|
||||
lastBuffer = alloc.getBytes();
|
||||
}
|
||||
assertEquals(expectedOff, alloc.getOffset());
|
||||
assertTrue("Allocation " + alloc + " overruns buffer", alloc.getOffset()
|
||||
+ size <= alloc.getData().length);
|
||||
assertTrue("Allocation overruns buffer", alloc.getOffset()
|
||||
+ size <= alloc.getBytes().length);
|
||||
expectedOff += size;
|
||||
}
|
||||
// chunks will be put back to pool after close
|
||||
|
@ -90,7 +90,7 @@ public class TestMemStoreChunkPool {
|
|||
int chunkCount = chunkPool.getPoolSize();
|
||||
assertTrue(chunkCount > 0);
|
||||
// reconstruct mslab
|
||||
mslab = new MemStoreLAB(conf, chunkPool);
|
||||
mslab = new HeapMemStoreLAB(conf);
|
||||
// chunk should be got from the pool, so we can reuse it.
|
||||
mslab.allocateBytes(1000);
|
||||
assertEquals(chunkCount - 1, chunkPool.getPoolSize());
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
|
||||
import org.apache.hadoop.hbase.util.ByteRange;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Iterables;
|
||||
|
@ -47,7 +47,7 @@ public class TestMemStoreLAB {
|
|||
@Test
|
||||
public void testLABRandomAllocation() {
|
||||
Random rand = new Random();
|
||||
MemStoreLAB mslab = new MemStoreLAB();
|
||||
MemStoreLAB mslab = new HeapMemStoreLAB();
|
||||
int expectedOff = 0;
|
||||
byte[] lastBuffer = null;
|
||||
// 100K iterations by 0-1K alloc -> 50MB expected
|
||||
|
@ -55,23 +55,23 @@ public class TestMemStoreLAB {
|
|||
// behavior
|
||||
for (int i = 0; i < 100000; i++) {
|
||||
int size = rand.nextInt(1000);
|
||||
Allocation alloc = mslab.allocateBytes(size);
|
||||
ByteRange alloc = mslab.allocateBytes(size);
|
||||
|
||||
if (alloc.getData() != lastBuffer) {
|
||||
if (alloc.getBytes() != lastBuffer) {
|
||||
expectedOff = 0;
|
||||
lastBuffer = alloc.getData();
|
||||
lastBuffer = alloc.getBytes();
|
||||
}
|
||||
assertEquals(expectedOff, alloc.getOffset());
|
||||
assertTrue("Allocation " + alloc + " overruns buffer",
|
||||
alloc.getOffset() + size <= alloc.getData().length);
|
||||
assertTrue("Allocation overruns buffer",
|
||||
alloc.getOffset() + size <= alloc.getBytes().length);
|
||||
expectedOff += size;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLABLargeAllocation() {
|
||||
MemStoreLAB mslab = new MemStoreLAB();
|
||||
Allocation alloc = mslab.allocateBytes(2*1024*1024);
|
||||
MemStoreLAB mslab = new HeapMemStoreLAB();
|
||||
ByteRange alloc = mslab.allocateBytes(2*1024*1024);
|
||||
assertNull("2MB allocation shouldn't be satisfied by LAB.",
|
||||
alloc);
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ public class TestMemStoreLAB {
|
|||
|
||||
final AtomicInteger totalAllocated = new AtomicInteger();
|
||||
|
||||
final MemStoreLAB mslab = new MemStoreLAB();
|
||||
final MemStoreLAB mslab = new HeapMemStoreLAB();
|
||||
List<List<AllocRecord>> allocations = Lists.newArrayList();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
|
@ -100,7 +100,7 @@ public class TestMemStoreLAB {
|
|||
@Override
|
||||
public void doAnAction() throws Exception {
|
||||
int size = r.nextInt(1000);
|
||||
Allocation alloc = mslab.allocateBytes(size);
|
||||
ByteRange alloc = mslab.allocateBytes(size);
|
||||
totalAllocated.addAndGet(size);
|
||||
allocsByThisThread.add(new AllocRecord(alloc, size));
|
||||
}
|
||||
|
@ -125,10 +125,10 @@ public class TestMemStoreLAB {
|
|||
if (rec.size == 0) continue;
|
||||
|
||||
Map<Integer, AllocRecord> mapForThisByteArray =
|
||||
mapsByChunk.get(rec.alloc.getData());
|
||||
mapsByChunk.get(rec.alloc.getBytes());
|
||||
if (mapForThisByteArray == null) {
|
||||
mapForThisByteArray = Maps.newTreeMap();
|
||||
mapsByChunk.put(rec.alloc.getData(), mapForThisByteArray);
|
||||
mapsByChunk.put(rec.alloc.getBytes(), mapForThisByteArray);
|
||||
}
|
||||
AllocRecord oldVal = mapForThisByteArray.put(rec.alloc.getOffset(), rec);
|
||||
assertNull("Already had an entry " + oldVal + " for allocation " + rec,
|
||||
|
@ -141,8 +141,8 @@ public class TestMemStoreLAB {
|
|||
int expectedOff = 0;
|
||||
for (AllocRecord alloc : allocsInChunk.values()) {
|
||||
assertEquals(expectedOff, alloc.alloc.getOffset());
|
||||
assertTrue("Allocation " + alloc + " overruns buffer",
|
||||
alloc.alloc.getOffset() + alloc.size <= alloc.alloc.getData().length);
|
||||
assertTrue("Allocation overruns buffer",
|
||||
alloc.alloc.getOffset() + alloc.size <= alloc.alloc.getBytes().length);
|
||||
expectedOff += alloc.size;
|
||||
}
|
||||
}
|
||||
|
@ -150,9 +150,9 @@ public class TestMemStoreLAB {
|
|||
}
|
||||
|
||||
private static class AllocRecord implements Comparable<AllocRecord>{
|
||||
private final Allocation alloc;
|
||||
private final ByteRange alloc;
|
||||
private final int size;
|
||||
public AllocRecord(Allocation alloc, int size) {
|
||||
public AllocRecord(ByteRange alloc, int size) {
|
||||
super();
|
||||
this.alloc = alloc;
|
||||
this.size = size;
|
||||
|
@ -160,7 +160,7 @@ public class TestMemStoreLAB {
|
|||
|
||||
@Override
|
||||
public int compareTo(AllocRecord e) {
|
||||
if (alloc.getData() != e.alloc.getData()) {
|
||||
if (alloc.getBytes() != e.alloc.getBytes()) {
|
||||
throw new RuntimeException("Can only compare within a particular array");
|
||||
}
|
||||
return Ints.compare(alloc.getOffset(), e.alloc.getOffset());
|
||||
|
@ -168,7 +168,7 @@ public class TestMemStoreLAB {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AllocRecord(alloc=" + alloc + ", size=" + size + ")";
|
||||
return "AllocRecord(offset=" + alloc.getOffset() + ", size=" + size + ")";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue