HBASE-23631 : Backport HBASE-23350 (Make compaction files cacheonWrite configurable based on threshold) to branch-1 (#1256)

Signed-off-by: ramkrish86 <ramkrishna@apache.org>

Backport of commit: 77229c79e3
This commit is contained in:
Viraj Jasani 2020-03-10 20:35:55 +05:30 committed by GitHub
parent 1931714583
commit 957bfcb065
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 173 additions and 59 deletions

View File

@ -132,6 +132,13 @@ public class CacheConfig {
public static final String CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY =
"hbase.rs.cachecompactedblocksonwrite";
/**
* Configuration key to determine total size in bytes of compacted files beyond which we do not
* cache blocks on compaction
*/
public static final String CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY =
"hbase.rs.cachecompactedblocksonwrite.threshold";
/**
* The target block size used by blockcache instances. Defaults to
* {@link HConstants#DEFAULT_BLOCKSIZE}.
@ -144,6 +151,8 @@ public class CacheConfig {
private static final String EXTERNAL_BLOCKCACHE_CLASS_KEY="hbase.blockcache.external.class";
private static final String DROP_BEHIND_CACHE_COMPACTION_KEY="hbase.hfile.drop.behind.compaction";
private static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
public static final long DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD = Long.MAX_VALUE;
/**
* @deprecated use {@link CacheConfig#BLOCKCACHE_BLOCKSIZE_KEY} instead.
@ -235,6 +244,11 @@ public class CacheConfig {
*/
private boolean cacheDataInL1;
/**
* Determine threshold beyond which we do not cache blocks on compaction
*/
private long cacheCompactedDataOnWriteThreshold;
private final boolean dropBehindCompaction;
/**
@ -266,6 +280,7 @@ public class CacheConfig {
conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE)
);
this.cacheCompactedDataOnWriteThreshold = getCacheCompactedBlocksOnWriteThreshold(conf);
LOG.info("Created cacheConfig for " + family.getNameAsString() + ": " + this);
}
@ -293,6 +308,7 @@ public class CacheConfig {
conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE)
);
this.cacheCompactedDataOnWriteThreshold = getCacheCompactedBlocksOnWriteThreshold(conf);
LOG.info("Created cacheConfig: " + this);
}
@ -344,6 +360,7 @@ public class CacheConfig {
cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen,
cacheConf.cacheDataInL1, cacheConf.dropBehindCompaction,
cacheConf.cacheCompactedDataOnWrite);
this.cacheCompactedDataOnWriteThreshold = cacheConf.cacheCompactedDataOnWriteThreshold;
}
/**
@ -438,7 +455,6 @@ public class CacheConfig {
this.cacheDataInL1 = cacheDataInL1;
}
/**
* Enable cache on write including:
* cacheDataOnWrite
@ -451,7 +467,6 @@ public class CacheConfig {
this.cacheBloomsOnWrite = true;
}
/**
* @return true if index blocks should be written to the cache when an HFile
* is written, false if not
@ -512,6 +527,12 @@ public class CacheConfig {
return isBlockCacheEnabled() && this.prefetchOnOpen;
}
/**
* @return total file size in bytes threshold for caching while writing during compaction
*/
public long getCacheCompactedBlocksOnWriteThreshold() {
return this.cacheCompactedDataOnWriteThreshold;
}
/**
* Return true if we may find this type of block in block cache.
* <p/>
@ -554,6 +575,21 @@ public class CacheConfig {
return shouldCacheBlockOnRead(blockType.getCategory());
}
private long getCacheCompactedBlocksOnWriteThreshold(Configuration conf) {
long cacheCompactedBlocksOnWriteThreshold = conf
.getLong(CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD);
if (cacheCompactedBlocksOnWriteThreshold < 0) {
LOG.warn("cacheCompactedBlocksOnWriteThreshold value : "
+ cacheCompactedBlocksOnWriteThreshold + " is less than 0, resetting it to: "
+ DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD);
cacheCompactedBlocksOnWriteThreshold = DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD;
}
return cacheCompactedBlocksOnWriteThreshold;
}
@Override
public String toString() {
if (!isBlockCacheEnabled()) {

View File

@ -69,7 +69,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
/* includeMVCCReadpoint = */ true,
/* includesTags = */ snapshot.isTagsPresent(),
/* shouldDropBehind = */ false,
snapshot.getTimeRangeTracker());
snapshot.getTimeRangeTracker(), -1);
IOException e = null;
try {
performFlush(scanner, writer, smallestReadPoint, throughputController);

View File

@ -1079,7 +1079,7 @@ public class HStore implements Store {
boolean includesTag)
throws IOException {
return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
includesTag, false);
includesTag, false, -1);
}
/*
@ -1093,35 +1093,41 @@ public class HStore implements Store {
@Override
public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
boolean shouldDropBehind)
throws IOException {
boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException {
return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
includesTag, shouldDropBehind, null);
includesTag, shouldDropBehind, null, totalCompactedFilesSize);
}
/*
* @param maxKeyCount
/**
*
* @param maxKeyCount max key count
* @param compression Compression algorithm to use
* @param isCompaction whether we are creating a new file in a compaction
* @param includesMVCCReadPoint - whether to include MVCC or not
* @param includesTag - includesTag or not
* @param includeMVCCReadpoint - whether to include MVCC or not
* @param includesTag whether to include tag while creating FileContext
* @param shouldDropBehind should the writer drop caches behind writes
* @param trt Ready-made timetracker to use.
* @param totalCompactedFilesSize total compacted file size
* @return Writer for a new StoreFile in the tmp dir.
* @throws IOException if something goes wrong with StoreFiles
*/
@Override
public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
boolean shouldDropBehind, final TimeRangeTracker trt)
throws IOException {
final CacheConfig writerCacheConf;
boolean shouldDropBehind, final TimeRangeTracker trt, long totalCompactedFilesSize)
throws IOException {
// creating new cache config for each new writer
final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
if (isCompaction) {
// Don't cache data on write on compactions, unless specifically configured to do so
writerCacheConf = new CacheConfig(cacheConf);
// Cache only when total file size remains lower than configured threshold
final boolean cacheCompactedBlocksOnWrite =
cacheConf.shouldCacheCompactedBlocksOnWrite();
// if data blocks are to be cached on write
// during compaction, we should forcefully
// cache index and bloom blocks as well
if (cacheCompactedBlocksOnWrite) {
if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf
.getCacheCompactedBlocksOnWriteThreshold()) {
writerCacheConf.enableCacheOnWrite();
if (!cacheOnWriteLogged) {
LOG.info("For Store " + getColumnFamilyName() +
@ -1131,9 +1137,16 @@ public class HStore implements Store {
}
} else {
writerCacheConf.setCacheDataOnWrite(false);
if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
// checking condition once again for logging
LOG.debug("For Store " + getColumnFamilyName()
+ ", setting cacheCompactedBlocksOnWrite as false as total size of compacted "
+ "files - " + totalCompactedFilesSize
+ ", is greater than cacheCompactedBlocksOnWriteThreshold - "
+ cacheConf.getCacheCompactedBlocksOnWriteThreshold());
}
}
} else {
writerCacheConf = cacheConf;
final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
if (shouldCacheDataOnWrite) {
writerCacheConf.enableCacheOnWrite();

View File

@ -209,24 +209,7 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* @param isCompaction whether we are creating a new file in a compaction
* @param includeMVCCReadpoint whether we should out the MVCC readpoint
* @param shouldDropBehind should the writer drop caches behind writes
* @return Writer for a new StoreFile in the tmp dir.
*/
StoreFile.Writer createWriterInTmp(
long maxKeyCount,
Compression.Algorithm compression,
boolean isCompaction,
boolean includeMVCCReadpoint,
boolean includesTags,
boolean shouldDropBehind
) throws IOException;
/**
* @param maxKeyCount
* @param compression Compression algorithm to use
* @param isCompaction whether we are creating a new file in a compaction
* @param includeMVCCReadpoint whether we should out the MVCC readpoint
* @param shouldDropBehind should the writer drop caches behind writes
* @param trt Ready-made timetracker to use.
* @param totalCompactedFilesSize total compacted file size
* @return Writer for a new StoreFile in the tmp dir.
*/
StoreFile.Writer createWriterInTmp(
@ -236,7 +219,28 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
boolean includeMVCCReadpoint,
boolean includesTags,
boolean shouldDropBehind,
final TimeRangeTracker trt
long totalCompactedFilesSize
) throws IOException;
/**
* @param maxKeyCount
* @param compression Compression algorithm to use
* @param isCompaction whether we are creating a new file in a compaction
* @param includeMVCCReadpoint whether we should out the MVCC readpoint
* @param shouldDropBehind should the writer drop caches behind writes
* @param trt Ready-made timetracker to use.
* @param totalCompactedFilesSize total compacted file size
* @return Writer for a new StoreFile in the tmp dir.
*/
StoreFile.Writer createWriterInTmp(
long maxKeyCount,
Compression.Algorithm compression,
boolean isCompaction,
boolean includeMVCCReadpoint,
boolean includesTags,
boolean shouldDropBehind,
final TimeRangeTracker trt,
long totalCompactedFilesSize
) throws IOException;
// Compaction oriented methods

View File

@ -116,7 +116,7 @@ public class StripeStoreFlusher extends StoreFlusher {
/* includeMVCCReadpoint = */ true,
/* includesTags = */ true,
/* shouldDropBehind = */ false,
tracker);
tracker, -1);
return writer;
}
};

View File

@ -126,6 +126,8 @@ public abstract class Compactor<T extends CellSink> {
public int maxTagsLength = 0;
/** Min SeqId to keep during a major compaction **/
public long minSeqIdToKeep = 0;
/** Total size of the compacted files **/
private long totalCompactedFilesSize = 0;
}
/**
@ -162,6 +164,10 @@ public abstract class Compactor<T extends CellSink> {
fd.maxKeyCount += keyCount;
// calculate the latest MVCC readpoint in any of the involved store files
Map<byte[], byte[]> fileInfo = r.loadFileInfo();
// calculate the total size of the compacted files
fd.totalCompactedFilesSize += r.length();
byte[] tmp = null;
// Get and set the real MVCCReadpoint for bulk loaded files, which is the
// SeqId number.
@ -260,10 +266,8 @@ public abstract class Compactor<T extends CellSink> {
protected Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException {
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
/* isCompaction = */true,
/* includeMVCCReadpoint = */fd.maxMVCCReadpoint > 0,
/* includesTags = */fd.maxTagsLength > 0, shouldDropBehind);
return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize);
}
protected List<Path> compact(final CompactionRequest request,

View File

@ -5316,8 +5316,9 @@ public class TestFromClientSide {
// flush, one new block
System.out.println("Flushing cache");
region.flush(true);
// + 1 for Index Block
assertEquals(++expectedBlockCount + 1, cache.getBlockCount());
// + 1 for Index Block, +1 for data block
expectedBlockCount += 2;
assertEquals(expectedBlockCount, cache.getBlockCount());
assertEquals(expectedBlockHits, cache.getStats().getHitCount());
assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
// compact, net minus two blocks, two hits, no misses
@ -5327,7 +5328,8 @@ public class TestFromClientSide {
region.compact(true);
waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
assertEquals(1, store.getStorefilesCount());
expectedBlockCount -= 2; // evicted two blocks, cached none
// evicted two data blocks and two index blocks and compaction does not cache new blocks
expectedBlockCount = 0;
assertEquals(expectedBlockCount, cache.getBlockCount());
expectedBlockHits += 2;
assertEquals(expectedBlockMiss, cache.getStats().getMissCount());

View File

@ -116,6 +116,10 @@ public class TestCacheOnWrite {
BlockType.DATA
);
// All test cases are supposed to generate files for compaction within this range
private static final long CACHE_COMPACTION_LOW_THRESHOLD = 10L;
private static final long CACHE_COMPACTION_HIGH_THRESHOLD = 1 * 1024 * 1024 * 1024L;
/** The number of valid key types possible in a store file */
private static final int NUM_VALID_KEY_TYPES =
KeyValue.Type.values().length - 2;
@ -401,16 +405,31 @@ public class TestCacheOnWrite {
}
private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags,
boolean cacheBlocksOnCompaction)
boolean cacheBlocksOnCompaction, long cacheBlocksOnCompactionThreshold)
throws IOException, InterruptedException {
// create a localConf
boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
false);
boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, false);
long localCacheCompactedBlocksThreshold = conf
.getLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
CacheConfig.DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD);
boolean localCacheBloomBlocksValue = conf
.getBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
CacheConfig.DEFAULT_CACHE_BLOOMS_ON_WRITE);
boolean localCacheIndexBlocksValue = conf
.getBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
CacheConfig.DEFAULT_CACHE_INDEXES_ON_WRITE);
try {
// Set the conf if testing caching compacted blocks on write
conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
cacheBlocksOnCompaction);
// set size threshold if testing compaction size threshold
if (cacheBlocksOnCompactionThreshold > 0) {
conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
cacheBlocksOnCompactionThreshold);
}
// TODO: need to change this test if we add a cache size threshold for
// compactions, or if we implement some other kind of intelligent logic for
// deciding what blocks to cache-on-write on compaction.
@ -444,7 +463,9 @@ public class TestCacheOnWrite {
HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
p.add(kv);
} else {
p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
ts++, Bytes.toBytes(valueStr));
p.add(kv);
}
}
}
@ -483,17 +504,43 @@ public class TestCacheOnWrite {
"\ncacheBlocksOnCompaction: "
+ cacheBlocksOnCompaction + "\n";
assertEquals(assertErrorMessage, cacheOnCompactAndNonBucketCache, dataBlockCached);
if (cacheOnCompactAndNonBucketCache && cacheBlocksOnCompactionThreshold > 0) {
if (cacheBlocksOnCompactionThreshold == CACHE_COMPACTION_HIGH_THRESHOLD) {
assertTrue(assertErrorMessage, dataBlockCached);
assertTrue(assertErrorMessage, bloomBlockCached);
assertTrue(assertErrorMessage, indexBlockCached);
} else {
assertFalse(assertErrorMessage, dataBlockCached);
if (cacheOnCompactAndNonBucketCache) {
assertTrue(assertErrorMessage, bloomBlockCached);
assertTrue(assertErrorMessage, indexBlockCached);
if (localCacheBloomBlocksValue) {
assertTrue(assertErrorMessage, bloomBlockCached);
} else {
assertFalse(assertErrorMessage, bloomBlockCached);
}
if (localCacheIndexBlocksValue) {
assertTrue(assertErrorMessage, indexBlockCached);
} else {
assertFalse(assertErrorMessage, indexBlockCached);
}
}
} else {
assertEquals(assertErrorMessage, cacheOnCompactAndNonBucketCache, dataBlockCached);
if (cacheOnCompactAndNonBucketCache) {
assertTrue(assertErrorMessage, bloomBlockCached);
assertTrue(assertErrorMessage, indexBlockCached);
}
}
((HRegion)region).close();
} finally {
// reset back
conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, localValue);
conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
localCacheCompactedBlocksThreshold);
conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, localCacheBloomBlocksValue);
conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, localCacheIndexBlocksValue);
}
}
@ -505,7 +552,15 @@ public class TestCacheOnWrite {
@Test
public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
testNotCachingDataBlocksDuringCompactionInternals(false, false);
testNotCachingDataBlocksDuringCompactionInternals(true, true);
testNotCachingDataBlocksDuringCompactionInternals(false, false, -1);
testNotCachingDataBlocksDuringCompactionInternals(true, true, -1);
}
@Test
public void testCachingDataBlocksThresholdDuringCompaction()
throws IOException, InterruptedException {
testNotCachingDataBlocksDuringCompactionInternals(false, true,
CACHE_COMPACTION_HIGH_THRESHOLD);
testNotCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_LOW_THRESHOLD);
}
}

View File

@ -1455,7 +1455,7 @@ public class TestStore {
public void testHFileContextSetWithCFAndTable() throws Exception {
init(this.name.getMethodName());
StoreFile.Writer writer = store.createWriterInTmp(10000L,
Compression.Algorithm.NONE, false, true, false, true);
Compression.Algorithm.NONE, false, true, false, true, -1);
HFileContext hFileContext = writer.getHFileWriter().getFileContext();
assertArrayEquals(family, hFileContext.getColumnFamily());
assertArrayEquals(table, hFileContext.getTableName());
@ -1583,4 +1583,4 @@ public class TestStore {
@Override
public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
}
}
}

View File

@ -101,7 +101,7 @@ public class TestDateTieredCompactor {
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
when(store.getComparator()).thenReturn(new KVComparator());
long maxSequenceId = StoreFile.getMaxSequenceIdInList(storefiles);
when(store.getMaxSequenceId()).thenReturn(maxSequenceId);

View File

@ -766,7 +766,7 @@ public class TestStripeCompactionPolicy {
when(store.getRegionInfo()).thenReturn(info);
when(
store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);

View File

@ -202,7 +202,7 @@ public class TestStripeCompactor {
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
when(store.getComparator()).thenReturn(new KVComparator());
return new StripeCompactor(conf, store) {