From 82dd6f52352ce62ef868a5aa1712fd1c3ffd22d4 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 6 Oct 2011 21:39:47 +0000 Subject: [PATCH] HBASE-4482 Race Condition Concerning Eviction in SlabCache git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1179868 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hbase/io/hfile/slab/SingleSizeCache.java | 20 ++- .../hadoop/hbase/io/hfile/slab/SlabCache.java | 117 ++++++++++++++---- .../hfile/slab/SlabItemEvictionWatcher.java | 3 +- .../hadoop/hbase/regionserver/StoreFile.java | 9 +- .../hadoop/hbase/io/hfile/CacheTestUtils.java | 9 +- .../io/hfile/slab/TestSingleSizeCache.java | 15 +-- .../hbase/io/hfile/slab/TestSlabCache.java | 29 +++-- 8 files changed, 138 insertions(+), 65 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e2fc12e2d83..794f7daa042 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -342,6 +342,7 @@ Release 0.92.0 - Unreleased HBASE-4481 TestMergeTool failed in 0.92 build 20 HBASE-4386 Fix a potential NPE in TaskMonitor (todd) HBASE-4402 Retaining locality after restart broken + HBASE-4482 Race Condition Concerning Eviction in SlabCache (Li Pi) TESTS HBASE-4450 test for number of blocks read: to serve as baseline for expected diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java index 3798a060c10..d7ab1083462 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java @@ -58,8 +58,8 @@ public class SingleSizeCache implements BlockCache, HeapSize { private final int blockSize; private final CacheStats stats; private final SlabItemEvictionWatcher evictionWatcher; - private AtomicLong size; - private AtomicLong timeSinceLastAccess; + private final AtomicLong size; + private final AtomicLong timeSinceLastAccess; public final static long CACHE_FIXED_OVERHEAD = ClassSize .align((2 * Bytes.SIZEOF_INT) + (5 * ClassSize.REFERENCE) + +ClassSize.OBJECT); @@ -87,13 +87,15 @@ public class SingleSizeCache implements BlockCache, HeapSize { this.size = new AtomicLong(CACHE_FIXED_OVERHEAD + backingStore.heapSize()); this.timeSinceLastAccess = new AtomicLong(); - // This evictionListener is called whenever the cache automatically evicts + // This evictionListener is called whenever the cache automatically + // evicts // something. MapEvictionListener listener = new MapEvictionListener() { @Override public void onEviction(String key, CacheablePair value) { timeSinceLastAccess.set(System.nanoTime() - value.recentlyAccessed.get()); + stats.evict(); doEviction(key, value); } }; @@ -107,12 +109,6 @@ public class SingleSizeCache implements BlockCache, HeapSize { public void cacheBlock(String blockName, Cacheable toBeCached) { ByteBuffer storedBlock; - /* - * Spinlock if empty, Guava Mapmaker guarantees that we will not store more - * items than the memory we have allocated, but the Slab Allocator may still - * be empty if we have not yet completed eviction - */ - try { storedBlock = backingStore.alloc(toBeCached.getSerializedLength()); } catch (InterruptedException e) { @@ -171,6 +167,7 @@ public class SingleSizeCache implements BlockCache, HeapSize { public boolean evictBlock(String key) { stats.evict(); CacheablePair evictedBlock = backingMap.remove(key); + if (evictedBlock != null) { doEviction(key, evictedBlock); } @@ -200,8 +197,9 @@ public class SingleSizeCache implements BlockCache, HeapSize { // Thread A sees the null serializedData, and returns null // Thread A calls cacheBlock on the same block, and gets // "already cached" since the block is still in backingStore + if (evictionWatcher != null) { - evictionWatcher.onEviction(key, false); + evictionWatcher.onEviction(key, this); } } stats.evicted(); @@ -210,7 +208,7 @@ public class SingleSizeCache implements BlockCache, HeapSize { public void logStats() { - long milliseconds = (long) this.timeSinceLastAccess.get() / 1000000; + long milliseconds = this.timeSinceLastAccess.get() / 1000000; LOG.info("For Slab of size " + this.blockSize + ": " + this.getOccupiedSize() / this.blockSize diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java index fe8b95a2e59..b3e6538668d 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java @@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.io.hfile.slab; import java.math.BigDecimal; -import java.util.Map.Entry; import java.util.List; +import java.util.Map.Entry; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -122,7 +122,9 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize + sizes.length + " slabs " + "offheapslabporportions and offheapslabsizes"); } - /* We use BigDecimals instead of floats because float rounding is annoying */ + /* + * We use BigDecimals instead of floats because float rounding is annoying + */ BigDecimal[] parsedProportions = stringArrayToBigDecimalArray(porportions); BigDecimal[] parsedSizes = stringArrayToBigDecimalArray(sizes); @@ -205,12 +207,37 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize this.successfullyCachedStats.addin(cachedItem.getSerializedLength()); SingleSizeCache scache = scacheEntry.getValue(); - /*This will throw a runtime exception if we try to cache the same value twice*/ + /* + * This will throw a runtime exception if we try to cache the same value + * twice + */ scache.cacheBlock(blockName, cachedItem); - /*Spinlock, if we're spinlocking, that means an eviction hasn't taken place yet*/ - while (backingStore.putIfAbsent(blockName, scache) != null) { - Thread.yield(); + /* + * If an eviction for this value hasn't taken place yet, we want to wait for + * it to take place. See HBase-4330. + */ + SingleSizeCache replace; + while ((replace = backingStore.putIfAbsent(blockName, scache)) != null) { + synchronized (replace) { + /* + * With the exception of unit tests, this should happen extremely + * rarely. + */ + try { + replace.wait(); + } catch (InterruptedException e) { + LOG.warn("InterruptedException on the caching thread: " + e); + } + } + } + + /* + * Let the eviction threads know that something has been cached, and let + * them try their hand at eviction + */ + synchronized (scache) { + scache.notifyAll(); } } @@ -254,25 +281,70 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize * the evict counter. */ public boolean evictBlock(String key) { - stats.evict(); - return onEviction(key, true); + SingleSizeCache cacheEntry = backingStore.get(key); + if (cacheEntry == null) { + return false; + } else { + cacheEntry.evictBlock(key); + return true; + } } @Override - public boolean onEviction(String key, boolean callAssignedCache) { - SingleSizeCache cacheEntry = backingStore.remove(key); - if (cacheEntry == null) { - return false; - } - /* we need to bump up stats.evict, as this call came from the assignedCache. */ - if (callAssignedCache == false) { - stats.evict(); + public void onEviction(String key, Object notifier) { + /* + * Without the while loop below, the following can occur: + * + * Invariant: Anything in SingleSizeCache will have a representation in + * SlabCache, and vice-versa. + * + * Start: Key A is in both SingleSizeCache and SlabCache. Invariant is + * satisfied + * + * Thread A: Caches something, starting eviction of Key A in SingleSizeCache + * + * Thread B: Checks for Key A -> Returns Gets Null, as eviction has begun + * + * Thread B: Recaches Key A, gets to SingleSizeCache, does not get the + * PutIfAbsentLoop yet... + * + * Thread C: Caches another key, starting the second eviction of Key A. + * + * Thread A: does its onEviction, removing the entry of Key A from + * SlabCache. + * + * Thread C: does its onEviction, removing the (blank) entry of Key A from + * SlabCache: + * + * Thread B: goes to putifabsent, and puts its entry into SlabCache. + * + * Result: SlabCache has an entry for A, while SingleSizeCache has no + * entries for A. Invariant is violated. + * + * What the while loop does, is that, at the end, it GUARANTEES that an + * onEviction will remove an entry. See HBase-4482. + */ + + stats.evict(); + while ((backingStore.remove(key)) == null) { + /* With the exception of unit tests, this should happen extremely rarely. */ + synchronized (notifier) { + try { + notifier.wait(); + } catch (InterruptedException e) { + LOG.warn("InterruptedException on the evicting thread: " + e); + } + } } stats.evicted(); - if (callAssignedCache) { - cacheEntry.evictBlock(key); + + /* + * Now we've evicted something, lets tell the caching threads to try to + * cache something. + */ + synchronized (notifier) { + notifier.notifyAll(); } - return true; } /** @@ -346,7 +418,8 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize * */ static class SlabStats { - // the maximum size somebody will ever try to cache, then we multiply by 10 + // the maximum size somebody will ever try to cache, then we multiply by + // 10 // so we have finer grained stats. final int MULTIPLIER = 10; final int NUMDIVISIONS = (int) (Math.log(Integer.MAX_VALUE) * MULTIPLIER); @@ -368,11 +441,11 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize } double getUpperBound(int index) { - return Math.pow(Math.E, ((double) (index + 0.5) / (double) MULTIPLIER)); + return Math.pow(Math.E, ((index + 0.5) / MULTIPLIER)); } double getLowerBound(int index) { - return Math.pow(Math.E, ((double) (index - 0.5) / (double) MULTIPLIER)); + return Math.pow(Math.E, ((index - 0.5) / MULTIPLIER)); } public void logStats() { diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabItemEvictionWatcher.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabItemEvictionWatcher.java index 91b16036f80..38bf85cad92 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabItemEvictionWatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabItemEvictionWatcher.java @@ -30,9 +30,10 @@ interface SlabItemEvictionWatcher { * SingleSizeSlabCaches. * * @param key the key of the item being evicted + * @param notifier the object notifying the SlabCache of the eviction. * @param boolean callAssignedCache whether we should call the cache which the * key was originally assigned to. */ - boolean onEviction(String key, boolean callAssignedCache); + void onEviction(String key, Object notifier); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 407238789ca..6c31373a06a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -386,17 +386,14 @@ public class StoreFile { MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); long cacheSize = (long)(mu.getMax() * cachePercentage); int blockSize = conf.getInt("hbase.offheapcache.minblocksize", HFile.DEFAULT_BLOCKSIZE); + long offHeapCacheSize = (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0.95) * DirectMemoryUtils.getDirectMemorySize()); boolean enableOffHeapCache = conf.getBoolean("hbase.offheapcache.enable", false); - long offHeapCacheSize = enableOffHeapCache ? - (long) (conf.getFloat("hbase.offheapcache.percentage", - (float) 0.95) * DirectMemoryUtils.getDirectMemorySize()) : - 0; LOG.info("Allocating LruBlockCache with maximum size " + StringUtils.humanReadableInt(cacheSize)); - if(offHeapCacheSize <= 0) { + if(offHeapCacheSize <= 0 || !enableOffHeapCache) { hfileBlockCache = new LruBlockCache(cacheSize, DEFAULT_BLOCKSIZE_SMALL); } else { - LOG.info("Allocating OffHeapCache with maximum size " + + LOG.info("Allocating OffHeapCache with maximum size " + StringUtils.humanReadableInt(offHeapCacheSize)); hfileBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize, DEFAULT_BLOCKSIZE_SMALL, blockSize, conf); } diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index 0814f41edd4..f7211b5df27 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -19,8 +19,11 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.*; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.nio.ByteBuffer; @@ -102,7 +105,7 @@ public class CacheTestUtils { Thread.sleep(10); } ctx.stop(); - if ((double) hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) { + if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) { fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get()); } @@ -201,7 +204,7 @@ public class CacheTestUtils { TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { @Override public void doAnAction() throws Exception { - for (int j = 0; j < 10; j++) { + for (int j = 0; j < 100; j++) { String key = "key_" + finalI + "_" + j; Arrays.fill(buf, (byte) (finalI * j)); final ByteArrayCacheable bac = new ByteArrayCacheable(buf); diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java index e0217805f41..ef2d1473bf3 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java @@ -20,8 +20,9 @@ package org.apache.hadoop.hbase.io.hfile.slab; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; -import org.apache.hadoop.hbase.io.hfile.slab.SingleSizeCache; -import org.junit.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; /** * Tests SingleSlabCache. @@ -48,28 +49,28 @@ public class TestSingleSizeCache { cache.shutdown(); } - @Ignore @Test + @Test public void testCacheSimple() throws Exception { CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); } - @Ignore @Test + @Test public void testCacheMultiThreaded() throws Exception { CacheTestUtils.testCacheMultiThreaded(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES, 0.80); } - @Ignore @Test + @Test public void testCacheMultiThreadedSingleKey() throws Exception { CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); } - @Ignore @Test + @Test public void testCacheMultiThreadedEviction() throws Exception { CacheTestUtils.hammerEviction(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); } - @Ignore @Test + @Test public void testHeapSizeChanges(){ CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); } diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java index 8dd51591735..37b7302d75b 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java @@ -19,16 +19,15 @@ */ package org.apache.hadoop.hbase.io.hfile.slab; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; -import org.apache.hadoop.hbase.io.hfile.slab.SlabCache; import org.apache.hadoop.hbase.io.hfile.slab.SlabCache.SlabStats; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.junit.Ignore; - -import static org.junit.Assert.*; /** * Basic test of SlabCache. Puts and gets. @@ -59,36 +58,36 @@ public class TestSlabCache { cache.shutdown(); } - @Ignore @Test + @Test public void testElementPlacement() { - assertEquals(cache.getHigherBlock((int) BLOCK_SIZE).getKey().intValue(), - (int) (BLOCK_SIZE * 11 / 10)); - assertEquals(cache.getHigherBlock((int) (BLOCK_SIZE * 2)).getKey() - .intValue(), (int) (BLOCK_SIZE * 21 / 10)); + assertEquals(cache.getHigherBlock(BLOCK_SIZE).getKey().intValue(), + (BLOCK_SIZE * 11 / 10)); + assertEquals(cache.getHigherBlock((BLOCK_SIZE * 2)).getKey() + .intValue(), (BLOCK_SIZE * 21 / 10)); } - @Ignore @Test + @Test public void testCacheSimple() throws Exception { CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); } - @Ignore @Test + @Test public void testCacheMultiThreaded() throws Exception { CacheTestUtils.testCacheMultiThreaded(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES, 0.80); } - @Ignore @Test + @Test public void testCacheMultiThreadedSingleKey() throws Exception { CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); } - @Ignore @Test + @Test public void testCacheMultiThreadedEviction() throws Exception { CacheTestUtils.hammerEviction(cache, BLOCK_SIZE, 10, NUM_QUERIES); } - @Ignore @Test + @Test /*Just checks if ranges overlap*/ public void testStatsArithmetic(){ SlabStats test = cache.requestStats; @@ -99,7 +98,7 @@ public class TestSlabCache { } } - @Ignore @Test + @Test public void testHeapSizeChanges(){ CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); }