HBASE-27852: Interrupt BucketCachePersister thread when BucketCache is shutdown (#5230)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
55aff4ceef
commit
e343584b50
|
@ -178,6 +178,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
|
|
||||||
private final BucketCacheStats cacheStats = new BucketCacheStats();
|
private final BucketCacheStats cacheStats = new BucketCacheStats();
|
||||||
|
|
||||||
|
/** BucketCache persister thread */
|
||||||
|
private BucketCachePersister cachePersister;
|
||||||
private final String persistencePath;
|
private final String persistencePath;
|
||||||
static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
|
static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
|
||||||
private final long cacheCapacity;
|
private final long cacheCapacity;
|
||||||
|
@ -377,8 +379,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
}
|
}
|
||||||
|
|
||||||
void startBucketCachePersisterThread() {
|
void startBucketCachePersisterThread() {
|
||||||
BucketCachePersister cachePersister =
|
cachePersister = new BucketCachePersister(this, bucketcachePersistInterval);
|
||||||
new BucketCachePersister(this, bucketcachePersistInterval);
|
|
||||||
cachePersister.setDaemon(true);
|
cachePersister.setDaemon(true);
|
||||||
cachePersister.start();
|
cachePersister.start();
|
||||||
}
|
}
|
||||||
|
@ -1416,6 +1417,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write="
|
LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write="
|
||||||
+ persistencePath);
|
+ persistencePath);
|
||||||
if (ioEngine.isPersistent() && persistencePath != null) {
|
if (ioEngine.isPersistent() && persistencePath != null) {
|
||||||
|
cachePersister.interrupt();
|
||||||
try {
|
try {
|
||||||
join();
|
join();
|
||||||
persistToFile();
|
persistToFile();
|
||||||
|
|
|
@ -44,8 +44,11 @@ public class BucketCachePersister extends Thread {
|
||||||
cache.persistToFile();
|
cache.persistToFile();
|
||||||
cache.setCacheInconsistent(false);
|
cache.setCacheInconsistent(false);
|
||||||
}
|
}
|
||||||
} catch (IOException | InterruptedException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Exception in BucketCachePersister" + e.getMessage());
|
LOG.warn("IOException in BucketCachePersister {} ", e.getMessage());
|
||||||
|
} catch (InterruptedException iex) {
|
||||||
|
LOG.warn("InterruptedException in BucketCachePersister {} ", iex.getMessage());
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,10 +29,12 @@ import java.nio.file.Files;
|
||||||
import java.nio.file.attribute.FileTime;
|
import java.nio.file.attribute.FileTime;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
|
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
|
||||||
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
@ -143,6 +145,44 @@ public class TestVerifyBucketCacheFile {
|
||||||
TEST_UTIL.cleanupTestDir();
|
TEST_UTIL.cleanupTestDir();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetrieveFromFileAfterDelete() throws Exception {
|
||||||
|
|
||||||
|
HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
|
||||||
|
Path testDir = TEST_UTIL.getDataTestDir();
|
||||||
|
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300);
|
||||||
|
|
||||||
|
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||||
|
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
|
||||||
|
testDir + "/bucket.persistence", 60 * 1000, conf);
|
||||||
|
|
||||||
|
long usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertEquals(0, usedSize);
|
||||||
|
CacheTestUtils.HFileBlockPair[] blocks =
|
||||||
|
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
|
||||||
|
// Add blocks
|
||||||
|
for (CacheTestUtils.HFileBlockPair block : blocks) {
|
||||||
|
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
|
||||||
|
}
|
||||||
|
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertNotEquals(0, usedSize);
|
||||||
|
// Shutdown BucketCache
|
||||||
|
bucketCache.shutdown();
|
||||||
|
// Delete the persistence file
|
||||||
|
final java.nio.file.Path mapFile =
|
||||||
|
FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence");
|
||||||
|
assertTrue(Files.deleteIfExists(mapFile));
|
||||||
|
Thread.sleep(350);
|
||||||
|
// Create BucketCache
|
||||||
|
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||||
|
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
|
||||||
|
testDir + "/bucket.persistence", 60 * 1000, conf);
|
||||||
|
assertEquals(0, bucketCache.getAllocator().getUsedSize());
|
||||||
|
assertEquals(0, bucketCache.backingMap.size());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test whether BucketCache is started normally after modifying the cache file. Start BucketCache
|
* Test whether BucketCache is started normally after modifying the cache file. Start BucketCache
|
||||||
* and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache
|
* and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache
|
||||||
|
|
Loading…
Reference in New Issue