diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index c5a1b21c055..98abfc834fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -29,6 +29,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.ByteBuffer; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; @@ -69,6 +70,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -242,6 +244,17 @@ public class BucketCache implements BlockCache, HeapSize { /** In-memory bucket size */ private float memoryFactor; + private String ioEngineName; + private static final String FILE_VERIFY_ALGORITHM = + "hbase.bucketcache.persistent.file.integrity.check.algorithm"; + private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; + + /** + * Use {@link java.security.MessageDigest} class's encryption algorithms to check + * persistent file integrity, default algorithm is MD5 + * */ + private String algorithm; + public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, IOException { @@ -252,8 +265,7 @@ public class BucketCache implements BlockCache, HeapSize { public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, Configuration conf) - throws FileNotFoundException, IOException { - this.ioEngine = getIOEngineFromName(ioEngineName, capacity); + throws IOException { this.writerThreads = new WriterThread[writerThreadNum]; long blockNumCapacity = capacity / blockSize; if (blockNumCapacity >= Integer.MAX_VALUE) { @@ -275,6 +287,7 @@ public class BucketCache implements BlockCache, HeapSize { ", memoryFactor: " + memoryFactor); this.cacheCapacity = capacity; + this.ioEngineName = ioEngineName; this.persistencePath = persistencePath; this.blockSize = blockSize; this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; @@ -288,14 +301,15 @@ public class BucketCache implements BlockCache, HeapSize { this.ramCache = new ConcurrentHashMap(); this.backingMap = new ConcurrentHashMap((int) blockNumCapacity); - + this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); + ioEngine = getIOEngineFromName(); if (ioEngine.isPersistent() && persistencePath != null) { try { retrieveFromFile(bucketSizes); } catch (IOException ioex) { LOG.error("Can't restore from file because of", ioex); } catch (ClassNotFoundException cnfe) { - LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe); + LOG.error("Can't restore from file in rebuild because can't deserialise", cnfe); throw new RuntimeException(cnfe); } } @@ -359,12 +373,10 @@ public class BucketCache implements BlockCache, HeapSize { /** * Get the IOEngine from the IO engine name - * @param ioEngineName - * @param capacity * @return the IOEngine * @throws IOException */ - private IOEngine getIOEngineFromName(String ioEngineName, long capacity) + private IOEngine getIOEngineFromName() throws IOException { if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { // In order to make the usage simple, we only need the prefix 'files:' in @@ -372,11 +384,11 @@ public class BucketCache implements BlockCache, HeapSize { // the compatibility String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); - return new FileIOEngine(capacity, filePaths); + return new FileIOEngine(algorithm, persistencePath, cacheCapacity, filePaths); } else if (ioEngineName.startsWith("offheap")) - return new ByteBufferIOEngine(capacity, true); + return new ByteBufferIOEngine(cacheCapacity, true); else if (ioEngineName.startsWith("heap")) - return new ByteBufferIOEngine(capacity, false); + return new ByteBufferIOEngine(cacheCapacity, false); else throw new IllegalArgumentException( "Don't understand io engine name for cache - prefix with file:, heap or offheap"); @@ -1021,41 +1033,48 @@ public class BucketCache implements BlockCache, HeapSize { private void persistToFile() throws IOException { assert !cacheEnabled; - FileOutputStream fos = null; - ObjectOutputStream oos = null; - try { + try (ObjectOutputStream oos = new ObjectOutputStream( + new FileOutputStream(persistencePath, false))){ if (!ioEngine.isPersistent()) { throw new IOException("Attempt to persist non-persistent cache mappings!"); } - fos = new FileOutputStream(persistencePath, false); - oos = new ObjectOutputStream(fos); + if (ioEngine instanceof PersistentIOEngine) { + oos.write(ProtobufUtil.PB_MAGIC); + byte[] checksum = ((PersistentIOEngine) ioEngine).calculateChecksum(); + oos.writeInt(checksum.length); + oos.write(checksum); + } oos.writeLong(cacheCapacity); oos.writeUTF(ioEngine.getClass().getName()); oos.writeUTF(backingMap.getClass().getName()); oos.writeObject(deserialiserMap); oos.writeObject(backingMap); - } finally { - if (oos != null) oos.close(); - if (fos != null) fos.close(); + } catch (NoSuchAlgorithmException e) { + LOG.error("No such algorithm : " + algorithm + "! Failed to persist data on exit",e); } } @SuppressWarnings("unchecked") - private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException, + private void retrieveFromFile(int[] bucketSizes) throws IOException, ClassNotFoundException { File persistenceFile = new File(persistencePath); if (!persistenceFile.exists()) { return; } assert !cacheEnabled; - FileInputStream fis = null; - ObjectInputStream ois = null; - try { + try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(persistencePath))){ if (!ioEngine.isPersistent()) throw new IOException( "Attempt to restore non-persistent cache mappings!"); - fis = new FileInputStream(persistencePath); - ois = new ObjectInputStream(fis); + // for backward compatibility + if (ioEngine instanceof PersistentIOEngine && + !((PersistentIOEngine) ioEngine).isOldVersion()) { + byte[] PBMagic = new byte[ProtobufUtil.PB_MAGIC.length]; + ois.read(PBMagic); + int length = ois.readInt(); + byte[] persistenceChecksum = new byte[length]; + ois.read(persistenceChecksum); + } long capacitySize = ois.readLong(); if (capacitySize != cacheCapacity) throw new IOException("Mismatched cache capacity:" @@ -1078,9 +1097,8 @@ public class BucketCache implements BlockCache, HeapSize { bucketAllocator = allocator; deserialiserMap = deserMap; backingMap = backingMapFromFile; + blockNumber.set(backingMap.size()); } finally { - if (ois != null) ois.close(); - if (fis != null) fis.close(); if (!persistenceFile.delete()) { throw new IOException("Failed deleting persistence file " + persistenceFile.getAbsolutePath()); @@ -1597,4 +1615,9 @@ public class BucketCache implements BlockCache, HeapSize { float getMemoryFactor() { return memoryFactor; } + + @VisibleForTesting + public UniqueIndexMap getDeserialiserMap() { + return deserialiserMap; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index 7d3a9fa6fcc..f26c6c5dd1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -19,12 +19,16 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.ObjectInputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.concurrent.locks.ReentrantLock; @@ -32,15 +36,20 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; /** * IO engine that stores data to a file on the local file system. */ @InterfaceAudience.Private -public class FileIOEngine implements IOEngine { +public class FileIOEngine implements PersistentIOEngine { private static final Log LOG = LogFactory.getLog(FileIOEngine.class); public static final String FILE_DELIMITER = ","; + private static final DuFileCommand DU = new DuFileCommand(new String[] {"du", ""}); + private final String[] filePaths; private final FileChannel[] fileChannels; private final RandomAccessFile[] rafs; @@ -48,17 +57,58 @@ public class FileIOEngine implements IOEngine { private final long sizePerFile; private final long capacity; + private final String algorithmName; + private boolean oldVersion; private FileReadAccessor readAccessor = new FileReadAccessor(); private FileWriteAccessor writeAccessor = new FileWriteAccessor(); - public FileIOEngine(long capacity, String... filePaths) throws IOException { + public FileIOEngine(String algorithmName, String persistentPath, + long capacity, String... filePaths) throws IOException { this.sizePerFile = capacity / filePaths.length; this.capacity = this.sizePerFile * filePaths.length; this.filePaths = filePaths; this.fileChannels = new FileChannel[filePaths.length]; this.rafs = new RandomAccessFile[filePaths.length]; this.channelLocks = new ReentrantLock[filePaths.length]; + this.algorithmName = algorithmName; + verifyFileIntegrity(persistentPath); + init(); + } + + /** + * Verify cache files's integrity + * @param persistentPath the backingMap persistent path + */ + @Override + public void verifyFileIntegrity(String persistentPath) { + if (persistentPath != null) { + byte[] persistentChecksum = readPersistentChecksum(persistentPath); + if (!oldVersion) { + try { + byte[] calculateChecksum = calculateChecksum(); + if (!Bytes.equals(persistentChecksum, calculateChecksum)) { + LOG.warn("The persistent checksum is " + Bytes.toString(persistentChecksum) + + ", but the calculate checksum is " + Bytes.toString(calculateChecksum)); + throw new IOException(); + } + } catch (IOException ioex) { + LOG.error("File verification failed because of ", ioex); + // delete cache files and backingMap persistent file. + deleteCacheDataFile(); + new File(persistentPath).delete(); + } catch (NoSuchAlgorithmException nsae) { + LOG.error("No such algorithm " + algorithmName, nsae); + throw new RuntimeException(nsae); + } + } + } else { + // not configure persistent path + deleteCacheDataFile(); + } + } + + private void init() throws IOException { for (int i = 0; i < filePaths.length; i++) { String filePath = filePaths[i]; try { @@ -68,15 +118,15 @@ public class FileIOEngine implements IOEngine { // The next setting length will throw exception,logging this message // is just used for the detail reason of exception, String msg = "Only " + StringUtils.byteDesc(totalSpace) - + " total space under " + filePath + ", not enough for requested " - + StringUtils.byteDesc(sizePerFile); + + " total space under " + filePath + ", not enough for requested " + + StringUtils.byteDesc(sizePerFile); LOG.warn(msg); } rafs[i].setLength(sizePerFile); fileChannels[i] = rafs[i].getChannel(); channelLocks[i] = new ReentrantLock(); LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile) - + ", on the path:" + filePath); + + ", on the path: " + filePath); } catch (IOException fex) { LOG.error("Failed allocating cache on " + filePath, fex); shutdown(); @@ -267,6 +317,98 @@ public class FileIOEngine implements IOEngine { } } + /** + * Read the persistent checksum from persistent path + * @param persistentPath the backingMap persistent path + * @return the persistent checksum + */ + private byte[] readPersistentChecksum(String persistentPath) { + try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(persistentPath))) { + byte[] PBMagic = new byte[ProtobufUtil.PB_MAGIC.length]; + ois.read(PBMagic); + if (Bytes.equals(ProtobufUtil.PB_MAGIC, PBMagic)) { + int length = ois.readInt(); + byte[] persistentChecksum = new byte[length]; + ois.read(persistentChecksum); + return persistentChecksum; + } else { + // if the persistent file is not start with PB_MAGIC, it's an old version file + oldVersion = true; + } + } catch (IOException ioex) { + LOG.warn("Failed read persistent checksum, because of " + ioex); + return null; + } + return null; + } + + @Override + public void deleteCacheDataFile() { + if (filePaths == null) { + return; + } + for (String file : filePaths) { + new File(file).delete(); + } + } + + @Override + public byte[] calculateChecksum() + throws IOException, NoSuchAlgorithmException { + if (filePaths == null) { + return null; + } + StringBuilder sb = new StringBuilder(); + for (String filePath : filePaths){ + File file = new File(filePath); + if (file.exists()){ + sb.append(filePath); + sb.append(getFileSize(filePath)); + sb.append(file.lastModified()); + } else { + throw new IOException("Cache file: " + filePath + " is not exists."); + } + } + MessageDigest messageDigest = MessageDigest.getInstance(algorithmName); + messageDigest.update(Bytes.toBytes(sb.toString())); + return messageDigest.digest(); + } + + @Override + public boolean isOldVersion() { + return oldVersion; + } + + /** + * Using Linux command du to get file's real size + * @param filePath the file + * @return file's real size + * @throws IOException something happened like file not exists + */ + private static long getFileSize(String filePath) throws IOException { + DU.setExecCommand(filePath); + DU.execute(); + return Long.parseLong(DU.getOutput().split("\t")[0]); + } + + private static class DuFileCommand extends Shell.ShellCommandExecutor { + private String[] execCommand; + + DuFileCommand(String[] execString) { + super(execString); + execCommand = execString; + } + + void setExecCommand(String filePath) { + this.execCommand[1] = filePath; + } + + @Override + public String[] getExecString() { + return this.execCommand; + } + } + private static interface FileAccessor { int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java new file mode 100644 index 00000000000..556f5c5f2e8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java @@ -0,0 +1,59 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; +import java.security.NoSuchAlgorithmException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A class implementing PersistentIOEngine interface supports persistent and file integrity verify + * for {@link BucketCache} + */ +@InterfaceAudience.Private +public interface PersistentIOEngine extends IOEngine { + + /** + * Delete bucketcache files + */ + void deleteCacheDataFile(); + + /** + * Using an encryption algorithm to calculate a checksum, the default encryption algorithm is MD5 + * @return the checksum which is convert to HexString + * @throws IOException something happened like file not exists + * @throws NoSuchAlgorithmException no such algorithm + */ + byte[] calculateChecksum() + throws IOException, NoSuchAlgorithmException; + + /** + * Whether the persistent file support verify file integrity, old version file + * does not support verification, it's for back compatibility + * @return true if the persistent file does not support verify file integrity + */ + boolean isOldVersion(); + + /** + * Verify cache files's integrity + * @param persistentPath the backingMap persistent path + */ + void verifyFileIntegrity(String persistentPath); +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java index 6e677d5e0fa..d85aec9e92b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java @@ -68,7 +68,7 @@ public class TestFileIOEngine { @Before public void setUp() throws IOException { - fileIOEngine = new FileIOEngine(TOTAL_CAPACITY, FILE_PATHS); + fileIOEngine = new FileIOEngine("MD5", null, TOTAL_CAPACITY, FILE_PATHS); } @After diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java new file mode 100644 index 00000000000..c54315ff183 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -0,0 +1,297 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.OutputStreamWriter; +import java.util.Arrays; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; +import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Basic test for check file's integrity before start BucketCache in fileIOEngine + */ +@RunWith(Parameterized.class) +@Category(SmallTests.class) +public class TestVerifyBucketCacheFile { + @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") + public static Iterable data() { + return Arrays.asList(new Object[][] { { 8192, null }, { 16 * 1024, + new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, + 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, + 128 * 1024 + 1024 } } }); + } + + @Parameterized.Parameter(0) + public int constructedBlockSize; + + @Parameterized.Parameter(1) + public int[] constructedBlockSizes; + + final long capacitySize = 32 * 1024 * 1024; + final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; + final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; + + /** + * Test cache file or persistence file does not exist whether BucketCache starts normally + * (1) Start BucketCache and add some blocks, then shutdown BucketCache and persist cache + * to file. Restart BucketCache and it can restore cache from file. + * (2) Delete bucket cache file after shutdown BucketCache. Restart BucketCache and it can't + * restore cache from file, the cache file and persistence file would be deleted before + * BucketCache start normally. + * (3) Delete persistence file after shutdown BucketCache. Restart BucketCache and it can't + * restore cache from file, the cache file and persistence file would be deleted before + * BucketCache start normally. + * @throws Exception the exception + */ + @Test + public void testRetrieveFromFile() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize == 0); + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // 1.persist cache to file + bucketCache.shutdown(); + // restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); + // persist cache to file + bucketCache.shutdown(); + + // 2.delete bucket cache file + File cacheFile = new File(testDir + "/bucket.cache"); + assertTrue(cacheFile.delete()); + // can't restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // persist cache to file + bucketCache.shutdown(); + + // 3.delete backingMap persistence file + File mapFile = new File(testDir + "/bucket.persistence"); + assertTrue(mapFile.delete()); + // can't restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + + TEST_UTIL.cleanupTestDir(); + } + + /** + * Test whether BucketCache is started normally after modifying the cache file. + * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file. + * Restart BucketCache after modify cache file's data, and it can't restore cache from file, + * the cache file and persistence file would be deleted before BucketCache start normally. + * @throws Exception the exception + */ + @Test + public void testModifiedBucketCacheFileData() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize == 0); + + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // persist cache to file + bucketCache.shutdown(); + + // modified bucket cache file + String file = testDir + "/bucket.cache"; + try(BufferedWriter out = new BufferedWriter(new OutputStreamWriter( + new FileOutputStream(file, true)))) { + out.write("test bucket cache"); + } + // can't restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + + TEST_UTIL.cleanupTestDir(); + } + + /** + * Test whether BucketCache is started normally after modifying the cache file's last modified + * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist + * cache to file. Then Restart BucketCache after modify cache file's last modified time, and + * it can't restore cache from file, the cache file and persistence file would be deleted + * before BucketCache start normally. + * @throws Exception the exception + */ + @Test + public void testModifiedBucketCacheFileTime() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize == 0); + + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // persist cache to file + bucketCache.shutdown(); + + // modified bucket cache file LastModifiedTime + File file = new File(testDir + "/bucket.cache"); + assertTrue(file.setLastModified(System.currentTimeMillis() + 1000)); + // can't restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + + TEST_UTIL.cleanupTestDir(); + } + + /** + * Test whether it can read the old version's persistence file, it's for backward compatibility. + * Start BucketCache and add some blocks, then persist cache to file in old way and shutdown + * BucketCache. Restart BucketCache, and it can normally restore from old version persistence + * file. + * @throws Exception the exception + */ + @Test + public void compatibilityTest() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + String persistencePath = testDir + "/bucket.persistence"; + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize == 0); + + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // persistence backingMap using old way + persistToFileInOldWay(persistencePath + ".old", bucketCache.getMaxSize(), + bucketCache.backingMap, bucketCache.getDeserialiserMap()); + bucketCache.shutdown(); + + // restore cache from file which skip check checksum + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath + ".old"); + assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); + assertEquals(blocks.length, bucketCache.backingMap.size()); + } + + private void persistToFileInOldWay(String persistencePath, long cacheCapacity, + ConcurrentMap backingMap, UniqueIndexMap deserialiserMap) + throws IOException { + try(ObjectOutputStream oos = new ObjectOutputStream( + new FileOutputStream(persistencePath, false))) { + oos.writeLong(cacheCapacity); + oos.writeUTF(FileIOEngine.class.getName()); + oos.writeUTF(backingMap.getClass().getName()); + oos.writeObject(deserialiserMap); + oos.writeObject(backingMap); + } + } + + private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) + throws InterruptedException { + while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { + Thread.sleep(100); + } + } + + // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer + // threads will flush it to the bucket and put reference entry in backingMap. + private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, + Cacheable block) throws InterruptedException { + cache.cacheBlock(cacheKey, block); + waitUntilFlushedToBucket(cache, cacheKey); + } +}