HBASE-26274 Create an option to reintroduce BlockCache to mapreduce job (#3684)

Introduce `hfile.onheap.block.cache.fixed.size`
and default to disable. when using ClientSideRegionScanner
it will be enabled with a fixed size for caching 
INDEX/LEAF_INDEX block when a client, e.g. 
snapshot scanner, scans the entire HFile
and does not need to seek/reseek to index
block multiple times.
This commit is contained in:
Tak Lon (Stephen) Wu 2021-09-22 09:17:18 -07:00 committed by GitHub
parent 96fa015043
commit 6556a5ee91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 238 additions and 8 deletions

View File

@ -1049,6 +1049,17 @@ public final class HConstants {
public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.4f; public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.4f;
/**
* Configuration key for setting the fix size of the block size, default do nothing and it should
* be explicitly set by user or only used within ClientSideRegionScanner. if it's set less than
* current max on heap size, it overrides the max size of block cache
*/
public static final String HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY =
"hfile.onheap.block.cache.fixed.size";
public static final long HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT = 0L;
public static final long HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT =
32 * 1024 * 1024L;
/* /*
* Minimum percentage of free heap necessary for a successful cluster startup. * Minimum percentage of free heap necessary for a successful cluster startup.
*/ */

View File

@ -25,8 +25,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
@ -60,6 +62,15 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
region = HRegion.newHRegion(CommonFSUtils.getTableDir(rootDir, htd.getTableName()), null, fs, region = HRegion.newHRegion(CommonFSUtils.getTableDir(rootDir, htd.getTableName()), null, fs,
conf, hri, htd, null); conf, hri, htd, null);
region.setRestoredRegion(true); region.setRestoredRegion(true);
// non RS process does not have a block cache, and this a client side scanner,
// create one for MapReduce jobs to cache the INDEX block by setting to use
// IndexOnlyLruBlockCache and set a value to HBASE_CLIENT_SCANNER_BLOCK_CACHE_SIZE_KEY
conf.set(BlockCacheFactory.BLOCKCACHE_POLICY_KEY, "IndexOnlyLRU");
conf.setIfUnset(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY,
String.valueOf(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT));
// don't allow L2 bucket cache for non RS process to avoid unexpected disk usage.
conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
region.setBlockCache(BlockCacheFactory.createBlockCache(conf));
// we won't initialize the MobFileCache when not running in RS process. so provided an // we won't initialize the MobFileCache when not running in RS process. so provided an
// initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only // initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only
// initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the // initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the
@ -122,6 +133,10 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
} }
} }
HRegion getRegion() {
return region;
}
@Override @Override
public boolean renewLease() { public boolean renewLease() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();

View File

@ -147,4 +147,13 @@ public interface BlockCache extends Iterable<CachedBlock> {
* @return The list of sub blockcaches that make up this one; returns null if no sub caches. * @return The list of sub blockcaches that make up this one; returns null if no sub caches.
*/ */
BlockCache [] getBlockCaches(); BlockCache [] getBlockCaches();
/**
* Check if block type is meta or index block
* @param blockType block type of a given HFile block
* @return true if block type is non-data block
*/
default boolean isMetaBlock(BlockType blockType) {
return blockType != null && blockType.getCategory() != BlockType.BlockCategory.DATA;
}
} }

View File

@ -43,7 +43,7 @@ public final class BlockCacheFactory {
*/ */
/** /**
* Configuration key to cache block policy (Lru, TinyLfu). * Configuration key to cache block policy (Lru, TinyLfu, AdaptiveLRU, IndexOnlyLRU).
*/ */
public static final String BLOCKCACHE_POLICY_KEY = "hfile.block.cache.policy"; public static final String BLOCKCACHE_POLICY_KEY = "hfile.block.cache.policy";
public static final String BLOCKCACHE_POLICY_DEFAULT = "LRU"; public static final String BLOCKCACHE_POLICY_DEFAULT = "LRU";
@ -129,6 +129,8 @@ public final class BlockCacheFactory {
StringUtils.byteDesc(cacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); StringUtils.byteDesc(cacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize));
if (policy.equalsIgnoreCase("LRU")) { if (policy.equalsIgnoreCase("LRU")) {
return new LruBlockCache(cacheSize, blockSize, true, c); return new LruBlockCache(cacheSize, blockSize, true, c);
} else if (policy.equalsIgnoreCase("IndexOnlyLRU")) {
return new IndexOnlyLruBlockCache(cacheSize, blockSize, true, c);
} else if (policy.equalsIgnoreCase("TinyLFU")) { } else if (policy.equalsIgnoreCase("TinyLFU")) {
return new TinyLfuBlockCache(cacheSize, blockSize, ForkJoinPool.commonPool(), c); return new TinyLfuBlockCache(cacheSize, blockSize, ForkJoinPool.commonPool(), c);
} else if (policy.equalsIgnoreCase("AdaptiveLRU")) { } else if (policy.equalsIgnoreCase("AdaptiveLRU")) {

View File

@ -22,7 +22,6 @@ import java.util.Iterator;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
/** /**
@ -71,10 +70,6 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
cacheBlock(cacheKey, buf, false); cacheBlock(cacheKey, buf, false);
} }
private boolean isMetaBlock(BlockType blockType) {
return blockType.getCategory() != BlockCategory.DATA;
}
@Override @Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
boolean repeat, boolean updateCacheMetrics) { boolean repeat, boolean updateCacheMetrics) {

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
/**
* An on heap block cache implementation extended LruBlockCache and only cache index block.
* This block cache should be only used by
* {@link org.apache.hadoop.hbase.client.ClientSideRegionScanner} that normally considers to be
* used by client resides out of the region server, e.g. a container of a map reduce job.
**/
@InterfaceAudience.Private
public class IndexOnlyLruBlockCache extends LruBlockCache {
public IndexOnlyLruBlockCache(long maxSize, long blockSize, boolean evictionThread,
Configuration conf) {
super(maxSize, blockSize, evictionThread, conf);
}
/**
* Cache only index block with the specified name and buffer
* @param cacheKey block's cache key
* @param buf block buffer
* @param inMemory if block is in-memory
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
if (isMetaBlock(buf.getBlockType())) {
super.cacheBlock(cacheKey, buf, inMemory);
}
}
}

View File

@ -228,9 +228,13 @@ public class MemorySizeUtil {
if (usage != null) { if (usage != null) {
max = usage.getMax(); max = usage.getMax();
} }
float onHeapCacheFixedSize = (float) conf
.getLong(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY,
HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT) / max;
// Calculate the amount of heap to give the heap. // Calculate the amount of heap to give the heap.
return (long) (max * cachePercentage); return (onHeapCacheFixedSize > 0 && onHeapCacheFixedSize < cachePercentage) ?
(long) (max * onHeapCacheFixedSize) :
(long) (max * cachePercentage);
} }
/** /**

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ SmallTests.class, ClientTests.class })
public class TestClientSideRegionScanner {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestClientSideRegionScanner.class);
private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private Configuration conf;
private Path rootDir;
private FileSystem fs;
private TableDescriptor htd;
private RegionInfo hri;
private Scan scan;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setup() throws IOException {
conf = TEST_UTIL.getConfiguration();
rootDir = TEST_UTIL.getDefaultRootDirPath();
fs = TEST_UTIL.getTestFileSystem();
htd = TEST_UTIL.getAdmin().getDescriptor(TableName.META_TABLE_NAME);
hri = TEST_UTIL.getAdmin().getRegions(TableName.META_TABLE_NAME).get(0);
scan = new Scan();
}
@Test
public void testDefaultBlockCache() throws IOException {
Configuration copyConf = new Configuration(conf);
ClientSideRegionScanner clientSideRegionScanner =
new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null);
BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
assertNotNull(blockCache);
assertTrue(blockCache instanceof IndexOnlyLruBlockCache);
assertTrue(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT == blockCache
.getMaxSize());
}
@Test
public void testConfiguredBlockCache() throws IOException {
Configuration copyConf = new Configuration(conf);
// tiny 1MB fixed cache size
long blockCacheFixedSize = 1024 * 1024L;
copyConf.setLong(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY, blockCacheFixedSize);
ClientSideRegionScanner clientSideRegionScanner =
new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null);
BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
assertNotNull(blockCache);
assertTrue(blockCache instanceof IndexOnlyLruBlockCache);
assertTrue(blockCacheFixedSize == blockCache.getMaxSize());
}
@Test
public void testNoBlockCache() throws IOException {
Configuration copyConf = new Configuration(conf);
copyConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
ClientSideRegionScanner clientSideRegionScanner =
new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null);
BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
assertNull(blockCache);
}
}

View File

@ -374,4 +374,34 @@ public class TestCacheConfig {
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
} }
} }
@Test
public void testIndexOnlyLruBlockCache() {
CacheConfig cc = new CacheConfig(this.conf);
conf.set(BlockCacheFactory.BLOCKCACHE_POLICY_KEY, "IndexOnlyLRU");
BlockCache blockCache = BlockCacheFactory.createBlockCache(this.conf);
assertTrue(blockCache instanceof IndexOnlyLruBlockCache);
// reject data block
long initialBlockCount = blockCache.getBlockCount();
BlockCacheKey bck = new BlockCacheKey("bck", 0);
Cacheable c = new DataCacheEntry();
blockCache.cacheBlock(bck, c, true);
// accept index block
Cacheable indexCacheEntry = new IndexCacheEntry();
blockCache.cacheBlock(bck, indexCacheEntry, true);
assertEquals(initialBlockCount + 1, blockCache.getBlockCount());
}
@Test
public void testGetOnHeapCacheSize() {
Configuration copyConf = new Configuration(conf);
long fixedSize = 1024 * 1024L;
long onHeapCacheSize = MemorySizeUtil.getOnHeapCacheSize(copyConf);
assertEquals(null, copyConf.get(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY));
assertTrue(onHeapCacheSize > 0 && onHeapCacheSize != fixedSize);
// when HBASE_BLOCK_CACHE_FIXED_SIZE_KEY is set, it will be a fixed size
copyConf.setLong(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY, fixedSize);
onHeapCacheSize = MemorySizeUtil.getOnHeapCacheSize(copyConf);
assertEquals(fixedSize, onHeapCacheSize);
}
} }