HBASE-4482 Race Condition Concerning Eviction in SlabCache

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1179868 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-10-06 21:39:47 +00:00
parent c15feb5fbf
commit 82dd6f5235
8 changed files with 138 additions and 65 deletions

View File

@ -342,6 +342,7 @@ Release 0.92.0 - Unreleased
HBASE-4481 TestMergeTool failed in 0.92 build 20
HBASE-4386 Fix a potential NPE in TaskMonitor (todd)
HBASE-4402 Retaining locality after restart broken
HBASE-4482 Race Condition Concerning Eviction in SlabCache (Li Pi)
TESTS
HBASE-4450 test for number of blocks read: to serve as baseline for expected

View File

@ -58,8 +58,8 @@ public class SingleSizeCache implements BlockCache, HeapSize {
private final int blockSize;
private final CacheStats stats;
private final SlabItemEvictionWatcher evictionWatcher;
private AtomicLong size;
private AtomicLong timeSinceLastAccess;
private final AtomicLong size;
private final AtomicLong timeSinceLastAccess;
public final static long CACHE_FIXED_OVERHEAD = ClassSize
.align((2 * Bytes.SIZEOF_INT) + (5 * ClassSize.REFERENCE)
+ +ClassSize.OBJECT);
@ -87,13 +87,15 @@ public class SingleSizeCache implements BlockCache, HeapSize {
this.size = new AtomicLong(CACHE_FIXED_OVERHEAD + backingStore.heapSize());
this.timeSinceLastAccess = new AtomicLong();
// This evictionListener is called whenever the cache automatically evicts
// This evictionListener is called whenever the cache automatically
// evicts
// something.
MapEvictionListener<String, CacheablePair> listener = new MapEvictionListener<String, CacheablePair>() {
@Override
public void onEviction(String key, CacheablePair value) {
timeSinceLastAccess.set(System.nanoTime()
- value.recentlyAccessed.get());
stats.evict();
doEviction(key, value);
}
};
@ -107,12 +109,6 @@ public class SingleSizeCache implements BlockCache, HeapSize {
public void cacheBlock(String blockName, Cacheable toBeCached) {
ByteBuffer storedBlock;
/*
* Spinlock if empty, Guava Mapmaker guarantees that we will not store more
* items than the memory we have allocated, but the Slab Allocator may still
* be empty if we have not yet completed eviction
*/
try {
storedBlock = backingStore.alloc(toBeCached.getSerializedLength());
} catch (InterruptedException e) {
@ -171,6 +167,7 @@ public class SingleSizeCache implements BlockCache, HeapSize {
public boolean evictBlock(String key) {
stats.evict();
CacheablePair evictedBlock = backingMap.remove(key);
if (evictedBlock != null) {
doEviction(key, evictedBlock);
}
@ -200,8 +197,9 @@ public class SingleSizeCache implements BlockCache, HeapSize {
// Thread A sees the null serializedData, and returns null
// Thread A calls cacheBlock on the same block, and gets
// "already cached" since the block is still in backingStore
if (evictionWatcher != null) {
evictionWatcher.onEviction(key, false);
evictionWatcher.onEviction(key, this);
}
}
stats.evicted();
@ -210,7 +208,7 @@ public class SingleSizeCache implements BlockCache, HeapSize {
public void logStats() {
long milliseconds = (long) this.timeSinceLastAccess.get() / 1000000;
long milliseconds = this.timeSinceLastAccess.get() / 1000000;
LOG.info("For Slab of size " + this.blockSize + ": "
+ this.getOccupiedSize() / this.blockSize

View File

@ -21,8 +21,8 @@
package org.apache.hadoop.hbase.io.hfile.slab;
import java.math.BigDecimal;
import java.util.Map.Entry;
import java.util.List;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@ -122,7 +122,9 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize
+ sizes.length + " slabs "
+ "offheapslabporportions and offheapslabsizes");
}
/* We use BigDecimals instead of floats because float rounding is annoying */
/*
* We use BigDecimals instead of floats because float rounding is annoying
*/
BigDecimal[] parsedProportions = stringArrayToBigDecimalArray(porportions);
BigDecimal[] parsedSizes = stringArrayToBigDecimalArray(sizes);
@ -205,12 +207,37 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize
this.successfullyCachedStats.addin(cachedItem.getSerializedLength());
SingleSizeCache scache = scacheEntry.getValue();
/*This will throw a runtime exception if we try to cache the same value twice*/
/*
* This will throw a runtime exception if we try to cache the same value
* twice
*/
scache.cacheBlock(blockName, cachedItem);
/*Spinlock, if we're spinlocking, that means an eviction hasn't taken place yet*/
while (backingStore.putIfAbsent(blockName, scache) != null) {
Thread.yield();
/*
* If an eviction for this value hasn't taken place yet, we want to wait for
* it to take place. See HBase-4330.
*/
SingleSizeCache replace;
while ((replace = backingStore.putIfAbsent(blockName, scache)) != null) {
synchronized (replace) {
/*
* With the exception of unit tests, this should happen extremely
* rarely.
*/
try {
replace.wait();
} catch (InterruptedException e) {
LOG.warn("InterruptedException on the caching thread: " + e);
}
}
}
/*
* Let the eviction threads know that something has been cached, and let
* them try their hand at eviction
*/
synchronized (scache) {
scache.notifyAll();
}
}
@ -254,25 +281,70 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize
* the evict counter.
*/
public boolean evictBlock(String key) {
stats.evict();
return onEviction(key, true);
SingleSizeCache cacheEntry = backingStore.get(key);
if (cacheEntry == null) {
return false;
} else {
cacheEntry.evictBlock(key);
return true;
}
}
@Override
public boolean onEviction(String key, boolean callAssignedCache) {
SingleSizeCache cacheEntry = backingStore.remove(key);
if (cacheEntry == null) {
return false;
}
/* we need to bump up stats.evict, as this call came from the assignedCache. */
if (callAssignedCache == false) {
public void onEviction(String key, Object notifier) {
/*
* Without the while loop below, the following can occur:
*
* Invariant: Anything in SingleSizeCache will have a representation in
* SlabCache, and vice-versa.
*
* Start: Key A is in both SingleSizeCache and SlabCache. Invariant is
* satisfied
*
* Thread A: Caches something, starting eviction of Key A in SingleSizeCache
*
* Thread B: Checks for Key A -> Returns Gets Null, as eviction has begun
*
* Thread B: Recaches Key A, gets to SingleSizeCache, does not get the
* PutIfAbsentLoop yet...
*
* Thread C: Caches another key, starting the second eviction of Key A.
*
* Thread A: does its onEviction, removing the entry of Key A from
* SlabCache.
*
* Thread C: does its onEviction, removing the (blank) entry of Key A from
* SlabCache:
*
* Thread B: goes to putifabsent, and puts its entry into SlabCache.
*
* Result: SlabCache has an entry for A, while SingleSizeCache has no
* entries for A. Invariant is violated.
*
* What the while loop does, is that, at the end, it GUARANTEES that an
* onEviction will remove an entry. See HBase-4482.
*/
stats.evict();
while ((backingStore.remove(key)) == null) {
/* With the exception of unit tests, this should happen extremely rarely. */
synchronized (notifier) {
try {
notifier.wait();
} catch (InterruptedException e) {
LOG.warn("InterruptedException on the evicting thread: " + e);
}
}
}
stats.evicted();
if (callAssignedCache) {
cacheEntry.evictBlock(key);
/*
* Now we've evicted something, lets tell the caching threads to try to
* cache something.
*/
synchronized (notifier) {
notifier.notifyAll();
}
return true;
}
/**
@ -346,7 +418,8 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize
*
*/
static class SlabStats {
// the maximum size somebody will ever try to cache, then we multiply by 10
// the maximum size somebody will ever try to cache, then we multiply by
// 10
// so we have finer grained stats.
final int MULTIPLIER = 10;
final int NUMDIVISIONS = (int) (Math.log(Integer.MAX_VALUE) * MULTIPLIER);
@ -368,11 +441,11 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize
}
double getUpperBound(int index) {
return Math.pow(Math.E, ((double) (index + 0.5) / (double) MULTIPLIER));
return Math.pow(Math.E, ((index + 0.5) / MULTIPLIER));
}
double getLowerBound(int index) {
return Math.pow(Math.E, ((double) (index - 0.5) / (double) MULTIPLIER));
return Math.pow(Math.E, ((index - 0.5) / MULTIPLIER));
}
public void logStats() {

View File

@ -30,9 +30,10 @@ interface SlabItemEvictionWatcher {
* SingleSizeSlabCaches.
*
* @param key the key of the item being evicted
* @param notifier the object notifying the SlabCache of the eviction.
* @param boolean callAssignedCache whether we should call the cache which the
* key was originally assigned to.
*/
boolean onEviction(String key, boolean callAssignedCache);
void onEviction(String key, Object notifier);
}

View File

@ -386,14 +386,11 @@ public class StoreFile {
MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
long cacheSize = (long)(mu.getMax() * cachePercentage);
int blockSize = conf.getInt("hbase.offheapcache.minblocksize", HFile.DEFAULT_BLOCKSIZE);
long offHeapCacheSize = (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0.95) * DirectMemoryUtils.getDirectMemorySize());
boolean enableOffHeapCache = conf.getBoolean("hbase.offheapcache.enable", false);
long offHeapCacheSize = enableOffHeapCache ?
(long) (conf.getFloat("hbase.offheapcache.percentage",
(float) 0.95) * DirectMemoryUtils.getDirectMemorySize()) :
0;
LOG.info("Allocating LruBlockCache with maximum size " +
StringUtils.humanReadableInt(cacheSize));
if(offHeapCacheSize <= 0) {
if(offHeapCacheSize <= 0 || !enableOffHeapCache) {
hfileBlockCache = new LruBlockCache(cacheSize, DEFAULT_BLOCKSIZE_SMALL);
} else {
LOG.info("Allocating OffHeapCache with maximum size " +

View File

@ -19,8 +19,11 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -102,7 +105,7 @@ public class CacheTestUtils {
Thread.sleep(10);
}
ctx.stop();
if ((double) hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
fail("Too many nulls returned. Hits: " + hits.get() + " Misses: "
+ miss.get());
}
@ -201,7 +204,7 @@ public class CacheTestUtils {
TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
for (int j = 0; j < 10; j++) {
for (int j = 0; j < 100; j++) {
String key = "key_" + finalI + "_" + j;
Arrays.fill(buf, (byte) (finalI * j));
final ByteArrayCacheable bac = new ByteArrayCacheable(buf);

View File

@ -20,8 +20,9 @@
package org.apache.hadoop.hbase.io.hfile.slab;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
import org.apache.hadoop.hbase.io.hfile.slab.SingleSizeCache;
import org.junit.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Tests SingleSlabCache.
@ -48,28 +49,28 @@ public class TestSingleSizeCache {
cache.shutdown();
}
@Ignore @Test
@Test
public void testCacheSimple() throws Exception {
CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
}
@Ignore @Test
@Test
public void testCacheMultiThreaded() throws Exception {
CacheTestUtils.testCacheMultiThreaded(cache, BLOCK_SIZE,
NUM_THREADS, NUM_QUERIES, 0.80);
}
@Ignore @Test
@Test
public void testCacheMultiThreadedSingleKey() throws Exception {
CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES);
}
@Ignore @Test
@Test
public void testCacheMultiThreadedEviction() throws Exception {
CacheTestUtils.hammerEviction(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES);
}
@Ignore @Test
@Test
public void testHeapSizeChanges(){
CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
}

View File

@ -19,16 +19,15 @@
*/
package org.apache.hadoop.hbase.io.hfile.slab;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
import org.apache.hadoop.hbase.io.hfile.slab.SlabCache;
import org.apache.hadoop.hbase.io.hfile.slab.SlabCache.SlabStats;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.Ignore;
import static org.junit.Assert.*;
/**
* Basic test of SlabCache. Puts and gets.
@ -59,36 +58,36 @@ public class TestSlabCache {
cache.shutdown();
}
@Ignore @Test
@Test
public void testElementPlacement() {
assertEquals(cache.getHigherBlock((int) BLOCK_SIZE).getKey().intValue(),
(int) (BLOCK_SIZE * 11 / 10));
assertEquals(cache.getHigherBlock((int) (BLOCK_SIZE * 2)).getKey()
.intValue(), (int) (BLOCK_SIZE * 21 / 10));
assertEquals(cache.getHigherBlock(BLOCK_SIZE).getKey().intValue(),
(BLOCK_SIZE * 11 / 10));
assertEquals(cache.getHigherBlock((BLOCK_SIZE * 2)).getKey()
.intValue(), (BLOCK_SIZE * 21 / 10));
}
@Ignore @Test
@Test
public void testCacheSimple() throws Exception {
CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
}
@Ignore @Test
@Test
public void testCacheMultiThreaded() throws Exception {
CacheTestUtils.testCacheMultiThreaded(cache, BLOCK_SIZE, NUM_THREADS,
NUM_QUERIES, 0.80);
}
@Ignore @Test
@Test
public void testCacheMultiThreadedSingleKey() throws Exception {
CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES);
}
@Ignore @Test
@Test
public void testCacheMultiThreadedEviction() throws Exception {
CacheTestUtils.hammerEviction(cache, BLOCK_SIZE, 10, NUM_QUERIES);
}
@Ignore @Test
@Test
/*Just checks if ranges overlap*/
public void testStatsArithmetic(){
SlabStats test = cache.requestStats;
@ -99,7 +98,7 @@ public class TestSlabCache {
}
}
@Ignore @Test
@Test
public void testHeapSizeChanges(){
CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
}