HBASE-22606 BucketCache additional tests

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
Signed-off-by: Sakthi <sakthivel.azhaku@gmail.com>
This commit is contained in:
Viraj Jasani 2019-06-24 21:21:53 +05:30 committed by Wellington Chevreuil
parent 605f8a15bb
commit 9ac9505f2a
1 changed files with 154 additions and 44 deletions

View File

@ -110,8 +110,9 @@ public class TestBucketCache {
final long capacitySize = 32 * 1024 * 1024;
final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
String ioEngineName = "offheap";
String persistencePath = null;
private String ioEngineName = "offheap";
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
private static class MockedBucketCache extends BucketCache {
@ -136,7 +137,7 @@ public class TestBucketCache {
@Before
public void setup() throws IOException {
cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
constructedBlockSizes, writeThreads, writerQLen, null);
}
@After
@ -144,6 +145,19 @@ public class TestBucketCache {
cache.shutdown();
}
/**
* Test Utility to create test dir and return name
*
* @return return name of created dir
* @throws IOException throws IOException
*/
private Path createAndGetTestDir() throws IOException {
final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
return testDir;
}
/**
* Return a random element from {@code a}.
*/
@ -255,51 +269,128 @@ public class TestBucketCache {
@Test
public void testRetrieveFromFile() throws Exception {
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
Path testDir = createAndGetTestDir();
String ioEngineName = "file:" + testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
String persistencePath = testDir + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
// Add blocks
for (HFileBlockPair block : blocks) {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
// restore cache from file
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
// reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k)
// so it can't restore cache from file
int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 };
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
smallBucketSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
HBASE_TESTING_UTILITY.cleanupTestDir();
}
TEST_UTIL.cleanupTestDir();
@Test
public void testRetrieveFromMMap() throws Exception {
final Path testDir = createAndGetTestDir();
final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
}
@Test
public void testRetrieveFromPMem() throws Exception {
final Path testDir = createAndGetTestDir();
final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
String persistencePath = testDir + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
HBASE_TESTING_UTILITY.cleanupTestDir();
}
private void testRetrievalUtils(Path testDir, String ioEngineName)
throws IOException, InterruptedException {
final String persistencePath = testDir + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
try {
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
for (HFileBlockPair block : blocks) {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
} finally {
bucketCache.shutdown();
}
assertTrue(new File(persistencePath).exists());
}
@Test
public void testRetrieveUnsupportedIOE() throws Exception {
try {
final Path testDir = createAndGetTestDir();
final String ioEngineName = testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
} catch (IllegalArgumentException e) {
Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " +
"files:, mmap: or offheap", e.getMessage());
}
}
@Test
public void testRetrieveFromMultipleFiles() throws Exception {
final Path testDirInitial = createAndGetTestDir();
final Path newTestDir = new HBaseTestingUtility().getDataTestDir();
HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
String ioEngineName = new StringBuilder("files:").append(testDirInitial)
.append("/bucket1.cache").append(FileIOEngine.FILE_DELIMITER).append(newTestDir)
.append("/bucket2.cache").toString();
testRetrievalUtils(testDirInitial, ioEngineName);
int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
String persistencePath = testDirInitial + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
HBASE_TESTING_UTILITY.cleanupTestDir();
}
@Test
public void testRetrieveFromFileWithoutPersistence() throws Exception {
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null);
try {
final Path testDir = createAndGetTestDir();
String ioEngineName = "file:" + testDir + "/bucket.cache";
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
for (HFileBlockPair block : blocks) {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
bucketCache.shutdown();
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null);
assertEquals(0, bucketCache.getAllocator().getUsedSize());
} finally {
bucketCache.shutdown();
HBASE_TESTING_UTILITY.cleanupTestDir();
}
}
@Test
@ -322,13 +413,32 @@ public class TestBucketCache {
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
validateGetPartitionSize(cache, 0.1f, 0.5f);
validateGetPartitionSize(cache, 0.7f, 0.5f);
validateGetPartitionSize(cache, 0.2f, 0.5f);
}
@Test
public void testCacheSizeCapacity() throws IOException {
// Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
BucketCache.DEFAULT_MIN_FACTOR);
Configuration conf = HBaseConfiguration.create();
conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
try {
new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
writerQLen, null, 100, conf);
Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
} catch (IllegalArgumentException e) {
Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
}
}
@Test
public void testValidBucketCacheConfigs() throws IOException {
Configuration conf = HBaseConfiguration.create();
@ -340,7 +450,7 @@ public class TestBucketCache {
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
cache.getAcceptableFactor(), 0);
@ -408,7 +518,7 @@ public class TestBucketCache {
conf.setFloat(configName, configMap.get(configName)[i]);
}
BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
} catch (IllegalArgumentException e) {
assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);