From 58cb1f4799e90f5fca51f76aa4a9787d823c69a5 Mon Sep 17 00:00:00 2001 From: Kota-SH Date: Thu, 16 Mar 2023 09:27:55 -0500 Subject: [PATCH] HBASE-27686: Recovery of BucketCache and Prefetched data after RS Crash (#5080) Signed-off-by: Wellington Ramos Chevreuil --- .../hadoop/hbase/io/hfile/CacheConfig.java | 6 + .../hbase/io/hfile/PrefetchExecutor.java | 2 +- .../hbase/io/hfile/bucket/BucketCache.java | 29 ++- .../io/hfile/bucket/BucketCachePersister.java | 52 +++++ .../hbase/io/hfile/TestPrefetchRSClose.java | 26 +-- .../bucket/TestBucketCachePersister.java | 194 ++++++++++++++++++ 6 files changed, 290 insertions(+), 19 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index fa661704b92..15c64c03d5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -95,6 +95,12 @@ public class CacheConfig { public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file.list.path"; + /** + * Configuration key to set interval for persisting bucket cache to disk. + */ + public static final String BUCKETCACHE_PERSIST_INTERVAL_KEY = + "hbase.bucketcache.persist.intervalinmillis"; + // Defaults public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index 9aafe7a7b6e..b30150fcb6d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -154,7 +154,7 @@ public final class PrefetchExecutor { throw new IOException("Error persisting prefetched HFiles set!"); } if (!prefetchCompleted.isEmpty()) { - try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, true)) { + try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, false)) { PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index a50e5b46455..b4ab66d238e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY; import java.io.File; @@ -178,6 +179,7 @@ public class BucketCache implements BlockCache, HeapSize { private final BucketCacheStats cacheStats = new BucketCacheStats(); private final String persistencePath; + static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false); private final long cacheCapacity; /** Approximate block size */ private final long blockSize; @@ -237,6 +239,8 @@ public class BucketCache implements BlockCache, HeapSize { private String prefetchedFileListPath; + private long bucketcachePersistInterval; + private static final String FILE_VERIFY_ALGORITHM = "hbase.bucketcache.persistent.file.integrity.check.algorithm"; private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; @@ -288,6 +292,7 @@ public class BucketCache implements BlockCache, HeapSize { this.queueAdditionWaitTime = conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY); + this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); sanityCheckConfigs(); @@ -314,6 +319,7 @@ public class BucketCache implements BlockCache, HeapSize { this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); if (ioEngine.isPersistent() && persistencePath != null) { + startBucketCachePersisterThread(); try { retrieveFromFile(bucketSizes); } catch (IOException ioex) { @@ -370,6 +376,12 @@ public class BucketCache implements BlockCache, HeapSize { } } + void startBucketCachePersisterThread() { + BucketCachePersister cachePersister = + new BucketCachePersister(this, bucketcachePersistInterval); + cachePersister.start(); + } + boolean isCacheEnabled() { return this.cacheEnabled; } @@ -597,6 +609,9 @@ public class BucketCache implements BlockCache, HeapSize { if (evictedByEvictionProcess) { cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); } + if (ioEngine.isPersistent()) { + setCacheInconsistent(true); + } } /** @@ -721,6 +736,14 @@ public class BucketCache implements BlockCache, HeapSize { }); } + public boolean isCacheInconsistent() { + return isCacheInconsistent.get(); + } + + public void setCacheInconsistent(boolean setCacheInconsistent) { + isCacheInconsistent.set(setCacheInconsistent); + } + /* * Statistics thread. Periodically output cache statistics to the log. */ @@ -1167,6 +1190,9 @@ public class BucketCache implements BlockCache, HeapSize { // Only add if non-null entry. if (bucketEntries[i] != null) { putIntoBackingMap(key, bucketEntries[i]); + if (ioEngine.isPersistent()) { + setCacheInconsistent(true); + } } // Always remove from ramCache even if we failed adding it to the block cache above. boolean existed = ramCache.remove(key, re -> { @@ -1216,8 +1242,7 @@ public class BucketCache implements BlockCache, HeapSize { */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", justification = "false positive, try-with-resources ensures close is called.") - private void persistToFile() throws IOException { - assert !cacheEnabled; + void persistToFile() throws IOException { if (!ioEngine.isPersistent()) { throw new IOException("Attempt to persist non-persistent cache mappings!"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java new file mode 100644 index 00000000000..099a19db0a1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class BucketCachePersister extends Thread { + private final BucketCache cache; + private final long intervalMillis; + private static final Logger LOG = LoggerFactory.getLogger(BucketCachePersister.class); + + public BucketCachePersister(BucketCache cache, long intervalMillis) { + super("bucket-cache-persister"); + this.cache = cache; + this.intervalMillis = intervalMillis; + LOG.info("BucketCachePersister started with interval: " + intervalMillis); + } + + public void run() { + while (true) { + try { + Thread.sleep(intervalMillis); + if (cache.isCacheInconsistent()) { + LOG.debug("Cache is inconsistent, persisting to disk"); + cache.persistToFile(); + cache.setCacheInconsistent(false); + } + } catch (IOException | InterruptedException e) { + LOG.warn("Exception in BucketCachePersister" + e.getMessage()); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java index 68f4371bb4f..a0e052e5ea3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java @@ -83,7 +83,8 @@ public class TestPrefetchRSClose { } @Test - public void testRegionClosePrefetchPersistence() throws Exception { + public void testPrefetchPersistence() throws Exception { + // Write to table and flush TableName tableName = TableName.valueOf("table1"); byte[] row0 = Bytes.toBytes("row1"); @@ -107,8 +108,14 @@ public class TestPrefetchRSClose { table.put(put1); TEST_UTIL.flush(tableName); } finally { - Thread.sleep(1000); + Thread.sleep(1500); } + + // Default interval for cache persistence is 1000ms. So after 1000ms, both the persistence files + // should exist. + assertTrue(new File(testDir + "/bucket.persistence").exists()); + assertTrue(new File(testDir + "/prefetch.persistence").exists()); + // Stop the RS cluster.stopRegionServer(0); LOG.info("Stopped Region Server 0."); @@ -118,20 +125,6 @@ public class TestPrefetchRSClose { // Start the RS and validate cluster.startRegionServer(); - Thread.sleep(1000); - assertFalse(new File(testDir + "/prefetch.persistence").exists()); - assertFalse(new File(testDir + "/bucket.persistence").exists()); - } - - @Test - public void testPrefetchPersistenceNegative() throws Exception { - cluster.stopRegionServer(0); - LOG.info("Stopped Region Server 0."); - Thread.sleep(1000); - assertFalse(new File(testDir + "/prefetch.persistence").exists()); - assertTrue(new File(testDir + "/bucket.persistence").exists()); - cluster.startRegionServer(); - Thread.sleep(1000); assertFalse(new File(testDir + "/prefetch.persistence").exists()); assertFalse(new File(testDir + "/bucket.persistence").exists()); } @@ -139,6 +132,7 @@ public class TestPrefetchRSClose { @After public void tearDown() throws Exception { TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir)); if (zkCluster != null) { zkCluster.shutdown(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java new file mode 100644 index 00000000000..9e90d0e229c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ IOTests.class, MediumTests.class }) +public class TestBucketCachePersister { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBucketCachePersister.class); + + public TestName name = new TestName(); + + public int constructedBlockSize = 16 * 1024; + + public int[] constructedBlockSizes = + new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, + 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024 }; + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; + private static final int DATA_BLOCK_SIZE = 2048; + private static final int NUM_KV = 1000; + + final long capacitySize = 32 * 1024 * 1024; + final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; + final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; + Path testDir; + + public Configuration setupBucketCacheConfig(long bucketCachePersistInterval) throws IOException { + Configuration conf; + conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); + testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, bucketCachePersistInterval); + return conf; + } + + public BucketCache setupBucketCache(Configuration conf) throws IOException { + conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, (testDir + "/prefetch.persistence")); + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, + testDir + "/bucket.persistence", 60 * 1000, conf); + return bucketCache; + } + + public void cleanupBucketCache(BucketCache bucketCache) throws IOException { + bucketCache.shutdown(); + TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir)); + assertFalse(TEST_UTIL.getTestFileSystem().exists(testDir)); + } + + @Test + public void testPrefetchPersistenceCrash() throws Exception { + long bucketCachePersistInterval = 3000; + Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval); + BucketCache bucketCache = setupBucketCache(conf); + CacheConfig cacheConf = new CacheConfig(conf, bucketCache); + FileSystem fs = HFileSystem.get(conf); + // Load Cache + Path storeFile = writeStoreFile("TestPrefetch0", conf, cacheConf, fs); + Path storeFile2 = writeStoreFile("TestPrefetch1", conf, cacheConf, fs); + readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); + readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache); + Thread.sleep(bucketCachePersistInterval); + assertTrue(new File(testDir + "/prefetch.persistence").exists()); + assertTrue(new File(testDir + "/bucket.persistence").exists()); + assertTrue(new File(testDir + "/prefetch.persistence").delete()); + assertTrue(new File(testDir + "/bucket.persistence").delete()); + cleanupBucketCache(bucketCache); + } + + @Test + public void testPrefetchPersistenceCrashNegative() throws Exception { + long bucketCachePersistInterval = 3000; + Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval); + BucketCache bucketCache = setupBucketCache(conf); + CacheConfig cacheConf = new CacheConfig(conf, bucketCache); + FileSystem fs = HFileSystem.get(conf); + // Load Cache + Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs); + Path storeFile2 = writeStoreFile("TestPrefetch3", conf, cacheConf, fs); + readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); + readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache); + assertFalse(new File(testDir + "/prefetch.persistence").exists()); + assertFalse(new File(testDir + "/bucket.persistence").exists()); + cleanupBucketCache(bucketCache); + } + + public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf, + Configuration conf, BucketCache bucketCache) throws Exception { + // Open the file + HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); + + while (!reader.prefetchComplete()) { + // Sleep for a bit + Thread.sleep(1000); + } + HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null); + BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); + BucketEntry be = bucketCache.backingMap.get(blockCacheKey); + boolean isCached = bucketCache.getBlock(blockCacheKey, true, false, true) != null; + + if ( + block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX + || block.getBlockType() == BlockType.INTERMEDIATE_INDEX + ) { + assertTrue(isCached); + } + } + + public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheConf, FileSystem fs) + throws IOException { + Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); + HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); + StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) + .withOutputDir(storeFileParentDir).withFileContext(meta).build(); + Random rand = ThreadLocalRandom.current(); + final int rowLen = 32; + for (int i = 0; i < NUM_KV; ++i) { + byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); + byte[] v = RandomKeyValueUtil.randomValue(rand); + int cfLen = rand.nextInt(k.length - rowLen + 1); + KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, + k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); + sfw.append(kv); + } + + sfw.close(); + return sfw.getPath(); + } + + public static KeyValue.Type generateKeyType(Random rand) { + if (rand.nextBoolean()) { + // Let's make half of KVs puts. + return KeyValue.Type.Put; + } else { + KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; + if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { + throw new RuntimeException("Generated an invalid key type: " + keyType + ". " + + "Probably the layout of KeyValue.Type has changed."); + } + return keyType; + } + } + +}