diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index eb530ae7b7a..653d8b620c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -40,11 +40,12 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; +import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CollectionBackedScanner; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ReflectionUtils; /** * The MemStore holds in-memory modifications to the Store. Modifications @@ -65,10 +66,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @InterfaceAudience.Private public class DefaultMemStore implements MemStore { private static final Log LOG = LogFactory.getLog(DefaultMemStore.class); - - static final String USEMSLAB_KEY = - "hbase.hregion.memstore.mslab.enabled"; + static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled"; private static final boolean USEMSLAB_DEFAULT = true; + static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class"; private Configuration conf; @@ -94,7 +94,6 @@ public class DefaultMemStore implements MemStore { TimeRangeTracker timeRangeTracker; TimeRangeTracker snapshotTimeRangeTracker; - MemStoreChunkPool chunkPool; volatile MemStoreLAB allocator; volatile MemStoreLAB snapshotAllocator; volatile long snapshotId; @@ -121,11 +120,11 @@ public class DefaultMemStore implements MemStore { this.size = new AtomicLong(DEEP_OVERHEAD); this.snapshotSize = 0; if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { - this.chunkPool = MemStoreChunkPool.getPool(conf); - this.allocator = new MemStoreLAB(conf, chunkPool); + String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); + this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, + new Class[] { Configuration.class }, new Object[] { conf }); } else { this.allocator = null; - this.chunkPool = null; } } @@ -162,7 +161,9 @@ public class DefaultMemStore implements MemStore { this.snapshotAllocator = this.allocator; // Reset allocator so we get a fresh buffer for the new memstore if (allocator != null) { - this.allocator = new MemStoreLAB(conf, chunkPool); + String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); + this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, + new Class[] { Configuration.class }, new Object[] { conf }); } timeOfOldestEdit = Long.MAX_VALUE; } @@ -258,15 +259,15 @@ public class DefaultMemStore implements MemStore { } int len = kv.getLength(); - Allocation alloc = allocator.allocateBytes(len); + ByteRange alloc = allocator.allocateBytes(len); if (alloc == null) { // The allocation was too large, allocator decided // not to do anything with it. return kv; } - assert alloc.getData() != null; - System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len); - KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len); + assert alloc.getBytes() != null; + alloc.put(0, kv.getBuffer(), kv.getOffset(), len); + KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len); newKv.setMvccVersion(kv.getMvccVersion()); return newKv; } @@ -987,7 +988,7 @@ public class DefaultMemStore implements MemStore { } public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG)); + ClassSize.OBJECT + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG)); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java new file mode 100644 index 00000000000..f4301f22660 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -0,0 +1,315 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.ByteRange; +import org.apache.hadoop.hbase.util.SimpleByteRange; + +import com.google.common.base.Preconditions; + +/** + * A memstore-local allocation buffer. + *

+ * The MemStoreLAB is basically a bump-the-pointer allocator that allocates + * big (2MB) byte[] chunks from and then doles it out to threads that request + * slices into the array. + *

+ * The purpose of this class is to combat heap fragmentation in the + * regionserver. By ensuring that all KeyValues in a given memstore refer + * only to large chunks of contiguous memory, we ensure that large blocks + * get freed up when the memstore is flushed. + *

+ * Without the MSLAB, the byte array allocated during insertion end up + * interleaved throughout the heap, and the old generation gets progressively + * more fragmented until a stop-the-world compacting collection occurs. + *

+ * TODO: we should probably benchmark whether word-aligning the allocations + * would provide a performance improvement - probably would speed up the + * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached + * anyway + */ +@InterfaceAudience.Private +public class HeapMemStoreLAB implements MemStoreLAB { + + static final String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize"; + static final int CHUNK_SIZE_DEFAULT = 2048 * 1024; + static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation"; + static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through + // allocator + + private AtomicReference curChunk = new AtomicReference(); + // A queue of chunks contained by this memstore + private BlockingQueue chunkQueue = new LinkedBlockingQueue(); + final int chunkSize; + final int maxAlloc; + private final MemStoreChunkPool chunkPool; + + // This flag is for closing this instance, its set when clearing snapshot of + // memstore + private volatile boolean closed = false; + // This flag is for reclaiming chunks. Its set when putting chunks back to + // pool + private AtomicBoolean reclaimed = new AtomicBoolean(false); + // Current count of open scanners which reading data from this MemStoreLAB + private final AtomicInteger openScannerCount = new AtomicInteger(); + + // Used in testing + public HeapMemStoreLAB() { + this(new Configuration()); + } + + public HeapMemStoreLAB(Configuration conf) { + chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); + maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); + this.chunkPool = MemStoreChunkPool.getPool(conf); + + // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! + Preconditions.checkArgument( + maxAlloc <= chunkSize, + MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); + } + + /** + * Allocate a slice of the given length. + * + * If the size is larger than the maximum size specified for this + * allocator, returns null. + */ + @Override + public ByteRange allocateBytes(int size) { + Preconditions.checkArgument(size >= 0, "negative size"); + + // Callers should satisfy large allocations directly from JVM since they + // don't cause fragmentation as badly. + if (size > maxAlloc) { + return null; + } + + while (true) { + Chunk c = getOrMakeChunk(); + + // Try to allocate from this chunk + int allocOffset = c.alloc(size); + if (allocOffset != -1) { + // We succeeded - this is the common case - small alloc + // from a big buffer + return new SimpleByteRange(c.data, allocOffset, size); + } + + // not enough space! + // try to retire this chunk + tryRetireChunk(c); + } + } + + /** + * Close this instance since it won't be used any more, try to put the chunks + * back to pool + */ + @Override + public void close() { + this.closed = true; + // We could put back the chunks to pool for reusing only when there is no + // opening scanner which will read their data + if (chunkPool != null && openScannerCount.get() == 0 + && reclaimed.compareAndSet(false, true)) { + chunkPool.putbackChunks(this.chunkQueue); + } + } + + /** + * Called when opening a scanner on the data of this MemStoreLAB + */ + @Override + public void incScannerCount() { + this.openScannerCount.incrementAndGet(); + } + + /** + * Called when closing a scanner on the data of this MemStoreLAB + */ + @Override + public void decScannerCount() { + int count = this.openScannerCount.decrementAndGet(); + if (chunkPool != null && count == 0 && this.closed + && reclaimed.compareAndSet(false, true)) { + chunkPool.putbackChunks(this.chunkQueue); + } + } + + /** + * Try to retire the current chunk if it is still + * c. Postcondition is that curChunk.get() + * != c + */ + private void tryRetireChunk(Chunk c) { + curChunk.compareAndSet(c, null); + // If the CAS succeeds, that means that we won the race + // to retire the chunk. We could use this opportunity to + // update metrics on external fragmentation. + // + // If the CAS fails, that means that someone else already + // retired the chunk for us. + } + + /** + * Get the current chunk, or, if there is no current chunk, + * allocate a new one from the JVM. + */ + private Chunk getOrMakeChunk() { + while (true) { + // Try to get the chunk + Chunk c = curChunk.get(); + if (c != null) { + return c; + } + + // No current chunk, so we want to allocate one. We race + // against other allocators to CAS in an uninitialized chunk + // (which is cheap to allocate) + c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize); + if (curChunk.compareAndSet(null, c)) { + // we won race - now we need to actually do the expensive + // allocation step + c.init(); + this.chunkQueue.add(c); + return c; + } else if (chunkPool != null) { + chunkPool.putbackChunk(c); + } + // someone else won race - that's fine, we'll try to grab theirs + // in the next iteration of the loop. + } + } + + /** + * A chunk of memory out of which allocations are sliced. + */ + static class Chunk { + /** Actual underlying data */ + private byte[] data; + + private static final int UNINITIALIZED = -1; + private static final int OOM = -2; + /** + * Offset for the next allocation, or the sentinel value -1 + * which implies that the chunk is still uninitialized. + * */ + private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED); + + /** Total number of allocations satisfied from this buffer */ + private AtomicInteger allocCount = new AtomicInteger(); + + /** Size of chunk in bytes */ + private final int size; + + /** + * Create an uninitialized chunk. Note that memory is not allocated yet, so + * this is cheap. + * @param size in bytes + */ + Chunk(int size) { + this.size = size; + } + + /** + * Actually claim the memory for this chunk. This should only be called from + * the thread that constructed the chunk. It is thread-safe against other + * threads calling alloc(), who will block until the allocation is complete. + */ + public void init() { + assert nextFreeOffset.get() == UNINITIALIZED; + try { + if (data == null) { + data = new byte[size]; + } + } catch (OutOfMemoryError e) { + boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); + assert failInit; // should be true. + throw e; + } + // Mark that it's ready for use + boolean initted = nextFreeOffset.compareAndSet( + UNINITIALIZED, 0); + // We should always succeed the above CAS since only one thread + // calls init()! + Preconditions.checkState(initted, + "Multiple threads tried to init same chunk"); + } + + /** + * Reset the offset to UNINITIALIZED before before reusing an old chunk + */ + void reset() { + if (nextFreeOffset.get() != UNINITIALIZED) { + nextFreeOffset.set(UNINITIALIZED); + allocCount.set(0); + } + } + + /** + * Try to allocate size bytes from the chunk. + * @return the offset of the successful allocation, or -1 to indicate not-enough-space + */ + public int alloc(int size) { + while (true) { + int oldOffset = nextFreeOffset.get(); + if (oldOffset == UNINITIALIZED) { + // The chunk doesn't have its data allocated yet. + // Since we found this in curChunk, we know that whoever + // CAS-ed it there is allocating it right now. So spin-loop + // shouldn't spin long! + Thread.yield(); + continue; + } + if (oldOffset == OOM) { + // doh we ran out of ram. return -1 to chuck this away. + return -1; + } + + if (oldOffset + size > data.length) { + return -1; // alloc doesn't fit + } + + // Try to atomically claim this chunk + if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) { + // we got the alloc + allocCount.incrementAndGet(); + return oldOffset; + } + // we raced and lost alloc, try again + } + } + + @Override + public String toString() { + return "Chunk@" + System.identityHashCode(this) + + " allocs=" + allocCount.get() + "waste=" + + (data.length - nextFreeOffset.get()); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java index 1230802f4a2..615b8604e30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java @@ -30,13 +30,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Chunk; +import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk; import org.apache.hadoop.util.StringUtils; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** - * A pool of {@link MemStoreLAB$Chunk} instances. + * A pool of {@link HeapMemStoreLAB$Chunk} instances. * * MemStoreChunkPool caches a number of retired chunks for reusing, it could * decrease allocating bytes when writing, thereby optimizing the garbage @@ -177,42 +177,39 @@ public class MemStoreChunkPool { * @param conf * @return the global MemStoreChunkPool instance */ - static synchronized MemStoreChunkPool getPool(Configuration conf) { + static MemStoreChunkPool getPool(Configuration conf) { if (globalInstance != null) return globalInstance; if (chunkPoolDisabled) return null; + synchronized (MemStoreChunkPool.class) { + if (globalInstance != null) return globalInstance; + float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT); + if (poolSizePercentage <= 0) { + chunkPoolDisabled = true; + return null; + } + if (poolSizePercentage > 1.0) { + throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); + } + long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); + long globalMemStoreLimit = (long) (heapMax * MemStoreFlusher.getGlobalMemStorePercent(conf)); + int chunkSize = conf.getInt(HeapMemStoreLAB.CHUNK_SIZE_KEY, + HeapMemStoreLAB.CHUNK_SIZE_DEFAULT); + int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize); - float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, - POOL_MAX_SIZE_DEFAULT); - if (poolSizePercentage <= 0) { - chunkPoolDisabled = true; - return null; - } - if (poolSizePercentage > 1.0) { - throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY - + " must be between 0.0 and 1.0"); - } - long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() - .getMax(); - long globalMemStoreLimit = (long) (heapMax * MemStoreFlusher.getGlobalMemStorePercent(conf)); - int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, - MemStoreLAB.CHUNK_SIZE_DEFAULT); - int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize); + float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY, + POOL_INITIAL_SIZE_DEFAULT); + if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { + throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY + + " must be between 0.0 and 1.0"); + } - float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY, - POOL_INITIAL_SIZE_DEFAULT); - if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { - throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY - + " must be between 0.0 and 1.0"); + int initialCount = (int) (initialCountPercentage * maxCount); + LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize) + + ", max count " + maxCount + ", initial count " + initialCount); + globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount); + return globalInstance; } - - int initialCount = (int) (initialCountPercentage * maxCount); - LOG.info("Allocating MemStoreChunkPool with chunk size " - + StringUtils.byteDesc(chunkSize) + ", max count " + maxCount - + ", initial count " + initialCount); - globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount, - initialCount); - return globalInstance; } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java index dc045801b1f..b8353052de8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,325 +17,47 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; - -import com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.util.ByteRange; /** * A memstore-local allocation buffer. *

- * The MemStoreLAB is basically a bump-the-pointer allocator that allocates - * big (2MB) byte[] chunks from and then doles it out to threads that request - * slices into the array. + * The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) chunks from + * and then doles it out to threads that request slices into the array. *

- * The purpose of this class is to combat heap fragmentation in the - * regionserver. By ensuring that all KeyValues in a given memstore refer - * only to large chunks of contiguous memory, we ensure that large blocks - * get freed up when the memstore is flushed. + * The purpose of this is to combat heap fragmentation in the regionserver. By ensuring that all + * KeyValues in a given memstore refer only to large chunks of contiguous memory, we ensure that + * large blocks get freed up when the memstore is flushed. *

- * Without the MSLAB, the byte array allocated during insertion end up - * interleaved throughout the heap, and the old generation gets progressively - * more fragmented until a stop-the-world compacting collection occurs. + * Without the MSLAB, the byte array allocated during insertion end up interleaved throughout the + * heap, and the old generation gets progressively more fragmented until a stop-the-world compacting + * collection occurs. *

- * TODO: we should probably benchmark whether word-aligning the allocations - * would provide a performance improvement - probably would speed up the - * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached - * anyway */ @InterfaceAudience.Private -public class MemStoreLAB { - private AtomicReference curChunk = new AtomicReference(); - // A queue of chunks contained by this memstore - private BlockingQueue chunkQueue = new LinkedBlockingQueue(); - - final static String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize"; - final static int CHUNK_SIZE_DEFAULT = 2048 * 1024; - final int chunkSize; - - final static String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation"; - final static int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through allocator - final int maxAlloc; - - private final MemStoreChunkPool chunkPool; - - // This flag is for closing this instance, its set when clearing snapshot of - // memstore - private volatile boolean closed = false; - // This flag is for reclaiming chunks. Its set when putting chunks back to - // pool - private AtomicBoolean reclaimed = new AtomicBoolean(false); - // Current count of open scanners which reading data from this MemStoreLAB - private final AtomicInteger openScannerCount = new AtomicInteger(); - - // Used in testing - public MemStoreLAB() { - this(new Configuration()); - } - - private MemStoreLAB(Configuration conf) { - this(conf, MemStoreChunkPool.getPool(conf)); - } - - public MemStoreLAB(Configuration conf, MemStoreChunkPool pool) { - chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); - maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); - this.chunkPool = pool; - - // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! - Preconditions.checkArgument( - maxAlloc <= chunkSize, - MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); - } +public interface MemStoreLAB { /** - * Allocate a slice of the given length. - * - * If the size is larger than the maximum size specified for this - * allocator, returns null. + * Allocate a slice of the given length. If the size is larger than the maximum size specified for + * this allocator, returns null. + * @param size + * @return {@link ByteRange} */ - public Allocation allocateBytes(int size) { - Preconditions.checkArgument(size >= 0, "negative size"); - - // Callers should satisfy large allocations directly from JVM since they - // don't cause fragmentation as badly. - if (size > maxAlloc) { - return null; - } - - while (true) { - Chunk c = getOrMakeChunk(); - - // Try to allocate from this chunk - int allocOffset = c.alloc(size); - if (allocOffset != -1) { - // We succeeded - this is the common case - small alloc - // from a big buffer - return new Allocation(c.data, allocOffset); - } - - // not enough space! - // try to retire this chunk - tryRetireChunk(c); - } - } + ByteRange allocateBytes(int size); /** - * Close this instance since it won't be used any more, try to put the chunks - * back to pool + * Close instance since it won't be used any more, try to put the chunks back to pool */ - void close() { - this.closed = true; - // We could put back the chunks to pool for reusing only when there is no - // opening scanner which will read their data - if (chunkPool != null && openScannerCount.get() == 0 - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.chunkQueue); - } - } + void close(); /** * Called when opening a scanner on the data of this MemStoreLAB */ - void incScannerCount() { - this.openScannerCount.incrementAndGet(); - } + void incScannerCount(); /** * Called when closing a scanner on the data of this MemStoreLAB */ - void decScannerCount() { - int count = this.openScannerCount.decrementAndGet(); - if (chunkPool != null && count == 0 && this.closed - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.chunkQueue); - } - } - - /** - * Try to retire the current chunk if it is still - * c. Postcondition is that curChunk.get() - * != c - */ - private void tryRetireChunk(Chunk c) { - curChunk.compareAndSet(c, null); - // If the CAS succeeds, that means that we won the race - // to retire the chunk. We could use this opportunity to - // update metrics on external fragmentation. - // - // If the CAS fails, that means that someone else already - // retired the chunk for us. - } - - /** - * Get the current chunk, or, if there is no current chunk, - * allocate a new one from the JVM. - */ - private Chunk getOrMakeChunk() { - while (true) { - // Try to get the chunk - Chunk c = curChunk.get(); - if (c != null) { - return c; - } - - // No current chunk, so we want to allocate one. We race - // against other allocators to CAS in an uninitialized chunk - // (which is cheap to allocate) - c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize); - if (curChunk.compareAndSet(null, c)) { - // we won race - now we need to actually do the expensive - // allocation step - c.init(); - this.chunkQueue.add(c); - return c; - } else if (chunkPool != null) { - chunkPool.putbackChunk(c); - } - // someone else won race - that's fine, we'll try to grab theirs - // in the next iteration of the loop. - } - } - - /** - * A chunk of memory out of which allocations are sliced. - */ - static class Chunk { - /** Actual underlying data */ - private byte[] data; - - private static final int UNINITIALIZED = -1; - private static final int OOM = -2; - /** - * Offset for the next allocation, or the sentinel value -1 - * which implies that the chunk is still uninitialized. - * */ - private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED); - - /** Total number of allocations satisfied from this buffer */ - private AtomicInteger allocCount = new AtomicInteger(); - - /** Size of chunk in bytes */ - private final int size; - - /** - * Create an uninitialized chunk. Note that memory is not allocated yet, so - * this is cheap. - * @param size in bytes - */ - Chunk(int size) { - this.size = size; - } - - /** - * Actually claim the memory for this chunk. This should only be called from - * the thread that constructed the chunk. It is thread-safe against other - * threads calling alloc(), who will block until the allocation is complete. - */ - public void init() { - assert nextFreeOffset.get() == UNINITIALIZED; - try { - if (data == null) { - data = new byte[size]; - } - } catch (OutOfMemoryError e) { - boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); - assert failInit; // should be true. - throw e; - } - // Mark that it's ready for use - boolean initted = nextFreeOffset.compareAndSet( - UNINITIALIZED, 0); - // We should always succeed the above CAS since only one thread - // calls init()! - Preconditions.checkState(initted, - "Multiple threads tried to init same chunk"); - } - - /** - * Reset the offset to UNINITIALIZED before before reusing an old chunk - */ - void reset() { - if (nextFreeOffset.get() != UNINITIALIZED) { - nextFreeOffset.set(UNINITIALIZED); - allocCount.set(0); - } - } - - /** - * Try to allocate size bytes from the chunk. - * @return the offset of the successful allocation, or -1 to indicate not-enough-space - */ - public int alloc(int size) { - while (true) { - int oldOffset = nextFreeOffset.get(); - if (oldOffset == UNINITIALIZED) { - // The chunk doesn't have its data allocated yet. - // Since we found this in curChunk, we know that whoever - // CAS-ed it there is allocating it right now. So spin-loop - // shouldn't spin long! - Thread.yield(); - continue; - } - if (oldOffset == OOM) { - // doh we ran out of ram. return -1 to chuck this away. - return -1; - } - - if (oldOffset + size > data.length) { - return -1; // alloc doesn't fit - } - - // Try to atomically claim this chunk - if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) { - // we got the alloc - allocCount.incrementAndGet(); - return oldOffset; - } - // we raced and lost alloc, try again - } - } - - @Override - public String toString() { - return "Chunk@" + System.identityHashCode(this) + - " allocs=" + allocCount.get() + "waste=" + - (data.length - nextFreeOffset.get()); - } - } - - /** - * The result of a single allocation. Contains the chunk that the - * allocation points into, and the offset in this array where the - * slice begins. - */ - public static class Allocation { - private final byte[] data; - private final int offset; - - private Allocation(byte[] data, int off) { - this.data = data; - this.offset = off; - } - - @Override - public String toString() { - return "Allocation(" + "capacity=" + data.length + ", off=" + offset - + ")"; - } - - byte[] getData() { - return data; - } - - int getOffset() { - return offset; - } - } + void decScannerCount(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index f00a8fea12f..2936f232ada 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -28,7 +28,7 @@ import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.SmallTests; -import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; +import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.Before; @@ -68,21 +68,21 @@ public class TestMemStoreChunkPool { @Test public void testReusingChunks() { Random rand = new Random(); - MemStoreLAB mslab = new MemStoreLAB(conf, chunkPool); + MemStoreLAB mslab = new HeapMemStoreLAB(conf); int expectedOff = 0; byte[] lastBuffer = null; // Randomly allocate some bytes for (int i = 0; i < 100; i++) { int size = rand.nextInt(1000); - Allocation alloc = mslab.allocateBytes(size); + ByteRange alloc = mslab.allocateBytes(size); - if (alloc.getData() != lastBuffer) { + if (alloc.getBytes() != lastBuffer) { expectedOff = 0; - lastBuffer = alloc.getData(); + lastBuffer = alloc.getBytes(); } assertEquals(expectedOff, alloc.getOffset()); - assertTrue("Allocation " + alloc + " overruns buffer", alloc.getOffset() - + size <= alloc.getData().length); + assertTrue("Allocation overruns buffer", alloc.getOffset() + + size <= alloc.getBytes().length); expectedOff += size; } // chunks will be put back to pool after close @@ -90,7 +90,7 @@ public class TestMemStoreChunkPool { int chunkCount = chunkPool.getPoolSize(); assertTrue(chunkCount > 0); // reconstruct mslab - mslab = new MemStoreLAB(conf, chunkPool); + mslab = new HeapMemStoreLAB(conf); // chunk should be got from the pool, so we can reuse it. mslab.allocateBytes(1000); assertEquals(chunkCount - 1, chunkPool.getPoolSize()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index 0423b37cea9..4b0b5ad8761 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.SmallTests; -import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; +import org.apache.hadoop.hbase.util.ByteRange; import org.junit.Test; import com.google.common.collect.Iterables; @@ -47,7 +47,7 @@ public class TestMemStoreLAB { @Test public void testLABRandomAllocation() { Random rand = new Random(); - MemStoreLAB mslab = new MemStoreLAB(); + MemStoreLAB mslab = new HeapMemStoreLAB(); int expectedOff = 0; byte[] lastBuffer = null; // 100K iterations by 0-1K alloc -> 50MB expected @@ -55,23 +55,23 @@ public class TestMemStoreLAB { // behavior for (int i = 0; i < 100000; i++) { int size = rand.nextInt(1000); - Allocation alloc = mslab.allocateBytes(size); + ByteRange alloc = mslab.allocateBytes(size); - if (alloc.getData() != lastBuffer) { + if (alloc.getBytes() != lastBuffer) { expectedOff = 0; - lastBuffer = alloc.getData(); + lastBuffer = alloc.getBytes(); } assertEquals(expectedOff, alloc.getOffset()); - assertTrue("Allocation " + alloc + " overruns buffer", - alloc.getOffset() + size <= alloc.getData().length); + assertTrue("Allocation overruns buffer", + alloc.getOffset() + size <= alloc.getBytes().length); expectedOff += size; } } @Test public void testLABLargeAllocation() { - MemStoreLAB mslab = new MemStoreLAB(); - Allocation alloc = mslab.allocateBytes(2*1024*1024); + MemStoreLAB mslab = new HeapMemStoreLAB(); + ByteRange alloc = mslab.allocateBytes(2*1024*1024); assertNull("2MB allocation shouldn't be satisfied by LAB.", alloc); } @@ -88,7 +88,7 @@ public class TestMemStoreLAB { final AtomicInteger totalAllocated = new AtomicInteger(); - final MemStoreLAB mslab = new MemStoreLAB(); + final MemStoreLAB mslab = new HeapMemStoreLAB(); List> allocations = Lists.newArrayList(); for (int i = 0; i < 10; i++) { @@ -100,7 +100,7 @@ public class TestMemStoreLAB { @Override public void doAnAction() throws Exception { int size = r.nextInt(1000); - Allocation alloc = mslab.allocateBytes(size); + ByteRange alloc = mslab.allocateBytes(size); totalAllocated.addAndGet(size); allocsByThisThread.add(new AllocRecord(alloc, size)); } @@ -125,10 +125,10 @@ public class TestMemStoreLAB { if (rec.size == 0) continue; Map mapForThisByteArray = - mapsByChunk.get(rec.alloc.getData()); + mapsByChunk.get(rec.alloc.getBytes()); if (mapForThisByteArray == null) { mapForThisByteArray = Maps.newTreeMap(); - mapsByChunk.put(rec.alloc.getData(), mapForThisByteArray); + mapsByChunk.put(rec.alloc.getBytes(), mapForThisByteArray); } AllocRecord oldVal = mapForThisByteArray.put(rec.alloc.getOffset(), rec); assertNull("Already had an entry " + oldVal + " for allocation " + rec, @@ -141,8 +141,8 @@ public class TestMemStoreLAB { int expectedOff = 0; for (AllocRecord alloc : allocsInChunk.values()) { assertEquals(expectedOff, alloc.alloc.getOffset()); - assertTrue("Allocation " + alloc + " overruns buffer", - alloc.alloc.getOffset() + alloc.size <= alloc.alloc.getData().length); + assertTrue("Allocation overruns buffer", + alloc.alloc.getOffset() + alloc.size <= alloc.alloc.getBytes().length); expectedOff += alloc.size; } } @@ -150,9 +150,9 @@ public class TestMemStoreLAB { } private static class AllocRecord implements Comparable{ - private final Allocation alloc; + private final ByteRange alloc; private final int size; - public AllocRecord(Allocation alloc, int size) { + public AllocRecord(ByteRange alloc, int size) { super(); this.alloc = alloc; this.size = size; @@ -160,7 +160,7 @@ public class TestMemStoreLAB { @Override public int compareTo(AllocRecord e) { - if (alloc.getData() != e.alloc.getData()) { + if (alloc.getBytes() != e.alloc.getBytes()) { throw new RuntimeException("Can only compare within a particular array"); } return Ints.compare(alloc.getOffset(), e.alloc.getOffset()); @@ -168,7 +168,7 @@ public class TestMemStoreLAB { @Override public String toString() { - return "AllocRecord(alloc=" + alloc + ", size=" + size + ")"; + return "AllocRecord(offset=" + alloc.getOffset() + ", size=" + size + ")"; } }