From bcd5c4d1371e32f8126349d671c7b82582311a84 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 11 Apr 2015 10:43:43 +0800 Subject: [PATCH] HBASE-13301 Possible memory leak in BucketCache Conflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java --- .../hbase/io/hfile/bucket/BucketCache.java | 182 ++++++++++-------- .../io/hfile/bucket/CachedEntryQueue.java | 20 +- .../org/apache/hadoop/hbase/util/IdLock.java | 16 ++ .../hadoop/hbase/io/hfile/CacheTestUtils.java | 6 +- .../io/hfile/bucket/TestBucketCache.java | 83 +++++--- 5 files changed, 194 insertions(+), 113 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 9435dc82b7a..45c75e36f33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -39,6 +39,7 @@ import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -109,13 +110,14 @@ public class BucketCache implements BlockCache, HeapSize { final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; // Store/read block data - IOEngine ioEngine; + final IOEngine ioEngine; // Store the block in this map before writing it to cache @VisibleForTesting - Map ramCache; + final ConcurrentMap ramCache; // In this map, store the block's meta data like offset, length - private Map backingMap; + @VisibleForTesting + ConcurrentMap backingMap; /** * Flag if the cache is enabled or not... We shut it off if there are IO @@ -132,14 +134,14 @@ public class BucketCache implements BlockCache, HeapSize { * to the BucketCache. It then updates the ramCache and backingMap accordingly. */ @VisibleForTesting - ArrayList> writerQueues = + final ArrayList> writerQueues = new ArrayList>(); @VisibleForTesting - WriterThread writerThreads[]; + final WriterThread[] writerThreads; /** Volatile boolean to track if free space is in process or not */ private volatile boolean freeInProgress = false; - private Lock freeSpaceLock = new ReentrantLock(); + private final Lock freeSpaceLock = new ReentrantLock(); private UniqueIndexMap deserialiserMap = new UniqueIndexMap(); @@ -152,17 +154,16 @@ public class BucketCache implements BlockCache, HeapSize { /** Cache access count (sequential ID) */ private final AtomicLong accessCount = new AtomicLong(0); - private final Object[] cacheWaitSignals; private static final int DEFAULT_CACHE_WAIT_TIME = 50; // Used in test now. If the flag is false and the cache speed is very fast, // bucket cache will skip some blocks when caching. If the flag is true, we // will wait blocks flushed to IOEngine for some time when caching boolean wait_when_cache = false; - private BucketCacheStats cacheStats = new BucketCacheStats(); + private final BucketCacheStats cacheStats = new BucketCacheStats(); - private String persistencePath; - private long cacheCapacity; + private final String persistencePath; + private final long cacheCapacity; /** Approximate block size */ private final long blockSize; @@ -182,7 +183,8 @@ public class BucketCache implements BlockCache, HeapSize { * * TODO:We could extend the IdLock to IdReadWriteLock for better. */ - private IdLock offsetLock = new IdLock(); + @VisibleForTesting + final IdLock offsetLock = new IdLock(); private final ConcurrentIndex blocksByHFile = new ConcurrentIndex(new Comparator() { @@ -216,7 +218,6 @@ public class BucketCache implements BlockCache, HeapSize { throws FileNotFoundException, IOException { this.ioEngine = getIOEngineFromName(ioEngineName, capacity); this.writerThreads = new WriterThread[writerThreadNum]; - this.cacheWaitSignals = new Object[writerThreadNum]; long blockNumCapacity = capacity / blockSize; if (blockNumCapacity >= Integer.MAX_VALUE) { // Enough for about 32TB of cache! @@ -231,7 +232,6 @@ public class BucketCache implements BlockCache, HeapSize { bucketAllocator = new BucketAllocator(capacity, bucketSizes); for (int i = 0; i < writerThreads.length; ++i) { writerQueues.add(new ArrayBlockingQueue(writerQLen)); - this.cacheWaitSignals[i] = new Object(); } assert writerQueues.size() == writerThreads.length; @@ -252,7 +252,7 @@ public class BucketCache implements BlockCache, HeapSize { final String threadName = Thread.currentThread().getName(); this.cacheEnabled = true; for (int i = 0; i < writerThreads.length; ++i) { - writerThreads[i] = new WriterThread(writerQueues.get(i), i); + writerThreads[i] = new WriterThread(writerQueues.get(i)); writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); writerThreads[i].setDaemon(true); } @@ -344,38 +344,39 @@ public class BucketCache implements BlockCache, HeapSize { * @param inMemory if block is in-memory * @param wait if true, blocking wait when queue is full */ - public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, - boolean inMemory, boolean wait) { - if (!cacheEnabled) + public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, + boolean wait) { + if (!cacheEnabled) { return; + } - if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) + if (backingMap.containsKey(cacheKey)) { return; + } /* - * Stuff the entry into the RAM cache so it can get drained to the - * persistent store + * Stuff the entry into the RAM cache so it can get drained to the persistent store */ - RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, - accessCount.incrementAndGet(), inMemory); - ramCache.put(cacheKey, re); + RAMQueueEntry re = + new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); + if (ramCache.putIfAbsent(cacheKey, re) != null) { + return; + } int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); BlockingQueue bq = writerQueues.get(queueNum); - boolean successfulAddition = bq.offer(re); - if (!successfulAddition && wait) { - synchronized (cacheWaitSignals[queueNum]) { - try { - successfulAddition = bq.offer(re); - if (!successfulAddition) cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } + boolean successfulAddition = false; + if (wait) { + try { + successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } + } else { successfulAddition = bq.offer(re); } if (!successfulAddition) { - ramCache.remove(cacheKey); - failedBlockAdditions.incrementAndGet(); + ramCache.remove(cacheKey); + failedBlockAdditions.incrementAndGet(); } else { this.blockNumber.incrementAndGet(); this.heapSize.addAndGet(cachedItem.heapSize()); @@ -394,11 +395,14 @@ public class BucketCache implements BlockCache, HeapSize { @Override public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, boolean updateCacheMetrics) { - if (!cacheEnabled) + if (!cacheEnabled) { return null; + } RAMQueueEntry re = ramCache.get(key); if (re != null) { - if (updateCacheMetrics) cacheStats.hit(caching); + if (updateCacheMetrics) { + cacheStats.hit(caching); + } re.access(accessCount.incrementAndGet()); return re.getData(); } @@ -408,6 +412,9 @@ public class BucketCache implements BlockCache, HeapSize { IdLock.Entry lockEntry = null; try { lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); + // We can not read here even if backingMap does contain the given key because its offset + // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check + // existence here. if (bucketEntry.equals(backingMap.get(key))) { int len = bucketEntry.getLength(); ByteBuffer bb = ByteBuffer.allocate(len); @@ -438,13 +445,27 @@ public class BucketCache implements BlockCache, HeapSize { } } } - if (!repeat && updateCacheMetrics) cacheStats.miss(caching); + if (!repeat && updateCacheMetrics) { + cacheStats.miss(caching); + } return null; } + @VisibleForTesting + void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { + bucketAllocator.freeBlock(bucketEntry.offset()); + realCacheSize.addAndGet(-1 * bucketEntry.getLength()); + blocksByHFile.remove(cacheKey.getHfileName(), cacheKey); + if (decrementBlockNumber) { + this.blockNumber.decrementAndGet(); + } + } + @Override public boolean evictBlock(BlockCacheKey cacheKey) { - if (!cacheEnabled) return false; + if (!cacheEnabled) { + return false; + } RAMQueueEntry removedBlock = ramCache.remove(cacheKey); if (removedBlock != null) { this.blockNumber.decrementAndGet(); @@ -462,13 +483,8 @@ public class BucketCache implements BlockCache, HeapSize { IdLock.Entry lockEntry = null; try { lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); - if (bucketEntry.equals(backingMap.remove(cacheKey))) { - bucketAllocator.freeBlock(bucketEntry.offset()); - realCacheSize.addAndGet(-1 * bucketEntry.getLength()); - blocksByHFile.remove(cacheKey.getHfileName(), cacheKey); - if (removedBlock == null) { - this.blockNumber.decrementAndGet(); - } + if (backingMap.remove(cacheKey, bucketEntry)) { + blockEvicted(cacheKey, bucketEntry, removedBlock == null); } else { return false; } @@ -703,13 +719,10 @@ public class BucketCache implements BlockCache, HeapSize { @VisibleForTesting class WriterThread extends HasThread { private final BlockingQueue inputQueue; - private final int threadNO; private volatile boolean writerEnabled = true; - WriterThread(BlockingQueue queue, int threadNO) { - super(); + WriterThread(BlockingQueue queue) { this.inputQueue = queue; - this.threadNO = threadNO; } // Used for test @@ -726,9 +739,6 @@ public class BucketCache implements BlockCache, HeapSize { try { // Blocks entries = getRAMQueueEntries(inputQueue, entries); - synchronized (cacheWaitSignals[threadNO]) { - cacheWaitSignals[threadNO].notifyAll(); - } } catch (InterruptedException ie) { if (!cacheEnabled) break; } @@ -753,7 +763,9 @@ public class BucketCache implements BlockCache, HeapSize { */ @VisibleForTesting void doDrain(final List entries) throws InterruptedException { - if (entries.isEmpty()) return; + if (entries.isEmpty()) { + return; + } // This method is a little hard to follow. We run through the passed in entries and for each // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must // do cleanup making sure we've cleared ramCache of all entries regardless of whether we @@ -828,6 +840,21 @@ public class BucketCache implements BlockCache, HeapSize { RAMQueueEntry ramCacheEntry = ramCache.remove(key); if (ramCacheEntry != null) { heapSize.addAndGet(-1 * entries.get(i).getData().heapSize()); + } else if (bucketEntries[i] != null){ + // Block should have already been evicted. Remove it and free space. + IdLock.Entry lockEntry = null; + try { + lockEntry = offsetLock.getLockEntry(bucketEntries[i].offset()); + if (backingMap.remove(key, bucketEntries[i])) { + blockEvicted(key, bucketEntries[i], false); + } + } catch (IOException e) { + LOG.warn("failed to free space for " + key, e); + } finally { + if (lockEntry != null) { + offsetLock.releaseLockEntry(lockEntry); + } + } } } @@ -1053,23 +1080,35 @@ public class BucketCache implements BlockCache, HeapSize { * up the long. Doubt we'll see devices this big for ages. Offsets are divided * by 256. So 5 bytes gives us 256TB or so. */ - static class BucketEntry implements Serializable, Comparable { + static class BucketEntry implements Serializable { private static final long serialVersionUID = -6741504807982257534L; + + // access counter comparator, descending order + static final Comparator COMPARATOR = new Comparator() { + + @Override + public int compare(BucketEntry o1, BucketEntry o2) { + long accessCounter1 = o1.accessCounter; + long accessCounter2 = o2.accessCounter; + return accessCounter1 < accessCounter2 ? 1 : accessCounter1 == accessCounter2 ? 0 : -1; + } + }; + private int offsetBase; private int length; private byte offset1; byte deserialiserIndex; - private volatile long accessTime; + private volatile long accessCounter; private BlockPriority priority; /** * Time this block was cached. Presumes we are created just before we are added to the cache. */ private final long cachedTime = System.nanoTime(); - BucketEntry(long offset, int length, long accessTime, boolean inMemory) { + BucketEntry(long offset, int length, long accessCounter, boolean inMemory) { setOffset(offset); this.length = length; - this.accessTime = accessTime; + this.accessCounter = accessCounter; if (inMemory) { this.priority = BlockPriority.MEMORY; } else { @@ -1108,10 +1147,10 @@ public class BucketCache implements BlockCache, HeapSize { } /** - * Block has been accessed. Update its local access time. + * Block has been accessed. Update its local access counter. */ - public void access(long accessTime) { - this.accessTime = accessTime; + public void access(long accessCounter) { + this.accessCounter = accessCounter; if (this.priority == BlockPriority.SINGLE) { this.priority = BlockPriority.MULTI; } @@ -1121,17 +1160,6 @@ public class BucketCache implements BlockCache, HeapSize { return this.priority; } - @Override - public int compareTo(BucketEntry that) { - if(this.accessTime == that.accessTime) return 0; - return this.accessTime < that.accessTime ? 1 : -1; - } - - @Override - public boolean equals(Object that) { - return this == that; - } - public long getCachedTime() { return cachedTime; } @@ -1202,14 +1230,14 @@ public class BucketCache implements BlockCache, HeapSize { static class RAMQueueEntry { private BlockCacheKey key; private Cacheable data; - private long accessTime; + private long accessCounter; private boolean inMemory; - public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessTime, + public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) { this.key = bck; this.data = data; - this.accessTime = accessTime; + this.accessCounter = accessCounter; this.inMemory = inMemory; } @@ -1221,8 +1249,8 @@ public class BucketCache implements BlockCache, HeapSize { return key; } - public void access(long accessTime) { - this.accessTime = accessTime; + public void access(long accessCounter) { + this.accessCounter = accessCounter; } public BucketEntry writeToCache(final IOEngine ioEngine, @@ -1234,7 +1262,7 @@ public class BucketCache implements BlockCache, HeapSize { // This cacheable thing can't be serialized... if (len == 0) return null; long offset = bucketAllocator.allocateBlock(len); - BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory); + BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory); bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); try { if (data instanceof HFileBlock) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java index b6954bb77c2..0e33a569f52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java @@ -54,23 +54,23 @@ public class CachedEntryQueue { */ public CachedEntryQueue(long maxSize, long blockSize) { int initialSize = (int) (maxSize / blockSize); - if (initialSize == 0) + if (initialSize == 0) { initialSize++; - queue = MinMaxPriorityQueue - .orderedBy(new Comparator>() { - public int compare(Entry entry1, - Entry entry2) { - return entry1.getValue().compareTo(entry2.getValue()); - } + } + queue = MinMaxPriorityQueue.orderedBy(new Comparator>() { - }).expectedSize(initialSize).create(); + public int compare(Entry entry1, + Entry entry2) { + return BucketEntry.COMPARATOR.compare(entry1.getValue(), entry2.getValue()); + } + + }).expectedSize(initialSize).create(); cacheSize = 0; this.maxSize = maxSize; } /** * Attempt to add the specified entry to this queue. - * *

* If the queue is smaller than the max size, or if the specified element is * ordered after the smallest element in the queue, the element will be added @@ -83,7 +83,7 @@ public class CachedEntryQueue { cacheSize += entry.getValue().getLength(); } else { BucketEntry head = queue.peek().getValue(); - if (entry.getValue().compareTo(head) > 0) { + if (BucketEntry.COMPARATOR.compare(entry.getValue(), head) > 0) { cacheSize += entry.getValue().getLength(); cacheSize -= head.getLength(); if (cacheSize > maxSize) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java index b9d0983983b..fedf951c26f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java @@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import com.google.common.annotations.VisibleForTesting; + /** * Allows multiple concurrent clients to lock on a numeric id with a minimal * memory overhead. The intended usage is as follows: @@ -119,4 +121,18 @@ public class IdLock { assert map.size() == 0; } + @VisibleForTesting + public void waitForWaiters(long id, int numWaiters) throws InterruptedException { + for (Entry entry;;) { + entry = map.get(id); + if (entry != null) { + synchronized (entry) { + if (entry.numWaiters >= numWaiters) { + return; + } + } + } + Thread.sleep(100); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index 5ef8cf0864d..b0a2ba29efc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -247,11 +247,11 @@ public class CacheTestUtils { assertTrue(toBeTested.getStats().getEvictedCount() > 0); } - private static class ByteArrayCacheable implements Cacheable { + public static class ByteArrayCacheable implements Cacheable { - static final CacheableDeserializer blockDeserializer = + static final CacheableDeserializer blockDeserializer = new CacheableDeserializer() { - + @Override public Cacheable deserialize(ByteBuffer b) throws IOException { int len = b.getInt(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 5bd77816ebe..8a3813d78da 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.FileNotFoundException; import java.io.IOException; @@ -27,12 +28,13 @@ import java.util.Arrays; import java.util.List; import java.util.Random; -import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.IdLock; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -43,8 +45,7 @@ import org.junit.runners.Parameterized; /** * Basic test of BucketCache.Puts and gets. *

- * Tests will ensure that blocks' data correctness under several threads - * concurrency + * Tests will ensure that blocks' data correctness under several threads concurrency */ @RunWith(Parameterized.class) @Category(SmallTests.class) @@ -52,15 +53,15 @@ public class TestBucketCache { private static final Random RAND = new Random(); - @Parameterized.Parameters(name="{index}: blockSize={0}, bucketSizes={1}") + @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") public static Iterable data() { return Arrays.asList(new Object[][] { - { 8192, null }, // TODO: why is 8k the default blocksize for these tests? - { 16 * 1024, new int[] { - 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, - 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, - 128 * 1024 + 1024 } } - }); + { 8192, null }, // TODO: why is 8k the default blocksize for these tests? + { + 16 * 1024, + new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, + 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, + 128 * 1024 + 1024 } } }); } @Parameterized.Parameter(0) @@ -75,7 +76,7 @@ public class TestBucketCache { final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; final int NUM_THREADS = 1000; final int NUM_QUERIES = 10000; - + final long capacitySize = 32 * 1024 * 1024; final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; @@ -85,16 +86,16 @@ public class TestBucketCache { private class MockedBucketCache extends BucketCache { public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, - int writerThreads, int writerQLen, String persistencePath) - throws FileNotFoundException, IOException { + int writerThreads, int writerQLen, String persistencePath) throws FileNotFoundException, + IOException { super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, - persistencePath); + persistencePath); super.wait_when_cache = true; } @Override - public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, - boolean inMemory, boolean cacheDataInL1) { + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, + boolean cacheDataInL1) { if (super.getBlock(cacheKey, true, false, true) != null) { throw new RuntimeException("Cached an already cached block"); } @@ -112,8 +113,9 @@ public class TestBucketCache { @Before public void setup() throws FileNotFoundException, IOException { - cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, persistencePath); + cache = + new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath); } @After @@ -141,7 +143,7 @@ public class TestBucketCache { // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until // the cache is completely filled. List tmp = new ArrayList(BLOCKSIZES); - for (int i = 0; !full; i++) { + while (!full) { Integer blockSize = null; try { blockSize = randFrom(tmp); @@ -155,9 +157,7 @@ public class TestBucketCache { for (Integer blockSize : BLOCKSIZES) { BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); IndexStatistics indexStatistics = bucketSizeInfo.statistics(); - assertEquals( - "unexpected freeCount for " + bucketSizeInfo, - 0, indexStatistics.freeCount()); + assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount()); } for (long offset : allocations) { @@ -181,4 +181,41 @@ public class TestBucketCache { cache.stopWriterThreads(); CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); } -} \ No newline at end of file + + // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer + // threads will flush it to the bucket and put reference entry in backingMap. + private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, + Cacheable block) throws InterruptedException { + cache.cacheBlock(cacheKey, block); + while (!cache.backingMap.containsKey(cacheKey)) { + Thread.sleep(100); + } + } + + @Test + public void testMemoryLeak() throws Exception { + final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); + cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable( + new byte[10])); + long lockId = cache.backingMap.get(cacheKey).offset(); + IdLock.Entry lockEntry = cache.offsetLock.getLockEntry(lockId); + Thread evictThread = new Thread("evict-block") { + + @Override + public void run() { + cache.evictBlock(cacheKey); + } + + }; + evictThread.start(); + cache.offsetLock.waitForWaiters(lockId, 1); + cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true); + cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable( + new byte[10])); + cache.offsetLock.releaseLockEntry(lockEntry); + evictThread.join(); + assertEquals(1L, cache.getBlockCount()); + assertTrue(cache.getCurrentSize() > 0L); + assertTrue("We should have a block!", cache.iterator().hasNext()); + } +}