HBASE-5230 : ensure that compactions do not cache-on-write data blocks

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1235882 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
mbautin 2012-01-25 19:41:07 +00:00
parent 6bcf607630
commit 07dbe903de
7 changed files with 140 additions and 33 deletions

View File

@ -289,7 +289,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
} }
getSchemaMetrics().updateOnCacheMiss(blockCategory, isCompaction, delta); getSchemaMetrics().updateOnCacheMiss(blockCategory, isCompaction, delta);
// Cache the block // Cache the block if necessary
if (cacheBlock && cacheConf.shouldCacheBlockOnRead( if (cacheBlock && cacheConf.shouldCacheBlockOnRead(
hfileBlock.getBlockType().getCategory())) { hfileBlock.getBlockType().getCategory())) {
cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,

View File

@ -242,10 +242,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
HFile.writeOps.incrementAndGet(); HFile.writeOps.incrementAndGet();
if (cacheConf.shouldCacheDataOnWrite()) { if (cacheConf.shouldCacheDataOnWrite()) {
HFileBlock blockForCaching = fsBlockWriter.getBlockForCaching(); doCacheOnWrite(lastDataBlockOffset);
passSchemaMetricsTo(blockForCaching);
cacheConf.getBlockCache().cacheBlock(
HFile.getBlockCacheKey(name, lastDataBlockOffset), blockForCaching);
} }
} }
@ -263,14 +260,23 @@ public class HFileWriterV2 extends AbstractHFileWriter {
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
if (cacheThisBlock) { if (cacheThisBlock) {
doCacheOnWrite(offset);
}
}
}
}
/**
* Caches the last written HFile block.
* @param offset the offset of the block we want to cache. Used to determine
* the cache key.
*/
private void doCacheOnWrite(long offset) {
// Cache this block on write. // Cache this block on write.
HFileBlock cBlock = fsBlockWriter.getBlockForCaching(); HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching();
passSchemaMetricsTo(cBlock); passSchemaMetricsTo(cacheFormatBlock);
cacheConf.getBlockCache().cacheBlock( cacheConf.getBlockCache().cacheBlock(
HFile.getBlockCacheKey(name, offset), cBlock); HFile.getBlockCacheKey(name, offset), cacheFormatBlock);
}
}
}
} }
/** /**

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -742,4 +743,14 @@ public class LruBlockCache implements BlockCache, HeapSize {
return fileNames; return fileNames;
} }
Map<BlockType, Integer> getBlockTypeCountsForTest() {
Map<BlockType, Integer> counts =
new EnumMap<BlockType, Integer>(BlockType.class);
for (CachedBlock cb : map.values()) {
BlockType blockType = ((HFileBlock) cb.getBuffer()).getBlockType();
Integer count = counts.get(blockType);
counts.put(blockType, (count == null ? 0 : count) + 1);
}
return counts;
}
} }

View File

@ -747,19 +747,28 @@ public class Store extends SchemaConfigured implements HeapSize {
*/ */
private StoreFile.Writer createWriterInTmp(int maxKeyCount) private StoreFile.Writer createWriterInTmp(int maxKeyCount)
throws IOException { throws IOException {
return createWriterInTmp(maxKeyCount, this.compression); return createWriterInTmp(maxKeyCount, this.compression, false);
} }
/* /*
* @param maxKeyCount * @param maxKeyCount
* @param compression Compression algorithm to use * @param compression Compression algorithm to use
* @param isCompaction whether we are creating a new file in a compaction
* @return Writer for a new StoreFile in the tmp dir. * @return Writer for a new StoreFile in the tmp dir.
*/ */
private StoreFile.Writer createWriterInTmp(int maxKeyCount, private StoreFile.Writer createWriterInTmp(int maxKeyCount,
Compression.Algorithm compression) Compression.Algorithm compression, boolean isCompaction)
throws IOException { throws IOException {
final CacheConfig writerCacheConf;
if (isCompaction) {
// Don't cache data on write on compactions.
writerCacheConf = new CacheConfig(cacheConf);
writerCacheConf.setCacheDataOnWrite(false);
} else {
writerCacheConf = cacheConf;
}
StoreFile.Writer w = StoreFile.createWriter(fs, region.getTmpDir(), StoreFile.Writer w = StoreFile.createWriter(fs, region.getTmpDir(),
blocksize, compression, comparator, conf, cacheConf, blocksize, compression, comparator, conf, writerCacheConf,
family.getBloomFilterType(), maxKeyCount); family.getBloomFilterType(), maxKeyCount);
// The store file writer's path does not include the CF name, so we need // The store file writer's path does not include the CF name, so we need
// to configure the HFile writer directly. // to configure the HFile writer directly.
@ -1428,8 +1437,8 @@ public class Store extends SchemaConfigured implements HeapSize {
do { do {
hasMore = scanner.next(kvs, this.compactionKVMax); hasMore = scanner.next(kvs, this.compactionKVMax);
if (writer == null && !kvs.isEmpty()) { if (writer == null && !kvs.isEmpty()) {
writer = createWriterInTmp(maxKeyCount, writer = createWriterInTmp(maxKeyCount, this.compactionCompression,
this.compactionCompression); true);
} }
if (writer != null) { if (writer != null) {
// output to writer: // output to writer:

View File

@ -219,9 +219,13 @@ public class SchemaMetrics {
256 - MORE_CFS_OMITTED_STR.length(); 256 - MORE_CFS_OMITTED_STR.length();
// Global variables // Global variables
/** All instances of this class */ /**
* Maps a string key consisting of table name and column family name, with
* table name optionally replaced with {@link #TOTAL_KEY} if per-table
* metrics are disabled, to an instance of this class.
*/
private static final ConcurrentHashMap<String, SchemaMetrics> private static final ConcurrentHashMap<String, SchemaMetrics>
cfToMetrics = new ConcurrentHashMap<String, SchemaMetrics>(); tableAndFamilyToMetrics = new ConcurrentHashMap<String, SchemaMetrics>();
/** Metrics for all tables and column families. */ /** Metrics for all tables and column families. */
// This has to be initialized after cfToMetrics. // This has to be initialized after cfToMetrics.
@ -317,14 +321,14 @@ public class SchemaMetrics {
tableName = getEffectiveTableName(tableName); tableName = getEffectiveTableName(tableName);
final String instanceKey = tableName + "\t" + cfName; final String instanceKey = tableName + "\t" + cfName;
SchemaMetrics schemaMetrics = cfToMetrics.get(instanceKey); SchemaMetrics schemaMetrics = tableAndFamilyToMetrics.get(instanceKey);
if (schemaMetrics != null) { if (schemaMetrics != null) {
return schemaMetrics; return schemaMetrics;
} }
schemaMetrics = new SchemaMetrics(tableName, cfName); schemaMetrics = new SchemaMetrics(tableName, cfName);
SchemaMetrics existingMetrics = cfToMetrics.putIfAbsent(instanceKey, SchemaMetrics existingMetrics =
schemaMetrics); tableAndFamilyToMetrics.putIfAbsent(instanceKey, schemaMetrics);
return existingMetrics != null ? existingMetrics : schemaMetrics; return existingMetrics != null ? existingMetrics : schemaMetrics;
} }
@ -720,7 +724,7 @@ public class SchemaMetrics {
public static Map<String, Long> getMetricsSnapshot() { public static Map<String, Long> getMetricsSnapshot() {
Map<String, Long> metricsSnapshot = new TreeMap<String, Long>(); Map<String, Long> metricsSnapshot = new TreeMap<String, Long>();
for (SchemaMetrics cfm : cfToMetrics.values()) { for (SchemaMetrics cfm : tableAndFamilyToMetrics.values()) {
for (String metricName : cfm.getAllMetricNames()) { for (String metricName : cfm.getAllMetricNames()) {
long metricValue; long metricValue;
if (isTimeVaryingKey(metricName)) { if (isTimeVaryingKey(metricName)) {
@ -781,7 +785,7 @@ public class SchemaMetrics {
final Set<String> allKeys = new TreeSet<String>(oldMetrics.keySet()); final Set<String> allKeys = new TreeSet<String>(oldMetrics.keySet());
allKeys.addAll(newMetrics.keySet()); allKeys.addAll(newMetrics.keySet());
for (SchemaMetrics cfm : cfToMetrics.values()) { for (SchemaMetrics cfm : tableAndFamilyToMetrics.values()) {
for (String metricName : cfm.getAllMetricNames()) { for (String metricName : cfm.getAllMetricNames()) {
if (metricName.startsWith(CF_PREFIX + CF_PREFIX)) { if (metricName.startsWith(CF_PREFIX + CF_PREFIX)) {
throw new AssertionError("Column family prefix used twice: " + throw new AssertionError("Column family prefix used twice: " +
@ -876,4 +880,16 @@ public class SchemaMetrics {
useTableNameGlobally = useTableNameNew; useTableNameGlobally = useTableNameNew;
} }
/** Formats the given map of metrics in a human-readable way. */
public static String formatMetrics(Map<String, Long> metrics) {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, Long> entry : metrics.entrySet()) {
if (sb.length() > 0) {
sb.append('\n');
}
sb.append(entry.getKey() + " : " + entry.getValue());
}
return sb.toString();
}
} }

View File

@ -4487,26 +4487,27 @@ public class TestFromClientSide {
assertEquals(++expectedBlockCount, cache.getBlockCount()); assertEquals(++expectedBlockCount, cache.getBlockCount());
assertEquals(expectedBlockHits, cache.getStats().getHitCount()); assertEquals(expectedBlockHits, cache.getStats().getHitCount());
assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
// compact, net minus on block, two hits, no misses // compact, net minus two blocks, two hits, no misses
System.out.println("Compacting"); System.out.println("Compacting");
assertEquals(2, store.getNumberOfstorefiles()); assertEquals(2, store.getNumberOfstorefiles());
store.triggerMajorCompaction(); store.triggerMajorCompaction();
region.compactStores(); region.compactStores();
waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
assertEquals(1, store.getNumberOfstorefiles()); assertEquals(1, store.getNumberOfstorefiles());
assertEquals(--expectedBlockCount, cache.getBlockCount()); expectedBlockCount -= 2; // evicted two blocks, cached none
assertEquals(expectedBlockCount, cache.getBlockCount());
expectedBlockHits += 2; expectedBlockHits += 2;
assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
assertEquals(expectedBlockHits, cache.getStats().getHitCount()); assertEquals(expectedBlockHits, cache.getStats().getHitCount());
// read the row, same blocks, one hit no miss // read the row, this should be a cache miss because we don't cache data
// blocks on compaction
r = table.get(new Get(ROW)); r = table.get(new Get(ROW));
assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
expectedBlockCount += 1; // cached one data block
assertEquals(expectedBlockCount, cache.getBlockCount()); assertEquals(expectedBlockCount, cache.getBlockCount());
assertEquals(++expectedBlockHits, cache.getStats().getHitCount()); assertEquals(expectedBlockHits, cache.getStats().getHitCount());
assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); assertEquals(++expectedBlockMiss, cache.getStats().getMissCount());
// no cache misses!
assertEquals(startBlockMiss, cache.getStats().getMissCount());
} }
private void waitForStoreFileCount(Store store, int count, int timeout) private void waitForStoreFileCount(Store store, int count, int timeout)

View File

@ -20,11 +20,16 @@
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumMap; import java.util.EnumMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -32,9 +37,18 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -42,7 +56,6 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
import static org.junit.Assert.*;
/** /**
* Tests {@link HFile} cache-on-write functionality for the following block * Tests {@link HFile} cache-on-write functionality for the following block
@ -70,6 +83,7 @@ public class TestCacheOnWrite {
private static final int NUM_KV = 25000; private static final int NUM_KV = 25000;
private static final int INDEX_BLOCK_SIZE = 512; private static final int INDEX_BLOCK_SIZE = 512;
private static final int BLOOM_BLOCK_SIZE = 4096; private static final int BLOOM_BLOCK_SIZE = 4096;
private static final BloomType BLOOM_TYPE = StoreFile.BloomType.ROWCOL;
/** The number of valid key types possible in a store file */ /** The number of valid key types possible in a store file */
private static final int NUM_VALID_KEY_TYPES = private static final int NUM_VALID_KEY_TYPES =
@ -149,7 +163,7 @@ public class TestCacheOnWrite {
} }
@Test @Test
public void testCacheOnWrite() throws IOException { public void testStoreFileCacheOnWrite() throws IOException {
writeStoreFile(); writeStoreFile();
readStoreFile(); readStoreFile();
} }
@ -215,7 +229,7 @@ public class TestCacheOnWrite {
"test_cache_on_write"); "test_cache_on_write");
StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir, StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir,
DATA_BLOCK_SIZE, compress, KeyValue.COMPARATOR, conf, DATA_BLOCK_SIZE, compress, KeyValue.COMPARATOR, conf,
cacheConf, StoreFile.BloomType.ROWCOL, NUM_KV); cacheConf, BLOOM_TYPE, NUM_KV);
final int rowLen = 32; final int rowLen = 32;
for (int i = 0; i < NUM_KV; ++i) { for (int i = 0; i < NUM_KV; ++i) {
@ -236,6 +250,56 @@ public class TestCacheOnWrite {
storeFilePath = sfw.getPath(); storeFilePath = sfw.getPath();
} }
@Test
public void testNotCachingDataBlocksDuringCompaction() throws IOException {
// TODO: need to change this test if we add a cache size threshold for
// compactions, or if we implement some other kind of intelligent logic for
// deciding what blocks to cache-on-write on compaction.
final String table = "CompactionCacheOnWrite";
final String cf = "myCF";
final byte[] cfBytes = Bytes.toBytes(cf);
final int maxVersions = 3;
HRegion region = TEST_UTIL.createTestRegion(table, cf, compress,
BLOOM_TYPE, maxVersions, HColumnDescriptor.DEFAULT_BLOCKCACHE,
HFile.DEFAULT_BLOCKSIZE);
int rowIdx = 0;
long ts = EnvironmentEdgeManager.currentTimeMillis();
for (int iFile = 0; iFile < 5; ++iFile) {
for (int iRow = 0; iRow < 500; ++iRow) {
String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" +
iRow;
Put p = new Put(Bytes.toBytes(rowStr));
++rowIdx;
for (int iCol = 0; iCol < 10; ++iCol) {
String qualStr = "col" + iCol;
String valueStr = "value_" + rowStr + "_" + qualStr;
for (int iTS = 0; iTS < 5; ++iTS) {
p.add(cfBytes, Bytes.toBytes(qualStr), ts++,
Bytes.toBytes(valueStr));
}
}
region.put(p);
}
region.flushcache();
}
LruBlockCache blockCache =
(LruBlockCache) new CacheConfig(conf).getBlockCache();
blockCache.clearCache();
assertEquals(0, blockCache.getBlockTypeCountsForTest().size());
Map<String, Long> metricsBefore = SchemaMetrics.getMetricsSnapshot();
region.compactStores();
LOG.debug("compactStores() returned");
SchemaMetrics.validateMetricChanges(metricsBefore);
Map<String, Long> compactionMetrics = SchemaMetrics.diffMetrics(
metricsBefore, SchemaMetrics.getMetricsSnapshot());
LOG.debug(SchemaMetrics.formatMetrics(compactionMetrics));
Map<BlockType, Integer> blockTypesInCache =
blockCache.getBlockTypeCountsForTest();
LOG.debug("Block types in cache: " + blockTypesInCache);
assertNull(blockTypesInCache.get(BlockType.DATA));
region.close();
blockCache.shutdown();
}
@org.junit.Rule @org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =