HBASE-13070 Fix TestCacheOnWrite (Duo Zhang)

This commit is contained in:
tedyu 2015-02-20 08:35:29 -08:00
parent 03d8918142
commit 229b0774c0
1 changed files with 82 additions and 65 deletions

View File

@ -20,8 +20,8 @@
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
@ -29,7 +29,6 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumMap; import java.util.EnumMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -37,7 +36,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -59,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -66,6 +65,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
/** /**
* Tests {@link HFile} cache-on-write functionality for the following block * Tests {@link HFile} cache-on-write functionality for the following block
* types: data blocks, non-root index blocks, and Bloom filter blocks. * types: data blocks, non-root index blocks, and Bloom filter blocks.
@ -170,7 +171,7 @@ public class TestCacheOnWrite {
this.blockCache = blockCache; this.blockCache = blockCache;
testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
", encoderType=" + encoderType + ", cacheCompressedData=" + cacheCompressedData + "]"; ", encoderType=" + encoderType + ", cacheCompressedData=" + cacheCompressedData + "]";
System.out.println(testDescription); LOG.info(testDescription);
} }
private static List<BlockCache> getBlockCaches() throws IOException { private static List<BlockCache> getBlockCaches() throws IOException {
@ -185,10 +186,10 @@ public class TestCacheOnWrite {
// bucket cache // bucket cache
FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir()); FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir());
int[] bucketSizes = {INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024 }; int[] bucketSizes =
{ INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 };
BlockCache bucketcache = BlockCache bucketcache =
new BucketCache("file:" + TEST_UTIL.getDataTestDir() + "/bucket.data", new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null);
128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null);
blockcaches.add(bucketcache); blockcaches.add(bucketcache);
return blockcaches; return blockcaches;
} }
@ -201,7 +202,8 @@ public class TestCacheOnWrite {
for (Compression.Algorithm compress : HBaseTestingUtility.COMPRESSION_ALGORITHMS) { for (Compression.Algorithm compress : HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
for (BlockEncoderTestType encoderType : BlockEncoderTestType.values()) { for (BlockEncoderTestType encoderType : BlockEncoderTestType.values()) {
for (boolean cacheCompressedData : new boolean[] { false, true }) { for (boolean cacheCompressedData : new boolean[] { false, true }) {
cowTypes.add(new Object[] { cowType, compress, encoderType, cacheCompressedData, blockache}); cowTypes.add(new Object[] { cowType, compress, encoderType, cacheCompressedData,
blockache });
} }
} }
} }
@ -210,6 +212,31 @@ public class TestCacheOnWrite {
return cowTypes; return cowTypes;
} }
private void clearBlockCache(BlockCache blockCache) throws InterruptedException {
if (blockCache instanceof LruBlockCache) {
((LruBlockCache) blockCache).clearCache();
} else {
// BucketCache may not return all cached blocks(blocks in write queue), so check it here.
for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) {
if (clearCount > 0) {
LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, "
+ blockCache.getBlockCount() + " blocks remaining");
Thread.sleep(10);
}
for (CachedBlock block : Lists.newArrayList(blockCache)) {
BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset());
// CombinedBucketCache may need evict two times.
for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) {
if (evictCount > 1) {
LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount
+ " times, maybe a bug here");
}
}
}
}
}
}
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
conf = TEST_UTIL.getConfiguration(); conf = TEST_UTIL.getConfiguration();
@ -221,25 +248,24 @@ public class TestCacheOnWrite {
conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData); conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData);
cowType.modifyConf(conf); cowType.modifyConf(conf);
fs = HFileSystem.get(conf); fs = HFileSystem.get(conf);
CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = blockCache;
cacheConf = cacheConf =
new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA), new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA),
cowType.shouldBeCached(BlockType.LEAF_INDEX), cowType.shouldBeCached(BlockType.LEAF_INDEX),
cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, true, false); cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, false, false);
} }
@After @After
public void tearDown() { public void tearDown() throws IOException, InterruptedException {
cacheConf = new CacheConfig(conf); clearBlockCache(blockCache);
blockCache = cacheConf.getBlockCache();
} }
@Test @AfterClass
public void testStoreFileCacheOnWrite() throws IOException { public static void afterClass() throws IOException {
testStoreFileCacheOnWriteInternals(false); TEST_UTIL.cleanupTestDir();
testStoreFileCacheOnWriteInternals(true);
} }
protected void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException { private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException {
writeStoreFile(useTags); writeStoreFile(useTags);
readStoreFile(useTags); readStoreFile(useTags);
} }
@ -323,15 +349,15 @@ public class TestCacheOnWrite {
encoderType.encode ? BlockType.ENCODED_DATA : BlockType.DATA; encoderType.encode ? BlockType.ENCODED_DATA : BlockType.DATA;
if (useTags) { if (useTags) {
assertEquals("{" + cachedDataBlockType assertEquals("{" + cachedDataBlockType
+ "=1550, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=20}", countByType); + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=34}", countByType);
} else { } else {
assertEquals("{" + cachedDataBlockType assertEquals("{" + cachedDataBlockType
+ "=1379, LEAF_INDEX=154, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=18}", countByType); + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", countByType);
} }
// iterate all the keyvalue from hfile // iterate all the keyvalue from hfile
while (scanner.next()) { while (scanner.next()) {
Cell cell = scanner.getKeyValue(); scanner.getKeyValue();
} }
reader.close(); reader.close();
} }
@ -341,18 +367,16 @@ public class TestCacheOnWrite {
// Let's make half of KVs puts. // Let's make half of KVs puts.
return KeyValue.Type.Put; return KeyValue.Type.Put;
} else { } else {
KeyValue.Type keyType = KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
{ + "Probably the layout of KeyValue.Type has changed.");
throw new RuntimeException("Generated an invalid key type: " + keyType
+ ". " + "Probably the layout of KeyValue.Type has changed.");
} }
return keyType; return keyType;
} }
} }
public void writeStoreFile(boolean useTags) throws IOException { private void writeStoreFile(boolean useTags) throws IOException {
if(useTags) { if(useTags) {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
} else { } else {
@ -368,12 +392,11 @@ public class TestCacheOnWrite {
.withOutputDir(storeFileParentDir).withComparator(KeyValue.COMPARATOR) .withOutputDir(storeFileParentDir).withComparator(KeyValue.COMPARATOR)
.withFileContext(meta) .withFileContext(meta)
.withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build(); .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build();
byte[] cf = Bytes.toBytes("fam");
final int rowLen = 32;
for (int i = 0; i < NUM_KV; ++i) { for (int i = 0; i < NUM_KV; ++i) {
byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i); byte[] row = TestHFileWriterV2.randomOrderedKey(rand, i);
byte[] v = TestHFileWriterV2.randomValue(rand); byte[] qualifier = TestHFileWriterV2.randomRowOrQualifier(rand);
int cfLen = rand.nextInt(k.length - rowLen + 1); byte[] value = TestHFileWriterV2.randomValue(rand);
KeyValue kv; KeyValue kv;
if(useTags) { if(useTags) {
Tag t = new Tag((byte) 1, "visibility"); Tag t = new Tag((byte) 1, "visibility");
@ -381,21 +404,13 @@ public class TestCacheOnWrite {
tagList.add(t); tagList.add(t);
Tag[] tags = new Tag[1]; Tag[] tags = new Tag[1];
tags[0] = t; tags[0] = t;
kv = new KeyValue( kv =
k, 0, rowLen, new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
k, rowLen, cfLen, rand.nextLong(), generateKeyType(rand), value, 0, value.length, tagList);
k, rowLen + cfLen, k.length - rowLen - cfLen,
rand.nextLong(),
generateKeyType(rand),
v, 0, v.length, tagList);
} else { } else {
kv = new KeyValue( kv =
k, 0, rowLen, new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
k, rowLen, cfLen, rand.nextLong(), generateKeyType(rand), value, 0, value.length);
k, rowLen + cfLen, k.length - rowLen - cfLen,
rand.nextLong(),
generateKeyType(rand),
v, 0, v.length);
} }
sfw.append(kv); sfw.append(kv);
} }
@ -404,13 +419,8 @@ public class TestCacheOnWrite {
storeFilePath = sfw.getPath(); storeFilePath = sfw.getPath();
} }
@Test private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
public void testNotCachingDataBlocksDuringCompaction() throws IOException { throws IOException, InterruptedException {
testNotCachingDataBlocksDuringCompactionInternals(false);
testNotCachingDataBlocksDuringCompactionInternals(true);
}
protected void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags) throws IOException {
if (useTags) { if (useTags) {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
} else { } else {
@ -450,7 +460,7 @@ public class TestCacheOnWrite {
HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags); HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
p.add(kv); p.add(kv);
} else { } else {
p.add(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr)); p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
} }
} }
} }
@ -459,20 +469,27 @@ public class TestCacheOnWrite {
} }
region.flushcache(); region.flushcache();
} }
LruBlockCache blockCache = clearBlockCache(blockCache);
(LruBlockCache) new CacheConfig(conf).getBlockCache(); assertEquals(0, blockCache.getBlockCount());
blockCache.clearCache();
assertEquals(0, blockCache.getBlockTypeCountsForTest().size());
region.compactStores(); region.compactStores();
LOG.debug("compactStores() returned"); LOG.debug("compactStores() returned");
Map<BlockType, Integer> blockTypesInCache = for (CachedBlock block: blockCache) {
blockCache.getBlockTypeCountsForTest(); assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
LOG.debug("Block types in cache: " + blockTypesInCache); assertNotEquals(BlockType.DATA, block.getBlockType());
assertNull(blockTypesInCache.get(BlockType.ENCODED_DATA)); }
assertNull(blockTypesInCache.get(BlockType.DATA));
region.close(); region.close();
blockCache.shutdown(); }
@Test
public void testStoreFileCacheOnWrite() throws IOException {
testStoreFileCacheOnWriteInternals(false);
testStoreFileCacheOnWriteInternals(true);
}
@Test
public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
testNotCachingDataBlocksDuringCompactionInternals(false);
testNotCachingDataBlocksDuringCompactionInternals(true);
} }
} }