HBASE-27686: Recovery of BucketCache and Prefetched data after RS Crash (#5080)
Signed-off-by: Wellington Ramos Chevreuil <wchevreuil@apache.org>
(cherry picked from commit 58cb1f4799
)
This commit is contained in:
parent
59434e9cd4
commit
d136c6d7c5
|
@ -95,6 +95,12 @@ public class CacheConfig {
|
|||
|
||||
public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file.list.path";
|
||||
|
||||
/**
|
||||
* Configuration key to set interval for persisting bucket cache to disk.
|
||||
*/
|
||||
public static final String BUCKETCACHE_PERSIST_INTERVAL_KEY =
|
||||
"hbase.bucketcache.persist.intervalinmillis";
|
||||
|
||||
// Defaults
|
||||
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
|
||||
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
|
||||
|
|
|
@ -154,7 +154,7 @@ public final class PrefetchExecutor {
|
|||
throw new IOException("Error persisting prefetched HFiles set!");
|
||||
}
|
||||
if (!prefetchCompleted.isEmpty()) {
|
||||
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, true)) {
|
||||
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, false)) {
|
||||
PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||
|
||||
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -173,6 +174,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
private final BucketCacheStats cacheStats = new BucketCacheStats();
|
||||
|
||||
private final String persistencePath;
|
||||
static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
|
||||
private final long cacheCapacity;
|
||||
/** Approximate block size */
|
||||
private final long blockSize;
|
||||
|
@ -233,6 +235,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
|
||||
private String prefetchedFileListPath;
|
||||
|
||||
private long bucketcachePersistInterval;
|
||||
|
||||
private static final String FILE_VERIFY_ALGORITHM =
|
||||
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
|
||||
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
|
||||
|
@ -278,6 +282,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
this.queueAdditionWaitTime =
|
||||
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
|
||||
this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
|
||||
this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
|
||||
|
||||
sanityCheckConfigs();
|
||||
|
||||
|
@ -303,6 +308,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
|
||||
|
||||
if (ioEngine.isPersistent() && persistencePath != null) {
|
||||
startBucketCachePersisterThread();
|
||||
try {
|
||||
retrieveFromFile(bucketSizes);
|
||||
} catch (IOException ioex) {
|
||||
|
@ -359,6 +365,12 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
}
|
||||
}
|
||||
|
||||
void startBucketCachePersisterThread() {
|
||||
BucketCachePersister cachePersister =
|
||||
new BucketCachePersister(this, bucketcachePersistInterval);
|
||||
cachePersister.start();
|
||||
}
|
||||
|
||||
boolean isCacheEnabled() {
|
||||
return this.cacheEnabled;
|
||||
}
|
||||
|
@ -586,6 +598,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
if (evictedByEvictionProcess) {
|
||||
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
|
||||
}
|
||||
if (ioEngine.isPersistent()) {
|
||||
setCacheInconsistent(true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -710,6 +725,14 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
});
|
||||
}
|
||||
|
||||
public boolean isCacheInconsistent() {
|
||||
return isCacheInconsistent.get();
|
||||
}
|
||||
|
||||
public void setCacheInconsistent(boolean setCacheInconsistent) {
|
||||
isCacheInconsistent.set(setCacheInconsistent);
|
||||
}
|
||||
|
||||
/*
|
||||
* Statistics thread. Periodically output cache statistics to the log.
|
||||
*/
|
||||
|
@ -1156,6 +1179,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
// Only add if non-null entry.
|
||||
if (bucketEntries[i] != null) {
|
||||
putIntoBackingMap(key, bucketEntries[i]);
|
||||
if (ioEngine.isPersistent()) {
|
||||
setCacheInconsistent(true);
|
||||
}
|
||||
}
|
||||
// Always remove from ramCache even if we failed adding it to the block cache above.
|
||||
boolean existed = ramCache.remove(key, re -> {
|
||||
|
@ -1205,8 +1231,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
|
||||
justification = "false positive, try-with-resources ensures close is called.")
|
||||
private void persistToFile() throws IOException {
|
||||
assert !cacheEnabled;
|
||||
void persistToFile() throws IOException {
|
||||
if (!ioEngine.isPersistent()) {
|
||||
throw new IOException("Attempt to persist non-persistent cache mappings!");
|
||||
}
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class BucketCachePersister extends Thread {
|
||||
private final BucketCache cache;
|
||||
private final long intervalMillis;
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BucketCachePersister.class);
|
||||
|
||||
public BucketCachePersister(BucketCache cache, long intervalMillis) {
|
||||
super("bucket-cache-persister");
|
||||
this.cache = cache;
|
||||
this.intervalMillis = intervalMillis;
|
||||
LOG.info("BucketCachePersister started with interval: " + intervalMillis);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
Thread.sleep(intervalMillis);
|
||||
if (cache.isCacheInconsistent()) {
|
||||
LOG.debug("Cache is inconsistent, persisting to disk");
|
||||
cache.persistToFile();
|
||||
cache.setCacheInconsistent(false);
|
||||
}
|
||||
} catch (IOException | InterruptedException e) {
|
||||
LOG.warn("Exception in BucketCachePersister" + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -82,7 +82,8 @@ public class TestPrefetchRSClose {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testRegionClosePrefetchPersistence() throws Exception {
|
||||
public void testPrefetchPersistence() throws Exception {
|
||||
|
||||
// Write to table and flush
|
||||
TableName tableName = TableName.valueOf("table1");
|
||||
byte[] row0 = Bytes.toBytes("row1");
|
||||
|
@ -106,8 +107,14 @@ public class TestPrefetchRSClose {
|
|||
table.put(put1);
|
||||
TEST_UTIL.flush(tableName);
|
||||
} finally {
|
||||
Thread.sleep(1000);
|
||||
Thread.sleep(1500);
|
||||
}
|
||||
|
||||
// Default interval for cache persistence is 1000ms. So after 1000ms, both the persistence files
|
||||
// should exist.
|
||||
assertTrue(new File(testDir + "/bucket.persistence").exists());
|
||||
assertTrue(new File(testDir + "/prefetch.persistence").exists());
|
||||
|
||||
// Stop the RS
|
||||
cluster.stopRegionServer(0);
|
||||
LOG.info("Stopped Region Server 0.");
|
||||
|
@ -117,20 +124,6 @@ public class TestPrefetchRSClose {
|
|||
|
||||
// Start the RS and validate
|
||||
cluster.startRegionServer();
|
||||
Thread.sleep(1000);
|
||||
assertFalse(new File(testDir + "/prefetch.persistence").exists());
|
||||
assertFalse(new File(testDir + "/bucket.persistence").exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrefetchPersistenceNegative() throws Exception {
|
||||
cluster.stopRegionServer(0);
|
||||
LOG.info("Stopped Region Server 0.");
|
||||
Thread.sleep(1000);
|
||||
assertFalse(new File(testDir + "/prefetch.persistence").exists());
|
||||
assertTrue(new File(testDir + "/bucket.persistence").exists());
|
||||
cluster.startRegionServer();
|
||||
Thread.sleep(1000);
|
||||
assertFalse(new File(testDir + "/prefetch.persistence").exists());
|
||||
assertFalse(new File(testDir + "/bucket.persistence").exists());
|
||||
}
|
||||
|
@ -138,6 +131,7 @@ public class TestPrefetchRSClose {
|
|||
@After
|
||||
public void tearDown() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
|
||||
if (zkCluster != null) {
|
||||
zkCluster.shutdown();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,194 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
@Category({ IOTests.class, MediumTests.class })
|
||||
public class TestBucketCachePersister {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestBucketCachePersister.class);
|
||||
|
||||
public TestName name = new TestName();
|
||||
|
||||
public int constructedBlockSize = 16 * 1024;
|
||||
|
||||
public int[] constructedBlockSizes =
|
||||
new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
|
||||
28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024 };
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
|
||||
private static final int DATA_BLOCK_SIZE = 2048;
|
||||
private static final int NUM_KV = 1000;
|
||||
|
||||
final long capacitySize = 32 * 1024 * 1024;
|
||||
final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
|
||||
final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
|
||||
Path testDir;
|
||||
|
||||
public Configuration setupBucketCacheConfig(long bucketCachePersistInterval) throws IOException {
|
||||
Configuration conf;
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
|
||||
testDir = TEST_UTIL.getDataTestDir();
|
||||
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||
conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, bucketCachePersistInterval);
|
||||
return conf;
|
||||
}
|
||||
|
||||
public BucketCache setupBucketCache(Configuration conf) throws IOException {
|
||||
conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, (testDir + "/prefetch.persistence"));
|
||||
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
|
||||
testDir + "/bucket.persistence", 60 * 1000, conf);
|
||||
return bucketCache;
|
||||
}
|
||||
|
||||
public void cleanupBucketCache(BucketCache bucketCache) throws IOException {
|
||||
bucketCache.shutdown();
|
||||
TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
|
||||
assertFalse(TEST_UTIL.getTestFileSystem().exists(testDir));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrefetchPersistenceCrash() throws Exception {
|
||||
long bucketCachePersistInterval = 3000;
|
||||
Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
|
||||
BucketCache bucketCache = setupBucketCache(conf);
|
||||
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
|
||||
FileSystem fs = HFileSystem.get(conf);
|
||||
// Load Cache
|
||||
Path storeFile = writeStoreFile("TestPrefetch0", conf, cacheConf, fs);
|
||||
Path storeFile2 = writeStoreFile("TestPrefetch1", conf, cacheConf, fs);
|
||||
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
|
||||
readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache);
|
||||
Thread.sleep(bucketCachePersistInterval);
|
||||
assertTrue(new File(testDir + "/prefetch.persistence").exists());
|
||||
assertTrue(new File(testDir + "/bucket.persistence").exists());
|
||||
assertTrue(new File(testDir + "/prefetch.persistence").delete());
|
||||
assertTrue(new File(testDir + "/bucket.persistence").delete());
|
||||
cleanupBucketCache(bucketCache);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrefetchPersistenceCrashNegative() throws Exception {
|
||||
long bucketCachePersistInterval = 3000;
|
||||
Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
|
||||
BucketCache bucketCache = setupBucketCache(conf);
|
||||
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
|
||||
FileSystem fs = HFileSystem.get(conf);
|
||||
// Load Cache
|
||||
Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs);
|
||||
Path storeFile2 = writeStoreFile("TestPrefetch3", conf, cacheConf, fs);
|
||||
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
|
||||
readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache);
|
||||
assertFalse(new File(testDir + "/prefetch.persistence").exists());
|
||||
assertFalse(new File(testDir + "/bucket.persistence").exists());
|
||||
cleanupBucketCache(bucketCache);
|
||||
}
|
||||
|
||||
public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf,
|
||||
Configuration conf, BucketCache bucketCache) throws Exception {
|
||||
// Open the file
|
||||
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
|
||||
|
||||
while (!reader.prefetchComplete()) {
|
||||
// Sleep for a bit
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
|
||||
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
|
||||
BucketEntry be = bucketCache.backingMap.get(blockCacheKey);
|
||||
boolean isCached = bucketCache.getBlock(blockCacheKey, true, false, true) != null;
|
||||
|
||||
if (
|
||||
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|
||||
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
|
||||
) {
|
||||
assertTrue(isCached);
|
||||
}
|
||||
}
|
||||
|
||||
public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheConf, FileSystem fs)
|
||||
throws IOException {
|
||||
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
|
||||
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
|
||||
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
|
||||
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
|
||||
Random rand = ThreadLocalRandom.current();
|
||||
final int rowLen = 32;
|
||||
for (int i = 0; i < NUM_KV; ++i) {
|
||||
byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
|
||||
byte[] v = RandomKeyValueUtil.randomValue(rand);
|
||||
int cfLen = rand.nextInt(k.length - rowLen + 1);
|
||||
KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
|
||||
k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
|
||||
sfw.append(kv);
|
||||
}
|
||||
|
||||
sfw.close();
|
||||
return sfw.getPath();
|
||||
}
|
||||
|
||||
public static KeyValue.Type generateKeyType(Random rand) {
|
||||
if (rand.nextBoolean()) {
|
||||
// Let's make half of KVs puts.
|
||||
return KeyValue.Type.Put;
|
||||
} else {
|
||||
KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
|
||||
if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
|
||||
throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
|
||||
+ "Probably the layout of KeyValue.Type has changed.");
|
||||
}
|
||||
return keyType;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue