HBASE-13301 Possible memory leak in BucketCache

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
This commit is contained in:
zhangduo 2015-04-11 10:43:43 +08:00
parent ba4c14133e
commit bcd5c4d137
5 changed files with 194 additions and 113 deletions

View File

@ -39,6 +39,7 @@ import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -109,13 +110,14 @@ public class BucketCache implements BlockCache, HeapSize {
final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
// Store/read block data // Store/read block data
IOEngine ioEngine; final IOEngine ioEngine;
// Store the block in this map before writing it to cache // Store the block in this map before writing it to cache
@VisibleForTesting @VisibleForTesting
Map<BlockCacheKey, RAMQueueEntry> ramCache; final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache;
// In this map, store the block's meta data like offset, length // In this map, store the block's meta data like offset, length
private Map<BlockCacheKey, BucketEntry> backingMap; @VisibleForTesting
ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
/** /**
* Flag if the cache is enabled or not... We shut it off if there are IO * Flag if the cache is enabled or not... We shut it off if there are IO
@ -132,14 +134,14 @@ public class BucketCache implements BlockCache, HeapSize {
* to the BucketCache. It then updates the ramCache and backingMap accordingly. * to the BucketCache. It then updates the ramCache and backingMap accordingly.
*/ */
@VisibleForTesting @VisibleForTesting
ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
new ArrayList<BlockingQueue<RAMQueueEntry>>(); new ArrayList<BlockingQueue<RAMQueueEntry>>();
@VisibleForTesting @VisibleForTesting
WriterThread writerThreads[]; final WriterThread[] writerThreads;
/** Volatile boolean to track if free space is in process or not */ /** Volatile boolean to track if free space is in process or not */
private volatile boolean freeInProgress = false; private volatile boolean freeInProgress = false;
private Lock freeSpaceLock = new ReentrantLock(); private final Lock freeSpaceLock = new ReentrantLock();
private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<Integer>(); private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<Integer>();
@ -152,17 +154,16 @@ public class BucketCache implements BlockCache, HeapSize {
/** Cache access count (sequential ID) */ /** Cache access count (sequential ID) */
private final AtomicLong accessCount = new AtomicLong(0); private final AtomicLong accessCount = new AtomicLong(0);
private final Object[] cacheWaitSignals;
private static final int DEFAULT_CACHE_WAIT_TIME = 50; private static final int DEFAULT_CACHE_WAIT_TIME = 50;
// Used in test now. If the flag is false and the cache speed is very fast, // Used in test now. If the flag is false and the cache speed is very fast,
// bucket cache will skip some blocks when caching. If the flag is true, we // bucket cache will skip some blocks when caching. If the flag is true, we
// will wait blocks flushed to IOEngine for some time when caching // will wait blocks flushed to IOEngine for some time when caching
boolean wait_when_cache = false; boolean wait_when_cache = false;
private BucketCacheStats cacheStats = new BucketCacheStats(); private final BucketCacheStats cacheStats = new BucketCacheStats();
private String persistencePath; private final String persistencePath;
private long cacheCapacity; private final long cacheCapacity;
/** Approximate block size */ /** Approximate block size */
private final long blockSize; private final long blockSize;
@ -182,7 +183,8 @@ public class BucketCache implements BlockCache, HeapSize {
* *
* TODO:We could extend the IdLock to IdReadWriteLock for better. * TODO:We could extend the IdLock to IdReadWriteLock for better.
*/ */
private IdLock offsetLock = new IdLock(); @VisibleForTesting
final IdLock offsetLock = new IdLock();
private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile = private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>() { new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>() {
@ -216,7 +218,6 @@ public class BucketCache implements BlockCache, HeapSize {
throws FileNotFoundException, IOException { throws FileNotFoundException, IOException {
this.ioEngine = getIOEngineFromName(ioEngineName, capacity); this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
this.writerThreads = new WriterThread[writerThreadNum]; this.writerThreads = new WriterThread[writerThreadNum];
this.cacheWaitSignals = new Object[writerThreadNum];
long blockNumCapacity = capacity / blockSize; long blockNumCapacity = capacity / blockSize;
if (blockNumCapacity >= Integer.MAX_VALUE) { if (blockNumCapacity >= Integer.MAX_VALUE) {
// Enough for about 32TB of cache! // Enough for about 32TB of cache!
@ -231,7 +232,6 @@ public class BucketCache implements BlockCache, HeapSize {
bucketAllocator = new BucketAllocator(capacity, bucketSizes); bucketAllocator = new BucketAllocator(capacity, bucketSizes);
for (int i = 0; i < writerThreads.length; ++i) { for (int i = 0; i < writerThreads.length; ++i) {
writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen)); writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen));
this.cacheWaitSignals[i] = new Object();
} }
assert writerQueues.size() == writerThreads.length; assert writerQueues.size() == writerThreads.length;
@ -252,7 +252,7 @@ public class BucketCache implements BlockCache, HeapSize {
final String threadName = Thread.currentThread().getName(); final String threadName = Thread.currentThread().getName();
this.cacheEnabled = true; this.cacheEnabled = true;
for (int i = 0; i < writerThreads.length; ++i) { for (int i = 0; i < writerThreads.length; ++i) {
writerThreads[i] = new WriterThread(writerQueues.get(i), i); writerThreads[i] = new WriterThread(writerQueues.get(i));
writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
writerThreads[i].setDaemon(true); writerThreads[i].setDaemon(true);
} }
@ -344,38 +344,39 @@ public class BucketCache implements BlockCache, HeapSize {
* @param inMemory if block is in-memory * @param inMemory if block is in-memory
* @param wait if true, blocking wait when queue is full * @param wait if true, blocking wait when queue is full
*/ */
public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
boolean inMemory, boolean wait) { boolean wait) {
if (!cacheEnabled) if (!cacheEnabled) {
return; return;
}
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) if (backingMap.containsKey(cacheKey)) {
return; return;
}
/* /*
* Stuff the entry into the RAM cache so it can get drained to the * Stuff the entry into the RAM cache so it can get drained to the persistent store
* persistent store
*/ */
RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, RAMQueueEntry re =
accessCount.incrementAndGet(), inMemory); new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
ramCache.put(cacheKey, re); if (ramCache.putIfAbsent(cacheKey, re) != null) {
return;
}
int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
boolean successfulAddition = bq.offer(re); boolean successfulAddition = false;
if (!successfulAddition && wait) { if (wait) {
synchronized (cacheWaitSignals[queueNum]) { try {
try { successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
successfulAddition = bq.offer(re); } catch (InterruptedException e) {
if (!successfulAddition) cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME); Thread.currentThread().interrupt();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
} }
} else {
successfulAddition = bq.offer(re); successfulAddition = bq.offer(re);
} }
if (!successfulAddition) { if (!successfulAddition) {
ramCache.remove(cacheKey); ramCache.remove(cacheKey);
failedBlockAdditions.incrementAndGet(); failedBlockAdditions.incrementAndGet();
} else { } else {
this.blockNumber.incrementAndGet(); this.blockNumber.incrementAndGet();
this.heapSize.addAndGet(cachedItem.heapSize()); this.heapSize.addAndGet(cachedItem.heapSize());
@ -394,11 +395,14 @@ public class BucketCache implements BlockCache, HeapSize {
@Override @Override
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
boolean updateCacheMetrics) { boolean updateCacheMetrics) {
if (!cacheEnabled) if (!cacheEnabled) {
return null; return null;
}
RAMQueueEntry re = ramCache.get(key); RAMQueueEntry re = ramCache.get(key);
if (re != null) { if (re != null) {
if (updateCacheMetrics) cacheStats.hit(caching); if (updateCacheMetrics) {
cacheStats.hit(caching);
}
re.access(accessCount.incrementAndGet()); re.access(accessCount.incrementAndGet());
return re.getData(); return re.getData();
} }
@ -408,6 +412,9 @@ public class BucketCache implements BlockCache, HeapSize {
IdLock.Entry lockEntry = null; IdLock.Entry lockEntry = null;
try { try {
lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
// We can not read here even if backingMap does contain the given key because its offset
// maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
// existence here.
if (bucketEntry.equals(backingMap.get(key))) { if (bucketEntry.equals(backingMap.get(key))) {
int len = bucketEntry.getLength(); int len = bucketEntry.getLength();
ByteBuffer bb = ByteBuffer.allocate(len); ByteBuffer bb = ByteBuffer.allocate(len);
@ -438,13 +445,27 @@ public class BucketCache implements BlockCache, HeapSize {
} }
} }
} }
if (!repeat && updateCacheMetrics) cacheStats.miss(caching); if (!repeat && updateCacheMetrics) {
cacheStats.miss(caching);
}
return null; return null;
} }
@VisibleForTesting
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
bucketAllocator.freeBlock(bucketEntry.offset());
realCacheSize.addAndGet(-1 * bucketEntry.getLength());
blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
if (decrementBlockNumber) {
this.blockNumber.decrementAndGet();
}
}
@Override @Override
public boolean evictBlock(BlockCacheKey cacheKey) { public boolean evictBlock(BlockCacheKey cacheKey) {
if (!cacheEnabled) return false; if (!cacheEnabled) {
return false;
}
RAMQueueEntry removedBlock = ramCache.remove(cacheKey); RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
if (removedBlock != null) { if (removedBlock != null) {
this.blockNumber.decrementAndGet(); this.blockNumber.decrementAndGet();
@ -462,13 +483,8 @@ public class BucketCache implements BlockCache, HeapSize {
IdLock.Entry lockEntry = null; IdLock.Entry lockEntry = null;
try { try {
lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
if (bucketEntry.equals(backingMap.remove(cacheKey))) { if (backingMap.remove(cacheKey, bucketEntry)) {
bucketAllocator.freeBlock(bucketEntry.offset()); blockEvicted(cacheKey, bucketEntry, removedBlock == null);
realCacheSize.addAndGet(-1 * bucketEntry.getLength());
blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
if (removedBlock == null) {
this.blockNumber.decrementAndGet();
}
} else { } else {
return false; return false;
} }
@ -703,13 +719,10 @@ public class BucketCache implements BlockCache, HeapSize {
@VisibleForTesting @VisibleForTesting
class WriterThread extends HasThread { class WriterThread extends HasThread {
private final BlockingQueue<RAMQueueEntry> inputQueue; private final BlockingQueue<RAMQueueEntry> inputQueue;
private final int threadNO;
private volatile boolean writerEnabled = true; private volatile boolean writerEnabled = true;
WriterThread(BlockingQueue<RAMQueueEntry> queue, int threadNO) { WriterThread(BlockingQueue<RAMQueueEntry> queue) {
super();
this.inputQueue = queue; this.inputQueue = queue;
this.threadNO = threadNO;
} }
// Used for test // Used for test
@ -726,9 +739,6 @@ public class BucketCache implements BlockCache, HeapSize {
try { try {
// Blocks // Blocks
entries = getRAMQueueEntries(inputQueue, entries); entries = getRAMQueueEntries(inputQueue, entries);
synchronized (cacheWaitSignals[threadNO]) {
cacheWaitSignals[threadNO].notifyAll();
}
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
if (!cacheEnabled) break; if (!cacheEnabled) break;
} }
@ -753,7 +763,9 @@ public class BucketCache implements BlockCache, HeapSize {
*/ */
@VisibleForTesting @VisibleForTesting
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException { void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
if (entries.isEmpty()) return; if (entries.isEmpty()) {
return;
}
// This method is a little hard to follow. We run through the passed in entries and for each // This method is a little hard to follow. We run through the passed in entries and for each
// successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
// do cleanup making sure we've cleared ramCache of all entries regardless of whether we // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
@ -828,6 +840,21 @@ public class BucketCache implements BlockCache, HeapSize {
RAMQueueEntry ramCacheEntry = ramCache.remove(key); RAMQueueEntry ramCacheEntry = ramCache.remove(key);
if (ramCacheEntry != null) { if (ramCacheEntry != null) {
heapSize.addAndGet(-1 * entries.get(i).getData().heapSize()); heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
} else if (bucketEntries[i] != null){
// Block should have already been evicted. Remove it and free space.
IdLock.Entry lockEntry = null;
try {
lockEntry = offsetLock.getLockEntry(bucketEntries[i].offset());
if (backingMap.remove(key, bucketEntries[i])) {
blockEvicted(key, bucketEntries[i], false);
}
} catch (IOException e) {
LOG.warn("failed to free space for " + key, e);
} finally {
if (lockEntry != null) {
offsetLock.releaseLockEntry(lockEntry);
}
}
} }
} }
@ -1053,23 +1080,35 @@ public class BucketCache implements BlockCache, HeapSize {
* up the long. Doubt we'll see devices this big for ages. Offsets are divided * up the long. Doubt we'll see devices this big for ages. Offsets are divided
* by 256. So 5 bytes gives us 256TB or so. * by 256. So 5 bytes gives us 256TB or so.
*/ */
static class BucketEntry implements Serializable, Comparable<BucketEntry> { static class BucketEntry implements Serializable {
private static final long serialVersionUID = -6741504807982257534L; private static final long serialVersionUID = -6741504807982257534L;
// access counter comparator, descending order
static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
@Override
public int compare(BucketEntry o1, BucketEntry o2) {
long accessCounter1 = o1.accessCounter;
long accessCounter2 = o2.accessCounter;
return accessCounter1 < accessCounter2 ? 1 : accessCounter1 == accessCounter2 ? 0 : -1;
}
};
private int offsetBase; private int offsetBase;
private int length; private int length;
private byte offset1; private byte offset1;
byte deserialiserIndex; byte deserialiserIndex;
private volatile long accessTime; private volatile long accessCounter;
private BlockPriority priority; private BlockPriority priority;
/** /**
* Time this block was cached. Presumes we are created just before we are added to the cache. * Time this block was cached. Presumes we are created just before we are added to the cache.
*/ */
private final long cachedTime = System.nanoTime(); private final long cachedTime = System.nanoTime();
BucketEntry(long offset, int length, long accessTime, boolean inMemory) { BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
setOffset(offset); setOffset(offset);
this.length = length; this.length = length;
this.accessTime = accessTime; this.accessCounter = accessCounter;
if (inMemory) { if (inMemory) {
this.priority = BlockPriority.MEMORY; this.priority = BlockPriority.MEMORY;
} else { } else {
@ -1108,10 +1147,10 @@ public class BucketCache implements BlockCache, HeapSize {
} }
/** /**
* Block has been accessed. Update its local access time. * Block has been accessed. Update its local access counter.
*/ */
public void access(long accessTime) { public void access(long accessCounter) {
this.accessTime = accessTime; this.accessCounter = accessCounter;
if (this.priority == BlockPriority.SINGLE) { if (this.priority == BlockPriority.SINGLE) {
this.priority = BlockPriority.MULTI; this.priority = BlockPriority.MULTI;
} }
@ -1121,17 +1160,6 @@ public class BucketCache implements BlockCache, HeapSize {
return this.priority; return this.priority;
} }
@Override
public int compareTo(BucketEntry that) {
if(this.accessTime == that.accessTime) return 0;
return this.accessTime < that.accessTime ? 1 : -1;
}
@Override
public boolean equals(Object that) {
return this == that;
}
public long getCachedTime() { public long getCachedTime() {
return cachedTime; return cachedTime;
} }
@ -1202,14 +1230,14 @@ public class BucketCache implements BlockCache, HeapSize {
static class RAMQueueEntry { static class RAMQueueEntry {
private BlockCacheKey key; private BlockCacheKey key;
private Cacheable data; private Cacheable data;
private long accessTime; private long accessCounter;
private boolean inMemory; private boolean inMemory;
public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessTime, public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
boolean inMemory) { boolean inMemory) {
this.key = bck; this.key = bck;
this.data = data; this.data = data;
this.accessTime = accessTime; this.accessCounter = accessCounter;
this.inMemory = inMemory; this.inMemory = inMemory;
} }
@ -1221,8 +1249,8 @@ public class BucketCache implements BlockCache, HeapSize {
return key; return key;
} }
public void access(long accessTime) { public void access(long accessCounter) {
this.accessTime = accessTime; this.accessCounter = accessCounter;
} }
public BucketEntry writeToCache(final IOEngine ioEngine, public BucketEntry writeToCache(final IOEngine ioEngine,
@ -1234,7 +1262,7 @@ public class BucketCache implements BlockCache, HeapSize {
// This cacheable thing can't be serialized... // This cacheable thing can't be serialized...
if (len == 0) return null; if (len == 0) return null;
long offset = bucketAllocator.allocateBlock(len); long offset = bucketAllocator.allocateBlock(len);
BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory); BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
try { try {
if (data instanceof HFileBlock) { if (data instanceof HFileBlock) {

View File

@ -54,23 +54,23 @@ public class CachedEntryQueue {
*/ */
public CachedEntryQueue(long maxSize, long blockSize) { public CachedEntryQueue(long maxSize, long blockSize) {
int initialSize = (int) (maxSize / blockSize); int initialSize = (int) (maxSize / blockSize);
if (initialSize == 0) if (initialSize == 0) {
initialSize++; initialSize++;
queue = MinMaxPriorityQueue }
.orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() { queue = MinMaxPriorityQueue.orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() {
public int compare(Entry<BlockCacheKey, BucketEntry> entry1,
Entry<BlockCacheKey, BucketEntry> entry2) {
return entry1.getValue().compareTo(entry2.getValue());
}
}).expectedSize(initialSize).create(); public int compare(Entry<BlockCacheKey, BucketEntry> entry1,
Entry<BlockCacheKey, BucketEntry> entry2) {
return BucketEntry.COMPARATOR.compare(entry1.getValue(), entry2.getValue());
}
}).expectedSize(initialSize).create();
cacheSize = 0; cacheSize = 0;
this.maxSize = maxSize; this.maxSize = maxSize;
} }
/** /**
* Attempt to add the specified entry to this queue. * Attempt to add the specified entry to this queue.
*
* <p> * <p>
* If the queue is smaller than the max size, or if the specified element is * If the queue is smaller than the max size, or if the specified element is
* ordered after the smallest element in the queue, the element will be added * ordered after the smallest element in the queue, the element will be added
@ -83,7 +83,7 @@ public class CachedEntryQueue {
cacheSize += entry.getValue().getLength(); cacheSize += entry.getValue().getLength();
} else { } else {
BucketEntry head = queue.peek().getValue(); BucketEntry head = queue.peek().getValue();
if (entry.getValue().compareTo(head) > 0) { if (BucketEntry.COMPARATOR.compare(entry.getValue(), head) > 0) {
cacheSize += entry.getValue().getLength(); cacheSize += entry.getValue().getLength();
cacheSize -= head.getLength(); cacheSize -= head.getLength();
if (cacheSize > maxSize) { if (cacheSize > maxSize) {

View File

@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Allows multiple concurrent clients to lock on a numeric id with a minimal * Allows multiple concurrent clients to lock on a numeric id with a minimal
* memory overhead. The intended usage is as follows: * memory overhead. The intended usage is as follows:
@ -119,4 +121,18 @@ public class IdLock {
assert map.size() == 0; assert map.size() == 0;
} }
@VisibleForTesting
public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
for (Entry entry;;) {
entry = map.get(id);
if (entry != null) {
synchronized (entry) {
if (entry.numWaiters >= numWaiters) {
return;
}
}
}
Thread.sleep(100);
}
}
} }

View File

@ -247,11 +247,11 @@ public class CacheTestUtils {
assertTrue(toBeTested.getStats().getEvictedCount() > 0); assertTrue(toBeTested.getStats().getEvictedCount() > 0);
} }
private static class ByteArrayCacheable implements Cacheable { public static class ByteArrayCacheable implements Cacheable {
static final CacheableDeserializer<Cacheable> blockDeserializer = static final CacheableDeserializer<Cacheable> blockDeserializer =
new CacheableDeserializer<Cacheable>() { new CacheableDeserializer<Cacheable>() {
@Override @Override
public Cacheable deserialize(ByteBuffer b) throws IOException { public Cacheable deserialize(ByteBuffer b) throws IOException {
int len = b.getInt(); int len = b.getInt();

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.io.hfile.bucket; package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
@ -27,12 +28,13 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.IdLock;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -43,8 +45,7 @@ import org.junit.runners.Parameterized;
/** /**
* Basic test of BucketCache.Puts and gets. * Basic test of BucketCache.Puts and gets.
* <p> * <p>
* Tests will ensure that blocks' data correctness under several threads * Tests will ensure that blocks' data correctness under several threads concurrency
* concurrency
*/ */
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
@Category(SmallTests.class) @Category(SmallTests.class)
@ -52,15 +53,15 @@ public class TestBucketCache {
private static final Random RAND = new Random(); private static final Random RAND = new Random();
@Parameterized.Parameters(name="{index}: blockSize={0}, bucketSizes={1}") @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
public static Iterable<Object[]> data() { public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] { return Arrays.asList(new Object[][] {
{ 8192, null }, // TODO: why is 8k the default blocksize for these tests? { 8192, null }, // TODO: why is 8k the default blocksize for these tests?
{ 16 * 1024, new int[] { {
2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 16 * 1024,
28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
128 * 1024 + 1024 } } 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
}); 128 * 1024 + 1024 } } });
} }
@Parameterized.Parameter(0) @Parameterized.Parameter(0)
@ -75,7 +76,7 @@ public class TestBucketCache {
final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
final int NUM_THREADS = 1000; final int NUM_THREADS = 1000;
final int NUM_QUERIES = 10000; final int NUM_QUERIES = 10000;
final long capacitySize = 32 * 1024 * 1024; final long capacitySize = 32 * 1024 * 1024;
final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
@ -85,16 +86,16 @@ public class TestBucketCache {
private class MockedBucketCache extends BucketCache { private class MockedBucketCache extends BucketCache {
public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreads, int writerQLen, String persistencePath) int writerThreads, int writerQLen, String persistencePath) throws FileNotFoundException,
throws FileNotFoundException, IOException { IOException {
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
persistencePath); persistencePath);
super.wait_when_cache = true; super.wait_when_cache = true;
} }
@Override @Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
boolean inMemory, boolean cacheDataInL1) { boolean cacheDataInL1) {
if (super.getBlock(cacheKey, true, false, true) != null) { if (super.getBlock(cacheKey, true, false, true) != null) {
throw new RuntimeException("Cached an already cached block"); throw new RuntimeException("Cached an already cached block");
} }
@ -112,8 +113,9 @@ public class TestBucketCache {
@Before @Before
public void setup() throws FileNotFoundException, IOException { public void setup() throws FileNotFoundException, IOException {
cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, cache =
constructedBlockSizes, writeThreads, writerQLen, persistencePath); new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
} }
@After @After
@ -141,7 +143,7 @@ public class TestBucketCache {
// Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
// the cache is completely filled. // the cache is completely filled.
List<Integer> tmp = new ArrayList<Integer>(BLOCKSIZES); List<Integer> tmp = new ArrayList<Integer>(BLOCKSIZES);
for (int i = 0; !full; i++) { while (!full) {
Integer blockSize = null; Integer blockSize = null;
try { try {
blockSize = randFrom(tmp); blockSize = randFrom(tmp);
@ -155,9 +157,7 @@ public class TestBucketCache {
for (Integer blockSize : BLOCKSIZES) { for (Integer blockSize : BLOCKSIZES) {
BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
IndexStatistics indexStatistics = bucketSizeInfo.statistics(); IndexStatistics indexStatistics = bucketSizeInfo.statistics();
assertEquals( assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
"unexpected freeCount for " + bucketSizeInfo,
0, indexStatistics.freeCount());
} }
for (long offset : allocations) { for (long offset : allocations) {
@ -181,4 +181,41 @@ public class TestBucketCache {
cache.stopWriterThreads(); cache.stopWriterThreads();
CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
} }
}
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
// threads will flush it to the bucket and put reference entry in backingMap.
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
Cacheable block) throws InterruptedException {
cache.cacheBlock(cacheKey, block);
while (!cache.backingMap.containsKey(cacheKey)) {
Thread.sleep(100);
}
}
@Test
public void testMemoryLeak() throws Exception {
final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
new byte[10]));
long lockId = cache.backingMap.get(cacheKey).offset();
IdLock.Entry lockEntry = cache.offsetLock.getLockEntry(lockId);
Thread evictThread = new Thread("evict-block") {
@Override
public void run() {
cache.evictBlock(cacheKey);
}
};
evictThread.start();
cache.offsetLock.waitForWaiters(lockId, 1);
cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
new byte[10]));
cache.offsetLock.releaseLockEntry(lockEntry);
evictThread.join();
assertEquals(1L, cache.getBlockCount());
assertTrue(cache.getCurrentSize() > 0L);
assertTrue("We should have a block!", cache.iterator().hasNext());
}
}