HBASE-3455. Add memstore-local allocation buffers to combat heap fragmentation in the region server.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1068148 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-02-07 21:39:08 +00:00
parent 1ae03f81b9
commit e0dbeec47f
7 changed files with 581 additions and 16 deletions

View File

@ -84,10 +84,17 @@ Release 0.91.0 - Unreleased
HBASE-3448 RegionSplitter, utility class to manually split tables HBASE-3448 RegionSplitter, utility class to manually split tables
HBASE-2824 A filter that randomly includes rows based on a configured HBASE-2824 A filter that randomly includes rows based on a configured
chance (Ferdy via Andrew Purtell) chance (Ferdy via Andrew Purtell)
HBASE-3455 Add memstore-local allocation buffers to combat heap
fragmentation in the region server. Enabled by default as of
0.91
Release 0.90.1 - Unreleased Release 0.90.1 - Unreleased
NEW FEATURES
HBASE-3455 Add memstore-local allocation buffers to combat heap
fragmentation in the region server. Experimental / disabled
by default in 0.90.1
BUG FIXES BUG FIXES
HBASE-3483 Memstore lower limit should trigger asynchronous flushes HBASE-3483 Memstore lower limit should trigger asynchronous flushes

View File

@ -34,10 +34,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -55,6 +58,13 @@ import org.apache.hadoop.hbase.util.ClassSize;
public class MemStore implements HeapSize { public class MemStore implements HeapSize {
private static final Log LOG = LogFactory.getLog(MemStore.class); private static final Log LOG = LogFactory.getLog(MemStore.class);
static final String USEMSLAB_KEY =
"hbase.hregion.memstore.mslab.enabled";
private static final boolean USEMSLAB_DEFAULT = false;
private Configuration conf;
// MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the // MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the
// better semantics. The Map will overwrite if passed a key it already had // better semantics. The Map will overwrite if passed a key it already had
// whereas the Set will not add new KV if key is same though value might be // whereas the Set will not add new KV if key is same though value might be
@ -81,18 +91,22 @@ public class MemStore implements HeapSize {
TimeRangeTracker timeRangeTracker; TimeRangeTracker timeRangeTracker;
TimeRangeTracker snapshotTimeRangeTracker; TimeRangeTracker snapshotTimeRangeTracker;
MemStoreLAB allocator;
/** /**
* Default constructor. Used for tests. * Default constructor. Used for tests.
*/ */
public MemStore() { public MemStore() {
this(KeyValue.COMPARATOR); this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
} }
/** /**
* Constructor. * Constructor.
* @param c Comparator * @param c Comparator
*/ */
public MemStore(final KeyValue.KVComparator c) { public MemStore(final Configuration conf,
final KeyValue.KVComparator c) {
this.conf = conf;
this.comparator = c; this.comparator = c;
this.comparatorIgnoreTimestamp = this.comparatorIgnoreTimestamp =
this.comparator.getComparatorIgnoringTimestamps(); this.comparator.getComparatorIgnoringTimestamps();
@ -102,6 +116,11 @@ public class MemStore implements HeapSize {
timeRangeTracker = new TimeRangeTracker(); timeRangeTracker = new TimeRangeTracker();
snapshotTimeRangeTracker = new TimeRangeTracker(); snapshotTimeRangeTracker = new TimeRangeTracker();
this.size = new AtomicLong(DEEP_OVERHEAD); this.size = new AtomicLong(DEEP_OVERHEAD);
if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
this.allocator = new MemStoreLAB(conf);
} else {
this.allocator = null;
}
} }
void dump() { void dump() {
@ -134,6 +153,10 @@ public class MemStore implements HeapSize {
this.timeRangeTracker = new TimeRangeTracker(); this.timeRangeTracker = new TimeRangeTracker();
// Reset heap to not include any keys // Reset heap to not include any keys
this.size.set(DEEP_OVERHEAD); this.size.set(DEEP_OVERHEAD);
// Reset allocator so we get a fresh buffer for the new memstore
if (allocator != null) {
this.allocator = new MemStoreLAB(conf);
}
} }
} }
} finally { } finally {
@ -184,18 +207,47 @@ public class MemStore implements HeapSize {
* @return approximate size of the passed key and value. * @return approximate size of the passed key and value.
*/ */
long add(final KeyValue kv) { long add(final KeyValue kv) {
long s = -1;
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
s = heapSizeChange(kv, this.kvset.add(kv)); KeyValue toAdd = maybeCloneWithAllocator(kv);
timeRangeTracker.includeTimestamp(kv); return internalAdd(toAdd);
this.size.addAndGet(s);
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
} }
}
/**
* Internal version of add() that doesn't clone KVs with the
* allocator, and doesn't take the lock.
*
* Callers should ensure they already have the read lock taken
*/
private long internalAdd(final KeyValue toAdd) {
long s = heapSizeChange(toAdd, this.kvset.add(toAdd));
timeRangeTracker.includeTimestamp(toAdd);
this.size.addAndGet(s);
return s; return s;
} }
private KeyValue maybeCloneWithAllocator(KeyValue kv) {
if (allocator == null) {
return kv;
}
int len = kv.getLength();
Allocation alloc = allocator.allocateBytes(len);
if (alloc == null) {
// The allocation was too large, allocator decided
// not to do anything with it.
return kv;
}
assert alloc != null && alloc.getData() != null;
System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
newKv.setMemstoreTS(kv.getMemstoreTS());
return newKv;
}
/** /**
* Write a delete * Write a delete
* @param delete * @param delete
@ -205,8 +257,9 @@ public class MemStore implements HeapSize {
long s = 0; long s = 0;
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
s += heapSizeChange(delete, this.kvset.add(delete)); KeyValue toAdd = maybeCloneWithAllocator(delete);
timeRangeTracker.includeTimestamp(delete); s += heapSizeChange(toAdd, this.kvset.add(toAdd));
timeRangeTracker.includeTimestamp(toAdd);
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
} }
@ -459,12 +512,20 @@ public class MemStore implements HeapSize {
* <p> * <p>
* If there are any existing KeyValues in this MemStore with the same row, * If there are any existing KeyValues in this MemStore with the same row,
* family, and qualifier, they are removed. * family, and qualifier, they are removed.
* <p>
* Callers must hold the read lock.
*
* @param kv * @param kv
* @return change in size of MemStore * @return change in size of MemStore
*/ */
private long upsert(KeyValue kv) { private long upsert(KeyValue kv) {
// Add the KeyValue to the MemStore // Add the KeyValue to the MemStore
long addedSize = add(kv); // Use the internalAdd method here since we (a) already have a lock
// and (b) cannot safely use the MSLAB here without potentially
// hitting OOME - see TestMemStore.testUpsertMSLAB for a
// test that triggers the pathological case if we don't avoid MSLAB
// here.
long addedSize = internalAdd(kv);
// Get the KeyValues for the row/family/qualifier regardless of timestamp. // Get the KeyValues for the row/family/qualifier regardless of timestamp.
// For this case we want to clean up any other puts // For this case we want to clean up any other puts
@ -732,7 +793,7 @@ public class MemStore implements HeapSize {
} }
public final static long FIXED_OVERHEAD = ClassSize.align( public final static long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (9 * ClassSize.REFERENCE)); ClassSize.OBJECT + (11 * ClassSize.REFERENCE));
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +

View File

@ -0,0 +1,262 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Preconditions;
/**
* A memstore-local allocation buffer.
* <p>
* The MemStoreLAB is basically a bump-the-pointer allocator that allocates
* big (2MB) byte[] chunks from and then doles it out to threads that request
* slices into the array.
* <p>
* The purpose of this class is to combat heap fragmentation in the
* regionserver. By ensuring that all KeyValues in a given memstore refer
* only to large chunks of contiguous memory, we ensure that large blocks
* get freed up when the memstore is flushed.
* <p>
* Without the MSLAB, the byte array allocated during insertion end up
* interleaved throughout the heap, and the old generation gets progressively
* more fragmented until a stop-the-world compacting collection occurs.
* <p>
* TODO: we should probably benchmark whether word-aligning the allocations
* would provide a performance improvement - probably would speed up the
* Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
* anyway
*/
public class MemStoreLAB {
private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
final static String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
final static int CHUNK_SIZE_DEFAULT = 2048 * 1024;
final int chunkSize;
final static String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
final static int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through allocator
final int maxAlloc;
public MemStoreLAB() {
this(new Configuration());
}
public MemStoreLAB(Configuration conf) {
chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
Preconditions.checkArgument(
maxAlloc <= chunkSize,
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
}
/**
* Allocate a slice of the given length.
*
* If the size is larger than the maximum size specified for this
* allocator, returns null.
*/
public Allocation allocateBytes(int size) {
Preconditions.checkArgument(size >= 0, "negative size");
// Callers should satisfy large allocations directly from JVM since they
// don't cause fragmentation as badly.
if (size > maxAlloc) {
return null;
}
while (true) {
Chunk c = getOrMakeChunk();
// Try to allocate from this chunk
int allocOffset = c.alloc(size);
if (allocOffset != -1) {
// We succeeded - this is the common case - small alloc
// from a big buffer
return new Allocation(c.data, allocOffset);
}
// not enough space!
// try to retire this chunk
tryRetireChunk(c);
}
}
/**
* Try to retire the current chunk if it is still
* <code>c</code>. Postcondition is that curChunk.get()
* != c
*/
private void tryRetireChunk(Chunk c) {
@SuppressWarnings("unused")
boolean weRetiredIt = curChunk.compareAndSet(c, null);
// If the CAS succeeds, that means that we won the race
// to retire the chunk. We could use this opportunity to
// update metrics on external fragmentation.
//
// If the CAS fails, that means that someone else already
// retired the chunk for us.
}
/**
* Get the current chunk, or, if there is no current chunk,
* allocate a new one from the JVM.
*/
private Chunk getOrMakeChunk() {
while (true) {
// Try to get the chunk
Chunk c = curChunk.get();
if (c != null) {
return c;
}
// No current chunk, so we want to allocate one. We race
// against other allocators to CAS in an uninitialized chunk
// (which is cheap to allocate)
c = new Chunk(chunkSize);
if (curChunk.compareAndSet(null, c)) {
// we won race - now we need to actually do the expensive
// allocation step
c.init();
return c;
}
// someone else won race - that's fine, we'll try to grab theirs
// in the next iteration of the loop.
}
}
/**
* A chunk of memory out of which allocations are sliced.
*/
private static class Chunk {
/** Actual underlying data */
private byte[] data;
private static final int UNINITIALIZED = -1;
/**
* Offset for the next allocation, or the sentinel value -1
* which implies that the chunk is still uninitialized.
* */
private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
/** Total number of allocations satisfied from this buffer */
private AtomicInteger allocCount = new AtomicInteger();
/** Size of chunk in bytes */
private final int size;
/**
* Create an uninitialized chunk. Note that memory is not allocated yet, so
* this is cheap.
* @param size in bytes
*/
private Chunk(int size) {
this.size = size;
}
/**
* Actually claim the memory for this chunk. This should only be called from
* the thread that constructed the chunk. It is thread-safe against other
* threads calling alloc(), who will block until the allocation is complete.
*/
public void init() {
assert nextFreeOffset.get() == UNINITIALIZED;
data = new byte[size];
// Mark that it's ready for use
boolean initted = nextFreeOffset.compareAndSet(
UNINITIALIZED, 0);
// We should always succeed the above CAS since only one thread
// calls init()!
Preconditions.checkState(initted,
"Multiple threads tried to init same chunk");
}
/**
* Try to allocate <code>size</code> bytes from the chunk.
* @return the offset of the successful allocation, or -1 to indicate not-enough-space
*/
public int alloc(int size) {
while (true) {
int oldOffset = nextFreeOffset.get();
if (oldOffset == UNINITIALIZED) {
// The chunk doesn't have its data allocated yet.
// Since we found this in curChunk, we know that whoever
// CAS-ed it there is allocating it right now. So spin-loop
// shouldn't spin long!
Thread.yield();
continue;
}
if (oldOffset + size > data.length) {
return -1; // alloc doesn't fit
}
// Try to atomically claim this chunk
if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
// we got the alloc
allocCount.incrementAndGet();
return oldOffset;
}
// we raced and lost alloc, try again
}
}
@Override
public String toString() {
return "Chunk@" + System.identityHashCode(this) +
" allocs=" + allocCount.get() + "waste=" +
(data.length - nextFreeOffset.get());
}
}
/**
* The result of a single allocation. Contains the chunk that the
* allocation points into, and the offset in this array where the
* slice begins.
*/
public static class Allocation {
private final byte[] data;
private final int offset;
private Allocation(byte[] data, int off) {
this.data = data;
this.offset = off;
}
@Override
public String toString() {
return "Allocation(data=" + data +
" with capacity=" + data.length +
", off=" + offset + ")";
}
byte[] getData() {
return data;
}
int getOffset() {
return offset;
}
}
}

View File

@ -178,7 +178,7 @@ public class Store implements HeapSize {
// second -> ms adjust for user data // second -> ms adjust for user data
this.ttl *= 1000; this.ttl *= 1000;
} }
this.memstore = new MemStore(this.comparator); this.memstore = new MemStore(conf, this.comparator);
this.storeNameStr = Bytes.toString(this.family.getName()); this.storeNameStr = Bytes.toString(this.family.getName());
// By default, compact if storefile.count >= minFilesToCompact // By default, compact if storefile.count >= minFilesToCompact

View File

@ -359,6 +359,16 @@
worse, we OOME. worse, we OOME.
</description> </description>
</property> </property>
<property>
<name>hbase.hregion.memstore.mslab.enabled</name>
<value>true</value>
<description>
Enables the MemStore-Local Allocation Buffer,
a feature which works to prevent heap fragmentation under
heavy write loads. This can reduce the frequency of stop-the-world
GC pauses on large heaps.
</description>
</property>
<property> <property>
<name>hbase.hregion.max.filesize</name> <name>hbase.hregion.max.filesize</name>
<value>268435456</value> <value>268435456</value>

View File

@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.rmi.UnexpectedException; import java.rmi.UnexpectedException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -32,6 +34,8 @@ import junit.framework.TestCase;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.KeyValueTestUtil;
@ -470,7 +474,7 @@ public class TestMemStore extends TestCase {
} }
public void testMultipleVersionsSimple() throws Exception { public void testMultipleVersionsSimple() throws Exception {
MemStore m = new MemStore(KeyValue.COMPARATOR); MemStore m = new MemStore(new Configuration(), KeyValue.COMPARATOR);
byte [] row = Bytes.toBytes("testRow"); byte [] row = Bytes.toBytes("testRow");
byte [] family = Bytes.toBytes("testFamily"); byte [] family = Bytes.toBytes("testFamily");
byte [] qf = Bytes.toBytes("testQualifier"); byte [] qf = Bytes.toBytes("testQualifier");
@ -490,7 +494,7 @@ public class TestMemStore extends TestCase {
} }
public void testBinary() throws IOException { public void testBinary() throws IOException {
MemStore mc = new MemStore(KeyValue.ROOT_COMPARATOR); MemStore mc = new MemStore(new Configuration(), KeyValue.ROOT_COMPARATOR);
final int start = 43; final int start = 43;
final int end = 46; final int end = 46;
for (int k = start; k <= end; k++) { for (int k = start; k <= end; k++) {
@ -757,7 +761,6 @@ public class TestMemStore extends TestCase {
assertEquals(delete, memstore.kvset.first()); assertEquals(delete, memstore.kvset.first());
} }
//////////////////////////////////// ////////////////////////////////////
//Test for timestamps //Test for timestamps
//////////////////////////////////// ////////////////////////////////////
@ -790,6 +793,51 @@ public class TestMemStore extends TestCase {
//assertTrue(!memstore.shouldSeek(scan)); //assertTrue(!memstore.shouldSeek(scan));
} }
////////////////////////////////////
//Test for upsert with MSLAB
////////////////////////////////////
/**
* Test a pathological pattern that shows why we can't currently
* use the MSLAB for upsert workloads. This test inserts data
* in the following pattern:
*
* - row0001 through row1000 (fills up one 2M Chunk)
* - row0002 through row1001 (fills up another 2M chunk, leaves one reference
* to the first chunk
* - row0003 through row1002 (another chunk, another dangling reference)
*
* This causes OOME pretty quickly if we use MSLAB for upsert
* since each 2M chunk is held onto by a single reference.
*/
public void testUpsertMSLAB() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(MemStore.USEMSLAB_KEY, true);
memstore = new MemStore(conf, KeyValue.COMPARATOR);
int ROW_SIZE = 2048;
byte[] qualifier = new byte[ROW_SIZE - 4];
MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
for (int i = 0; i < 3; i++) { System.gc(); }
long usageBefore = bean.getHeapMemoryUsage().getUsed();
long size = 0;
long ts=0;
for (int newValue = 0; newValue < 1000; newValue++) {
for (int row = newValue; row < newValue + 1000; row++) {
byte[] rowBytes = Bytes.toBytes(row);
size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts);
}
}
System.out.println("Wrote " + ts + " vals");
for (int i = 0; i < 3; i++) { System.gc(); }
long usageAfter = bean.getHeapMemoryUsage().getUsed();
System.out.println("Memory used: " + (usageAfter - usageBefore)
+ " (heapsize: " + memstore.heapSize() +
" size: " + size + ")");
}
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Helpers // Helpers

View File

@ -0,0 +1,177 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
import org.junit.Test;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
public class TestMemStoreLAB {
/**
* Test a bunch of random allocations
*/
@Test
public void testLABRandomAllocation() {
Random rand = new Random();
MemStoreLAB mslab = new MemStoreLAB();
int expectedOff = 0;
byte[] lastBuffer = null;
// 100K iterations by 0-1K alloc -> 50MB expected
// should be reasonable for unit test and also cover wraparound
// behavior
for (int i = 0; i < 100000; i++) {
int size = rand.nextInt(1000);
Allocation alloc = mslab.allocateBytes(size);
if (alloc.getData() != lastBuffer) {
expectedOff = 0;
lastBuffer = alloc.getData();
}
assertEquals(expectedOff, alloc.getOffset());
assertTrue("Allocation " + alloc + " overruns buffer",
alloc.getOffset() + size <= alloc.getData().length);
expectedOff += size;
}
}
@Test
public void testLABLargeAllocation() {
MemStoreLAB mslab = new MemStoreLAB();
Allocation alloc = mslab.allocateBytes(2*1024*1024);
assertNull("2MB allocation shouldn't be satisfied by LAB.",
alloc);
}
/**
* Test allocation from lots of threads, making sure the results don't
* overlap in any way
*/
@Test
public void testLABThreading() throws Exception {
Configuration conf = new Configuration();
MultithreadedTestUtil.TestContext ctx =
new MultithreadedTestUtil.TestContext(conf);
final AtomicInteger totalAllocated = new AtomicInteger();
final MemStoreLAB mslab = new MemStoreLAB();
List<List<AllocRecord>> allocations = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
final List<AllocRecord> allocsByThisThread = Lists.newLinkedList();
allocations.add(allocsByThisThread);
TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
private Random r = new Random();
@Override
public void doAnAction() throws Exception {
int size = r.nextInt(1000);
Allocation alloc = mslab.allocateBytes(size);
totalAllocated.addAndGet(size);
allocsByThisThread.add(new AllocRecord(alloc, size));
}
};
ctx.addThread(t);
}
ctx.startThreads();
while (totalAllocated.get() < 50*1024*1024 && ctx.shouldRun()) {
Thread.sleep(10);
}
ctx.stop();
// Partition the allocations by the actual byte[] they point into,
// make sure offsets are unique for each chunk
Map<byte[], Map<Integer, AllocRecord>> mapsByChunk =
Maps.newHashMap();
int sizeCounted = 0;
for (AllocRecord rec : Iterables.concat(allocations)) {
sizeCounted += rec.size;
if (rec.size == 0) continue;
Map<Integer, AllocRecord> mapForThisByteArray =
mapsByChunk.get(rec.alloc.getData());
if (mapForThisByteArray == null) {
mapForThisByteArray = Maps.newTreeMap();
mapsByChunk.put(rec.alloc.getData(), mapForThisByteArray);
}
AllocRecord oldVal = mapForThisByteArray.put(rec.alloc.getOffset(), rec);
assertNull("Already had an entry " + oldVal + " for allocation " + rec,
oldVal);
}
assertEquals("Sanity check test", sizeCounted, totalAllocated.get());
// Now check each byte array to make sure allocations don't overlap
for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) {
int expectedOff = 0;
for (AllocRecord alloc : allocsInChunk.values()) {
assertEquals(expectedOff, alloc.alloc.getOffset());
assertTrue("Allocation " + alloc + " overruns buffer",
alloc.alloc.getOffset() + alloc.size <= alloc.alloc.getData().length);
expectedOff += alloc.size;
}
}
}
private static class AllocRecord implements Comparable<AllocRecord>{
private final Allocation alloc;
private final int size;
public AllocRecord(Allocation alloc, int size) {
super();
this.alloc = alloc;
this.size = size;
}
@Override
public int compareTo(AllocRecord e) {
if (alloc.getData() != e.alloc.getData()) {
throw new RuntimeException("Can only compare within a particular array");
}
return Ints.compare(alloc.getOffset(), e.alloc.getOffset());
}
@Override
public String toString() {
return "AllocRecord(alloc=" + alloc + ", size=" + size + ")";
}
}
}