HDFS-14740. Recover data blocks from persistent memory read cache during datanode restarts. Contributed by Feilong He.
(cherry picked from commit d79cce20abbbf321f6dcce03f4087544124a7cd2)
This commit is contained in:
@ -223,28 +223,31 @@ public long getLength() {
* JNI wrapper of persist memory operations.
public static class Pmem {
// check whether the address is a Pmem address or DIMM address
// Check whether the address is a Pmem address or DIMM address
public static boolean isPmem(long address, long length) {
return NativeIO.POSIX.isPmemCheck(address, length);
// create a pmem file and memory map it
public static PmemMappedRegion mapBlock(String path, long length) {
return NativeIO.POSIX.pmemCreateMapFile(path, length);
// Map a file in persistent memory, if the given file exists,
// directly map it. If not, create the named file on persistent memory
// and then map it.
public static PmemMappedRegion mapBlock(
String path, long length, boolean isFileExist) {
return NativeIO.POSIX.pmemMapFile(path, length, isFileExist);
// unmap a pmem file
// Unmap a pmem file
public static boolean unmapBlock(long address, long length) {
return NativeIO.POSIX.pmemUnMap(address, length);
// copy data from disk file(src) to pmem file(dest), without flush
// Copy data from disk file(src) to pmem file(dest), without flush
public static void memCopy(byte[] src, long dest, boolean isPmem,
long length) {
NativeIO.POSIX.pmemCopy(src, dest, isPmem, length);
// flush the memory content to persistent storage
// Flush the memory content to persistent storage
public static void memSync(PmemMappedRegion region) {
if (region.isPmem()) {
@ -260,8 +263,8 @@ public static String getPmdkLibPath() {
private static native String getPmdkLibPath();
private static native boolean isPmemCheck(long address, long length);
private static native PmemMappedRegion pmemCreateMapFile(String path,
long length);
private static native PmemMappedRegion pmemMapFile(String path,
long length, boolean isFileExist);
private static native boolean pmemUnMap(long address, long length);
private static native void pmemCopy(byte[] src, long dest, boolean isPmem,
long length);
@ -1486,11 +1486,11 @@ JNIEnv *env, jclass thisClass, jlong address, jlong length) {
* Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
* Method: pmemCreateMapFile
* Method: pmemMapFile
* Signature: (Ljava/lang/String;J)Lorg/apache/hadoop/io/nativeio/NativeIO/POSIX/PmemMappedRegion;
JNIEXPORT jobject JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCreateMapFile(
JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength) {
JNIEXPORT jobject JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemMapFile(
JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength, jboolean isFileExist) {
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
/* create a pmem file and memory map it */
const char * path = NULL;
@ -1505,17 +1505,20 @@ JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength) {
return NULL;
if (fileLength <= 0) {
(*env)->ReleaseStringUTFChars(env, filePath, path);
THROW(env, "java/lang/IllegalArgumentException", "File length should be positive");
return NULL;
if (isFileExist) {
pmemaddr = pmdkLoader->pmem_map_file(path, 0, 0, 0666, &mapped_len, &is_pmem);
} else {
if (fileLength <= 0) {
(*env)->ReleaseStringUTFChars(env, filePath, path);
THROW(env, "java/lang/IllegalArgumentException", "File length should be positive");
return NULL;
pmemaddr = pmdkLoader->pmem_map_file(path, fileLength, PMEM_FILE_CREATE|PMEM_FILE_EXCL,
0666, &mapped_len, &is_pmem);
pmemaddr = pmdkLoader->pmem_map_file(path, fileLength, PMEM_FILE_CREATE|PMEM_FILE_EXCL,
0666, &mapped_len, &is_pmem);
if (!pmemaddr) {
snprintf(msg, sizeof(msg), "Failed to create pmem file. file: %s, length: %x, error msg: %s", path, fileLength, pmem_errormsg());
snprintf(msg, sizeof(msg), "Failed to map file on persistent memory.file: %s, length: %x, error msg: %s", path, fileLength, pmem_errormsg());
THROW(env, "java/io/IOException", msg);
(*env)->ReleaseStringUTFChars(env, filePath, path);
return NULL;
@ -800,7 +800,7 @@ public void testPmemCheckParameters() {
// Incorrect file length
try {
NativeIO.POSIX.Pmem.mapBlock(filePath, length);
NativeIO.POSIX.Pmem.mapBlock(filePath, length, false);
fail("Illegal length parameter should be detected");
} catch (Exception e) {
@ -810,7 +810,7 @@ public void testPmemCheckParameters() {
filePath = "/mnt/pmem0/test_native_io";
length = -1L;
try {
NativeIO.POSIX.Pmem.mapBlock(filePath, length);
NativeIO.POSIX.Pmem.mapBlock(filePath, length, false);
fail("Illegal length parameter should be detected");
}catch (Exception e) {
@ -837,10 +837,10 @@ public void testPmemMapMultipleFiles() {
for (int i = 0; i < fileNumber; i++) {
String path = filePath + i;
LOG.info("File path = " + path);
NativeIO.POSIX.Pmem.mapBlock(path, length);
NativeIO.POSIX.Pmem.mapBlock(path, length, false);
try {
NativeIO.POSIX.Pmem.mapBlock(filePath, length);
NativeIO.POSIX.Pmem.mapBlock(filePath, length, false);
fail("Request map extra file when persistent memory is all occupied");
} catch (Exception e) {
@ -863,7 +863,7 @@ public void testPmemMapBigFile() {
length = volumeSize + 1024L;
try {
LOG.info("File length = " + length);
NativeIO.POSIX.Pmem.mapBlock(filePath, length);
NativeIO.POSIX.Pmem.mapBlock(filePath, length, false);
fail("File length exceeds persistent memory total volume size");
}catch (Exception e) {
@ -881,7 +881,8 @@ public void testPmemCopy() throws IOException {
// memory device.
String filePath = "/mnt/pmem0/copy";
long length = 4096;
PmemMappedRegion region = NativeIO.POSIX.Pmem.mapBlock(filePath, length);
PmemMappedRegion region = NativeIO.POSIX.Pmem.mapBlock(
filePath, length, false);
assertTrue(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length));
assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length + 100));
assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() + 100, length));
@ -381,9 +381,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Multiple dirs separated by "," are acceptable.
public static final String DFS_DATANODE_CACHE_PMEM_DIRS_KEY =
public static final String DFS_DATANODE_CACHE_PMEM_DIRS_DEFAULT = "";
public static final String DFS_DATANODE_PMEM_CACHE_DIRS_KEY =
public static final String DFS_DATANODE_PMEM_CACHE_DIRS_DEFAULT = "";
public static final String DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY =
public static final boolean DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT =
public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
@ -27,12 +27,14 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
@ -93,6 +95,7 @@ public class DNConf {
final boolean encryptDataTransfer;
final boolean connectToDnViaHostname;
final boolean overwriteDownstreamDerivedQOP;
private final boolean pmemCacheRecoveryEnabled;
final long readaheadLength;
final long heartBeatInterval;
@ -264,7 +267,7 @@ public DNConf(final Configurable dn) {
this.pmemDirs = getConf().getTrimmedStrings(
this.restartReplicaExpiry = getConf().getLong(
@ -285,6 +288,10 @@ public DNConf(final Configurable dn) {
String[] dataDirs =
this.volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
this.pmemCacheRecoveryEnabled = getConf().getBoolean(
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@ -294,7 +301,7 @@ String getMinimumNameNodeVersion() {
* Returns the configuration.
* @return Configuration the configuration
public Configuration getConf() {
@ -434,4 +441,8 @@ int getMaxDataLength() {
public String[] getPmemVolumes() {
return pmemDirs;
public boolean getPmemCacheRecoveryEnabled() {
return pmemCacheRecoveryEnabled;
@ -33,7 +33,9 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@ -184,6 +186,30 @@ public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
this.memCacheStats = cacheLoader.initialize(this.getDnConf());
* For persistent memory cache, create cache subdirectory specified with
* blockPoolId to store cache data.
* Recover the status of cache in persistent memory, if any.
public void initCache(String bpid) throws IOException {
if (cacheLoader.isTransientCache()) {
if (getDnConf().getPmemCacheRecoveryEnabled()) {
final Map<ExtendedBlockId, MappableBlock> keyToMappableBlock =
PmemVolumeManager.getInstance().recoverCache(bpid, cacheLoader);
Set<Map.Entry<ExtendedBlockId, MappableBlock>> entrySet
= keyToMappableBlock.entrySet();
for (Map.Entry<ExtendedBlockId, MappableBlock> entry : entrySet) {
new Value(keyToMappableBlock.get(entry.getKey()), State.CACHED));
DNConf getDnConf() {
return this.dataset.datanode.getDnConf();
@ -191,7 +217,7 @@ DNConf getDnConf() {
* Get the cache path if the replica is cached into persistent memory.
String getReplicaCachePath(String bpid, long blockId) {
String getReplicaCachePath(String bpid, long blockId) throws IOException {
if (cacheLoader.isTransientCache() ||
!isCached(bpid, blockId)) {
return null;
@ -270,6 +270,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
final AutoCloseableLock datasetLock;
private final Condition datasetLockCondition;
private static String blockPoolId = "";
* An FSDataset has a directory where it loads its data files.
@ -2843,6 +2844,16 @@ public void addBlockPool(String bpid, Configuration conf)
if (volumeExceptions.hasExceptions()) {
throw volumeExceptions;
// For test use only.
if (!blockPoolId.isEmpty()) {
bpid = blockPoolId;
public static void setBlockPoolId(String bpid) {
blockPoolId = bpid;
@ -3372,8 +3383,8 @@ public int getVolumeCount() {
void stopAllDataxceiverThreads(FsVolumeImpl volume) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
for (String blockPoolId : volumeMap.getBlockPoolList()) {
Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
for (String bpid : volumeMap.getBlockPoolList()) {
Collection<ReplicaInfo> replicas = volumeMap.replicas(bpid);
for (ReplicaInfo replicaInfo : replicas) {
if ((replicaInfo.getState() == ReplicaState.TEMPORARY
|| replicaInfo.getState() == ReplicaState.RBW)
@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import java.io.Closeable;
@ -41,4 +42,10 @@ public interface MappableBlock extends Closeable {
* Return -1 if not applicable.
long getAddress();
* Get cached block's ExtendedBlockId.
* @return cached block's ExtendedBlockId..
ExtendedBlockId getKey();
@ -29,6 +29,7 @@
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -111,6 +112,12 @@ abstract MappableBlock load(long length, FileInputStream blockIn,
abstract boolean isNativeLoader();
* Get mappableBlock recovered from persistent memory.
abstract MappableBlock getRecoveredMappableBlock(
File cacheFile, String bpid, byte volumeIndex) throws IOException;
* Clean up cache, can be used during DataNode shutdown.
@ -27,6 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.MappedByteBuffer;
@ -118,6 +119,12 @@ public boolean isTransientCache() {
return true;
public MappableBlock getRecoveredMappableBlock(
File cacheFile, String bpid, byte volumeIndex) throws IOException {
return null;
public boolean isNativeLoader() {
return false;
@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.io.nativeio.NativeIO;
@ -49,6 +50,11 @@ public long getAddress() {
return -1L;
public ExtendedBlockId getKey() {
return null;
public void close() {
if (mmap != null) {
@ -33,6 +33,7 @@
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -57,8 +58,8 @@ CacheStats initialize(DNConf dnConf) throws IOException {
* Map the block and verify its checksum.
* The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
* is a persistent memory volume chosen by PmemVolumeManager.
* The block will be mapped to PmemDir/BlockPoolId/subdir#/subdir#/BlockId,
* in which PmemDir is a persistent memory volume chosen by PmemVolumeManager.
* @param length The current length of the block.
* @param blockIn The block input stream. Should be positioned at the
@ -91,7 +92,7 @@ public MappableBlock load(long length, FileInputStream blockIn,
assert NativeIO.isAvailable();
filePath = PmemVolumeManager.getInstance().getCachePath(key);
region = POSIX.Pmem.mapBlock(filePath, length);
region = POSIX.Pmem.mapBlock(filePath, length, false);
if (region == null) {
throw new IOException("Failed to map the block " + blockFileName +
" to persistent storage.");
@ -189,4 +190,28 @@ private void verifyChecksumAndMapBlock(POSIX.PmemMappedRegion region,
public boolean isNativeLoader() {
return true;
public MappableBlock getRecoveredMappableBlock(
File cacheFile, String bpid, byte volumeIndex) throws IOException {
NativeIO.POSIX.PmemMappedRegion region =
cacheFile.length(), true);
if (region == null) {
throw new IOException("Failed to recover the block "
+ cacheFile.getName() + " in persistent storage.");
ExtendedBlockId key =
new ExtendedBlockId(super.getBlockId(cacheFile), bpid);
MappableBlock mappableBlock = new NativePmemMappedBlock(
region.getAddress(), region.getLength(), key);
PmemVolumeManager.getInstance().recoverBlockKeyToVolume(key, volumeIndex);
String path = PmemVolumeManager.getInstance().getCachePath(key);
long addr = mappableBlock.getAddress();
long length = mappableBlock.getLength();
LOG.info("Recovering persistent memory cache for block {}, " +
"path = {}, address = {}, length = {}", key, path, addr, length);
return mappableBlock;
@ -58,12 +58,17 @@ public long getAddress() {
return pmemMappedAddress;
public ExtendedBlockId getKey() {
return key;
public void close() {
if (pmemMappedAddress != -1L) {
String cacheFilePath =
try {
String cacheFilePath =
// Current libpmem will report error when pmem_unmap is called with
// length not aligned with page size, although the length is returned
// by pmem_map_file.
@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
@ -40,12 +41,15 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
private static final Logger LOG =
private PmemVolumeManager pmemVolumeManager;
private boolean cacheRecoveryEnabled;
CacheStats initialize(DNConf dnConf) throws IOException {
LOG.info("Initializing cache loader: " + this.getClass().getName());
pmemVolumeManager = PmemVolumeManager.getInstance();
cacheRecoveryEnabled = dnConf.getPmemCacheRecoveryEnabled();
// The configuration for max locked memory is shaded.
LOG.info("Persistent memory is used for caching data instead of " +
"DRAM. Max locked memory is set to zero to disable DRAM cache");
@ -59,8 +63,8 @@ CacheStats initialize(DNConf dnConf) throws IOException {
* Map the block and verify its checksum.
* The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
* is a persistent memory volume chosen by PmemVolumeManager.
* The block will be mapped to PmemDir/BlockPoolId/subdir#/subdir#/BlockId,
* in which PmemDir is a persistent memory volume chosen by PmemVolumeManager.
* @param length The current length of the block.
* @param blockIn The block input stream. Should be positioned at the
@ -141,9 +145,32 @@ public boolean isNativeLoader() {
return false;
public MappableBlock getRecoveredMappableBlock(
File cacheFile, String bpid, byte volumeIndex) throws IOException {
ExtendedBlockId key = new ExtendedBlockId(getBlockId(cacheFile), bpid);
MappableBlock mappableBlock = new PmemMappedBlock(cacheFile.length(), key);
PmemVolumeManager.getInstance().recoverBlockKeyToVolume(key, volumeIndex);
String path = PmemVolumeManager.getInstance().getCachePath(key);
long length = mappableBlock.getLength();
LOG.info("Recovering persistent memory cache for block {}, " +
"path = {}, length = {}", key, path, length);
return mappableBlock;
* Parse the file name and get the BlockId.
public long getBlockId(File file) {
return Long.parseLong(file.getName());
void shutdown() {
LOG.info("Clean up cache on persistent memory during shutdown.");
if (!cacheRecoveryEnabled) {
LOG.info("Clean up cache on persistent memory during shutdown.");
@ -54,11 +54,17 @@ public long getAddress() {
return -1L;
public ExtendedBlockId getKey() {
return key;
public void close() {
String cacheFilePath =
String cacheFilePath = null;
try {
cacheFilePath =
LOG.info("Successfully uncached one replica:{} from persistent memory"
+ ", [cached path={}, length={}]", key, cacheFilePath, length);
@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -35,6 +36,7 @@
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -52,7 +54,7 @@ public final class PmemVolumeManager {
* Counts used bytes for persistent memory.
private static class UsedBytesCount {
private final long maxBytes;
private long maxBytes;
private final AtomicLong usedBytes = new AtomicLong(0);
UsedBytesCount(long maxBytes) {
@ -102,6 +104,10 @@ long getMaxBytes() {
long getAvailableBytes() {
return maxBytes - usedBytes.get();
void setMaxBytes(long maxBytes) {
this.maxBytes = maxBytes;
private static final Logger LOG =
@ -113,6 +119,7 @@ long getAvailableBytes() {
private final Map<ExtendedBlockId, Byte> blockKeyToVolume =
new ConcurrentHashMap<>();
private final List<UsedBytesCount> usedBytesCounts = new ArrayList<>();
private boolean cacheRecoveryEnabled;
* The total cache capacity in bytes of persistent memory.
@ -122,12 +129,14 @@ long getAvailableBytes() {
private int count = 0;
private byte nextIndex = 0;
private PmemVolumeManager(String[] pmemVolumesConfig) throws IOException {
private PmemVolumeManager(String[] pmemVolumesConfig,
boolean cacheRecoveryEnabled) throws IOException {
if (pmemVolumesConfig == null || pmemVolumesConfig.length == 0) {
throw new IOException("The persistent memory volume, " +
" is not configured!");
this.cacheRecoveryEnabled = cacheRecoveryEnabled;
cacheCapacity = 0L;
for (UsedBytesCount counter : usedBytesCounts) {
@ -135,10 +144,12 @@ private PmemVolumeManager(String[] pmemVolumesConfig) throws IOException {
public synchronized static void init(String[] pmemVolumesConfig)
public synchronized static void init(
String[] pmemVolumesConfig, boolean cacheRecoveryEnabled)
throws IOException {
if (pmemVolumeManager == null) {
pmemVolumeManager = new PmemVolumeManager(pmemVolumesConfig);
pmemVolumeManager = new PmemVolumeManager(pmemVolumesConfig,
@ -150,6 +161,11 @@ public static PmemVolumeManager getInstance() {
return pmemVolumeManager;
public static void reset() {
pmemVolumeManager = null;
public static void setMaxBytes(long maxBytes) {
maxBytesPerPmem = maxBytes;
@ -218,8 +234,10 @@ private void loadVolumes(String[] volumes)
try {
File pmemDir = new File(volumes[n]);
File realPmemDir = verifyIfValidPmemVolume(pmemDir);
// Clean up the cache left before, if any.
if (!cacheRecoveryEnabled) {
// Clean up the cache left before, if any.
long maxBytes;
if (maxBytesPerPmem == -1) {
@ -261,6 +279,41 @@ void cleanup() {
* Recover cache from the cached files in the configured pmem volumes.
public Map<ExtendedBlockId, MappableBlock> recoverCache(
String bpid, MappableBlockLoader cacheLoader) throws IOException {
final Map<ExtendedBlockId, MappableBlock> keyToMappableBlock
= new ConcurrentHashMap<>();
for (byte volumeIndex = 0; volumeIndex < pmemVolumes.size();
volumeIndex++) {
long maxBytes = usedBytesCounts.get(volumeIndex).getMaxBytes();
long usedBytes = 0;
File cacheDir = new File(pmemVolumes.get(volumeIndex), bpid);
Collection<File> cachedFileList = FileUtils.listFiles(cacheDir,
TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
// Scan the cached files in pmem volumes for cache recovery.
for (File cachedFile : cachedFileList) {
MappableBlock mappableBlock = cacheLoader.
getRecoveredMappableBlock(cachedFile, bpid, volumeIndex);
ExtendedBlockId key = mappableBlock.getKey();
keyToMappableBlock.put(key, mappableBlock);
usedBytes += cachedFile.length();
// Update maxBytes and cache capacity according to cache space
// used by recovered cached files.
usedBytesCounts.get(volumeIndex).setMaxBytes(maxBytes + usedBytes);
cacheCapacity += usedBytes;
return keyToMappableBlock;
public void recoverBlockKeyToVolume(ExtendedBlockId key, byte volumeIndex) {
blockKeyToVolume.put(key, volumeIndex);
static File verifyIfValidPmemVolume(File pmemDir)
throws IOException {
@ -316,6 +369,18 @@ static File verifyIfValidPmemVolume(File pmemDir)
* Create cache subdirectory specified with blockPoolId.
public void createBlockPoolDir(String bpid) throws IOException {
for (String volume : pmemVolumes) {
File cacheDir = new File(volume, bpid);
if (!cacheDir.exists() && !cacheDir.mkdir()) {
throw new IOException("Failed to create " + cacheDir.getPath());
public static String getRealPmemDir(String rawPmemDir) {
return new File(rawPmemDir, CACHE_DIR).getAbsolutePath();
@ -355,19 +420,22 @@ String getVolumeByIndex(Byte index) {
return pmemVolumes.get(index);
* The cache file is named as BlockPoolId-BlockId.
* So its name can be inferred by BlockPoolId and BlockId.
public String getCacheFileName(ExtendedBlockId key) {
return key.getBlockPoolId() + "-" + key.getBlockId();
ArrayList<String> getVolumes() {
return pmemVolumes;
* Considering the pmem volume size is below TB level currently,
* it is tolerable to keep cache files under one directory.
* The strategy will be optimized, especially if one pmem volume
* has huge cache capacity.
* A cache file is named after the corresponding BlockId.
* Thus, cache file name can be inferred according to BlockId.
public String idToCacheFileName(ExtendedBlockId key) {
return String.valueOf(key.getBlockId());
* Create and get the directory where a cache file with this key and
* volumeIndex should be stored. Use hierarchical strategy of storing
* blocks to avoid keeping cache files under one directory.
* @param volumeIndex The index of pmem volume where a replica will be
* cached to or has been cached to.
@ -376,19 +444,31 @@ public String getCacheFileName(ExtendedBlockId key) {
* @return A path to which the block replica is mapped.
public String inferCacheFilePath(Byte volumeIndex, ExtendedBlockId key) {
return pmemVolumes.get(volumeIndex) + "/" + getCacheFileName(key);
public String idToCacheFilePath(Byte volumeIndex, ExtendedBlockId key)
throws IOException {
final String cacheSubdirPrefix = "subdir";
long blockId = key.getBlockId();
String bpid = key.getBlockPoolId();
int d1 = (int) ((blockId >> 16) & 0x1F);
int d2 = (int) ((blockId >> 8) & 0x1F);
String parentDir = pmemVolumes.get(volumeIndex) + "/" + bpid;
String subDir = cacheSubdirPrefix + d1 + "/" + cacheSubdirPrefix + d2;
File filePath = new File(parentDir, subDir);
if (!filePath.exists() && !filePath.mkdirs()) {
throw new IOException("Failed to create " + filePath.getPath());
return filePath.getAbsolutePath() + "/" + idToCacheFileName(key);
* The cache file path is pmemVolume/BlockPoolId-BlockId.
* The cache file path is pmemVolume/BlockPoolId/subdir#/subdir#/BlockId.
public String getCachePath(ExtendedBlockId key) {
public String getCachePath(ExtendedBlockId key) throws IOException {
Byte volumeIndex = blockKeyToVolume.get(key);
if (volumeIndex == null) {
return null;
return inferCacheFilePath(volumeIndex, key);
return idToCacheFilePath(volumeIndex, key);
@ -2527,12 +2527,21 @@
This value specifies the persistent memory directory used for caching block
replica. It matters only if the value of dfs.datanode.cache.loader.class is
PmemMappableBlockLoader. Multiple directories separated by "," are acceptable.
replica. Multiple directories separated by "," are acceptable.
This value specifies whether previous cache on persistent memory will be recovered.
This configuration can take effect only if persistent memory cache is enabled by
specifying value for 'dfs.datanode.pmem.cache.dirs'.
@ -224,7 +224,7 @@ Be sure to configure one of the following properties for DRAM cache or persisten
This setting is shared with the [Lazy Persist Writes feature](./MemoryStorage.html). The Data Node will ensure that the combined memory used by Lazy Persist Writes and Centralized Cache Management does not exceed the amount configured in `dfs.datanode.max.locked.memory`.
* dfs.datanode.cache.pmem.dirs
* dfs.datanode.pmem.cache.dirs
This property specifies the cache volume of persistent memory. For multiple volumes, they should be separated by “,”, e.g. “/mnt/pmem0, /mnt/pmem1”. The default value is empty. If this property is configured, the volume capacity will be detected. And there is no need to configure `dfs.datanode.max.locked.memory`.
@ -254,6 +254,10 @@ The following properties are not required, but may be specified for tuning:
The percentage of the Java heap which we will allocate to the cached blocks map. The cached blocks map is a hash map which uses chained hashing. Smaller maps may be accessed more slowly if the number of cached blocks is large; larger maps will consume more memory. The default is 0.25 percent.
* dfs.datanode.pmem.cache.recovery
This parameter is used to determine whether to recover the status for previous cache on persistent memory during the start of DataNode. If it is enabled, DataNode will recover the status for previously cached data on persistent memory. Thus, re-caching data will be avoided. If this property is not enabled, DataNode will clean up the previous cache, if any, on persistent memory. This property can only work when persistent memory is enabled, i.e., `dfs.datanode.pmem.cache.dirs` is configured.
### OS Limits
If you get the error "Cannot start datanode because the configured max locked memory size... is more than the datanode's available RLIMIT\_MEMLOCK ulimit," that means that the operating system is imposing a lower limit on the amount of memory that you can lock than what you have configured. To fix this, you must adjust the ulimit -l value that the DataNode runs with. Usually, this value is configured in `/etc/security/limits.conf`. However, it will vary depending on what operating system and distribution you are using.
@ -21,7 +21,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -137,7 +137,7 @@ public void setUp() throws Exception {
new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
// Configure two bogus pmem volumes
PmemVolumeManager.setMaxBytes((long) (CACHE_CAPACITY * 0.5));
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
@ -259,14 +259,18 @@ public Boolean get() {
// The cachePath shouldn't be null if the replica has been cached
// to pmem.
String expectFileName =
Path path = new Path(cachePath);
String fileName = path.getName();
if (cachePath.startsWith(PMEM_DIR_0)) {
.getRealPmemDir(PMEM_DIR_0) + "/" + expectFileName));
String expectPath = PmemVolumeManager.
getRealPmemDir(PMEM_DIR_0) + "/" + key.getBlockPoolId();
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else if (cachePath.startsWith(PMEM_DIR_1)) {
.getRealPmemDir(PMEM_DIR_1) + "/" + expectFileName));
String expectPath = PmemVolumeManager.
getRealPmemDir(PMEM_DIR_1) + "/" + key.getBlockPoolId();
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else {
fail("The cache path is not the expected one: " + cachePath);
@ -0,0 +1,342 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.event.Level;
import com.google.common.base.Supplier;
import com.google.common.primitives.Ints;
* Tests HDFS persistent memory cache by PmemMappableBlockLoader.
* Bogus persistent memory volume is used to cache blocks.
public class TestPmemCacheRecovery {
protected static final org.slf4j.Logger LOG =
protected static final long CACHE_AMOUNT = 64 * 1024;
protected static final long BLOCK_SIZE = 4 * 1024;
private static Configuration conf;
private static MiniDFSCluster cluster = null;
private static DistributedFileSystem fs;
private static DataNode dn;
private static FsDatasetCache cacheManager;
private static String blockPoolId = "";
* Used to pause DN BPServiceActor threads. BPSA threads acquire the
* shared read lock. The test acquires the write lock for exclusive access.
private static ReadWriteLock lock = new ReentrantReadWriteLock(true);
private static CacheManipulator prevCacheManipulator;
private static DataNodeFaultInjector oldInjector;
private static final String PMEM_DIR_0 =
MiniDFSCluster.getBaseDirectory() + "pmem0";
private static final String PMEM_DIR_1 =
MiniDFSCluster.getBaseDirectory() + "pmem1";
static {
LoggerFactory.getLogger(FsDatasetCache.class), Level.DEBUG);
public static void setUpClass() throws Exception {
oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
public void startOfferService() throws Exception {
public void endOfferService() throws Exception {
public static void tearDownClass() throws Exception {
public void setUp() throws Exception {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
// Configuration for pmem cache
new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
// Configure two bogus pmem volumes
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
cluster = new MiniDFSCluster.Builder(conf)
fs = cluster.getFileSystem();
dn = cluster.getDataNodes().get(0);
cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
public void tearDown() throws Exception {
if (fs != null) {
fs = null;
if (cluster != null) {
cluster = null;
protected static void restartCluster() throws Exception {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
// Configure two bogus pmem volumes
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
cluster = new MiniDFSCluster.Builder(conf)
fs = cluster.getFileSystem();
dn = cluster.getDataNodes().get(0);
cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
protected static void shutdownCluster() {
if (cluster != null) {
cluster = null;
public List<ExtendedBlockId> getExtendedBlockId(Path filePath, long fileLen)
throws IOException {
List<ExtendedBlockId> keys = new ArrayList<>();
HdfsBlockLocation[] locs = (HdfsBlockLocation[]) fs.getFileBlockLocations(
filePath, 0, fileLen);
for (HdfsBlockLocation loc : locs) {
long bkid = loc.getLocatedBlock().getBlock().getBlockId();
String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId();
keys.add(new ExtendedBlockId(bkid, bpid));
return keys;
@Test(timeout = 60000)
public void testCacheRecovery() throws Exception {
final int cacheBlocksNum =
Ints.checkedCast(CACHE_AMOUNT / BLOCK_SIZE);
Assert.assertEquals(0, CACHE_AMOUNT % BLOCK_SIZE);
final Path testFile = new Path("/testFile");
final long testFileLen = cacheBlocksNum * BLOCK_SIZE;
DFSTestUtil.createFile(fs, testFile,
testFileLen, (short) 1, 0xbeef);
List<ExtendedBlockId> blockKeys =
getExtendedBlockId(testFile, testFileLen);
fs.addCachePool(new CachePoolInfo("testPool"));
final long cacheDirectiveId = fs.addCacheDirective(
new CacheDirectiveInfo.Builder().setPool("testPool").
setPath(testFile).setReplication((short) 1).build());
// wait for caching
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksCached =
MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
if (blocksCached != cacheBlocksNum) {
LOG.info("waiting for " + cacheBlocksNum + " blocks to " +
"be cached. Right now " + blocksCached + " blocks are cached.");
return false;
LOG.info(cacheBlocksNum + " blocks are now cached.");
return true;
}, 1000, 30000);
assertEquals(CACHE_AMOUNT, cacheManager.getCacheUsed());
Map<ExtendedBlockId, Byte> blockKeyToVolume =
// All block keys should be kept in blockKeyToVolume
assertEquals(blockKeyToVolume.size(), cacheBlocksNum);
// Test each replica's cache file path
for (ExtendedBlockId key : blockKeys) {
if (blockPoolId.isEmpty()) {
blockPoolId = key.getBlockPoolId();
String cachePath = cacheManager.
getReplicaCachePath(key.getBlockPoolId(), key.getBlockId());
// The cachePath shouldn't be null if the replica has been cached
// to pmem.
Path path = new Path(cachePath);
String fileName = path.getName();
if (cachePath.startsWith(PMEM_DIR_0)) {
String expectPath = PmemVolumeManager.
getRealPmemDir(PMEM_DIR_0) + "/" + key.getBlockPoolId();
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else if (cachePath.startsWith(PMEM_DIR_1)) {
String expectPath = PmemVolumeManager.
getRealPmemDir(PMEM_DIR_1) + "/" + key.getBlockPoolId();
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else {
fail("The cache path is not the expected one: " + cachePath);
// Trigger cache recovery
assertEquals(CACHE_AMOUNT, cacheManager.getCacheUsed());
blockKeyToVolume = PmemVolumeManager.getInstance().getBlockKeyToVolume();
// All block keys should be kept in blockKeyToVolume
assertEquals(blockKeyToVolume.size(), cacheBlocksNum);
// Test each replica's cache file path
for (ExtendedBlockId key : blockKeys) {
String cachePath = cacheManager.
getReplicaCachePath(key.getBlockPoolId(), key.getBlockId());
// The cachePath shouldn't be null if the replica has been cached
// to pmem.
Path path = new Path(cachePath);
String fileName = path.getName();
if (cachePath.startsWith(PMEM_DIR_0)) {
String expectPath = PmemVolumeManager.
getRealPmemDir(PMEM_DIR_0) + "/" + key.getBlockPoolId();
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else if (cachePath.startsWith(PMEM_DIR_1)) {
String expectPath = PmemVolumeManager.
getRealPmemDir(PMEM_DIR_1) + "/" + key.getBlockPoolId();
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else {
fail("The cache path is not the expected one: " + cachePath);
// Uncache the test file
for (ExtendedBlockId key : blockKeys) {
cacheManager.uncacheBlock(blockPoolId, key.getBlockId());
// Wait for uncaching
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksUncached =
MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
if (blocksUncached != cacheBlocksNum) {
LOG.info("waiting for " + cacheBlocksNum + " blocks to be " +
"uncached. Right now " + blocksUncached +
" blocks are uncached.");
return false;
LOG.info(cacheBlocksNum + " blocks have been uncached.");
return true;
}, 1000, 30000);
// It is expected that no pmem cache space is used.
assertEquals(0, cacheManager.getCacheUsed());
// No record should be kept by blockKeyToVolume after testFile is uncached.
assertEquals(blockKeyToVolume.size(), 0);
Reference in New Issue
Block a user