HBASE-23017 Verify the file integrity in persistent IOEngine
Signed-off-by: Anoop Sam John <anoopsamjohn@apacher.org> Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
parent
fd9cfd789b
commit
16da123df4
|
@ -31,6 +31,7 @@ message BucketCacheEntry {
|
|||
required string map_class = 3;
|
||||
map<int32, string> deserializers = 4;
|
||||
required BackingMap backing_map = 5;
|
||||
optional bytes checksum = 6;
|
||||
}
|
||||
|
||||
message BackingMap {
|
||||
|
|
|
@ -238,6 +238,16 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
/** In-memory bucket size */
|
||||
private float memoryFactor;
|
||||
|
||||
private static final String FILE_VERIFY_ALGORITHM =
|
||||
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
|
||||
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
|
||||
|
||||
/**
|
||||
* Use {@link java.security.MessageDigest} class's encryption algorithms to check
|
||||
* persistent file integrity, default algorithm is MD5
|
||||
* */
|
||||
private String algorithm;
|
||||
|
||||
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
|
||||
int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
|
||||
this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
|
||||
|
@ -247,6 +257,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
|
||||
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
|
||||
Configuration conf) throws IOException {
|
||||
this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
|
||||
this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
|
||||
this.writerThreads = new WriterThread[writerThreadNum];
|
||||
long blockNumCapacity = capacity / blockSize;
|
||||
|
@ -1131,6 +1142,13 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
}
|
||||
|
||||
private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
|
||||
if (proto.hasChecksum()) {
|
||||
((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
|
||||
algorithm);
|
||||
} else {
|
||||
// if has not checksum, it means the persistence file is old format
|
||||
LOG.info("Persistent file is old format, it does not support verifying file integrity!");
|
||||
}
|
||||
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
|
||||
backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap());
|
||||
}
|
||||
|
@ -1235,6 +1253,10 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
return this.bucketAllocator.getUsedSize();
|
||||
}
|
||||
|
||||
protected String getAlgorithm() {
|
||||
return algorithm;
|
||||
}
|
||||
|
||||
/**
|
||||
* Evicts all blocks for a specific HFile.
|
||||
* <p>
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockPriority;
|
|||
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
|
||||
|
@ -46,7 +47,8 @@ final class BucketProtoUtils {
|
|||
.setMapClass(cache.backingMap.getClass().getName())
|
||||
.putAllDeserializers(CacheableDeserializerIdManager.save())
|
||||
.setBackingMap(BucketProtoUtils.toPB(cache.backingMap))
|
||||
.build();
|
||||
.setChecksum(ByteString.copyFrom(((PersistentIOEngine) cache.ioEngine).
|
||||
calculateChecksum(cache.getAlgorithm()))).build();
|
||||
}
|
||||
|
||||
private static BucketCacheProtos.BackingMap toPB(
|
||||
|
|
|
@ -43,10 +43,9 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
|||
* IO engine that stores data to a file on the local file system.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class FileIOEngine implements IOEngine {
|
||||
public class FileIOEngine extends PersistentIOEngine {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(FileIOEngine.class);
|
||||
public static final String FILE_DELIMITER = ",";
|
||||
private final String[] filePaths;
|
||||
private final FileChannel[] fileChannels;
|
||||
private final RandomAccessFile[] rafs;
|
||||
private final ReentrantLock[] channelLocks;
|
||||
|
@ -59,9 +58,9 @@ public class FileIOEngine implements IOEngine {
|
|||
|
||||
public FileIOEngine(long capacity, boolean maintainPersistence, String... filePaths)
|
||||
throws IOException {
|
||||
super(filePaths);
|
||||
this.sizePerFile = capacity / filePaths.length;
|
||||
this.capacity = this.sizePerFile * filePaths.length;
|
||||
this.filePaths = filePaths;
|
||||
this.fileChannels = new FileChannel[filePaths.length];
|
||||
if (!maintainPersistence) {
|
||||
for (String filePath : filePaths) {
|
||||
|
@ -90,7 +89,12 @@ public class FileIOEngine implements IOEngine {
|
|||
+ StringUtils.byteDesc(sizePerFile);
|
||||
LOG.warn(msg);
|
||||
}
|
||||
File file = new File(filePath);
|
||||
// setLength() method will change file's last modified time. So if don't do
|
||||
// this check, wrong time will be used when calculating checksum.
|
||||
if (file.length() != sizePerFile) {
|
||||
rafs[i].setLength(sizePerFile);
|
||||
}
|
||||
fileChannels[i] = rafs[i].getChannel();
|
||||
channelLocks[i] = new ReentrantLock();
|
||||
LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -37,7 +38,7 @@ import org.slf4j.LoggerFactory;
|
|||
* mechanism
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class FileMmapIOEngine implements IOEngine {
|
||||
public abstract class FileMmapIOEngine extends PersistentIOEngine {
|
||||
static final Logger LOG = LoggerFactory.getLogger(FileMmapIOEngine.class);
|
||||
|
||||
protected final String path;
|
||||
|
@ -47,13 +48,19 @@ public abstract class FileMmapIOEngine implements IOEngine {
|
|||
private RandomAccessFile raf = null;
|
||||
|
||||
public FileMmapIOEngine(String filePath, long capacity) throws IOException {
|
||||
super(filePath);
|
||||
this.path = filePath;
|
||||
this.size = capacity;
|
||||
long fileSize = 0;
|
||||
try {
|
||||
raf = new RandomAccessFile(filePath, "rw");
|
||||
fileSize = roundUp(capacity, ByteBufferArray.DEFAULT_BUFFER_SIZE);
|
||||
File file = new File(filePath);
|
||||
// setLength() method will change file's last modified time. So if don't do
|
||||
// this check, wrong time will be used when calculating checksum.
|
||||
if (file.length() != fileSize) {
|
||||
raf.setLength(fileSize);
|
||||
}
|
||||
fileChannel = raf.getChannel();
|
||||
LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath);
|
||||
} catch (java.io.FileNotFoundException fex) {
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A class implementing PersistentIOEngine interface supports file integrity verification
|
||||
* for {@link BucketCache} which use persistent IOEngine
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class PersistentIOEngine implements IOEngine {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PersistentIOEngine.class);
|
||||
private static final DuFileCommand DU = new DuFileCommand(new String[] {"du", ""});
|
||||
protected final String[] filePaths;
|
||||
|
||||
public PersistentIOEngine(String... filePaths) {
|
||||
this.filePaths = filePaths;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify cache files's integrity
|
||||
* @param algorithm the backingMap persistence path
|
||||
*/
|
||||
protected void verifyFileIntegrity(byte[] persistentChecksum, String algorithm)
|
||||
throws IOException {
|
||||
byte[] calculateChecksum = calculateChecksum(algorithm);
|
||||
if (!Bytes.equals(persistentChecksum, calculateChecksum)) {
|
||||
throw new IOException("Mismatch of checksum! The persistent checksum is " +
|
||||
Bytes.toString(persistentChecksum) + ", but the calculate checksum is " +
|
||||
Bytes.toString(calculateChecksum));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Using an encryption algorithm to calculate a checksum, the default encryption algorithm is MD5
|
||||
* @return the checksum which is convert to HexString
|
||||
* @throws IOException something happened like file not exists
|
||||
* @throws NoSuchAlgorithmException no such algorithm
|
||||
*/
|
||||
protected byte[] calculateChecksum(String algorithm) {
|
||||
try {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (String filePath : filePaths){
|
||||
File file = new File(filePath);
|
||||
sb.append(filePath);
|
||||
sb.append(getFileSize(filePath));
|
||||
sb.append(file.lastModified());
|
||||
}
|
||||
MessageDigest messageDigest = MessageDigest.getInstance(algorithm);
|
||||
messageDigest.update(Bytes.toBytes(sb.toString()));
|
||||
return messageDigest.digest();
|
||||
} catch (IOException ioex) {
|
||||
LOG.error("Calculating checksum failed, because of ", ioex);
|
||||
return new byte[0];
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
LOG.error("No such algorithm : " + algorithm + "!");
|
||||
return new byte[0];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Using Linux command du to get file's real size
|
||||
* @param filePath the file
|
||||
* @return file's real size
|
||||
* @throws IOException something happened like file not exists
|
||||
*/
|
||||
private static long getFileSize(String filePath) throws IOException {
|
||||
DU.setExecCommand(filePath);
|
||||
DU.execute();
|
||||
return Long.parseLong(DU.getOutput().split("\t")[0]);
|
||||
}
|
||||
|
||||
private static class DuFileCommand extends Shell.ShellCommandExecutor {
|
||||
private String[] execCommand;
|
||||
|
||||
DuFileCommand(String[] execString) {
|
||||
super(execString);
|
||||
execCommand = execString;
|
||||
}
|
||||
|
||||
void setExecCommand(String filePath) {
|
||||
this.execCommand[1] = filePath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getExecString() {
|
||||
return this.execCommand;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,247 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
|
||||
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
/**
|
||||
* Basic test for check file's integrity before start BucketCache in fileIOEngine
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
@Category(MediumTests.class)
|
||||
public class TestVerifyBucketCacheFile {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class);
|
||||
|
||||
@Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
|
||||
public static Iterable<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] { { 8192, null }, { 16 * 1024,
|
||||
new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
|
||||
28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
|
||||
128 * 1024 + 1024 } } });
|
||||
}
|
||||
|
||||
@Parameterized.Parameter(0)
|
||||
public int constructedBlockSize;
|
||||
|
||||
@Parameterized.Parameter(1)
|
||||
public int[] constructedBlockSizes;
|
||||
|
||||
final long capacitySize = 32 * 1024 * 1024;
|
||||
final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
|
||||
final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
|
||||
|
||||
/**
|
||||
* Test cache file or persistence file does not exist whether BucketCache starts normally
|
||||
* (1) Start BucketCache and add some blocks, then shutdown BucketCache and persist cache
|
||||
* to file. Restart BucketCache and it can restore cache from file.
|
||||
* (2) Delete bucket cache file after shutdown BucketCache. Restart BucketCache and it can't
|
||||
* restore cache from file, the cache file and persistence file would be deleted before
|
||||
* BucketCache start normally.
|
||||
* (3) Delete persistence file after shutdown BucketCache. Restart BucketCache and it can't
|
||||
* restore cache from file, the cache file and persistence file would be deleted before
|
||||
* BucketCache start normally.
|
||||
* @throws Exception the exception
|
||||
*/
|
||||
@Test
|
||||
public void testRetrieveFromFile() throws Exception {
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
Path testDir = TEST_UTIL.getDataTestDir();
|
||||
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||
|
||||
BucketCache bucketCache =
|
||||
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||
long usedSize = bucketCache.getAllocator().getUsedSize();
|
||||
assertTrue(usedSize == 0);
|
||||
CacheTestUtils.HFileBlockPair[] blocks =
|
||||
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
|
||||
// Add blocks
|
||||
for (CacheTestUtils.HFileBlockPair block : blocks) {
|
||||
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
|
||||
}
|
||||
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||
assertTrue(usedSize != 0);
|
||||
// 1.persist cache to file
|
||||
bucketCache.shutdown();
|
||||
// restore cache from file
|
||||
bucketCache =
|
||||
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
|
||||
// persist cache to file
|
||||
bucketCache.shutdown();
|
||||
|
||||
// 2.delete bucket cache file
|
||||
File cacheFile = new File(testDir + "/bucket.cache");
|
||||
assertTrue(cacheFile.delete());
|
||||
// can't restore cache from file
|
||||
bucketCache =
|
||||
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||
assertEquals(0, bucketCache.getAllocator().getUsedSize());
|
||||
assertEquals(0, bucketCache.backingMap.size());
|
||||
// Add blocks
|
||||
for (CacheTestUtils.HFileBlockPair block : blocks) {
|
||||
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
|
||||
}
|
||||
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||
assertTrue(usedSize != 0);
|
||||
// persist cache to file
|
||||
bucketCache.shutdown();
|
||||
|
||||
// 3.delete backingMap persistence file
|
||||
File mapFile = new File(testDir + "/bucket.persistence");
|
||||
assertTrue(mapFile.delete());
|
||||
// can't restore cache from file
|
||||
bucketCache =
|
||||
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||
assertEquals(0, bucketCache.getAllocator().getUsedSize());
|
||||
assertEquals(0, bucketCache.backingMap.size());
|
||||
|
||||
TEST_UTIL.cleanupTestDir();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test whether BucketCache is started normally after modifying the cache file.
|
||||
* Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file.
|
||||
* Restart BucketCache after modify cache file's data, and it can't restore cache from file,
|
||||
* the cache file and persistence file would be deleted before BucketCache start normally.
|
||||
* @throws Exception the exception
|
||||
*/
|
||||
@Test
|
||||
public void testModifiedBucketCacheFileData() throws Exception {
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
Path testDir = TEST_UTIL.getDataTestDir();
|
||||
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||
|
||||
BucketCache bucketCache =
|
||||
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||
long usedSize = bucketCache.getAllocator().getUsedSize();
|
||||
assertTrue(usedSize == 0);
|
||||
|
||||
CacheTestUtils.HFileBlockPair[] blocks =
|
||||
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
|
||||
// Add blocks
|
||||
for (CacheTestUtils.HFileBlockPair block : blocks) {
|
||||
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
|
||||
}
|
||||
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||
assertTrue(usedSize != 0);
|
||||
// persist cache to file
|
||||
bucketCache.shutdown();
|
||||
|
||||
// modified bucket cache file
|
||||
String file = testDir + "/bucket.cache";
|
||||
try(BufferedWriter out = new BufferedWriter(new OutputStreamWriter(
|
||||
new FileOutputStream(file, false)))) {
|
||||
out.write("test bucket cache");
|
||||
}
|
||||
// can't restore cache from file
|
||||
bucketCache =
|
||||
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||
assertEquals(0, bucketCache.getAllocator().getUsedSize());
|
||||
assertEquals(0, bucketCache.backingMap.size());
|
||||
|
||||
TEST_UTIL.cleanupTestDir();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test whether BucketCache is started normally after modifying the cache file's last modified
|
||||
* time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist
|
||||
* cache to file. Then Restart BucketCache after modify cache file's last modified time, and
|
||||
* it can't restore cache from file, the cache file and persistence file would be deleted
|
||||
* before BucketCache start normally.
|
||||
* @throws Exception the exception
|
||||
*/
|
||||
@Test
|
||||
public void testModifiedBucketCacheFileTime() throws Exception {
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
Path testDir = TEST_UTIL.getDataTestDir();
|
||||
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||
|
||||
BucketCache bucketCache =
|
||||
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||
long usedSize = bucketCache.getAllocator().getUsedSize();
|
||||
assertTrue(usedSize == 0);
|
||||
|
||||
CacheTestUtils.HFileBlockPair[] blocks =
|
||||
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
|
||||
// Add blocks
|
||||
for (CacheTestUtils.HFileBlockPair block : blocks) {
|
||||
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
|
||||
}
|
||||
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||
assertTrue(usedSize != 0);
|
||||
// persist cache to file
|
||||
bucketCache.shutdown();
|
||||
|
||||
// modified bucket cache file LastModifiedTime
|
||||
File file = new File(testDir + "/bucket.cache");
|
||||
assertTrue(file.setLastModified(System.currentTimeMillis() + 1000));
|
||||
// can't restore cache from file
|
||||
bucketCache =
|
||||
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||
assertEquals(0, bucketCache.getAllocator().getUsedSize());
|
||||
assertEquals(0, bucketCache.backingMap.size());
|
||||
|
||||
TEST_UTIL.cleanupTestDir();
|
||||
}
|
||||
|
||||
private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
|
||||
throws InterruptedException {
|
||||
while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
|
||||
// threads will flush it to the bucket and put reference entry in backingMap.
|
||||
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
|
||||
Cacheable block) throws InterruptedException {
|
||||
cache.cacheBlock(cacheKey, block);
|
||||
waitUntilFlushedToBucket(cache, cacheKey);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue