diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 59033f42e00..e71215c0be1 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -289,7 +289,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } getSchemaMetrics().updateOnCacheMiss(blockCategory, isCompaction, delta); - // Cache the block + // Cache the block if necessary if (cacheBlock && cacheConf.shouldCacheBlockOnRead( hfileBlock.getBlockType().getCategory())) { cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index d44a9954067..d99d1585dcb 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -242,10 +242,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { HFile.writeOps.incrementAndGet(); if (cacheConf.shouldCacheDataOnWrite()) { - HFileBlock blockForCaching = fsBlockWriter.getBlockForCaching(); - passSchemaMetricsTo(blockForCaching); - cacheConf.getBlockCache().cacheBlock( - HFile.getBlockCacheKey(name, lastDataBlockOffset), blockForCaching); + doCacheOnWrite(lastDataBlockOffset); } } @@ -263,16 +260,25 @@ public class HFileWriterV2 extends AbstractHFileWriter { totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); if (cacheThisBlock) { - // Cache this block on write. - HFileBlock cBlock = fsBlockWriter.getBlockForCaching(); - passSchemaMetricsTo(cBlock); - cacheConf.getBlockCache().cacheBlock( - HFile.getBlockCacheKey(name, offset), cBlock); + doCacheOnWrite(offset); } } } } + /** + * Caches the last written HFile block. + * @param offset the offset of the block we want to cache. Used to determine + * the cache key. + */ + private void doCacheOnWrite(long offset) { + // Cache this block on write. + HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(); + passSchemaMetricsTo(cacheFormatBlock); + cacheConf.getBlockCache().cacheBlock( + HFile.getBlockCacheKey(name, offset), cacheFormatBlock); + } + /** * Ready a new block for writing. * diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 8f13999f8e9..56f5683788f 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -742,4 +743,14 @@ public class LruBlockCache implements BlockCache, HeapSize { return fileNames; } + Map getBlockTypeCountsForTest() { + Map counts = + new EnumMap(BlockType.class); + for (CachedBlock cb : map.values()) { + BlockType blockType = ((HFileBlock) cb.getBuffer()).getBlockType(); + Integer count = counts.get(blockType); + counts.put(blockType, (count == null ? 0 : count) + 1); + } + return counts; + } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 636e533df4e..0e6240f5b55 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -747,19 +747,28 @@ public class Store extends SchemaConfigured implements HeapSize { */ private StoreFile.Writer createWriterInTmp(int maxKeyCount) throws IOException { - return createWriterInTmp(maxKeyCount, this.compression); + return createWriterInTmp(maxKeyCount, this.compression, false); } /* * @param maxKeyCount * @param compression Compression algorithm to use + * @param isCompaction whether we are creating a new file in a compaction * @return Writer for a new StoreFile in the tmp dir. */ private StoreFile.Writer createWriterInTmp(int maxKeyCount, - Compression.Algorithm compression) + Compression.Algorithm compression, boolean isCompaction) throws IOException { + final CacheConfig writerCacheConf; + if (isCompaction) { + // Don't cache data on write on compactions. + writerCacheConf = new CacheConfig(cacheConf); + writerCacheConf.setCacheDataOnWrite(false); + } else { + writerCacheConf = cacheConf; + } StoreFile.Writer w = StoreFile.createWriter(fs, region.getTmpDir(), - blocksize, compression, comparator, conf, cacheConf, + blocksize, compression, comparator, conf, writerCacheConf, family.getBloomFilterType(), maxKeyCount); // The store file writer's path does not include the CF name, so we need // to configure the HFile writer directly. @@ -1428,8 +1437,8 @@ public class Store extends SchemaConfigured implements HeapSize { do { hasMore = scanner.next(kvs, this.compactionKVMax); if (writer == null && !kvs.isEmpty()) { - writer = createWriterInTmp(maxKeyCount, - this.compactionCompression); + writer = createWriterInTmp(maxKeyCount, this.compactionCompression, + true); } if (writer != null) { // output to writer: diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java b/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java index 08c5cfb2d3e..6ff4dfed5f5 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java @@ -219,9 +219,13 @@ public class SchemaMetrics { 256 - MORE_CFS_OMITTED_STR.length(); // Global variables - /** All instances of this class */ + /** + * Maps a string key consisting of table name and column family name, with + * table name optionally replaced with {@link #TOTAL_KEY} if per-table + * metrics are disabled, to an instance of this class. + */ private static final ConcurrentHashMap - cfToMetrics = new ConcurrentHashMap(); + tableAndFamilyToMetrics = new ConcurrentHashMap(); /** Metrics for all tables and column families. */ // This has to be initialized after cfToMetrics. @@ -317,14 +321,14 @@ public class SchemaMetrics { tableName = getEffectiveTableName(tableName); final String instanceKey = tableName + "\t" + cfName; - SchemaMetrics schemaMetrics = cfToMetrics.get(instanceKey); + SchemaMetrics schemaMetrics = tableAndFamilyToMetrics.get(instanceKey); if (schemaMetrics != null) { return schemaMetrics; } schemaMetrics = new SchemaMetrics(tableName, cfName); - SchemaMetrics existingMetrics = cfToMetrics.putIfAbsent(instanceKey, - schemaMetrics); + SchemaMetrics existingMetrics = + tableAndFamilyToMetrics.putIfAbsent(instanceKey, schemaMetrics); return existingMetrics != null ? existingMetrics : schemaMetrics; } @@ -720,7 +724,7 @@ public class SchemaMetrics { public static Map getMetricsSnapshot() { Map metricsSnapshot = new TreeMap(); - for (SchemaMetrics cfm : cfToMetrics.values()) { + for (SchemaMetrics cfm : tableAndFamilyToMetrics.values()) { for (String metricName : cfm.getAllMetricNames()) { long metricValue; if (isTimeVaryingKey(metricName)) { @@ -781,7 +785,7 @@ public class SchemaMetrics { final Set allKeys = new TreeSet(oldMetrics.keySet()); allKeys.addAll(newMetrics.keySet()); - for (SchemaMetrics cfm : cfToMetrics.values()) { + for (SchemaMetrics cfm : tableAndFamilyToMetrics.values()) { for (String metricName : cfm.getAllMetricNames()) { if (metricName.startsWith(CF_PREFIX + CF_PREFIX)) { throw new AssertionError("Column family prefix used twice: " + @@ -876,4 +880,16 @@ public class SchemaMetrics { useTableNameGlobally = useTableNameNew; } + /** Formats the given map of metrics in a human-readable way. */ + public static String formatMetrics(Map metrics) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : metrics.entrySet()) { + if (sb.length() > 0) { + sb.append('\n'); + } + sb.append(entry.getKey() + " : " + entry.getValue()); + } + return sb.toString(); + } + } diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 95ab8e637ef..9ed67c7e3b3 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -4487,26 +4487,27 @@ public class TestFromClientSide { assertEquals(++expectedBlockCount, cache.getBlockCount()); assertEquals(expectedBlockHits, cache.getStats().getHitCount()); assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); - // compact, net minus on block, two hits, no misses + // compact, net minus two blocks, two hits, no misses System.out.println("Compacting"); assertEquals(2, store.getNumberOfstorefiles()); store.triggerMajorCompaction(); region.compactStores(); waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max assertEquals(1, store.getNumberOfstorefiles()); - assertEquals(--expectedBlockCount, cache.getBlockCount()); + expectedBlockCount -= 2; // evicted two blocks, cached none + assertEquals(expectedBlockCount, cache.getBlockCount()); expectedBlockHits += 2; assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); assertEquals(expectedBlockHits, cache.getStats().getHitCount()); - // read the row, same blocks, one hit no miss + // read the row, this should be a cache miss because we don't cache data + // blocks on compaction r = table.get(new Get(ROW)); assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); + expectedBlockCount += 1; // cached one data block assertEquals(expectedBlockCount, cache.getBlockCount()); - assertEquals(++expectedBlockHits, cache.getStats().getHitCount()); - assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); - // no cache misses! - assertEquals(startBlockMiss, cache.getStats().getMissCount()); + assertEquals(expectedBlockHits, cache.getStats().getHitCount()); + assertEquals(++expectedBlockMiss, cache.getStats().getMissCount()); } private void waitForStoreFileCount(Store store, int count, int timeout) diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 5ee825bb85e..c9bcf188e14 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -20,11 +20,16 @@ package org.apache.hadoop.hbase.io.hfile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.EnumMap; import java.util.List; +import java.util.Map; import java.util.Random; import org.apache.commons.logging.Log; @@ -32,9 +37,18 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; +import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,7 +56,6 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import static org.junit.Assert.*; /** * Tests {@link HFile} cache-on-write functionality for the following block @@ -70,6 +83,7 @@ public class TestCacheOnWrite { private static final int NUM_KV = 25000; private static final int INDEX_BLOCK_SIZE = 512; private static final int BLOOM_BLOCK_SIZE = 4096; + private static final BloomType BLOOM_TYPE = StoreFile.BloomType.ROWCOL; /** The number of valid key types possible in a store file */ private static final int NUM_VALID_KEY_TYPES = @@ -149,7 +163,7 @@ public class TestCacheOnWrite { } @Test - public void testCacheOnWrite() throws IOException { + public void testStoreFileCacheOnWrite() throws IOException { writeStoreFile(); readStoreFile(); } @@ -215,7 +229,7 @@ public class TestCacheOnWrite { "test_cache_on_write"); StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir, DATA_BLOCK_SIZE, compress, KeyValue.COMPARATOR, conf, - cacheConf, StoreFile.BloomType.ROWCOL, NUM_KV); + cacheConf, BLOOM_TYPE, NUM_KV); final int rowLen = 32; for (int i = 0; i < NUM_KV; ++i) { @@ -236,6 +250,56 @@ public class TestCacheOnWrite { storeFilePath = sfw.getPath(); } + @Test + public void testNotCachingDataBlocksDuringCompaction() throws IOException { + // TODO: need to change this test if we add a cache size threshold for + // compactions, or if we implement some other kind of intelligent logic for + // deciding what blocks to cache-on-write on compaction. + final String table = "CompactionCacheOnWrite"; + final String cf = "myCF"; + final byte[] cfBytes = Bytes.toBytes(cf); + final int maxVersions = 3; + HRegion region = TEST_UTIL.createTestRegion(table, cf, compress, + BLOOM_TYPE, maxVersions, HColumnDescriptor.DEFAULT_BLOCKCACHE, + HFile.DEFAULT_BLOCKSIZE); + int rowIdx = 0; + long ts = EnvironmentEdgeManager.currentTimeMillis(); + for (int iFile = 0; iFile < 5; ++iFile) { + for (int iRow = 0; iRow < 500; ++iRow) { + String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + + iRow; + Put p = new Put(Bytes.toBytes(rowStr)); + ++rowIdx; + for (int iCol = 0; iCol < 10; ++iCol) { + String qualStr = "col" + iCol; + String valueStr = "value_" + rowStr + "_" + qualStr; + for (int iTS = 0; iTS < 5; ++iTS) { + p.add(cfBytes, Bytes.toBytes(qualStr), ts++, + Bytes.toBytes(valueStr)); + } + } + region.put(p); + } + region.flushcache(); + } + LruBlockCache blockCache = + (LruBlockCache) new CacheConfig(conf).getBlockCache(); + blockCache.clearCache(); + assertEquals(0, blockCache.getBlockTypeCountsForTest().size()); + Map metricsBefore = SchemaMetrics.getMetricsSnapshot(); + region.compactStores(); + LOG.debug("compactStores() returned"); + SchemaMetrics.validateMetricChanges(metricsBefore); + Map compactionMetrics = SchemaMetrics.diffMetrics( + metricsBefore, SchemaMetrics.getMetricsSnapshot()); + LOG.debug(SchemaMetrics.formatMetrics(compactionMetrics)); + Map blockTypesInCache = + blockCache.getBlockTypeCountsForTest(); + LOG.debug("Block types in cache: " + blockTypesInCache); + assertNull(blockTypesInCache.get(BlockType.DATA)); + region.close(); + blockCache.shutdown(); + } @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =