HBASE-23350 Make compaction files cacheonWrite configurable based on threshold
Signed-off-by: ramkrish86 <ramkrishna@apache.org>
This commit is contained in:
parent
76247aa21f
commit
bf924ccdaa
|
@ -87,6 +87,13 @@ public class CacheConfig {
|
|||
public static final String CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY =
|
||||
"hbase.rs.cachecompactedblocksonwrite";
|
||||
|
||||
/**
|
||||
* Configuration key to determine total size in bytes of compacted files beyond which we do not
|
||||
* cache blocks on compaction
|
||||
*/
|
||||
public static final String CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY =
|
||||
"hbase.rs.cachecompactedblocksonwrite.threshold";
|
||||
|
||||
public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
|
||||
"hbase.hfile.drop.behind.compaction";
|
||||
|
||||
|
@ -101,6 +108,7 @@ public class CacheConfig {
|
|||
public static final boolean DEFAULT_PREFETCH_ON_OPEN = false;
|
||||
public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false;
|
||||
public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
|
||||
public static final long DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD = Long.MAX_VALUE;
|
||||
|
||||
/**
|
||||
* Whether blocks should be cached on read (default is on if there is a
|
||||
|
@ -136,6 +144,11 @@ public class CacheConfig {
|
|||
*/
|
||||
private final boolean cacheCompactedDataOnWrite;
|
||||
|
||||
/**
|
||||
* Determine threshold beyond which we do not cache blocks on compaction
|
||||
*/
|
||||
private long cacheCompactedDataOnWriteThreshold;
|
||||
|
||||
private final boolean dropBehindCompaction;
|
||||
|
||||
// Local reference to the block cache
|
||||
|
@ -188,6 +201,7 @@ public class CacheConfig {
|
|||
(family == null ? false : family.isPrefetchBlocksOnOpen());
|
||||
this.cacheCompactedDataOnWrite = conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
|
||||
DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE);
|
||||
this.cacheCompactedDataOnWriteThreshold = getCacheCompactedBlocksOnWriteThreshold(conf);
|
||||
this.blockCache = blockCache;
|
||||
this.byteBuffAllocator = byteBuffAllocator;
|
||||
LOG.info("Created cacheConfig: " + this + (family == null ? "" : " for family " + family) +
|
||||
|
@ -208,6 +222,7 @@ public class CacheConfig {
|
|||
this.cacheDataCompressed = cacheConf.cacheDataCompressed;
|
||||
this.prefetchOnOpen = cacheConf.prefetchOnOpen;
|
||||
this.cacheCompactedDataOnWrite = cacheConf.cacheCompactedDataOnWrite;
|
||||
this.cacheCompactedDataOnWriteThreshold = cacheConf.cacheCompactedDataOnWriteThreshold;
|
||||
this.dropBehindCompaction = cacheConf.dropBehindCompaction;
|
||||
this.blockCache = cacheConf.blockCache;
|
||||
this.byteBuffAllocator = cacheConf.byteBuffAllocator;
|
||||
|
@ -275,7 +290,6 @@ public class CacheConfig {
|
|||
this.cacheDataOnWrite = cacheDataOnWrite;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Enable cache on write including:
|
||||
* cacheDataOnWrite
|
||||
|
@ -288,7 +302,6 @@ public class CacheConfig {
|
|||
this.cacheBloomsOnWrite = true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return true if index blocks should be written to the cache when an HFile
|
||||
* is written, false if not
|
||||
|
@ -356,6 +369,12 @@ public class CacheConfig {
|
|||
return this.cacheCompactedDataOnWrite;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total file size in bytes threshold for caching while writing during compaction
|
||||
*/
|
||||
public long getCacheCompactedBlocksOnWriteThreshold() {
|
||||
return this.cacheCompactedDataOnWriteThreshold;
|
||||
}
|
||||
/**
|
||||
* Return true if we may find this type of block in block cache.
|
||||
* <p>
|
||||
|
@ -412,6 +431,21 @@ public class CacheConfig {
|
|||
return this.byteBuffAllocator;
|
||||
}
|
||||
|
||||
private long getCacheCompactedBlocksOnWriteThreshold(Configuration conf) {
|
||||
long cacheCompactedBlocksOnWriteThreshold = conf
|
||||
.getLong(CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
|
||||
DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD);
|
||||
|
||||
if (cacheCompactedBlocksOnWriteThreshold < 0) {
|
||||
LOG.warn(
|
||||
"cacheCompactedBlocksOnWriteThreshold value : {} is less than 0, resetting it to: {}",
|
||||
cacheCompactedBlocksOnWriteThreshold, DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD);
|
||||
cacheCompactedBlocksOnWriteThreshold = DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD;
|
||||
}
|
||||
|
||||
return cacheCompactedBlocksOnWriteThreshold;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "cacheDataOnRead=" + shouldCacheDataOnRead() + ", cacheDataOnWrite="
|
||||
|
|
|
@ -1110,6 +1110,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
return sf;
|
||||
}
|
||||
|
||||
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
|
||||
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
|
||||
boolean shouldDropBehind) throws IOException {
|
||||
return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
|
||||
includesTag, shouldDropBehind, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param compression Compression algorithm to use
|
||||
* @param isCompaction whether we are creating a new file in a compaction
|
||||
|
@ -1121,17 +1128,19 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
// compaction
|
||||
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
|
||||
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
|
||||
boolean shouldDropBehind) throws IOException {
|
||||
final CacheConfig writerCacheConf;
|
||||
boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException {
|
||||
// creating new cache config for each new writer
|
||||
final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
|
||||
if (isCompaction) {
|
||||
// Don't cache data on write on compactions, unless specifically configured to do so
|
||||
writerCacheConf = new CacheConfig(cacheConf);
|
||||
// Cache only when total file size remains lower than configured threshold
|
||||
final boolean cacheCompactedBlocksOnWrite =
|
||||
cacheConf.shouldCacheCompactedBlocksOnWrite();
|
||||
// if data blocks are to be cached on write
|
||||
// during compaction, we should forcefully
|
||||
// cache index and bloom blocks as well
|
||||
if (cacheCompactedBlocksOnWrite) {
|
||||
if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf
|
||||
.getCacheCompactedBlocksOnWriteThreshold()) {
|
||||
writerCacheConf.enableCacheOnWrite();
|
||||
if (!cacheOnWriteLogged) {
|
||||
LOG.info("For Store {} , cacheCompactedBlocksOnWrite is true, hence enabled " +
|
||||
|
@ -1141,9 +1150,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
}
|
||||
} else {
|
||||
writerCacheConf.setCacheDataOnWrite(false);
|
||||
if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
|
||||
// checking condition once again for logging
|
||||
LOG.debug(
|
||||
"For Store {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted "
|
||||
+ "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
|
||||
getColumnFamilyName(), totalCompactedFilesSize,
|
||||
cacheConf.getCacheCompactedBlocksOnWriteThreshold());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
writerCacheConf = cacheConf;
|
||||
final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
|
||||
if (shouldCacheDataOnWrite) {
|
||||
writerCacheConf.enableCacheOnWrite();
|
||||
|
|
|
@ -130,6 +130,8 @@ public abstract class Compactor<T extends CellSink> {
|
|||
public int maxTagsLength = 0;
|
||||
/** Min SeqId to keep during a major compaction **/
|
||||
public long minSeqIdToKeep = 0;
|
||||
/** Total size of the compacted files **/
|
||||
private long totalCompactedFilesSize = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -166,6 +168,10 @@ public abstract class Compactor<T extends CellSink> {
|
|||
fd.maxKeyCount += keyCount;
|
||||
// calculate the latest MVCC readpoint in any of the involved store files
|
||||
Map<byte[], byte[]> fileInfo = r.loadFileInfo();
|
||||
|
||||
// calculate the total size of the compacted files
|
||||
fd.totalCompactedFilesSize += r.length();
|
||||
|
||||
byte[] tmp = null;
|
||||
// Get and set the real MVCCReadpoint for bulk loaded files, which is the
|
||||
// SeqId number.
|
||||
|
@ -260,8 +266,9 @@ public abstract class Compactor<T extends CellSink> {
|
|||
throws IOException {
|
||||
// When all MVCC readpoints are 0, don't write them.
|
||||
// See HBASE-8166, HBASE-12600, and HBASE-13389.
|
||||
return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
|
||||
fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, shouldDropBehind);
|
||||
return store
|
||||
.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0,
|
||||
fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize);
|
||||
}
|
||||
|
||||
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
|
||||
|
|
|
@ -5424,8 +5424,9 @@ public class TestFromClientSide {
|
|||
System.out.println("Flushing cache");
|
||||
region.flush(true);
|
||||
|
||||
// + 1 for Index Block
|
||||
assertEquals(++expectedBlockCount + 1, cache.getBlockCount());
|
||||
// + 1 for Index Block, +1 for data block
|
||||
expectedBlockCount += 2;
|
||||
assertEquals(expectedBlockCount, cache.getBlockCount());
|
||||
assertEquals(expectedBlockHits, cache.getStats().getHitCount());
|
||||
assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
|
||||
// compact, net minus two blocks, two hits, no misses
|
||||
|
@ -5436,7 +5437,8 @@ public class TestFromClientSide {
|
|||
store.closeAndArchiveCompactedFiles();
|
||||
waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
|
||||
assertEquals(1, store.getStorefilesCount());
|
||||
expectedBlockCount -= 2; // evicted two blocks, cached none
|
||||
// evicted two data blocks and two index blocks and compaction does not cache new blocks
|
||||
expectedBlockCount = 0;
|
||||
assertEquals(expectedBlockCount, cache.getBlockCount());
|
||||
expectedBlockHits += 2;
|
||||
assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
|
||||
|
|
|
@ -127,6 +127,10 @@ public class TestCacheOnWrite {
|
|||
BlockType.DATA
|
||||
);
|
||||
|
||||
// All test cases are supposed to generate files for compaction within this range
|
||||
private static final long CACHE_COMPACTION_LOW_THRESHOLD = 10L;
|
||||
private static final long CACHE_COMPACTION_HIGH_THRESHOLD = 1 * 1024 * 1024 * 1024L;
|
||||
|
||||
/** The number of valid key types possible in a store file */
|
||||
private static final int NUM_VALID_KEY_TYPES =
|
||||
KeyValue.Type.values().length - 2;
|
||||
|
@ -424,15 +428,31 @@ public class TestCacheOnWrite {
|
|||
}
|
||||
|
||||
private void testCachingDataBlocksDuringCompactionInternals(boolean useTags,
|
||||
boolean cacheBlocksOnCompaction) throws IOException, InterruptedException {
|
||||
boolean cacheBlocksOnCompaction, long cacheBlocksOnCompactionThreshold)
|
||||
throws IOException, InterruptedException {
|
||||
// create a localConf
|
||||
boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
|
||||
false);
|
||||
boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, false);
|
||||
long localCacheCompactedBlocksThreshold = conf
|
||||
.getLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
|
||||
CacheConfig.DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD);
|
||||
boolean localCacheBloomBlocksValue = conf
|
||||
.getBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
|
||||
CacheConfig.DEFAULT_CACHE_BLOOMS_ON_WRITE);
|
||||
boolean localCacheIndexBlocksValue = conf
|
||||
.getBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
|
||||
CacheConfig.DEFAULT_CACHE_INDEXES_ON_WRITE);
|
||||
|
||||
try {
|
||||
// Set the conf if testing caching compacted blocks on write
|
||||
conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
|
||||
cacheBlocksOnCompaction);
|
||||
|
||||
// set size threshold if testing compaction size threshold
|
||||
if (cacheBlocksOnCompactionThreshold > 0) {
|
||||
conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
|
||||
cacheBlocksOnCompactionThreshold);
|
||||
}
|
||||
|
||||
// TODO: need to change this test if we add a cache size threshold for
|
||||
// compactions, or if we implement some other kind of intelligent logic for
|
||||
// deciding what blocks to cache-on-write on compaction.
|
||||
|
@ -467,7 +487,9 @@ public class TestCacheOnWrite {
|
|||
HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
|
||||
p.add(kv);
|
||||
} else {
|
||||
p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
|
||||
KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
|
||||
ts++, Bytes.toBytes(valueStr));
|
||||
p.add(kv);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -507,11 +529,33 @@ public class TestCacheOnWrite {
|
|||
"\ncacheBlocksOnCompaction: "
|
||||
+ cacheBlocksOnCompaction + "\n";
|
||||
|
||||
assertEquals(assertErrorMessage, cacheOnCompactAndNonBucketCache, dataBlockCached);
|
||||
if (cacheOnCompactAndNonBucketCache && cacheBlocksOnCompactionThreshold > 0) {
|
||||
if (cacheBlocksOnCompactionThreshold == CACHE_COMPACTION_HIGH_THRESHOLD) {
|
||||
assertTrue(assertErrorMessage, dataBlockCached);
|
||||
assertTrue(assertErrorMessage, bloomBlockCached);
|
||||
assertTrue(assertErrorMessage, indexBlockCached);
|
||||
} else {
|
||||
assertFalse(assertErrorMessage, dataBlockCached);
|
||||
|
||||
if (cacheOnCompactAndNonBucketCache) {
|
||||
assertTrue(assertErrorMessage, bloomBlockCached);
|
||||
assertTrue(assertErrorMessage, indexBlockCached);
|
||||
if (localCacheBloomBlocksValue) {
|
||||
assertTrue(assertErrorMessage, bloomBlockCached);
|
||||
} else {
|
||||
assertFalse(assertErrorMessage, bloomBlockCached);
|
||||
}
|
||||
|
||||
if (localCacheIndexBlocksValue) {
|
||||
assertTrue(assertErrorMessage, indexBlockCached);
|
||||
} else {
|
||||
assertFalse(assertErrorMessage, indexBlockCached);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
assertEquals(assertErrorMessage, cacheOnCompactAndNonBucketCache, dataBlockCached);
|
||||
|
||||
if (cacheOnCompactAndNonBucketCache) {
|
||||
assertTrue(assertErrorMessage, bloomBlockCached);
|
||||
assertTrue(assertErrorMessage, indexBlockCached);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -519,6 +563,10 @@ public class TestCacheOnWrite {
|
|||
} finally {
|
||||
// reset back
|
||||
conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, localValue);
|
||||
conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
|
||||
localCacheCompactedBlocksThreshold);
|
||||
conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, localCacheBloomBlocksValue);
|
||||
conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, localCacheIndexBlocksValue);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -530,8 +578,15 @@ public class TestCacheOnWrite {
|
|||
|
||||
@Test
|
||||
public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
|
||||
testCachingDataBlocksDuringCompactionInternals(false, false);
|
||||
testCachingDataBlocksDuringCompactionInternals(true, true);
|
||||
testCachingDataBlocksDuringCompactionInternals(false, false, -1);
|
||||
testCachingDataBlocksDuringCompactionInternals(true, true, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCachingDataBlocksThresholdDuringCompaction()
|
||||
throws IOException, InterruptedException {
|
||||
testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_HIGH_THRESHOLD);
|
||||
testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_LOW_THRESHOLD);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -108,6 +108,8 @@ public class TestDateTieredCompactor {
|
|||
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
|
||||
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
|
||||
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
|
||||
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
|
||||
anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
|
||||
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
|
||||
OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
|
||||
when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
|
||||
|
|
|
@ -785,6 +785,9 @@ public class TestStripeCompactionPolicy {
|
|||
when(
|
||||
store.createWriterInTmp(anyLong(), any(), anyBoolean(),
|
||||
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
|
||||
when(
|
||||
store.createWriterInTmp(anyLong(), any(), anyBoolean(),
|
||||
anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
|
||||
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
|
||||
|
|
|
@ -208,6 +208,8 @@ public class TestStripeCompactor {
|
|||
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
|
||||
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
|
||||
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
|
||||
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
|
||||
anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
|
||||
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
|
||||
|
||||
return new StripeCompactor(conf, store) {
|
||||
|
|
Loading…
Reference in New Issue