diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 2558e1e3ebf..dfce7914bc3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -87,6 +87,13 @@ public class CacheConfig { public static final String CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY = "hbase.rs.cachecompactedblocksonwrite"; + /** + * Configuration key to determine total size in bytes of compacted files beyond which we do not + * cache blocks on compaction + */ + public static final String CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY = + "hbase.rs.cachecompactedblocksonwrite.threshold"; + public static final String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction"; @@ -101,6 +108,7 @@ public class CacheConfig { public static final boolean DEFAULT_PREFETCH_ON_OPEN = false; public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false; public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true; + public static final long DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD = Long.MAX_VALUE; /** * Whether blocks should be cached on read (default is on if there is a @@ -136,6 +144,11 @@ public class CacheConfig { */ private final boolean cacheCompactedDataOnWrite; + /** + * Determine threshold beyond which we do not cache blocks on compaction + */ + private long cacheCompactedDataOnWriteThreshold; + private final boolean dropBehindCompaction; // Local reference to the block cache @@ -188,6 +201,7 @@ public class CacheConfig { (family == null ? false : family.isPrefetchBlocksOnOpen()); this.cacheCompactedDataOnWrite = conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE); + this.cacheCompactedDataOnWriteThreshold = getCacheCompactedBlocksOnWriteThreshold(conf); this.blockCache = blockCache; this.byteBuffAllocator = byteBuffAllocator; LOG.info("Created cacheConfig: " + this + (family == null ? "" : " for family " + family) + @@ -208,6 +222,7 @@ public class CacheConfig { this.cacheDataCompressed = cacheConf.cacheDataCompressed; this.prefetchOnOpen = cacheConf.prefetchOnOpen; this.cacheCompactedDataOnWrite = cacheConf.cacheCompactedDataOnWrite; + this.cacheCompactedDataOnWriteThreshold = cacheConf.cacheCompactedDataOnWriteThreshold; this.dropBehindCompaction = cacheConf.dropBehindCompaction; this.blockCache = cacheConf.blockCache; this.byteBuffAllocator = cacheConf.byteBuffAllocator; @@ -275,7 +290,6 @@ public class CacheConfig { this.cacheDataOnWrite = cacheDataOnWrite; } - /** * Enable cache on write including: * cacheDataOnWrite @@ -288,7 +302,6 @@ public class CacheConfig { this.cacheBloomsOnWrite = true; } - /** * @return true if index blocks should be written to the cache when an HFile * is written, false if not @@ -356,6 +369,12 @@ public class CacheConfig { return this.cacheCompactedDataOnWrite; } + /** + * @return total file size in bytes threshold for caching while writing during compaction + */ + public long getCacheCompactedBlocksOnWriteThreshold() { + return this.cacheCompactedDataOnWriteThreshold; + } /** * Return true if we may find this type of block in block cache. *

@@ -412,6 +431,21 @@ public class CacheConfig { return this.byteBuffAllocator; } + private long getCacheCompactedBlocksOnWriteThreshold(Configuration conf) { + long cacheCompactedBlocksOnWriteThreshold = conf + .getLong(CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, + DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD); + + if (cacheCompactedBlocksOnWriteThreshold < 0) { + LOG.warn( + "cacheCompactedBlocksOnWriteThreshold value : {} is less than 0, resetting it to: {}", + cacheCompactedBlocksOnWriteThreshold, DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD); + cacheCompactedBlocksOnWriteThreshold = DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD; + } + + return cacheCompactedBlocksOnWriteThreshold; + } + @Override public String toString() { return "cacheDataOnRead=" + shouldCacheDataOnRead() + ", cacheDataOnWrite=" diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index a0ad3e86166..167bb3ee743 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1110,6 +1110,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, return sf; } + public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, + boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, + boolean shouldDropBehind) throws IOException { + return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint, + includesTag, shouldDropBehind, -1); + } + /** * @param compression Compression algorithm to use * @param isCompaction whether we are creating a new file in a compaction @@ -1121,17 +1128,19 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, // compaction public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, - boolean shouldDropBehind) throws IOException { - final CacheConfig writerCacheConf; + boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException { + // creating new cache config for each new writer + final CacheConfig writerCacheConf = new CacheConfig(cacheConf); if (isCompaction) { // Don't cache data on write on compactions, unless specifically configured to do so - writerCacheConf = new CacheConfig(cacheConf); + // Cache only when total file size remains lower than configured threshold final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite(); // if data blocks are to be cached on write // during compaction, we should forcefully // cache index and bloom blocks as well - if (cacheCompactedBlocksOnWrite) { + if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf + .getCacheCompactedBlocksOnWriteThreshold()) { writerCacheConf.enableCacheOnWrite(); if (!cacheOnWriteLogged) { LOG.info("For Store {} , cacheCompactedBlocksOnWrite is true, hence enabled " + @@ -1141,9 +1150,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, } } else { writerCacheConf.setCacheDataOnWrite(false); + if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { + // checking condition once again for logging + LOG.debug( + "For Store {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " + + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}", + getColumnFamilyName(), totalCompactedFilesSize, + cacheConf.getCacheCompactedBlocksOnWriteThreshold()); + } } } else { - writerCacheConf = cacheConf; final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); if (shouldCacheDataOnWrite) { writerCacheConf.enableCacheOnWrite(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 63bf1305d8e..10fac550180 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -130,6 +130,8 @@ public abstract class Compactor { public int maxTagsLength = 0; /** Min SeqId to keep during a major compaction **/ public long minSeqIdToKeep = 0; + /** Total size of the compacted files **/ + private long totalCompactedFilesSize = 0; } /** @@ -166,6 +168,10 @@ public abstract class Compactor { fd.maxKeyCount += keyCount; // calculate the latest MVCC readpoint in any of the involved store files Map fileInfo = r.loadFileInfo(); + + // calculate the total size of the compacted files + fd.totalCompactedFilesSize += r.length(); + byte[] tmp = null; // Get and set the real MVCCReadpoint for bulk loaded files, which is the // SeqId number. @@ -260,8 +266,9 @@ public abstract class Compactor { throws IOException { // When all MVCC readpoints are 0, don't write them. // See HBASE-8166, HBASE-12600, and HBASE-13389. - return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, - fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, shouldDropBehind); + return store + .createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0, + fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize); } private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 622bc3971c3..5cb9ffd188d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -5424,8 +5424,9 @@ public class TestFromClientSide { System.out.println("Flushing cache"); region.flush(true); - // + 1 for Index Block - assertEquals(++expectedBlockCount + 1, cache.getBlockCount()); + // + 1 for Index Block, +1 for data block + expectedBlockCount += 2; + assertEquals(expectedBlockCount, cache.getBlockCount()); assertEquals(expectedBlockHits, cache.getStats().getHitCount()); assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); // compact, net minus two blocks, two hits, no misses @@ -5436,7 +5437,8 @@ public class TestFromClientSide { store.closeAndArchiveCompactedFiles(); waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max assertEquals(1, store.getStorefilesCount()); - expectedBlockCount -= 2; // evicted two blocks, cached none + // evicted two data blocks and two index blocks and compaction does not cache new blocks + expectedBlockCount = 0; assertEquals(expectedBlockCount, cache.getBlockCount()); expectedBlockHits += 2; assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 609ff9d3c09..119d26cec86 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -127,6 +127,10 @@ public class TestCacheOnWrite { BlockType.DATA ); + // All test cases are supposed to generate files for compaction within this range + private static final long CACHE_COMPACTION_LOW_THRESHOLD = 10L; + private static final long CACHE_COMPACTION_HIGH_THRESHOLD = 1 * 1024 * 1024 * 1024L; + /** The number of valid key types possible in a store file */ private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; @@ -424,15 +428,31 @@ public class TestCacheOnWrite { } private void testCachingDataBlocksDuringCompactionInternals(boolean useTags, - boolean cacheBlocksOnCompaction) throws IOException, InterruptedException { + boolean cacheBlocksOnCompaction, long cacheBlocksOnCompactionThreshold) + throws IOException, InterruptedException { // create a localConf - boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, - false); + boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, false); + long localCacheCompactedBlocksThreshold = conf + .getLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, + CacheConfig.DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD); + boolean localCacheBloomBlocksValue = conf + .getBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, + CacheConfig.DEFAULT_CACHE_BLOOMS_ON_WRITE); + boolean localCacheIndexBlocksValue = conf + .getBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, + CacheConfig.DEFAULT_CACHE_INDEXES_ON_WRITE); + try { // Set the conf if testing caching compacted blocks on write conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, cacheBlocksOnCompaction); + // set size threshold if testing compaction size threshold + if (cacheBlocksOnCompactionThreshold > 0) { + conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, + cacheBlocksOnCompactionThreshold); + } + // TODO: need to change this test if we add a cache size threshold for // compactions, or if we implement some other kind of intelligent logic for // deciding what blocks to cache-on-write on compaction. @@ -467,7 +487,9 @@ public class TestCacheOnWrite { HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags); p.add(kv); } else { - p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr)); + KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), + ts++, Bytes.toBytes(valueStr)); + p.add(kv); } } } @@ -507,11 +529,33 @@ public class TestCacheOnWrite { "\ncacheBlocksOnCompaction: " + cacheBlocksOnCompaction + "\n"; - assertEquals(assertErrorMessage, cacheOnCompactAndNonBucketCache, dataBlockCached); + if (cacheOnCompactAndNonBucketCache && cacheBlocksOnCompactionThreshold > 0) { + if (cacheBlocksOnCompactionThreshold == CACHE_COMPACTION_HIGH_THRESHOLD) { + assertTrue(assertErrorMessage, dataBlockCached); + assertTrue(assertErrorMessage, bloomBlockCached); + assertTrue(assertErrorMessage, indexBlockCached); + } else { + assertFalse(assertErrorMessage, dataBlockCached); - if (cacheOnCompactAndNonBucketCache) { - assertTrue(assertErrorMessage, bloomBlockCached); - assertTrue(assertErrorMessage, indexBlockCached); + if (localCacheBloomBlocksValue) { + assertTrue(assertErrorMessage, bloomBlockCached); + } else { + assertFalse(assertErrorMessage, bloomBlockCached); + } + + if (localCacheIndexBlocksValue) { + assertTrue(assertErrorMessage, indexBlockCached); + } else { + assertFalse(assertErrorMessage, indexBlockCached); + } + } + } else { + assertEquals(assertErrorMessage, cacheOnCompactAndNonBucketCache, dataBlockCached); + + if (cacheOnCompactAndNonBucketCache) { + assertTrue(assertErrorMessage, bloomBlockCached); + assertTrue(assertErrorMessage, indexBlockCached); + } } @@ -519,6 +563,10 @@ public class TestCacheOnWrite { } finally { // reset back conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, localValue); + conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, + localCacheCompactedBlocksThreshold); + conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, localCacheBloomBlocksValue); + conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, localCacheIndexBlocksValue); } } @@ -530,8 +578,15 @@ public class TestCacheOnWrite { @Test public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { - testCachingDataBlocksDuringCompactionInternals(false, false); - testCachingDataBlocksDuringCompactionInternals(true, true); + testCachingDataBlocksDuringCompactionInternals(false, false, -1); + testCachingDataBlocksDuringCompactionInternals(true, true, -1); + } + + @Test + public void testCachingDataBlocksThresholdDuringCompaction() + throws IOException, InterruptedException { + testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_HIGH_THRESHOLD); + testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_LOW_THRESHOLD); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java index 698dc816c1f..812ee4b416e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java @@ -108,6 +108,8 @@ public class TestDateTieredCompactor { when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME)); when(store.createWriterInTmp(anyLong(), any(), anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); + when(store.createWriterInTmp(anyLong(), any(), anyBoolean(), + anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers); when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR); OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles); when(store.getMaxSequenceId()).thenReturn(maxSequenceId); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index aee3dc64361..3a64c8458f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -785,6 +785,9 @@ public class TestStripeCompactionPolicy { when( store.createWriterInTmp(anyLong(), any(), anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); + when( + store.createWriterInTmp(anyLong(), any(), anyBoolean(), + anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers); Configuration conf = HBaseConfiguration.create(); conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java index 8b5df7262ac..02212743a78 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java @@ -208,6 +208,8 @@ public class TestStripeCompactor { when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME)); when(store.createWriterInTmp(anyLong(), any(), anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); + when(store.createWriterInTmp(anyLong(), any(), anyBoolean(), + anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers); when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR); return new StripeCompactor(conf, store) {