HBASE-22890 Verify the files when RegionServer is starting and BucketCache is in file mode
Signed-off-by: Reid Chan <reidchan@apache.org> Signed-off-by: Stack <stack@apache.org>
This commit is contained in:
parent
20d7da50e9
commit
5bf60ec55f
|
@ -29,6 +29,7 @@ import java.io.ObjectInputStream;
|
||||||
import java.io.ObjectOutputStream;
|
import java.io.ObjectOutputStream;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -69,6 +70,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
|
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
|
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.HasThread;
|
import org.apache.hadoop.hbase.util.HasThread;
|
||||||
import org.apache.hadoop.hbase.util.IdReadWriteLock;
|
import org.apache.hadoop.hbase.util.IdReadWriteLock;
|
||||||
|
@ -242,6 +244,17 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
/** In-memory bucket size */
|
/** In-memory bucket size */
|
||||||
private float memoryFactor;
|
private float memoryFactor;
|
||||||
|
|
||||||
|
private String ioEngineName;
|
||||||
|
private static final String FILE_VERIFY_ALGORITHM =
|
||||||
|
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
|
||||||
|
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use {@link java.security.MessageDigest} class's encryption algorithms to check
|
||||||
|
* persistent file integrity, default algorithm is MD5
|
||||||
|
* */
|
||||||
|
private String algorithm;
|
||||||
|
|
||||||
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
|
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
|
||||||
int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
|
int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
|
||||||
IOException {
|
IOException {
|
||||||
|
@ -252,8 +265,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
|
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
|
||||||
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
|
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
|
||||||
Configuration conf)
|
Configuration conf)
|
||||||
throws FileNotFoundException, IOException {
|
throws IOException {
|
||||||
this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
|
|
||||||
this.writerThreads = new WriterThread[writerThreadNum];
|
this.writerThreads = new WriterThread[writerThreadNum];
|
||||||
long blockNumCapacity = capacity / blockSize;
|
long blockNumCapacity = capacity / blockSize;
|
||||||
if (blockNumCapacity >= Integer.MAX_VALUE) {
|
if (blockNumCapacity >= Integer.MAX_VALUE) {
|
||||||
|
@ -275,6 +287,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
", memoryFactor: " + memoryFactor);
|
", memoryFactor: " + memoryFactor);
|
||||||
|
|
||||||
this.cacheCapacity = capacity;
|
this.cacheCapacity = capacity;
|
||||||
|
this.ioEngineName = ioEngineName;
|
||||||
this.persistencePath = persistencePath;
|
this.persistencePath = persistencePath;
|
||||||
this.blockSize = blockSize;
|
this.blockSize = blockSize;
|
||||||
this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
|
this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
|
||||||
|
@ -288,14 +301,15 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();
|
this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();
|
||||||
|
|
||||||
this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);
|
this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);
|
||||||
|
this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
|
||||||
|
ioEngine = getIOEngineFromName();
|
||||||
if (ioEngine.isPersistent() && persistencePath != null) {
|
if (ioEngine.isPersistent() && persistencePath != null) {
|
||||||
try {
|
try {
|
||||||
retrieveFromFile(bucketSizes);
|
retrieveFromFile(bucketSizes);
|
||||||
} catch (IOException ioex) {
|
} catch (IOException ioex) {
|
||||||
LOG.error("Can't restore from file because of", ioex);
|
LOG.error("Can't restore from file because of", ioex);
|
||||||
} catch (ClassNotFoundException cnfe) {
|
} catch (ClassNotFoundException cnfe) {
|
||||||
LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
|
LOG.error("Can't restore from file in rebuild because can't deserialise", cnfe);
|
||||||
throw new RuntimeException(cnfe);
|
throw new RuntimeException(cnfe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -359,12 +373,10 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the IOEngine from the IO engine name
|
* Get the IOEngine from the IO engine name
|
||||||
* @param ioEngineName
|
|
||||||
* @param capacity
|
|
||||||
* @return the IOEngine
|
* @return the IOEngine
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
|
private IOEngine getIOEngineFromName()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
|
if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
|
||||||
// In order to make the usage simple, we only need the prefix 'files:' in
|
// In order to make the usage simple, we only need the prefix 'files:' in
|
||||||
|
@ -372,11 +384,11 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
// the compatibility
|
// the compatibility
|
||||||
String[] filePaths =
|
String[] filePaths =
|
||||||
ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER);
|
ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER);
|
||||||
return new FileIOEngine(capacity, filePaths);
|
return new FileIOEngine(algorithm, persistencePath, cacheCapacity, filePaths);
|
||||||
} else if (ioEngineName.startsWith("offheap"))
|
} else if (ioEngineName.startsWith("offheap"))
|
||||||
return new ByteBufferIOEngine(capacity, true);
|
return new ByteBufferIOEngine(cacheCapacity, true);
|
||||||
else if (ioEngineName.startsWith("heap"))
|
else if (ioEngineName.startsWith("heap"))
|
||||||
return new ByteBufferIOEngine(capacity, false);
|
return new ByteBufferIOEngine(cacheCapacity, false);
|
||||||
else
|
else
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Don't understand io engine name for cache - prefix with file:, heap or offheap");
|
"Don't understand io engine name for cache - prefix with file:, heap or offheap");
|
||||||
|
@ -1021,41 +1033,48 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
|
|
||||||
private void persistToFile() throws IOException {
|
private void persistToFile() throws IOException {
|
||||||
assert !cacheEnabled;
|
assert !cacheEnabled;
|
||||||
FileOutputStream fos = null;
|
try (ObjectOutputStream oos = new ObjectOutputStream(
|
||||||
ObjectOutputStream oos = null;
|
new FileOutputStream(persistencePath, false))){
|
||||||
try {
|
|
||||||
if (!ioEngine.isPersistent()) {
|
if (!ioEngine.isPersistent()) {
|
||||||
throw new IOException("Attempt to persist non-persistent cache mappings!");
|
throw new IOException("Attempt to persist non-persistent cache mappings!");
|
||||||
}
|
}
|
||||||
fos = new FileOutputStream(persistencePath, false);
|
if (ioEngine instanceof PersistentIOEngine) {
|
||||||
oos = new ObjectOutputStream(fos);
|
oos.write(ProtobufUtil.PB_MAGIC);
|
||||||
|
byte[] checksum = ((PersistentIOEngine) ioEngine).calculateChecksum();
|
||||||
|
oos.writeInt(checksum.length);
|
||||||
|
oos.write(checksum);
|
||||||
|
}
|
||||||
oos.writeLong(cacheCapacity);
|
oos.writeLong(cacheCapacity);
|
||||||
oos.writeUTF(ioEngine.getClass().getName());
|
oos.writeUTF(ioEngine.getClass().getName());
|
||||||
oos.writeUTF(backingMap.getClass().getName());
|
oos.writeUTF(backingMap.getClass().getName());
|
||||||
oos.writeObject(deserialiserMap);
|
oos.writeObject(deserialiserMap);
|
||||||
oos.writeObject(backingMap);
|
oos.writeObject(backingMap);
|
||||||
} finally {
|
} catch (NoSuchAlgorithmException e) {
|
||||||
if (oos != null) oos.close();
|
LOG.error("No such algorithm : " + algorithm + "! Failed to persist data on exit",e);
|
||||||
if (fos != null) fos.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
|
private void retrieveFromFile(int[] bucketSizes) throws IOException,
|
||||||
ClassNotFoundException {
|
ClassNotFoundException {
|
||||||
File persistenceFile = new File(persistencePath);
|
File persistenceFile = new File(persistencePath);
|
||||||
if (!persistenceFile.exists()) {
|
if (!persistenceFile.exists()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
assert !cacheEnabled;
|
assert !cacheEnabled;
|
||||||
FileInputStream fis = null;
|
try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(persistencePath))){
|
||||||
ObjectInputStream ois = null;
|
|
||||||
try {
|
|
||||||
if (!ioEngine.isPersistent())
|
if (!ioEngine.isPersistent())
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"Attempt to restore non-persistent cache mappings!");
|
"Attempt to restore non-persistent cache mappings!");
|
||||||
fis = new FileInputStream(persistencePath);
|
// for backward compatibility
|
||||||
ois = new ObjectInputStream(fis);
|
if (ioEngine instanceof PersistentIOEngine &&
|
||||||
|
!((PersistentIOEngine) ioEngine).isOldVersion()) {
|
||||||
|
byte[] PBMagic = new byte[ProtobufUtil.PB_MAGIC.length];
|
||||||
|
ois.read(PBMagic);
|
||||||
|
int length = ois.readInt();
|
||||||
|
byte[] persistenceChecksum = new byte[length];
|
||||||
|
ois.read(persistenceChecksum);
|
||||||
|
}
|
||||||
long capacitySize = ois.readLong();
|
long capacitySize = ois.readLong();
|
||||||
if (capacitySize != cacheCapacity)
|
if (capacitySize != cacheCapacity)
|
||||||
throw new IOException("Mismatched cache capacity:"
|
throw new IOException("Mismatched cache capacity:"
|
||||||
|
@ -1078,9 +1097,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
bucketAllocator = allocator;
|
bucketAllocator = allocator;
|
||||||
deserialiserMap = deserMap;
|
deserialiserMap = deserMap;
|
||||||
backingMap = backingMapFromFile;
|
backingMap = backingMapFromFile;
|
||||||
|
blockNumber.set(backingMap.size());
|
||||||
} finally {
|
} finally {
|
||||||
if (ois != null) ois.close();
|
|
||||||
if (fis != null) fis.close();
|
|
||||||
if (!persistenceFile.delete()) {
|
if (!persistenceFile.delete()) {
|
||||||
throw new IOException("Failed deleting persistence file "
|
throw new IOException("Failed deleting persistence file "
|
||||||
+ persistenceFile.getAbsolutePath());
|
+ persistenceFile.getAbsolutePath());
|
||||||
|
@ -1597,4 +1615,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
float getMemoryFactor() {
|
float getMemoryFactor() {
|
||||||
return memoryFactor;
|
return memoryFactor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public UniqueIndexMap<Integer> getDeserialiserMap() {
|
||||||
|
return deserialiserMap;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,12 +19,16 @@
|
||||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.ObjectInputStream;
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.ClosedByInterruptException;
|
import java.nio.channels.ClosedByInterruptException;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
|
import java.security.MessageDigest;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
@ -32,15 +36,20 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* IO engine that stores data to a file on the local file system.
|
* IO engine that stores data to a file on the local file system.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class FileIOEngine implements IOEngine {
|
public class FileIOEngine implements PersistentIOEngine {
|
||||||
private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
|
private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
|
||||||
public static final String FILE_DELIMITER = ",";
|
public static final String FILE_DELIMITER = ",";
|
||||||
|
private static final DuFileCommand DU = new DuFileCommand(new String[] {"du", ""});
|
||||||
|
|
||||||
private final String[] filePaths;
|
private final String[] filePaths;
|
||||||
private final FileChannel[] fileChannels;
|
private final FileChannel[] fileChannels;
|
||||||
private final RandomAccessFile[] rafs;
|
private final RandomAccessFile[] rafs;
|
||||||
|
@ -48,17 +57,58 @@ public class FileIOEngine implements IOEngine {
|
||||||
|
|
||||||
private final long sizePerFile;
|
private final long sizePerFile;
|
||||||
private final long capacity;
|
private final long capacity;
|
||||||
|
private final String algorithmName;
|
||||||
|
private boolean oldVersion;
|
||||||
|
|
||||||
private FileReadAccessor readAccessor = new FileReadAccessor();
|
private FileReadAccessor readAccessor = new FileReadAccessor();
|
||||||
private FileWriteAccessor writeAccessor = new FileWriteAccessor();
|
private FileWriteAccessor writeAccessor = new FileWriteAccessor();
|
||||||
|
|
||||||
public FileIOEngine(long capacity, String... filePaths) throws IOException {
|
public FileIOEngine(String algorithmName, String persistentPath,
|
||||||
|
long capacity, String... filePaths) throws IOException {
|
||||||
this.sizePerFile = capacity / filePaths.length;
|
this.sizePerFile = capacity / filePaths.length;
|
||||||
this.capacity = this.sizePerFile * filePaths.length;
|
this.capacity = this.sizePerFile * filePaths.length;
|
||||||
this.filePaths = filePaths;
|
this.filePaths = filePaths;
|
||||||
this.fileChannels = new FileChannel[filePaths.length];
|
this.fileChannels = new FileChannel[filePaths.length];
|
||||||
this.rafs = new RandomAccessFile[filePaths.length];
|
this.rafs = new RandomAccessFile[filePaths.length];
|
||||||
this.channelLocks = new ReentrantLock[filePaths.length];
|
this.channelLocks = new ReentrantLock[filePaths.length];
|
||||||
|
this.algorithmName = algorithmName;
|
||||||
|
verifyFileIntegrity(persistentPath);
|
||||||
|
init();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify cache files's integrity
|
||||||
|
* @param persistentPath the backingMap persistent path
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void verifyFileIntegrity(String persistentPath) {
|
||||||
|
if (persistentPath != null) {
|
||||||
|
byte[] persistentChecksum = readPersistentChecksum(persistentPath);
|
||||||
|
if (!oldVersion) {
|
||||||
|
try {
|
||||||
|
byte[] calculateChecksum = calculateChecksum();
|
||||||
|
if (!Bytes.equals(persistentChecksum, calculateChecksum)) {
|
||||||
|
LOG.warn("The persistent checksum is " + Bytes.toString(persistentChecksum) +
|
||||||
|
", but the calculate checksum is " + Bytes.toString(calculateChecksum));
|
||||||
|
throw new IOException();
|
||||||
|
}
|
||||||
|
} catch (IOException ioex) {
|
||||||
|
LOG.error("File verification failed because of ", ioex);
|
||||||
|
// delete cache files and backingMap persistent file.
|
||||||
|
deleteCacheDataFile();
|
||||||
|
new File(persistentPath).delete();
|
||||||
|
} catch (NoSuchAlgorithmException nsae) {
|
||||||
|
LOG.error("No such algorithm " + algorithmName, nsae);
|
||||||
|
throw new RuntimeException(nsae);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// not configure persistent path
|
||||||
|
deleteCacheDataFile();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void init() throws IOException {
|
||||||
for (int i = 0; i < filePaths.length; i++) {
|
for (int i = 0; i < filePaths.length; i++) {
|
||||||
String filePath = filePaths[i];
|
String filePath = filePaths[i];
|
||||||
try {
|
try {
|
||||||
|
@ -68,15 +118,15 @@ public class FileIOEngine implements IOEngine {
|
||||||
// The next setting length will throw exception,logging this message
|
// The next setting length will throw exception,logging this message
|
||||||
// is just used for the detail reason of exception,
|
// is just used for the detail reason of exception,
|
||||||
String msg = "Only " + StringUtils.byteDesc(totalSpace)
|
String msg = "Only " + StringUtils.byteDesc(totalSpace)
|
||||||
+ " total space under " + filePath + ", not enough for requested "
|
+ " total space under " + filePath + ", not enough for requested "
|
||||||
+ StringUtils.byteDesc(sizePerFile);
|
+ StringUtils.byteDesc(sizePerFile);
|
||||||
LOG.warn(msg);
|
LOG.warn(msg);
|
||||||
}
|
}
|
||||||
rafs[i].setLength(sizePerFile);
|
rafs[i].setLength(sizePerFile);
|
||||||
fileChannels[i] = rafs[i].getChannel();
|
fileChannels[i] = rafs[i].getChannel();
|
||||||
channelLocks[i] = new ReentrantLock();
|
channelLocks[i] = new ReentrantLock();
|
||||||
LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
|
LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
|
||||||
+ ", on the path:" + filePath);
|
+ ", on the path: " + filePath);
|
||||||
} catch (IOException fex) {
|
} catch (IOException fex) {
|
||||||
LOG.error("Failed allocating cache on " + filePath, fex);
|
LOG.error("Failed allocating cache on " + filePath, fex);
|
||||||
shutdown();
|
shutdown();
|
||||||
|
@ -267,6 +317,98 @@ public class FileIOEngine implements IOEngine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the persistent checksum from persistent path
|
||||||
|
* @param persistentPath the backingMap persistent path
|
||||||
|
* @return the persistent checksum
|
||||||
|
*/
|
||||||
|
private byte[] readPersistentChecksum(String persistentPath) {
|
||||||
|
try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(persistentPath))) {
|
||||||
|
byte[] PBMagic = new byte[ProtobufUtil.PB_MAGIC.length];
|
||||||
|
ois.read(PBMagic);
|
||||||
|
if (Bytes.equals(ProtobufUtil.PB_MAGIC, PBMagic)) {
|
||||||
|
int length = ois.readInt();
|
||||||
|
byte[] persistentChecksum = new byte[length];
|
||||||
|
ois.read(persistentChecksum);
|
||||||
|
return persistentChecksum;
|
||||||
|
} else {
|
||||||
|
// if the persistent file is not start with PB_MAGIC, it's an old version file
|
||||||
|
oldVersion = true;
|
||||||
|
}
|
||||||
|
} catch (IOException ioex) {
|
||||||
|
LOG.warn("Failed read persistent checksum, because of " + ioex);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteCacheDataFile() {
|
||||||
|
if (filePaths == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (String file : filePaths) {
|
||||||
|
new File(file).delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] calculateChecksum()
|
||||||
|
throws IOException, NoSuchAlgorithmException {
|
||||||
|
if (filePaths == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for (String filePath : filePaths){
|
||||||
|
File file = new File(filePath);
|
||||||
|
if (file.exists()){
|
||||||
|
sb.append(filePath);
|
||||||
|
sb.append(getFileSize(filePath));
|
||||||
|
sb.append(file.lastModified());
|
||||||
|
} else {
|
||||||
|
throw new IOException("Cache file: " + filePath + " is not exists.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MessageDigest messageDigest = MessageDigest.getInstance(algorithmName);
|
||||||
|
messageDigest.update(Bytes.toBytes(sb.toString()));
|
||||||
|
return messageDigest.digest();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isOldVersion() {
|
||||||
|
return oldVersion;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Using Linux command du to get file's real size
|
||||||
|
* @param filePath the file
|
||||||
|
* @return file's real size
|
||||||
|
* @throws IOException something happened like file not exists
|
||||||
|
*/
|
||||||
|
private static long getFileSize(String filePath) throws IOException {
|
||||||
|
DU.setExecCommand(filePath);
|
||||||
|
DU.execute();
|
||||||
|
return Long.parseLong(DU.getOutput().split("\t")[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class DuFileCommand extends Shell.ShellCommandExecutor {
|
||||||
|
private String[] execCommand;
|
||||||
|
|
||||||
|
DuFileCommand(String[] execString) {
|
||||||
|
super(execString);
|
||||||
|
execCommand = execString;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setExecCommand(String filePath) {
|
||||||
|
this.execCommand[1] = filePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getExecString() {
|
||||||
|
return this.execCommand;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static interface FileAccessor {
|
private static interface FileAccessor {
|
||||||
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
|
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Copyright The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A class implementing PersistentIOEngine interface supports persistent and file integrity verify
|
||||||
|
* for {@link BucketCache}
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public interface PersistentIOEngine extends IOEngine {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete bucketcache files
|
||||||
|
*/
|
||||||
|
void deleteCacheDataFile();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Using an encryption algorithm to calculate a checksum, the default encryption algorithm is MD5
|
||||||
|
* @return the checksum which is convert to HexString
|
||||||
|
* @throws IOException something happened like file not exists
|
||||||
|
* @throws NoSuchAlgorithmException no such algorithm
|
||||||
|
*/
|
||||||
|
byte[] calculateChecksum()
|
||||||
|
throws IOException, NoSuchAlgorithmException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the persistent file support verify file integrity, old version file
|
||||||
|
* does not support verification, it's for back compatibility
|
||||||
|
* @return true if the persistent file does not support verify file integrity
|
||||||
|
*/
|
||||||
|
boolean isOldVersion();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify cache files's integrity
|
||||||
|
* @param persistentPath the backingMap persistent path
|
||||||
|
*/
|
||||||
|
void verifyFileIntegrity(String persistentPath);
|
||||||
|
}
|
|
@ -68,7 +68,7 @@ public class TestFileIOEngine {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws IOException {
|
public void setUp() throws IOException {
|
||||||
fileIOEngine = new FileIOEngine(TOTAL_CAPACITY, FILE_PATHS);
|
fileIOEngine = new FileIOEngine("MD5", null, TOTAL_CAPACITY, FILE_PATHS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
|
@ -0,0 +1,297 @@
|
||||||
|
/**
|
||||||
|
* Copyright The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.BufferedWriter;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.ObjectOutputStream;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Basic test for check file's integrity before start BucketCache in fileIOEngine
|
||||||
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestVerifyBucketCacheFile {
|
||||||
|
@Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
|
||||||
|
public static Iterable<Object[]> data() {
|
||||||
|
return Arrays.asList(new Object[][] { { 8192, null }, { 16 * 1024,
|
||||||
|
new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
|
||||||
|
28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
|
||||||
|
128 * 1024 + 1024 } } });
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameter(0)
|
||||||
|
public int constructedBlockSize;
|
||||||
|
|
||||||
|
@Parameterized.Parameter(1)
|
||||||
|
public int[] constructedBlockSizes;
|
||||||
|
|
||||||
|
final long capacitySize = 32 * 1024 * 1024;
|
||||||
|
final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
|
||||||
|
final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test cache file or persistence file does not exist whether BucketCache starts normally
|
||||||
|
* (1) Start BucketCache and add some blocks, then shutdown BucketCache and persist cache
|
||||||
|
* to file. Restart BucketCache and it can restore cache from file.
|
||||||
|
* (2) Delete bucket cache file after shutdown BucketCache. Restart BucketCache and it can't
|
||||||
|
* restore cache from file, the cache file and persistence file would be deleted before
|
||||||
|
* BucketCache start normally.
|
||||||
|
* (3) Delete persistence file after shutdown BucketCache. Restart BucketCache and it can't
|
||||||
|
* restore cache from file, the cache file and persistence file would be deleted before
|
||||||
|
* BucketCache start normally.
|
||||||
|
* @throws Exception the exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRetrieveFromFile() throws Exception {
|
||||||
|
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
Path testDir = TEST_UTIL.getDataTestDir();
|
||||||
|
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||||
|
|
||||||
|
BucketCache bucketCache =
|
||||||
|
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||||
|
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||||
|
long usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertTrue(usedSize == 0);
|
||||||
|
CacheTestUtils.HFileBlockPair[] blocks =
|
||||||
|
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
|
||||||
|
// Add blocks
|
||||||
|
for (CacheTestUtils.HFileBlockPair block : blocks) {
|
||||||
|
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
|
||||||
|
}
|
||||||
|
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertTrue(usedSize != 0);
|
||||||
|
// 1.persist cache to file
|
||||||
|
bucketCache.shutdown();
|
||||||
|
// restore cache from file
|
||||||
|
bucketCache =
|
||||||
|
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||||
|
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||||
|
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
|
||||||
|
// persist cache to file
|
||||||
|
bucketCache.shutdown();
|
||||||
|
|
||||||
|
// 2.delete bucket cache file
|
||||||
|
File cacheFile = new File(testDir + "/bucket.cache");
|
||||||
|
assertTrue(cacheFile.delete());
|
||||||
|
// can't restore cache from file
|
||||||
|
bucketCache =
|
||||||
|
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||||
|
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||||
|
assertEquals(0, bucketCache.getAllocator().getUsedSize());
|
||||||
|
assertEquals(0, bucketCache.backingMap.size());
|
||||||
|
// Add blocks
|
||||||
|
for (CacheTestUtils.HFileBlockPair block : blocks) {
|
||||||
|
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
|
||||||
|
}
|
||||||
|
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertTrue(usedSize != 0);
|
||||||
|
// persist cache to file
|
||||||
|
bucketCache.shutdown();
|
||||||
|
|
||||||
|
// 3.delete backingMap persistence file
|
||||||
|
File mapFile = new File(testDir + "/bucket.persistence");
|
||||||
|
assertTrue(mapFile.delete());
|
||||||
|
// can't restore cache from file
|
||||||
|
bucketCache =
|
||||||
|
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||||
|
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||||
|
assertEquals(0, bucketCache.getAllocator().getUsedSize());
|
||||||
|
assertEquals(0, bucketCache.backingMap.size());
|
||||||
|
|
||||||
|
TEST_UTIL.cleanupTestDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test whether BucketCache is started normally after modifying the cache file.
|
||||||
|
* Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file.
|
||||||
|
* Restart BucketCache after modify cache file's data, and it can't restore cache from file,
|
||||||
|
* the cache file and persistence file would be deleted before BucketCache start normally.
|
||||||
|
* @throws Exception the exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testModifiedBucketCacheFileData() throws Exception {
|
||||||
|
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
Path testDir = TEST_UTIL.getDataTestDir();
|
||||||
|
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||||
|
|
||||||
|
BucketCache bucketCache =
|
||||||
|
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||||
|
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||||
|
long usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertTrue(usedSize == 0);
|
||||||
|
|
||||||
|
CacheTestUtils.HFileBlockPair[] blocks =
|
||||||
|
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
|
||||||
|
// Add blocks
|
||||||
|
for (CacheTestUtils.HFileBlockPair block : blocks) {
|
||||||
|
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
|
||||||
|
}
|
||||||
|
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertTrue(usedSize != 0);
|
||||||
|
// persist cache to file
|
||||||
|
bucketCache.shutdown();
|
||||||
|
|
||||||
|
// modified bucket cache file
|
||||||
|
String file = testDir + "/bucket.cache";
|
||||||
|
try(BufferedWriter out = new BufferedWriter(new OutputStreamWriter(
|
||||||
|
new FileOutputStream(file, true)))) {
|
||||||
|
out.write("test bucket cache");
|
||||||
|
}
|
||||||
|
// can't restore cache from file
|
||||||
|
bucketCache =
|
||||||
|
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||||
|
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||||
|
assertEquals(0, bucketCache.getAllocator().getUsedSize());
|
||||||
|
assertEquals(0, bucketCache.backingMap.size());
|
||||||
|
|
||||||
|
TEST_UTIL.cleanupTestDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test whether BucketCache is started normally after modifying the cache file's last modified
|
||||||
|
* time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist
|
||||||
|
* cache to file. Then Restart BucketCache after modify cache file's last modified time, and
|
||||||
|
* it can't restore cache from file, the cache file and persistence file would be deleted
|
||||||
|
* before BucketCache start normally.
|
||||||
|
* @throws Exception the exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testModifiedBucketCacheFileTime() throws Exception {
|
||||||
|
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
Path testDir = TEST_UTIL.getDataTestDir();
|
||||||
|
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||||
|
|
||||||
|
BucketCache bucketCache =
|
||||||
|
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||||
|
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||||
|
long usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertTrue(usedSize == 0);
|
||||||
|
|
||||||
|
CacheTestUtils.HFileBlockPair[] blocks =
|
||||||
|
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
|
||||||
|
// Add blocks
|
||||||
|
for (CacheTestUtils.HFileBlockPair block : blocks) {
|
||||||
|
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
|
||||||
|
}
|
||||||
|
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertTrue(usedSize != 0);
|
||||||
|
// persist cache to file
|
||||||
|
bucketCache.shutdown();
|
||||||
|
|
||||||
|
// modified bucket cache file LastModifiedTime
|
||||||
|
File file = new File(testDir + "/bucket.cache");
|
||||||
|
assertTrue(file.setLastModified(System.currentTimeMillis() + 1000));
|
||||||
|
// can't restore cache from file
|
||||||
|
bucketCache =
|
||||||
|
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||||
|
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
|
||||||
|
assertEquals(0, bucketCache.getAllocator().getUsedSize());
|
||||||
|
assertEquals(0, bucketCache.backingMap.size());
|
||||||
|
|
||||||
|
TEST_UTIL.cleanupTestDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test whether it can read the old version's persistence file, it's for backward compatibility.
|
||||||
|
* Start BucketCache and add some blocks, then persist cache to file in old way and shutdown
|
||||||
|
* BucketCache. Restart BucketCache, and it can normally restore from old version persistence
|
||||||
|
* file.
|
||||||
|
* @throws Exception the exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void compatibilityTest() throws Exception {
|
||||||
|
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
Path testDir = TEST_UTIL.getDataTestDir();
|
||||||
|
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||||
|
String persistencePath = testDir + "/bucket.persistence";
|
||||||
|
BucketCache bucketCache =
|
||||||
|
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||||
|
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
|
||||||
|
long usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertTrue(usedSize == 0);
|
||||||
|
|
||||||
|
CacheTestUtils.HFileBlockPair[] blocks =
|
||||||
|
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
|
||||||
|
// Add blocks
|
||||||
|
for (CacheTestUtils.HFileBlockPair block : blocks) {
|
||||||
|
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
|
||||||
|
}
|
||||||
|
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertTrue(usedSize != 0);
|
||||||
|
// persistence backingMap using old way
|
||||||
|
persistToFileInOldWay(persistencePath + ".old", bucketCache.getMaxSize(),
|
||||||
|
bucketCache.backingMap, bucketCache.getDeserialiserMap());
|
||||||
|
bucketCache.shutdown();
|
||||||
|
|
||||||
|
// restore cache from file which skip check checksum
|
||||||
|
bucketCache =
|
||||||
|
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
|
||||||
|
constructedBlockSizes, writeThreads, writerQLen, persistencePath + ".old");
|
||||||
|
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
|
||||||
|
assertEquals(blocks.length, bucketCache.backingMap.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void persistToFileInOldWay(String persistencePath, long cacheCapacity,
|
||||||
|
ConcurrentMap backingMap, UniqueIndexMap deserialiserMap)
|
||||||
|
throws IOException {
|
||||||
|
try(ObjectOutputStream oos = new ObjectOutputStream(
|
||||||
|
new FileOutputStream(persistencePath, false))) {
|
||||||
|
oos.writeLong(cacheCapacity);
|
||||||
|
oos.writeUTF(FileIOEngine.class.getName());
|
||||||
|
oos.writeUTF(backingMap.getClass().getName());
|
||||||
|
oos.writeObject(deserialiserMap);
|
||||||
|
oos.writeObject(backingMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
|
||||||
|
throws InterruptedException {
|
||||||
|
while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
|
||||||
|
// threads will flush it to the bucket and put reference entry in backingMap.
|
||||||
|
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
|
||||||
|
Cacheable block) throws InterruptedException {
|
||||||
|
cache.cacheBlock(cacheKey, block);
|
||||||
|
waitUntilFlushedToBucket(cache, cacheKey);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue