HBASE-23066 Allow cache on write during compactions when prefetching … (#935)
* HBASE-23066 Allow cache on write during compactions when prefetching is enabled * Fix checkstyle issues - recommit
This commit is contained in:
parent
ec317a6629
commit
eee9480cb4
|
@ -81,6 +81,12 @@ public class CacheConfig {
|
||||||
*/
|
*/
|
||||||
public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = "hbase.rs.prefetchblocksonopen";
|
public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = "hbase.rs.prefetchblocksonopen";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration key to cache blocks when a compacted file is written
|
||||||
|
*/
|
||||||
|
public static final String CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY =
|
||||||
|
"hbase.rs.cachecompactedblocksonwrite";
|
||||||
|
|
||||||
public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
|
public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
|
||||||
"hbase.hfile.drop.behind.compaction";
|
"hbase.hfile.drop.behind.compaction";
|
||||||
|
|
||||||
|
@ -93,6 +99,7 @@ public class CacheConfig {
|
||||||
public static final boolean DEFAULT_EVICT_ON_CLOSE = false;
|
public static final boolean DEFAULT_EVICT_ON_CLOSE = false;
|
||||||
public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false;
|
public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false;
|
||||||
public static final boolean DEFAULT_PREFETCH_ON_OPEN = false;
|
public static final boolean DEFAULT_PREFETCH_ON_OPEN = false;
|
||||||
|
public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false;
|
||||||
public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
|
public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -124,6 +131,11 @@ public class CacheConfig {
|
||||||
/** Whether data blocks should be prefetched into the cache */
|
/** Whether data blocks should be prefetched into the cache */
|
||||||
private final boolean prefetchOnOpen;
|
private final boolean prefetchOnOpen;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether data blocks should be cached when compacted file is written
|
||||||
|
*/
|
||||||
|
private final boolean cacheCompactedDataOnWrite;
|
||||||
|
|
||||||
private final boolean dropBehindCompaction;
|
private final boolean dropBehindCompaction;
|
||||||
|
|
||||||
// Local reference to the block cache
|
// Local reference to the block cache
|
||||||
|
@ -174,6 +186,8 @@ public class CacheConfig {
|
||||||
(family == null ? false : family.isEvictBlocksOnClose());
|
(family == null ? false : family.isEvictBlocksOnClose());
|
||||||
this.prefetchOnOpen = conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) ||
|
this.prefetchOnOpen = conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) ||
|
||||||
(family == null ? false : family.isPrefetchBlocksOnOpen());
|
(family == null ? false : family.isPrefetchBlocksOnOpen());
|
||||||
|
this.cacheCompactedDataOnWrite = conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
|
||||||
|
DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE);
|
||||||
this.blockCache = blockCache;
|
this.blockCache = blockCache;
|
||||||
this.byteBuffAllocator = byteBuffAllocator;
|
this.byteBuffAllocator = byteBuffAllocator;
|
||||||
LOG.info("Created cacheConfig: " + this + (family == null ? "" : " for family " + family) +
|
LOG.info("Created cacheConfig: " + this + (family == null ? "" : " for family " + family) +
|
||||||
|
@ -193,6 +207,7 @@ public class CacheConfig {
|
||||||
this.evictOnClose = cacheConf.evictOnClose;
|
this.evictOnClose = cacheConf.evictOnClose;
|
||||||
this.cacheDataCompressed = cacheConf.cacheDataCompressed;
|
this.cacheDataCompressed = cacheConf.cacheDataCompressed;
|
||||||
this.prefetchOnOpen = cacheConf.prefetchOnOpen;
|
this.prefetchOnOpen = cacheConf.prefetchOnOpen;
|
||||||
|
this.cacheCompactedDataOnWrite = cacheConf.cacheCompactedDataOnWrite;
|
||||||
this.dropBehindCompaction = cacheConf.dropBehindCompaction;
|
this.dropBehindCompaction = cacheConf.dropBehindCompaction;
|
||||||
this.blockCache = cacheConf.blockCache;
|
this.blockCache = cacheConf.blockCache;
|
||||||
this.byteBuffAllocator = cacheConf.byteBuffAllocator;
|
this.byteBuffAllocator = cacheConf.byteBuffAllocator;
|
||||||
|
@ -207,6 +222,7 @@ public class CacheConfig {
|
||||||
this.evictOnClose = false;
|
this.evictOnClose = false;
|
||||||
this.cacheDataCompressed = false;
|
this.cacheDataCompressed = false;
|
||||||
this.prefetchOnOpen = false;
|
this.prefetchOnOpen = false;
|
||||||
|
this.cacheCompactedDataOnWrite = false;
|
||||||
this.dropBehindCompaction = false;
|
this.dropBehindCompaction = false;
|
||||||
this.blockCache = null;
|
this.blockCache = null;
|
||||||
this.byteBuffAllocator = ByteBuffAllocator.HEAP;
|
this.byteBuffAllocator = ByteBuffAllocator.HEAP;
|
||||||
|
@ -319,6 +335,13 @@ public class CacheConfig {
|
||||||
return this.prefetchOnOpen;
|
return this.prefetchOnOpen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if blocks should be cached while writing during compaction, false if not
|
||||||
|
*/
|
||||||
|
public boolean shouldCacheCompactedBlocksOnWrite() {
|
||||||
|
return this.cacheCompactedDataOnWrite;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return true if we may find this type of block in block cache.
|
* Return true if we may find this type of block in block cache.
|
||||||
* <p>
|
* <p>
|
||||||
|
|
|
@ -1118,9 +1118,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
boolean shouldDropBehind) throws IOException {
|
boolean shouldDropBehind) throws IOException {
|
||||||
final CacheConfig writerCacheConf;
|
final CacheConfig writerCacheConf;
|
||||||
if (isCompaction) {
|
if (isCompaction) {
|
||||||
// Don't cache data on write on compactions.
|
// Don't cache data on write on compactions, unless specifically configured to do so
|
||||||
writerCacheConf = new CacheConfig(cacheConf);
|
writerCacheConf = new CacheConfig(cacheConf);
|
||||||
writerCacheConf.setCacheDataOnWrite(false);
|
writerCacheConf.setCacheDataOnWrite(cacheConf.shouldCacheCompactedBlocksOnWrite());
|
||||||
} else {
|
} else {
|
||||||
writerCacheConf = cacheConf;
|
writerCacheConf = cacheConf;
|
||||||
}
|
}
|
||||||
|
|
|
@ -405,59 +405,84 @@ public class TestCacheOnWrite {
|
||||||
storeFilePath = sfw.getPath();
|
storeFilePath = sfw.getPath();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
|
private void testCachingDataBlocksDuringCompactionInternals(boolean useTags,
|
||||||
throws IOException, InterruptedException {
|
boolean cacheBlocksOnCompaction) throws IOException, InterruptedException {
|
||||||
// TODO: need to change this test if we add a cache size threshold for
|
// create a localConf
|
||||||
// compactions, or if we implement some other kind of intelligent logic for
|
boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
|
||||||
// deciding what blocks to cache-on-write on compaction.
|
false);
|
||||||
final String table = "CompactionCacheOnWrite";
|
try {
|
||||||
final String cf = "myCF";
|
// Set the conf if testing caching compacted blocks on write
|
||||||
final byte[] cfBytes = Bytes.toBytes(cf);
|
conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
|
||||||
final int maxVersions = 3;
|
cacheBlocksOnCompaction);
|
||||||
ColumnFamilyDescriptor cfd =
|
|
||||||
ColumnFamilyDescriptorBuilder.newBuilder(cfBytes).setCompressionType(compress)
|
// TODO: need to change this test if we add a cache size threshold for
|
||||||
.setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions)
|
// compactions, or if we implement some other kind of intelligent logic for
|
||||||
.setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build();
|
// deciding what blocks to cache-on-write on compaction.
|
||||||
HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache);
|
final String table = "CompactionCacheOnWrite";
|
||||||
int rowIdx = 0;
|
final String cf = "myCF";
|
||||||
long ts = EnvironmentEdgeManager.currentTime();
|
final byte[] cfBytes = Bytes.toBytes(cf);
|
||||||
for (int iFile = 0; iFile < 5; ++iFile) {
|
final int maxVersions = 3;
|
||||||
for (int iRow = 0; iRow < 500; ++iRow) {
|
ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(cfBytes)
|
||||||
String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" +
|
.setCompressionType(compress).setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions)
|
||||||
iRow;
|
.setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build();
|
||||||
Put p = new Put(Bytes.toBytes(rowStr));
|
HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache);
|
||||||
++rowIdx;
|
int rowIdx = 0;
|
||||||
for (int iCol = 0; iCol < 10; ++iCol) {
|
long ts = EnvironmentEdgeManager.currentTime();
|
||||||
String qualStr = "col" + iCol;
|
for (int iFile = 0; iFile < 5; ++iFile) {
|
||||||
String valueStr = "value_" + rowStr + "_" + qualStr;
|
for (int iRow = 0; iRow < 500; ++iRow) {
|
||||||
for (int iTS = 0; iTS < 5; ++iTS) {
|
String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + iRow;
|
||||||
if (useTags) {
|
Put p = new Put(Bytes.toBytes(rowStr));
|
||||||
Tag t = new ArrayBackedTag((byte) 1, "visibility");
|
++rowIdx;
|
||||||
Tag[] tags = new Tag[1];
|
for (int iCol = 0; iCol < 10; ++iCol) {
|
||||||
tags[0] = t;
|
String qualStr = "col" + iCol;
|
||||||
KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
|
String valueStr = "value_" + rowStr + "_" + qualStr;
|
||||||
HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
|
for (int iTS = 0; iTS < 5; ++iTS) {
|
||||||
p.add(kv);
|
if (useTags) {
|
||||||
} else {
|
Tag t = new ArrayBackedTag((byte) 1, "visibility");
|
||||||
p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
|
Tag[] tags = new Tag[1];
|
||||||
|
tags[0] = t;
|
||||||
|
KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
|
||||||
|
HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
|
||||||
|
p.add(kv);
|
||||||
|
} else {
|
||||||
|
p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
p.setDurability(Durability.ASYNC_WAL);
|
||||||
|
region.put(p);
|
||||||
}
|
}
|
||||||
p.setDurability(Durability.ASYNC_WAL);
|
region.flush(true);
|
||||||
region.put(p);
|
|
||||||
}
|
}
|
||||||
region.flush(true);
|
|
||||||
}
|
|
||||||
clearBlockCache(blockCache);
|
|
||||||
assertEquals(0, blockCache.getBlockCount());
|
|
||||||
region.compact(false);
|
|
||||||
LOG.debug("compactStores() returned");
|
|
||||||
|
|
||||||
for (CachedBlock block: blockCache) {
|
clearBlockCache(blockCache);
|
||||||
assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
|
assertEquals(0, blockCache.getBlockCount());
|
||||||
assertNotEquals(BlockType.DATA, block.getBlockType());
|
|
||||||
|
region.compact(false);
|
||||||
|
LOG.debug("compactStores() returned");
|
||||||
|
|
||||||
|
boolean dataBlockCached = false;
|
||||||
|
for (CachedBlock block : blockCache) {
|
||||||
|
if (BlockType.ENCODED_DATA.equals(block.getBlockType())
|
||||||
|
|| BlockType.DATA.equals(block.getBlockType())) {
|
||||||
|
dataBlockCached = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Data blocks should be cached in instances where we are caching blocks on write. In the case
|
||||||
|
// of testing
|
||||||
|
// BucketCache, we cannot verify block type as it is not stored in the cache.
|
||||||
|
assertTrue(
|
||||||
|
"\nTest description: " + testDescription + "\ncacheBlocksOnCompaction: "
|
||||||
|
+ cacheBlocksOnCompaction + "\n",
|
||||||
|
(cacheBlocksOnCompaction && !(blockCache instanceof BucketCache)) == dataBlockCached);
|
||||||
|
|
||||||
|
region.close();
|
||||||
|
} finally {
|
||||||
|
// reset back
|
||||||
|
conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, localValue);
|
||||||
}
|
}
|
||||||
region.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -467,8 +492,8 @@ public class TestCacheOnWrite {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
|
public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
|
||||||
testNotCachingDataBlocksDuringCompactionInternals(false);
|
testCachingDataBlocksDuringCompactionInternals(false, false);
|
||||||
testNotCachingDataBlocksDuringCompactionInternals(true);
|
testCachingDataBlocksDuringCompactionInternals(true, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue