diff --git a/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto index d78acc06a28..038c6ca3f04 100644 --- a/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto +++ b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto @@ -31,6 +31,7 @@ message BucketCacheEntry { required string map_class = 3; map deserializers = 4; required BackingMap backing_map = 5; + optional bytes checksum = 6; } message BackingMap { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 99abfea4269..7d8f5821494 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -238,6 +238,16 @@ public class BucketCache implements BlockCache, HeapSize { /** In-memory bucket size */ private float memoryFactor; + private static final String FILE_VERIFY_ALGORITHM = + "hbase.bucketcache.persistent.file.integrity.check.algorithm"; + private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; + + /** + * Use {@link java.security.MessageDigest} class's encryption algorithms to check + * persistent file integrity, default algorithm is MD5 + * */ + private String algorithm; + public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath) throws IOException { this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, @@ -247,6 +257,7 @@ public class BucketCache implements BlockCache, HeapSize { public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, Configuration conf) throws IOException { + this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); this.writerThreads = new WriterThread[writerThreadNum]; long blockNumCapacity = capacity / blockSize; @@ -1131,6 +1142,13 @@ public class BucketCache implements BlockCache, HeapSize { } private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { + if (proto.hasChecksum()) { + ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), + algorithm); + } else { + // if has not checksum, it means the persistence file is old format + LOG.info("Persistent file is old format, it does not support verifying file integrity!"); + } verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap()); } @@ -1235,6 +1253,10 @@ public class BucketCache implements BlockCache, HeapSize { return this.bucketAllocator.getUsedSize(); } + protected String getAlgorithm() { + return algorithm; + } + /** * Evicts all blocks for a specific HFile. *

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index 69b8370bb95..f3d63d4e72f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockPriority; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; @@ -41,12 +42,13 @@ final class BucketProtoUtils { static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) { return BucketCacheProtos.BucketCacheEntry.newBuilder() - .setCacheCapacity(cache.getMaxSize()) - .setIoClass(cache.ioEngine.getClass().getName()) - .setMapClass(cache.backingMap.getClass().getName()) - .putAllDeserializers(CacheableDeserializerIdManager.save()) - .setBackingMap(BucketProtoUtils.toPB(cache.backingMap)) - .build(); + .setCacheCapacity(cache.getMaxSize()) + .setIoClass(cache.ioEngine.getClass().getName()) + .setMapClass(cache.backingMap.getClass().getName()) + .putAllDeserializers(CacheableDeserializerIdManager.save()) + .setBackingMap(BucketProtoUtils.toPB(cache.backingMap)) + .setChecksum(ByteString.copyFrom(((PersistentIOEngine) cache.ioEngine). + calculateChecksum(cache.getAlgorithm()))).build(); } private static BucketCacheProtos.BackingMap toPB( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index 9e6a75b0dcb..2cdfc80a39c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -43,10 +43,9 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; * IO engine that stores data to a file on the local file system. */ @InterfaceAudience.Private -public class FileIOEngine implements IOEngine { +public class FileIOEngine extends PersistentIOEngine { private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class); public static final String FILE_DELIMITER = ","; - private final String[] filePaths; private final FileChannel[] fileChannels; private final RandomAccessFile[] rafs; private final ReentrantLock[] channelLocks; @@ -59,9 +58,9 @@ public class FileIOEngine implements IOEngine { public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths) throws IOException { + super(filePaths); this.sizePerFile = capacity / filePaths.length; this.capacity = this.sizePerFile * filePaths.length; - this.filePaths = filePaths; this.fileChannels = new FileChannel[filePaths.length]; if (!maintainPersistence) { for (String filePath : filePaths) { @@ -90,7 +89,12 @@ public class FileIOEngine implements IOEngine { + StringUtils.byteDesc(sizePerFile); LOG.warn(msg); } - rafs[i].setLength(sizePerFile); + File file = new File(filePath); + // setLength() method will change file's last modified time. So if don't do + // this check, wrong time will be used when calculating checksum. + if (file.length() != sizePerFile) { + rafs[i].setLength(sizePerFile); + } fileChannels[i] = rafs[i].getChannel(); channelLocks[i] = new ReentrantLock(); LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java index ee37e916316..c0cb22d0b07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; @@ -37,7 +38,7 @@ import org.slf4j.LoggerFactory; * mechanism */ @InterfaceAudience.Private -public abstract class FileMmapIOEngine implements IOEngine { +public abstract class FileMmapIOEngine extends PersistentIOEngine { static final Logger LOG = LoggerFactory.getLogger(FileMmapIOEngine.class); protected final String path; @@ -47,13 +48,19 @@ public abstract class FileMmapIOEngine implements IOEngine { private RandomAccessFile raf = null; public FileMmapIOEngine(String filePath, long capacity) throws IOException { + super(filePath); this.path = filePath; this.size = capacity; long fileSize = 0; try { raf = new RandomAccessFile(filePath, "rw"); fileSize = roundUp(capacity, ByteBufferArray.DEFAULT_BUFFER_SIZE); - raf.setLength(fileSize); + File file = new File(filePath); + // setLength() method will change file's last modified time. So if don't do + // this check, wrong time will be used when calculating checksum. + if (file.length() != fileSize) { + raf.setLength(fileSize); + } fileChannel = raf.getChannel(); LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath); } catch (java.io.FileNotFoundException fex) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java new file mode 100644 index 00000000000..4ee7d0ed1be --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.File; +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Shell; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A class implementing PersistentIOEngine interface supports file integrity verification + * for {@link BucketCache} which use persistent IOEngine + */ +@InterfaceAudience.Private +public abstract class PersistentIOEngine implements IOEngine { + private static final Logger LOG = LoggerFactory.getLogger(PersistentIOEngine.class); + private static final DuFileCommand DU = new DuFileCommand(new String[] {"du", ""}); + protected final String[] filePaths; + + public PersistentIOEngine(String... filePaths) { + this.filePaths = filePaths; + } + + /** + * Verify cache files's integrity + * @param algorithm the backingMap persistence path + */ + protected void verifyFileIntegrity(byte[] persistentChecksum, String algorithm) + throws IOException { + byte[] calculateChecksum = calculateChecksum(algorithm); + if (!Bytes.equals(persistentChecksum, calculateChecksum)) { + throw new IOException("Mismatch of checksum! The persistent checksum is " + + Bytes.toString(persistentChecksum) + ", but the calculate checksum is " + + Bytes.toString(calculateChecksum)); + } + } + + /** + * Using an encryption algorithm to calculate a checksum, the default encryption algorithm is MD5 + * @return the checksum which is convert to HexString + * @throws IOException something happened like file not exists + * @throws NoSuchAlgorithmException no such algorithm + */ + protected byte[] calculateChecksum(String algorithm) { + try { + StringBuilder sb = new StringBuilder(); + for (String filePath : filePaths){ + File file = new File(filePath); + sb.append(filePath); + sb.append(getFileSize(filePath)); + sb.append(file.lastModified()); + } + MessageDigest messageDigest = MessageDigest.getInstance(algorithm); + messageDigest.update(Bytes.toBytes(sb.toString())); + return messageDigest.digest(); + } catch (IOException ioex) { + LOG.error("Calculating checksum failed, because of ", ioex); + return new byte[0]; + } catch (NoSuchAlgorithmException e) { + LOG.error("No such algorithm : " + algorithm + "!"); + return new byte[0]; + } + } + + /** + * Using Linux command du to get file's real size + * @param filePath the file + * @return file's real size + * @throws IOException something happened like file not exists + */ + private static long getFileSize(String filePath) throws IOException { + DU.setExecCommand(filePath); + DU.execute(); + return Long.parseLong(DU.getOutput().split("\t")[0]); + } + + private static class DuFileCommand extends Shell.ShellCommandExecutor { + private String[] execCommand; + + DuFileCommand(String[] execString) { + super(execString); + execCommand = execString; + } + + void setExecCommand(String filePath) { + this.execCommand[1] = filePath; + } + + @Override + public String[] getExecString() { + return this.execCommand; + } + } +} + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java new file mode 100644 index 00000000000..1dd2a3bc4c1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -0,0 +1,247 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.util.Arrays; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; +import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Basic test for check file's integrity before start BucketCache in fileIOEngine + */ +@RunWith(Parameterized.class) +@Category(MediumTests.class) +public class TestVerifyBucketCacheFile { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class); + + @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") + public static Iterable data() { + return Arrays.asList(new Object[][] { { 8192, null }, { 16 * 1024, + new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, + 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, + 128 * 1024 + 1024 } } }); + } + + @Parameterized.Parameter(0) + public int constructedBlockSize; + + @Parameterized.Parameter(1) + public int[] constructedBlockSizes; + + final long capacitySize = 32 * 1024 * 1024; + final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; + final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; + + /** + * Test cache file or persistence file does not exist whether BucketCache starts normally + * (1) Start BucketCache and add some blocks, then shutdown BucketCache and persist cache + * to file. Restart BucketCache and it can restore cache from file. + * (2) Delete bucket cache file after shutdown BucketCache. Restart BucketCache and it can't + * restore cache from file, the cache file and persistence file would be deleted before + * BucketCache start normally. + * (3) Delete persistence file after shutdown BucketCache. Restart BucketCache and it can't + * restore cache from file, the cache file and persistence file would be deleted before + * BucketCache start normally. + * @throws Exception the exception + */ + @Test + public void testRetrieveFromFile() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize == 0); + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // 1.persist cache to file + bucketCache.shutdown(); + // restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); + // persist cache to file + bucketCache.shutdown(); + + // 2.delete bucket cache file + File cacheFile = new File(testDir + "/bucket.cache"); + assertTrue(cacheFile.delete()); + // can't restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // persist cache to file + bucketCache.shutdown(); + + // 3.delete backingMap persistence file + File mapFile = new File(testDir + "/bucket.persistence"); + assertTrue(mapFile.delete()); + // can't restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + + TEST_UTIL.cleanupTestDir(); + } + + /** + * Test whether BucketCache is started normally after modifying the cache file. + * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file. + * Restart BucketCache after modify cache file's data, and it can't restore cache from file, + * the cache file and persistence file would be deleted before BucketCache start normally. + * @throws Exception the exception + */ + @Test + public void testModifiedBucketCacheFileData() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize == 0); + + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // persist cache to file + bucketCache.shutdown(); + + // modified bucket cache file + String file = testDir + "/bucket.cache"; + try(BufferedWriter out = new BufferedWriter(new OutputStreamWriter( + new FileOutputStream(file, false)))) { + out.write("test bucket cache"); + } + // can't restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + + TEST_UTIL.cleanupTestDir(); + } + + /** + * Test whether BucketCache is started normally after modifying the cache file's last modified + * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist + * cache to file. Then Restart BucketCache after modify cache file's last modified time, and + * it can't restore cache from file, the cache file and persistence file would be deleted + * before BucketCache start normally. + * @throws Exception the exception + */ + @Test + public void testModifiedBucketCacheFileTime() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize == 0); + + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // persist cache to file + bucketCache.shutdown(); + + // modified bucket cache file LastModifiedTime + File file = new File(testDir + "/bucket.cache"); + assertTrue(file.setLastModified(System.currentTimeMillis() + 1000)); + // can't restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + + TEST_UTIL.cleanupTestDir(); + } + + private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) + throws InterruptedException { + while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { + Thread.sleep(100); + } + } + + // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer + // threads will flush it to the bucket and put reference entry in backingMap. + private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, + Cacheable block) throws InterruptedException { + cache.cacheBlock(cacheKey, block); + waitUntilFlushedToBucket(cache, cacheKey); + } +}