HBASE-4330 Fix races in slab cache (Li Pi & Todd)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1170902 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-09-14 23:47:21 +00:00
parent 6e77e87c66
commit a76a708a20
9 changed files with 205 additions and 82 deletions

View File

@ -235,7 +235,6 @@ Release 0.91.0 - Unreleased
HBASE-4315 RPC logging too verbose (todd)
HBASE-4273 java.lang.NullPointerException when a table is being disabled and
HMaster restarts (Ming Ma)
HBASE-4310 SlabCache metrics bugfix (Li Pi)
HBASE-4027 Off Heap Cache never creates Slabs (Li Pi)
HBASE-4265 zookeeper.KeeperException$NodeExistsException if HMaster restarts
while table is being disabled (Ming Ma)
@ -262,6 +261,7 @@ Release 0.91.0 - Unreleased
exception (Jinchao Gao)
HBASE-4394 Add support for seeking hints to FilterList
HBASE-4406 TestOpenRegionHandler failing after HBASE-4287 (todd)
HBASE-4330 Fix races in slab cache (Li Pi & Todd)
IMPROVEMENTS
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)

View File

@ -91,18 +91,17 @@ public class HFileBlock implements Cacheable {
private static final CacheableDeserializer<Cacheable> blockDeserializer =
new CacheableDeserializer<Cacheable>() {
public HFileBlock deserialize(ByteBuffer buf) throws IOException{
ByteBuffer tempCopy = buf.duplicate();
ByteBuffer newByteBuffer = ByteBuffer.allocate(tempCopy.limit()
ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit()
- HFileBlock.EXTRA_SERIALIZATION_SPACE);
tempCopy.limit(tempCopy.limit()
buf.limit(buf.limit()
- HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
newByteBuffer.put(tempCopy);
newByteBuffer.put(buf);
HFileBlock ourBuffer = new HFileBlock(newByteBuffer);
tempCopy.position(tempCopy.limit());
tempCopy.limit(tempCopy.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
ourBuffer.offset = tempCopy.getLong();
ourBuffer.nextBlockOnDiskSizeWithHeader = tempCopy.getInt();
buf.position(buf.limit());
buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
ourBuffer.offset = buf.getLong();
ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
return ourBuffer;
}
};
@ -1534,4 +1533,4 @@ public class HFileBlock implements Cacheable {
}
}
}

View File

@ -19,12 +19,10 @@
*/
package org.apache.hadoop.hbase.io.hfile.slab;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -94,24 +92,9 @@ public class SingleSizeCache implements BlockCache {
MapEvictionListener<String, CacheablePair> listener = new MapEvictionListener<String, CacheablePair>() {
@Override
public void onEviction(String key, CacheablePair value) {
try {
value.evictionLock.writeLock().lock();
timeSinceLastAccess.set(System.nanoTime()
- value.recentlyAccessed.get());
backingStore.free(value.serializedData);
stats.evict();
/**
* We may choose to run this cache alone, without the SlabCache on
* top, no evictionWatcher in that case
*/
if (evictionWatcher != null) {
evictionWatcher.onEviction(key, false);
}
size.addAndGet(-1 * value.heapSize());
stats.evicted();
} finally {
value.evictionLock.writeLock().unlock();
}
timeSinceLastAccess.set(System.nanoTime()
- value.recentlyAccessed.get());
doEviction(key, value);
}
};
@ -121,7 +104,7 @@ public class SingleSizeCache implements BlockCache {
}
@Override
public synchronized void cacheBlock(String blockName, Cacheable toBeCached) {
public void cacheBlock(String blockName, Cacheable toBeCached) {
ByteBuffer storedBlock;
/*
@ -129,12 +112,18 @@ public class SingleSizeCache implements BlockCache {
* items than the memory we have allocated, but the Slab Allocator may still
* be empty if we have not yet completed eviction
*/
do {
try {
storedBlock = backingStore.alloc(toBeCached.getSerializedLength());
} while (storedBlock == null);
} catch (InterruptedException e) {
LOG.warn("SlabAllocator was interrupted while waiting for block to become available");
LOG.warn(e);
return;
}
CacheablePair newEntry = new CacheablePair(toBeCached.getDeserializer(),
storedBlock);
toBeCached.serialize(storedBlock);
CacheablePair alreadyCached = backingMap.putIfAbsent(blockName, newEntry);
@ -142,7 +131,6 @@ public class SingleSizeCache implements BlockCache {
backingStore.free(storedBlock);
throw new RuntimeException("already cached " + blockName);
}
toBeCached.serialize(storedBlock);
newEntry.recentlyAccessed.set(System.nanoTime());
this.size.addAndGet(newEntry.heapSize());
}
@ -157,20 +145,21 @@ public class SingleSizeCache implements BlockCache {
stats.hit(caching);
// If lock cannot be obtained, that means we're undergoing eviction.
if (contentBlock.evictionLock.readLock().tryLock()) {
try {
contentBlock.recentlyAccessed.set(System.nanoTime());
try {
contentBlock.recentlyAccessed.set(System.nanoTime());
synchronized (contentBlock) {
if (contentBlock.serializedData == null) {
// concurrently evicted
LOG.warn("Concurrent eviction of " + key);
return null;
}
return contentBlock.deserializer
.deserialize(contentBlock.serializedData);
} catch (IOException e) {
e.printStackTrace();
LOG.warn("Deserializer throwing ioexception, possibly deserializing wrong object buffer");
return null;
} finally {
contentBlock.evictionLock.readLock().unlock();
.deserialize(contentBlock.serializedData.asReadOnlyBuffer());
}
} catch (Throwable t) {
LOG.error("Deserializer threw an exception. This may indicate a bug.", t);
return null;
}
return null;
}
/**
@ -183,23 +172,45 @@ public class SingleSizeCache implements BlockCache {
stats.evict();
CacheablePair evictedBlock = backingMap.remove(key);
if (evictedBlock != null) {
try {
evictedBlock.evictionLock.writeLock().lock();
backingStore.free(evictedBlock.serializedData);
evictionWatcher.onEviction(key, false);
stats.evicted();
size.addAndGet(-1 * evictedBlock.heapSize());
} finally {
evictedBlock.evictionLock.writeLock().unlock();
}
doEviction(key, evictedBlock);
}
return evictedBlock != null;
}
private void doEviction(String key, CacheablePair evictedBlock) {
long evictedHeap = 0;
synchronized (evictedBlock) {
if (evictedBlock.serializedData == null) {
// someone else already freed
return;
}
evictedHeap = evictedBlock.heapSize();
ByteBuffer bb = evictedBlock.serializedData;
evictedBlock.serializedData = null;
backingStore.free(bb);
// We have to do this callback inside the synchronization here.
// Otherwise we can have the following interleaving:
// Thread A calls getBlock():
// SlabCache directs call to this SingleSizeCache
// It gets the CacheablePair object
// Thread B runs eviction
// doEviction() is called and sets serializedData = null, here.
// Thread A sees the null serializedData, and returns null
// Thread A calls cacheBlock on the same block, and gets
// "already cached" since the block is still in backingStore
if (evictionWatcher != null) {
evictionWatcher.onEviction(key, false);
}
}
stats.evicted();
size.addAndGet(-1 * evictedHeap);
}
public void logStats() {
long milliseconds = (long)this.timeSinceLastAccess.get() / 1000000;
long milliseconds = (long) this.timeSinceLastAccess.get() / 1000000;
LOG.info("For Slab of size " + this.blockSize + ": "
+ this.getOccupiedSize() / this.blockSize
@ -299,8 +310,7 @@ public class SingleSizeCache implements BlockCache {
/* Just a pair class, holds a reference to the parent cacheable */
private class CacheablePair implements HeapSize {
final CacheableDeserializer<Cacheable> deserializer;
final ByteBuffer serializedData;
final ReentrantReadWriteLock evictionLock;
ByteBuffer serializedData;
AtomicLong recentlyAccessed;
private CacheablePair(CacheableDeserializer<Cacheable> deserializer,
@ -308,7 +318,6 @@ public class SingleSizeCache implements BlockCache {
this.recentlyAccessed = new AtomicLong();
this.deserializer = deserializer;
this.serializedData = serializedData;
evictionLock = new ReentrantReadWriteLock();
}
/*

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.io.hfile.slab;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.util.ClassSize;
@ -37,7 +39,7 @@ class Slab implements org.apache.hadoop.hbase.io.HeapSize {
static final Log LOG = LogFactory.getLog(Slab.class);
/** This is where our items, or blocks of the slab, are stored. */
private ConcurrentLinkedQueue<ByteBuffer> buffers;
private LinkedBlockingQueue<ByteBuffer> buffers;
/** This is where our Slabs are stored */
private ConcurrentLinkedQueue<ByteBuffer> slabs;
@ -47,7 +49,7 @@ class Slab implements org.apache.hadoop.hbase.io.HeapSize {
private long heapSize;
Slab(int blockSize, int numBlocks) {
buffers = new ConcurrentLinkedQueue<ByteBuffer>();
buffers = new LinkedBlockingQueue<ByteBuffer>();
slabs = new ConcurrentLinkedQueue<ByteBuffer>();
this.blockSize = blockSize;
@ -108,16 +110,13 @@ class Slab implements org.apache.hadoop.hbase.io.HeapSize {
}
/*
* This returns null if empty. Throws an exception if you try to allocate a
* bigger size than the allocator can handle.
* Throws an exception if you try to allocate a
* bigger size than the allocator can handle. Alloc will block until a buffer is available.
*/
ByteBuffer alloc(int bufferSize) {
ByteBuffer alloc(int bufferSize) throws InterruptedException {
int newCapacity = Preconditions.checkPositionIndex(bufferSize, blockSize);
ByteBuffer returnedBuffer = buffers.poll();
if (returnedBuffer == null) {
return null;
}
ByteBuffer returnedBuffer = buffers.take();
returnedBuffer.clear().limit(newCapacity);
return returnedBuffer;

View File

@ -204,11 +204,14 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize
this.successfullyCachedStats.addin(cachedItem.getSerializedLength());
SingleSizeCache scache = scacheEntry.getValue();
scache.cacheBlock(blockName, cachedItem); // if this
// fails, due to
// block already
// being there, exception will be thrown
backingStore.put(blockName, scache);
/*This will throw a runtime exception if we try to cache the same value twice*/
scache.cacheBlock(blockName, cachedItem);
/*Spinlock, if we're spinlocking, that means an eviction hasn't taken place yet*/
while (backingStore.putIfAbsent(blockName, scache) != null) {
Thread.yield();
}
}
/**
@ -232,6 +235,7 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize
public Cacheable getBlock(String key, boolean caching) {
SingleSizeCache cachedBlock = backingStore.get(key);
if (cachedBlock == null) {
stats.miss(caching);
return null;
}
@ -272,12 +276,15 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize
}
/**
* Sends a shutdown to all SingleSizeCache's contained by this cache.F
* Sends a shutdown to all SingleSizeCache's contained by this cache.
*
* Also terminates the scheduleThreadPool.
*/
public void shutdown() {
for (SingleSizeCache s : sizer.values()) {
s.shutdown();
}
this.scheduleThreadPool.shutdown();
}
public long heapSize() {

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
@ -57,8 +58,8 @@ public class CacheTestUtils {
public void doAnAction() throws Exception {
if (!blocksToTest.isEmpty()) {
HFileBlockPair ourBlock = blocksToTest.poll();
//if we run out of blocks to test, then we should stop the tests.
if(ourBlock == null){
// if we run out of blocks to test, then we should stop the tests.
if (ourBlock == null) {
ctx.stop();
return;
}
@ -67,7 +68,9 @@ public class CacheTestUtils {
false);
if (retrievedBlock != null) {
assertEquals(ourBlock.block, retrievedBlock);
toBeTested.evictBlock(ourBlock.blockName);
hits.incrementAndGet();
assertNull(toBeTested.getBlock(ourBlock.blockName, false));
} else {
miss.incrementAndGet();
}
@ -75,6 +78,7 @@ public class CacheTestUtils {
}
}
};
t.setDaemon(true);
ctx.addThread(t);
}
ctx.startThreads();
@ -82,8 +86,9 @@ public class CacheTestUtils {
Thread.sleep(10);
}
ctx.stop();
if((double) hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore){
fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get());
if ((double) hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
fail("Too many nulls returned. Hits: " + hits.get() + " Misses: "
+ miss.get());
}
}
@ -130,19 +135,25 @@ public class CacheTestUtils {
public static void hammerSingleKey(final BlockCache toBeTested,
int BlockSize, int numThreads, int numQueries) throws Exception {
final HFileBlockPair kv = generateHFileBlocks(BlockSize, 1)[0];
final String key = "key";
final byte[] buf = new byte[5 * 1024];
Arrays.fill(buf, (byte) 5);
final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
Configuration conf = new Configuration();
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
conf);
final AtomicInteger totalQueries = new AtomicInteger();
toBeTested.cacheBlock(kv.blockName, kv.block);
toBeTested.cacheBlock(key, bac);
for (int i = 0; i < numThreads; i++) {
TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
assertEquals(kv.block, toBeTested.getBlock(kv.blockName, false));
ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested
.getBlock(key, false);
assertArrayEquals(buf, returned.buf);
totalQueries.incrementAndGet();
}
};
@ -157,6 +168,94 @@ public class CacheTestUtils {
ctx.stop();
}
public static void hammerEviction(final BlockCache toBeTested, int BlockSize,
int numThreads, int numQueries) throws Exception {
Configuration conf = new Configuration();
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
conf);
final AtomicInteger totalQueries = new AtomicInteger();
for (int i = 0; i < numThreads; i++) {
final int finalI = i;
final byte[] buf = new byte[5 * 1024];
TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
for (int j = 0; j < 10; j++) {
String key = "key_" + finalI + "_" + j;
Arrays.fill(buf, (byte) (finalI * j));
final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested
.getBlock(key, true);
if (gotBack != null) {
assertArrayEquals(gotBack.buf, bac.buf);
} else {
toBeTested.cacheBlock(key, bac);
}
}
totalQueries.incrementAndGet();
}
};
ctx.addThread(t);
}
ctx.startThreads();
while (totalQueries.get() < numQueries && ctx.shouldRun()) {
Thread.sleep(10);
}
ctx.stop();
assertTrue(toBeTested.getStats().getEvictedCount() > 0);
}
private static class ByteArrayCacheable implements Cacheable {
final byte[] buf;
public ByteArrayCacheable(byte[] buf) {
this.buf = buf;
}
@Override
public long heapSize() {
return 4 + buf.length;
}
@Override
public int getSerializedLength() {
return 4 + buf.length;
}
@Override
public void serialize(ByteBuffer destination) {
destination.putInt(buf.length);
Thread.yield();
destination.put(buf);
destination.rewind();
}
@Override
public CacheableDeserializer<Cacheable> getDeserializer() {
return new CacheableDeserializer<Cacheable>() {
@Override
public Cacheable deserialize(ByteBuffer b) throws IOException {
int len = b.getInt();
Thread.yield();
byte buf[] = new byte[len];
b.get(buf);
return new ByteArrayCacheable(buf);
}
};
}
}
private static HFileBlockPair[] generateHFileBlocks(int blockSize,
int numBlocks) {
HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];

View File

@ -63,5 +63,10 @@ public class TestSingleSizeCache {
public void testCacheMultiThreadedSingleKey() throws Exception {
CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES);
}
@Test
public void testCacheMultiThreadedEviction() throws Exception {
CacheTestUtils.hammerEviction(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES);
}
}

View File

@ -42,7 +42,7 @@ public class TestSlab {
}
@Test
public void testBasicFunctionality() {
public void testBasicFunctionality() throws InterruptedException {
for (int i = 0; i < NUMBLOCKS; i++) {
buffers[i] = testSlab.alloc(BLOCKSIZE);
assertEquals(BLOCKSIZE, buffers[i].limit());

View File

@ -43,7 +43,7 @@ public class TestSlabCache {
static final int CACHE_SIZE = 1000000;
static final int NUM_BLOCKS = 101;
static final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
static final int NUM_THREADS = 1000;
static final int NUM_THREADS = 50;
static final int NUM_QUERIES = 10000;
SlabCache cache;
@ -82,6 +82,11 @@ public class TestSlabCache {
CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES);
}
@Test
public void testCacheMultiThreadedEviction() throws Exception {
CacheTestUtils.hammerEviction(cache, BLOCK_SIZE, 10, NUM_QUERIES);
}
@Test
/*Just checks if ranges overlap*/
public void testStatsArithmetic(){