HDFS-14740. Recover data blocks from persistent memory read cache during datanode restarts. Contributed by Feilong He.

This commit is contained in:
Rakesh Radhakrishnan 2020-01-02 11:44:00 +05:30
parent 074050ca59
commit d79cce20ab
20 changed files with 668 additions and 81 deletions

View File

@ -224,28 +224,31 @@ public class NativeIO {
* JNI wrapper of persist memory operations. * JNI wrapper of persist memory operations.
*/ */
public static class Pmem { public static class Pmem {
// check whether the address is a Pmem address or DIMM address // Check whether the address is a Pmem address or DIMM address
public static boolean isPmem(long address, long length) { public static boolean isPmem(long address, long length) {
return NativeIO.POSIX.isPmemCheck(address, length); return NativeIO.POSIX.isPmemCheck(address, length);
} }
// create a pmem file and memory map it // Map a file in persistent memory, if the given file exists,
public static PmemMappedRegion mapBlock(String path, long length) { // directly map it. If not, create the named file on persistent memory
return NativeIO.POSIX.pmemCreateMapFile(path, length); // and then map it.
public static PmemMappedRegion mapBlock(
String path, long length, boolean isFileExist) {
return NativeIO.POSIX.pmemMapFile(path, length, isFileExist);
} }
// unmap a pmem file // Unmap a pmem file
public static boolean unmapBlock(long address, long length) { public static boolean unmapBlock(long address, long length) {
return NativeIO.POSIX.pmemUnMap(address, length); return NativeIO.POSIX.pmemUnMap(address, length);
} }
// copy data from disk file(src) to pmem file(dest), without flush // Copy data from disk file(src) to pmem file(dest), without flush
public static void memCopy(byte[] src, long dest, boolean isPmem, public static void memCopy(byte[] src, long dest, boolean isPmem,
long length) { long length) {
NativeIO.POSIX.pmemCopy(src, dest, isPmem, length); NativeIO.POSIX.pmemCopy(src, dest, isPmem, length);
} }
// flush the memory content to persistent storage // Flush the memory content to persistent storage
public static void memSync(PmemMappedRegion region) { public static void memSync(PmemMappedRegion region) {
if (region.isPmem()) { if (region.isPmem()) {
NativeIO.POSIX.pmemDrain(); NativeIO.POSIX.pmemDrain();
@ -261,8 +264,8 @@ public class NativeIO {
private static native String getPmdkLibPath(); private static native String getPmdkLibPath();
private static native boolean isPmemCheck(long address, long length); private static native boolean isPmemCheck(long address, long length);
private static native PmemMappedRegion pmemCreateMapFile(String path, private static native PmemMappedRegion pmemMapFile(String path,
long length); long length, boolean isFileExist);
private static native boolean pmemUnMap(long address, long length); private static native boolean pmemUnMap(long address, long length);
private static native void pmemCopy(byte[] src, long dest, boolean isPmem, private static native void pmemCopy(byte[] src, long dest, boolean isPmem,
long length); long length);

View File

@ -1486,11 +1486,11 @@ JNIEnv *env, jclass thisClass, jlong address, jlong length) {
/* /*
* Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
* Method: pmemCreateMapFile * Method: pmemMapFile
* Signature: (Ljava/lang/String;J)Lorg/apache/hadoop/io/nativeio/NativeIO/POSIX/PmemMappedRegion; * Signature: (Ljava/lang/String;J)Lorg/apache/hadoop/io/nativeio/NativeIO/POSIX/PmemMappedRegion;
*/ */
JNIEXPORT jobject JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCreateMapFile( JNIEXPORT jobject JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemMapFile(
JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength) { JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength, jboolean isFileExist) {
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY) #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
/* create a pmem file and memory map it */ /* create a pmem file and memory map it */
const char * path = NULL; const char * path = NULL;
@ -1505,17 +1505,20 @@ JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength) {
return NULL; return NULL;
} }
if (fileLength <= 0) { if (isFileExist) {
(*env)->ReleaseStringUTFChars(env, filePath, path); pmemaddr = pmdkLoader->pmem_map_file(path, 0, 0, 0666, &mapped_len, &is_pmem);
THROW(env, "java/lang/IllegalArgumentException", "File length should be positive"); } else {
return NULL; if (fileLength <= 0) {
(*env)->ReleaseStringUTFChars(env, filePath, path);
THROW(env, "java/lang/IllegalArgumentException", "File length should be positive");
return NULL;
}
pmemaddr = pmdkLoader->pmem_map_file(path, fileLength, PMEM_FILE_CREATE|PMEM_FILE_EXCL,
0666, &mapped_len, &is_pmem);
} }
pmemaddr = pmdkLoader->pmem_map_file(path, fileLength, PMEM_FILE_CREATE|PMEM_FILE_EXCL,
0666, &mapped_len, &is_pmem);
if (!pmemaddr) { if (!pmemaddr) {
snprintf(msg, sizeof(msg), "Failed to create pmem file. file: %s, length: %x, error msg: %s", path, fileLength, pmem_errormsg()); snprintf(msg, sizeof(msg), "Failed to map file on persistent memory.file: %s, length: %x, error msg: %s", path, fileLength, pmem_errormsg());
THROW(env, "java/io/IOException", msg); THROW(env, "java/io/IOException", msg);
(*env)->ReleaseStringUTFChars(env, filePath, path); (*env)->ReleaseStringUTFChars(env, filePath, path);
return NULL; return NULL;

View File

@ -800,7 +800,7 @@ public class TestNativeIO {
// Incorrect file length // Incorrect file length
try { try {
NativeIO.POSIX.Pmem.mapBlock(filePath, length); NativeIO.POSIX.Pmem.mapBlock(filePath, length, false);
fail("Illegal length parameter should be detected"); fail("Illegal length parameter should be detected");
} catch (Exception e) { } catch (Exception e) {
LOG.info(e.getMessage()); LOG.info(e.getMessage());
@ -810,7 +810,7 @@ public class TestNativeIO {
filePath = "/mnt/pmem0/test_native_io"; filePath = "/mnt/pmem0/test_native_io";
length = -1L; length = -1L;
try { try {
NativeIO.POSIX.Pmem.mapBlock(filePath, length); NativeIO.POSIX.Pmem.mapBlock(filePath, length, false);
fail("Illegal length parameter should be detected"); fail("Illegal length parameter should be detected");
}catch (Exception e) { }catch (Exception e) {
LOG.info(e.getMessage()); LOG.info(e.getMessage());
@ -837,10 +837,10 @@ public class TestNativeIO {
for (int i = 0; i < fileNumber; i++) { for (int i = 0; i < fileNumber; i++) {
String path = filePath + i; String path = filePath + i;
LOG.info("File path = " + path); LOG.info("File path = " + path);
NativeIO.POSIX.Pmem.mapBlock(path, length); NativeIO.POSIX.Pmem.mapBlock(path, length, false);
} }
try { try {
NativeIO.POSIX.Pmem.mapBlock(filePath, length); NativeIO.POSIX.Pmem.mapBlock(filePath, length, false);
fail("Request map extra file when persistent memory is all occupied"); fail("Request map extra file when persistent memory is all occupied");
} catch (Exception e) { } catch (Exception e) {
LOG.info(e.getMessage()); LOG.info(e.getMessage());
@ -863,7 +863,7 @@ public class TestNativeIO {
length = volumeSize + 1024L; length = volumeSize + 1024L;
try { try {
LOG.info("File length = " + length); LOG.info("File length = " + length);
NativeIO.POSIX.Pmem.mapBlock(filePath, length); NativeIO.POSIX.Pmem.mapBlock(filePath, length, false);
fail("File length exceeds persistent memory total volume size"); fail("File length exceeds persistent memory total volume size");
}catch (Exception e) { }catch (Exception e) {
LOG.info(e.getMessage()); LOG.info(e.getMessage());
@ -881,7 +881,8 @@ public class TestNativeIO {
// memory device. // memory device.
String filePath = "/mnt/pmem0/copy"; String filePath = "/mnt/pmem0/copy";
long length = 4096; long length = 4096;
PmemMappedRegion region = NativeIO.POSIX.Pmem.mapBlock(filePath, length); PmemMappedRegion region = NativeIO.POSIX.Pmem.mapBlock(
filePath, length, false);
assertTrue(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length)); assertTrue(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length));
assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length + 100)); assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length + 100));
assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() + 100, length)); assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() + 100, length));

View File

@ -408,9 +408,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L; public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L;
// Multiple dirs separated by "," are acceptable. // Multiple dirs separated by "," are acceptable.
public static final String DFS_DATANODE_CACHE_PMEM_DIRS_KEY = public static final String DFS_DATANODE_PMEM_CACHE_DIRS_KEY =
"dfs.datanode.cache.pmem.dirs"; "dfs.datanode.pmem.cache.dirs";
public static final String DFS_DATANODE_CACHE_PMEM_DIRS_DEFAULT = ""; public static final String DFS_DATANODE_PMEM_CACHE_DIRS_DEFAULT = "";
public static final String DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY =
"dfs.datanode.pmem.cache.recovery";
public static final boolean DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT =
true;
public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check"; public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true; public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;

View File

@ -27,12 +27,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHO
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
@ -93,6 +95,7 @@ public class DNConf {
final boolean encryptDataTransfer; final boolean encryptDataTransfer;
final boolean connectToDnViaHostname; final boolean connectToDnViaHostname;
final boolean overwriteDownstreamDerivedQOP; final boolean overwriteDownstreamDerivedQOP;
private final boolean pmemCacheRecoveryEnabled;
final long readaheadLength; final long readaheadLength;
final long heartBeatInterval; final long heartBeatInterval;
@ -264,7 +267,7 @@ public class DNConf {
DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT); DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
this.pmemDirs = getConf().getTrimmedStrings( this.pmemDirs = getConf().getTrimmedStrings(
DFS_DATANODE_CACHE_PMEM_DIRS_KEY); DFS_DATANODE_PMEM_CACHE_DIRS_KEY);
this.restartReplicaExpiry = getConf().getLong( this.restartReplicaExpiry = getConf().getLong(
DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY, DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
@ -285,6 +288,10 @@ public class DNConf {
String[] dataDirs = String[] dataDirs =
getConf().getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); getConf().getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
this.volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; this.volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
this.pmemCacheRecoveryEnabled = getConf().getBoolean(
DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY,
DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT);
} }
// We get minimumNameNodeVersion via a method so it can be mocked out in tests. // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@ -294,7 +301,7 @@ public class DNConf {
/** /**
* Returns the configuration. * Returns the configuration.
* *
* @return Configuration the configuration * @return Configuration the configuration
*/ */
public Configuration getConf() { public Configuration getConf() {
@ -434,4 +441,8 @@ public class DNConf {
public String[] getPmemVolumes() { public String[] getPmemVolumes() {
return pmemDirs; return pmemDirs;
} }
public boolean getPmemCacheRecoveryEnabled() {
return pmemCacheRecoveryEnabled;
}
} }

View File

@ -33,7 +33,9 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -184,6 +186,30 @@ public class FsDatasetCache {
this.memCacheStats = cacheLoader.initialize(this.getDnConf()); this.memCacheStats = cacheLoader.initialize(this.getDnConf());
} }
/**
* For persistent memory cache, create cache subdirectory specified with
* blockPoolId to store cache data.
* Recover the status of cache in persistent memory, if any.
*/
public void initCache(String bpid) throws IOException {
if (cacheLoader.isTransientCache()) {
return;
}
PmemVolumeManager.getInstance().createBlockPoolDir(bpid);
if (getDnConf().getPmemCacheRecoveryEnabled()) {
final Map<ExtendedBlockId, MappableBlock> keyToMappableBlock =
PmemVolumeManager.getInstance().recoverCache(bpid, cacheLoader);
Set<Map.Entry<ExtendedBlockId, MappableBlock>> entrySet
= keyToMappableBlock.entrySet();
for (Map.Entry<ExtendedBlockId, MappableBlock> entry : entrySet) {
mappableBlockMap.put(entry.getKey(),
new Value(keyToMappableBlock.get(entry.getKey()), State.CACHED));
numBlocksCached.addAndGet(1);
dataset.datanode.getMetrics().incrBlocksCached(1);
}
}
}
DNConf getDnConf() { DNConf getDnConf() {
return this.dataset.datanode.getDnConf(); return this.dataset.datanode.getDnConf();
} }
@ -191,7 +217,7 @@ public class FsDatasetCache {
/** /**
* Get the cache path if the replica is cached into persistent memory. * Get the cache path if the replica is cached into persistent memory.
*/ */
String getReplicaCachePath(String bpid, long blockId) { String getReplicaCachePath(String bpid, long blockId) throws IOException {
if (cacheLoader.isTransientCache() || if (cacheLoader.isTransientCache() ||
!isCached(bpid, blockId)) { !isCached(bpid, blockId)) {
return null; return null;

View File

@ -271,6 +271,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@VisibleForTesting @VisibleForTesting
final AutoCloseableLock datasetLock; final AutoCloseableLock datasetLock;
private final Condition datasetLockCondition; private final Condition datasetLockCondition;
private static String blockPoolId = "";
/** /**
* An FSDataset has a directory where it loads its data files. * An FSDataset has a directory where it loads its data files.
@ -2844,6 +2845,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (volumeExceptions.hasExceptions()) { if (volumeExceptions.hasExceptions()) {
throw volumeExceptions; throw volumeExceptions;
} }
// For test use only.
if (!blockPoolId.isEmpty()) {
bpid = blockPoolId;
}
cacheManager.initCache(bpid);
}
@VisibleForTesting
public static void setBlockPoolId(String bpid) {
blockPoolId = bpid;
} }
@Override @Override
@ -3373,8 +3384,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
void stopAllDataxceiverThreads(FsVolumeImpl volume) { void stopAllDataxceiverThreads(FsVolumeImpl volume) {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetLock.acquire()) {
for (String blockPoolId : volumeMap.getBlockPoolList()) { for (String bpid : volumeMap.getBlockPoolList()) {
Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId); Collection<ReplicaInfo> replicas = volumeMap.replicas(bpid);
for (ReplicaInfo replicaInfo : replicas) { for (ReplicaInfo replicaInfo : replicas) {
if ((replicaInfo.getState() == ReplicaState.TEMPORARY if ((replicaInfo.getState() == ReplicaState.TEMPORARY
|| replicaInfo.getState() == ReplicaState.RBW) || replicaInfo.getState() == ReplicaState.RBW)

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import java.io.Closeable; import java.io.Closeable;
@ -41,4 +42,10 @@ public interface MappableBlock extends Closeable {
* Return -1 if not applicable. * Return -1 if not applicable.
*/ */
long getAddress(); long getAddress();
/**
* Get cached block's ExtendedBlockId.
* @return cached block's ExtendedBlockId..
*/
ExtendedBlockId getKey();
} }

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.util.DataChecksum;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -111,6 +112,12 @@ public abstract class MappableBlockLoader {
*/ */
abstract boolean isNativeLoader(); abstract boolean isNativeLoader();
/**
* Get mappableBlock recovered from persistent memory.
*/
abstract MappableBlock getRecoveredMappableBlock(
File cacheFile, String bpid, byte volumeIndex) throws IOException;
/** /**
* Clean up cache, can be used during DataNode shutdown. * Clean up cache, can be used during DataNode shutdown.
*/ */

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.io.nativeio.NativeIO;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.MappedByteBuffer; import java.nio.MappedByteBuffer;
@ -118,6 +119,12 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
return true; return true;
} }
@Override
public MappableBlock getRecoveredMappableBlock(
File cacheFile, String bpid, byte volumeIndex) throws IOException {
return null;
}
@Override @Override
public boolean isNativeLoader() { public boolean isNativeLoader() {
return false; return false;

View File

@ -22,6 +22,7 @@ import java.nio.MappedByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
/** /**
@ -49,6 +50,11 @@ public class MemoryMappedBlock implements MappableBlock {
return -1L; return -1L;
} }
@Override
public ExtendedBlockId getKey() {
return null;
}
@Override @Override
public void close() { public void close() {
if (mmap != null) { if (mmap != null) {

View File

@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -57,8 +58,8 @@ public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
* *
* Map the block and verify its checksum. * Map the block and verify its checksum.
* *
* The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir * The block will be mapped to PmemDir/BlockPoolId/subdir#/subdir#/BlockId,
* is a persistent memory volume chosen by PmemVolumeManager. * in which PmemDir is a persistent memory volume chosen by PmemVolumeManager.
* *
* @param length The current length of the block. * @param length The current length of the block.
* @param blockIn The block input stream. Should be positioned at the * @param blockIn The block input stream. Should be positioned at the
@ -91,7 +92,7 @@ public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
assert NativeIO.isAvailable(); assert NativeIO.isAvailable();
filePath = PmemVolumeManager.getInstance().getCachePath(key); filePath = PmemVolumeManager.getInstance().getCachePath(key);
region = POSIX.Pmem.mapBlock(filePath, length); region = POSIX.Pmem.mapBlock(filePath, length, false);
if (region == null) { if (region == null) {
throw new IOException("Failed to map the block " + blockFileName + throw new IOException("Failed to map the block " + blockFileName +
" to persistent storage."); " to persistent storage.");
@ -189,4 +190,28 @@ public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
public boolean isNativeLoader() { public boolean isNativeLoader() {
return true; return true;
} }
@Override
public MappableBlock getRecoveredMappableBlock(
File cacheFile, String bpid, byte volumeIndex) throws IOException {
NativeIO.POSIX.PmemMappedRegion region =
NativeIO.POSIX.Pmem.mapBlock(cacheFile.getAbsolutePath(),
cacheFile.length(), true);
if (region == null) {
throw new IOException("Failed to recover the block "
+ cacheFile.getName() + " in persistent storage.");
}
ExtendedBlockId key =
new ExtendedBlockId(super.getBlockId(cacheFile), bpid);
MappableBlock mappableBlock = new NativePmemMappedBlock(
region.getAddress(), region.getLength(), key);
PmemVolumeManager.getInstance().recoverBlockKeyToVolume(key, volumeIndex);
String path = PmemVolumeManager.getInstance().getCachePath(key);
long addr = mappableBlock.getAddress();
long length = mappableBlock.getLength();
LOG.info("Recovering persistent memory cache for block {}, " +
"path = {}, address = {}, length = {}", key, path, addr, length);
return mappableBlock;
}
} }

View File

@ -58,12 +58,17 @@ public class NativePmemMappedBlock implements MappableBlock {
return pmemMappedAddress; return pmemMappedAddress;
} }
@Override
public ExtendedBlockId getKey() {
return key;
}
@Override @Override
public void close() { public void close() {
if (pmemMappedAddress != -1L) { if (pmemMappedAddress != -1L) {
String cacheFilePath =
PmemVolumeManager.getInstance().getCachePath(key);
try { try {
String cacheFilePath =
PmemVolumeManager.getInstance().getCachePath(key);
// Current libpmem will report error when pmem_unmap is called with // Current libpmem will report error when pmem_unmap is called with
// length not aligned with page size, although the length is returned // length not aligned with page size, although the length is returned
// by pmem_map_file. // by pmem_map_file.

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
@ -40,12 +41,15 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(PmemMappableBlockLoader.class); LoggerFactory.getLogger(PmemMappableBlockLoader.class);
private PmemVolumeManager pmemVolumeManager; private PmemVolumeManager pmemVolumeManager;
private boolean cacheRecoveryEnabled;
@Override @Override
CacheStats initialize(DNConf dnConf) throws IOException { CacheStats initialize(DNConf dnConf) throws IOException {
LOG.info("Initializing cache loader: " + this.getClass().getName()); LOG.info("Initializing cache loader: " + this.getClass().getName());
PmemVolumeManager.init(dnConf.getPmemVolumes()); PmemVolumeManager.init(dnConf.getPmemVolumes(),
dnConf.getPmemCacheRecoveryEnabled());
pmemVolumeManager = PmemVolumeManager.getInstance(); pmemVolumeManager = PmemVolumeManager.getInstance();
cacheRecoveryEnabled = dnConf.getPmemCacheRecoveryEnabled();
// The configuration for max locked memory is shaded. // The configuration for max locked memory is shaded.
LOG.info("Persistent memory is used for caching data instead of " + LOG.info("Persistent memory is used for caching data instead of " +
"DRAM. Max locked memory is set to zero to disable DRAM cache"); "DRAM. Max locked memory is set to zero to disable DRAM cache");
@ -59,8 +63,8 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
* *
* Map the block and verify its checksum. * Map the block and verify its checksum.
* *
* The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir * The block will be mapped to PmemDir/BlockPoolId/subdir#/subdir#/BlockId,
* is a persistent memory volume chosen by PmemVolumeManager. * in which PmemDir is a persistent memory volume chosen by PmemVolumeManager.
* *
* @param length The current length of the block. * @param length The current length of the block.
* @param blockIn The block input stream. Should be positioned at the * @param blockIn The block input stream. Should be positioned at the
@ -141,9 +145,32 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
return false; return false;
} }
@Override
public MappableBlock getRecoveredMappableBlock(
File cacheFile, String bpid, byte volumeIndex) throws IOException {
ExtendedBlockId key = new ExtendedBlockId(getBlockId(cacheFile), bpid);
MappableBlock mappableBlock = new PmemMappedBlock(cacheFile.length(), key);
PmemVolumeManager.getInstance().recoverBlockKeyToVolume(key, volumeIndex);
String path = PmemVolumeManager.getInstance().getCachePath(key);
long length = mappableBlock.getLength();
LOG.info("Recovering persistent memory cache for block {}, " +
"path = {}, length = {}", key, path, length);
return mappableBlock;
}
/**
* Parse the file name and get the BlockId.
*/
public long getBlockId(File file) {
return Long.parseLong(file.getName());
}
@Override @Override
void shutdown() { void shutdown() {
LOG.info("Clean up cache on persistent memory during shutdown."); if (!cacheRecoveryEnabled) {
PmemVolumeManager.getInstance().cleanup(); LOG.info("Clean up cache on persistent memory during shutdown.");
PmemVolumeManager.getInstance().cleanup();
}
} }
} }

View File

@ -54,11 +54,17 @@ public class PmemMappedBlock implements MappableBlock {
return -1L; return -1L;
} }
@Override
public ExtendedBlockId getKey() {
return key;
}
@Override @Override
public void close() { public void close() {
String cacheFilePath = String cacheFilePath = null;
PmemVolumeManager.getInstance().getCachePath(key);
try { try {
cacheFilePath =
PmemVolumeManager.getInstance().getCachePath(key);
FsDatasetUtil.deleteMappedFile(cacheFilePath); FsDatasetUtil.deleteMappedFile(cacheFilePath);
LOG.info("Successfully uncached one replica:{} from persistent memory" LOG.info("Successfully uncached one replica:{} from persistent memory"
+ ", [cached path={}, length={}]", key, cacheFilePath, length); + ", [cached path={}, length={}]", key, cacheFilePath, length);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -35,6 +36,7 @@ import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer; import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
@ -52,7 +54,7 @@ public final class PmemVolumeManager {
* Counts used bytes for persistent memory. * Counts used bytes for persistent memory.
*/ */
private static class UsedBytesCount { private static class UsedBytesCount {
private final long maxBytes; private long maxBytes;
private final AtomicLong usedBytes = new AtomicLong(0); private final AtomicLong usedBytes = new AtomicLong(0);
UsedBytesCount(long maxBytes) { UsedBytesCount(long maxBytes) {
@ -102,6 +104,10 @@ public final class PmemVolumeManager {
long getAvailableBytes() { long getAvailableBytes() {
return maxBytes - usedBytes.get(); return maxBytes - usedBytes.get();
} }
void setMaxBytes(long maxBytes) {
this.maxBytes = maxBytes;
}
} }
private static final Logger LOG = private static final Logger LOG =
@ -113,6 +119,7 @@ public final class PmemVolumeManager {
private final Map<ExtendedBlockId, Byte> blockKeyToVolume = private final Map<ExtendedBlockId, Byte> blockKeyToVolume =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private final List<UsedBytesCount> usedBytesCounts = new ArrayList<>(); private final List<UsedBytesCount> usedBytesCounts = new ArrayList<>();
private boolean cacheRecoveryEnabled;
/** /**
* The total cache capacity in bytes of persistent memory. * The total cache capacity in bytes of persistent memory.
@ -122,12 +129,14 @@ public final class PmemVolumeManager {
private int count = 0; private int count = 0;
private byte nextIndex = 0; private byte nextIndex = 0;
private PmemVolumeManager(String[] pmemVolumesConfig) throws IOException { private PmemVolumeManager(String[] pmemVolumesConfig,
boolean cacheRecoveryEnabled) throws IOException {
if (pmemVolumesConfig == null || pmemVolumesConfig.length == 0) { if (pmemVolumesConfig == null || pmemVolumesConfig.length == 0) {
throw new IOException("The persistent memory volume, " + throw new IOException("The persistent memory volume, " +
DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY + DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY +
" is not configured!"); " is not configured!");
} }
this.cacheRecoveryEnabled = cacheRecoveryEnabled;
this.loadVolumes(pmemVolumesConfig); this.loadVolumes(pmemVolumesConfig);
cacheCapacity = 0L; cacheCapacity = 0L;
for (UsedBytesCount counter : usedBytesCounts) { for (UsedBytesCount counter : usedBytesCounts) {
@ -135,10 +144,12 @@ public final class PmemVolumeManager {
} }
} }
public synchronized static void init(String[] pmemVolumesConfig) public synchronized static void init(
String[] pmemVolumesConfig, boolean cacheRecoveryEnabled)
throws IOException { throws IOException {
if (pmemVolumeManager == null) { if (pmemVolumeManager == null) {
pmemVolumeManager = new PmemVolumeManager(pmemVolumesConfig); pmemVolumeManager = new PmemVolumeManager(pmemVolumesConfig,
cacheRecoveryEnabled);
} }
} }
@ -150,6 +161,11 @@ public final class PmemVolumeManager {
return pmemVolumeManager; return pmemVolumeManager;
} }
@VisibleForTesting
public static void reset() {
pmemVolumeManager = null;
}
@VisibleForTesting @VisibleForTesting
public static void setMaxBytes(long maxBytes) { public static void setMaxBytes(long maxBytes) {
maxBytesPerPmem = maxBytes; maxBytesPerPmem = maxBytes;
@ -218,8 +234,10 @@ public final class PmemVolumeManager {
try { try {
File pmemDir = new File(volumes[n]); File pmemDir = new File(volumes[n]);
File realPmemDir = verifyIfValidPmemVolume(pmemDir); File realPmemDir = verifyIfValidPmemVolume(pmemDir);
// Clean up the cache left before, if any. if (!cacheRecoveryEnabled) {
cleanup(realPmemDir); // Clean up the cache left before, if any.
cleanup(realPmemDir);
}
this.pmemVolumes.add(realPmemDir.getPath()); this.pmemVolumes.add(realPmemDir.getPath());
long maxBytes; long maxBytes;
if (maxBytesPerPmem == -1) { if (maxBytesPerPmem == -1) {
@ -261,6 +279,41 @@ public final class PmemVolumeManager {
} }
} }
/**
* Recover cache from the cached files in the configured pmem volumes.
*/
public Map<ExtendedBlockId, MappableBlock> recoverCache(
String bpid, MappableBlockLoader cacheLoader) throws IOException {
final Map<ExtendedBlockId, MappableBlock> keyToMappableBlock
= new ConcurrentHashMap<>();
for (byte volumeIndex = 0; volumeIndex < pmemVolumes.size();
volumeIndex++) {
long maxBytes = usedBytesCounts.get(volumeIndex).getMaxBytes();
long usedBytes = 0;
File cacheDir = new File(pmemVolumes.get(volumeIndex), bpid);
Collection<File> cachedFileList = FileUtils.listFiles(cacheDir,
TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
// Scan the cached files in pmem volumes for cache recovery.
for (File cachedFile : cachedFileList) {
MappableBlock mappableBlock = cacheLoader.
getRecoveredMappableBlock(cachedFile, bpid, volumeIndex);
ExtendedBlockId key = mappableBlock.getKey();
keyToMappableBlock.put(key, mappableBlock);
usedBytes += cachedFile.length();
}
// Update maxBytes and cache capacity according to cache space
// used by recovered cached files.
usedBytesCounts.get(volumeIndex).setMaxBytes(maxBytes + usedBytes);
cacheCapacity += usedBytes;
usedBytesCounts.get(volumeIndex).reserve(usedBytes);
}
return keyToMappableBlock;
}
public void recoverBlockKeyToVolume(ExtendedBlockId key, byte volumeIndex) {
blockKeyToVolume.put(key, volumeIndex);
}
@VisibleForTesting @VisibleForTesting
static File verifyIfValidPmemVolume(File pmemDir) static File verifyIfValidPmemVolume(File pmemDir)
throws IOException { throws IOException {
@ -316,6 +369,18 @@ public final class PmemVolumeManager {
} }
} }
/**
* Create cache subdirectory specified with blockPoolId.
*/
public void createBlockPoolDir(String bpid) throws IOException {
for (String volume : pmemVolumes) {
File cacheDir = new File(volume, bpid);
if (!cacheDir.exists() && !cacheDir.mkdir()) {
throw new IOException("Failed to create " + cacheDir.getPath());
}
}
}
public static String getRealPmemDir(String rawPmemDir) { public static String getRealPmemDir(String rawPmemDir) {
return new File(rawPmemDir, CACHE_DIR).getAbsolutePath(); return new File(rawPmemDir, CACHE_DIR).getAbsolutePath();
} }
@ -355,19 +420,22 @@ public final class PmemVolumeManager {
return pmemVolumes.get(index); return pmemVolumes.get(index);
} }
/** ArrayList<String> getVolumes() {
* The cache file is named as BlockPoolId-BlockId. return pmemVolumes;
* So its name can be inferred by BlockPoolId and BlockId.
*/
public String getCacheFileName(ExtendedBlockId key) {
return key.getBlockPoolId() + "-" + key.getBlockId();
} }
/** /**
* Considering the pmem volume size is below TB level currently, * A cache file is named after the corresponding BlockId.
* it is tolerable to keep cache files under one directory. * Thus, cache file name can be inferred according to BlockId.
* The strategy will be optimized, especially if one pmem volume */
* has huge cache capacity. public String idToCacheFileName(ExtendedBlockId key) {
return String.valueOf(key.getBlockId());
}
/**
* Create and get the directory where a cache file with this key and
* volumeIndex should be stored. Use hierarchical strategy of storing
* blocks to avoid keeping cache files under one directory.
* *
* @param volumeIndex The index of pmem volume where a replica will be * @param volumeIndex The index of pmem volume where a replica will be
* cached to or has been cached to. * cached to or has been cached to.
@ -376,19 +444,31 @@ public final class PmemVolumeManager {
* *
* @return A path to which the block replica is mapped. * @return A path to which the block replica is mapped.
*/ */
public String inferCacheFilePath(Byte volumeIndex, ExtendedBlockId key) { public String idToCacheFilePath(Byte volumeIndex, ExtendedBlockId key)
return pmemVolumes.get(volumeIndex) + "/" + getCacheFileName(key); throws IOException {
final String cacheSubdirPrefix = "subdir";
long blockId = key.getBlockId();
String bpid = key.getBlockPoolId();
int d1 = (int) ((blockId >> 16) & 0x1F);
int d2 = (int) ((blockId >> 8) & 0x1F);
String parentDir = pmemVolumes.get(volumeIndex) + "/" + bpid;
String subDir = cacheSubdirPrefix + d1 + "/" + cacheSubdirPrefix + d2;
File filePath = new File(parentDir, subDir);
if (!filePath.exists() && !filePath.mkdirs()) {
throw new IOException("Failed to create " + filePath.getPath());
}
return filePath.getAbsolutePath() + "/" + idToCacheFileName(key);
} }
/** /**
* The cache file path is pmemVolume/BlockPoolId-BlockId. * The cache file path is pmemVolume/BlockPoolId/subdir#/subdir#/BlockId.
*/ */
public String getCachePath(ExtendedBlockId key) { public String getCachePath(ExtendedBlockId key) throws IOException {
Byte volumeIndex = blockKeyToVolume.get(key); Byte volumeIndex = blockKeyToVolume.get(key);
if (volumeIndex == null) { if (volumeIndex == null) {
return null; return null;
} }
return inferCacheFilePath(volumeIndex, key); return idToCacheFilePath(volumeIndex, key);
} }
@VisibleForTesting @VisibleForTesting

View File

@ -2686,12 +2686,21 @@
</property> </property>
<property> <property>
<name>dfs.datanode.cache.pmem.dirs</name> <name>dfs.datanode.pmem.cache.dirs</name>
<value></value> <value></value>
<description> <description>
This value specifies the persistent memory directory used for caching block This value specifies the persistent memory directory used for caching block
replica. It matters only if the value of dfs.datanode.cache.loader.class is replica. Multiple directories separated by "," are acceptable.
PmemMappableBlockLoader. Multiple directories separated by "," are acceptable. </description>
</property>
<property>
<name>dfs.datanode.pmem.cache.recovery</name>
<value>true</value>
<description>
This value specifies whether previous cache on persistent memory will be recovered.
This configuration can take effect only if persistent memory cache is enabled by
specifying value for 'dfs.datanode.pmem.cache.dirs'.
</description> </description>
</property> </property>

View File

@ -224,7 +224,7 @@ Be sure to configure one of the following properties for DRAM cache or persisten
This setting is shared with the [Lazy Persist Writes feature](./MemoryStorage.html). The Data Node will ensure that the combined memory used by Lazy Persist Writes and Centralized Cache Management does not exceed the amount configured in `dfs.datanode.max.locked.memory`. This setting is shared with the [Lazy Persist Writes feature](./MemoryStorage.html). The Data Node will ensure that the combined memory used by Lazy Persist Writes and Centralized Cache Management does not exceed the amount configured in `dfs.datanode.max.locked.memory`.
* dfs.datanode.cache.pmem.dirs * dfs.datanode.pmem.cache.dirs
This property specifies the cache volume of persistent memory. For multiple volumes, they should be separated by “,”, e.g. “/mnt/pmem0, /mnt/pmem1”. The default value is empty. If this property is configured, the volume capacity will be detected. And there is no need to configure `dfs.datanode.max.locked.memory`. This property specifies the cache volume of persistent memory. For multiple volumes, they should be separated by “,”, e.g. “/mnt/pmem0, /mnt/pmem1”. The default value is empty. If this property is configured, the volume capacity will be detected. And there is no need to configure `dfs.datanode.max.locked.memory`.
@ -258,6 +258,9 @@ The following properties are not required, but may be specified for tuning:
This parameter can be used to enable/disable the centralized caching in NameNode. When centralized caching is disabled, NameNode will not process cache reports or store information about block cache locations on the cluster. Note that NameNode will continute to store the path based cache locations in the file-system metadata, even though it will not act on this information until the caching is enabled. The default value for this parameter is true (i.e. centralized caching is enabled). This parameter can be used to enable/disable the centralized caching in NameNode. When centralized caching is disabled, NameNode will not process cache reports or store information about block cache locations on the cluster. Note that NameNode will continute to store the path based cache locations in the file-system metadata, even though it will not act on this information until the caching is enabled. The default value for this parameter is true (i.e. centralized caching is enabled).
* dfs.datanode.pmem.cache.recovery
This parameter is used to determine whether to recover the status for previous cache on persistent memory during the start of DataNode. If it is enabled, DataNode will recover the status for previously cached data on persistent memory. Thus, re-caching data will be avoided. If this property is not enabled, DataNode will clean up the previous cache, if any, on persistent memory. This property can only work when persistent memory is enabled, i.e., `dfs.datanode.pmem.cache.dirs` is configured.
### OS Limits ### OS Limits

View File

@ -21,7 +21,7 @@ import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -137,7 +137,7 @@ public class TestCacheByPmemMappableBlockLoader {
new File(PMEM_DIR_0).getAbsoluteFile().mkdir(); new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
new File(PMEM_DIR_1).getAbsoluteFile().mkdir(); new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
// Configure two bogus pmem volumes // Configure two bogus pmem volumes
conf.set(DFS_DATANODE_CACHE_PMEM_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1); conf.set(DFS_DATANODE_PMEM_CACHE_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
PmemVolumeManager.setMaxBytes((long) (CACHE_CAPACITY * 0.5)); PmemVolumeManager.setMaxBytes((long) (CACHE_CAPACITY * 0.5));
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator(); prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
@ -259,14 +259,18 @@ public class TestCacheByPmemMappableBlockLoader {
// The cachePath shouldn't be null if the replica has been cached // The cachePath shouldn't be null if the replica has been cached
// to pmem. // to pmem.
assertNotNull(cachePath); assertNotNull(cachePath);
String expectFileName = Path path = new Path(cachePath);
PmemVolumeManager.getInstance().getCacheFileName(key); String fileName = path.getName();
if (cachePath.startsWith(PMEM_DIR_0)) { if (cachePath.startsWith(PMEM_DIR_0)) {
assertTrue(cachePath.equals(PmemVolumeManager String expectPath = PmemVolumeManager.
.getRealPmemDir(PMEM_DIR_0) + "/" + expectFileName)); getRealPmemDir(PMEM_DIR_0) + "/" + key.getBlockPoolId();
assertTrue(path.toString().startsWith(expectPath));
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else if (cachePath.startsWith(PMEM_DIR_1)) { } else if (cachePath.startsWith(PMEM_DIR_1)) {
assertTrue(cachePath.equals(PmemVolumeManager String expectPath = PmemVolumeManager.
.getRealPmemDir(PMEM_DIR_1) + "/" + expectFileName)); getRealPmemDir(PMEM_DIR_1) + "/" + key.getBlockPoolId();
assertTrue(path.toString().startsWith(expectPath));
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else { } else {
fail("The cache path is not the expected one: " + cachePath); fail("The cache path is not the expected one: " + cachePath);
} }

View File

@ -0,0 +1,342 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.event.Level;
import com.google.common.base.Supplier;
import com.google.common.primitives.Ints;
/**
* Tests HDFS persistent memory cache by PmemMappableBlockLoader.
*
* Bogus persistent memory volume is used to cache blocks.
*/
public class TestPmemCacheRecovery {
protected static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(TestCacheByPmemMappableBlockLoader.class);
protected static final long CACHE_AMOUNT = 64 * 1024;
protected static final long BLOCK_SIZE = 4 * 1024;
private static Configuration conf;
private static MiniDFSCluster cluster = null;
private static DistributedFileSystem fs;
private static DataNode dn;
private static FsDatasetCache cacheManager;
private static String blockPoolId = "";
/**
* Used to pause DN BPServiceActor threads. BPSA threads acquire the
* shared read lock. The test acquires the write lock for exclusive access.
*/
private static ReadWriteLock lock = new ReentrantReadWriteLock(true);
private static CacheManipulator prevCacheManipulator;
private static DataNodeFaultInjector oldInjector;
private static final String PMEM_DIR_0 =
MiniDFSCluster.getBaseDirectory() + "pmem0";
private static final String PMEM_DIR_1 =
MiniDFSCluster.getBaseDirectory() + "pmem1";
static {
GenericTestUtils.setLogLevel(
LoggerFactory.getLogger(FsDatasetCache.class), Level.DEBUG);
}
@BeforeClass
public static void setUpClass() throws Exception {
oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
@Override
public void startOfferService() throws Exception {
lock.readLock().lock();
}
@Override
public void endOfferService() throws Exception {
lock.readLock().unlock();
}
});
}
@AfterClass
public static void tearDownClass() throws Exception {
DataNodeFaultInjector.set(oldInjector);
}
@Before
public void setUp() throws Exception {
conf = new HdfsConfiguration();
conf.setBoolean(DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY, true);
conf.setLong(DFSConfigKeys.
DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(
DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
// Configuration for pmem cache
new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
// Configure two bogus pmem volumes
conf.set(DFS_DATANODE_PMEM_CACHE_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
dn = cluster.getDataNodes().get(0);
cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
}
@After
public void tearDown() throws Exception {
if (fs != null) {
fs.close();
fs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
}
protected static void restartCluster() throws Exception {
conf = new HdfsConfiguration();
conf.setBoolean(DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY, true);
conf.setLong(DFSConfigKeys.
DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(
DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
// Configure two bogus pmem volumes
conf.set(DFS_DATANODE_PMEM_CACHE_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
FsDatasetImpl.setBlockPoolId(blockPoolId);
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
dn = cluster.getDataNodes().get(0);
cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
}
protected static void shutdownCluster() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
PmemVolumeManager.reset();
}
public List<ExtendedBlockId> getExtendedBlockId(Path filePath, long fileLen)
throws IOException {
List<ExtendedBlockId> keys = new ArrayList<>();
HdfsBlockLocation[] locs = (HdfsBlockLocation[]) fs.getFileBlockLocations(
filePath, 0, fileLen);
for (HdfsBlockLocation loc : locs) {
long bkid = loc.getLocatedBlock().getBlock().getBlockId();
String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId();
keys.add(new ExtendedBlockId(bkid, bpid));
}
return keys;
}
@Test(timeout = 60000)
public void testCacheRecovery() throws Exception {
final int cacheBlocksNum =
Ints.checkedCast(CACHE_AMOUNT / BLOCK_SIZE);
BlockReaderTestUtil.enableHdfsCachingTracing();
Assert.assertEquals(0, CACHE_AMOUNT % BLOCK_SIZE);
final Path testFile = new Path("/testFile");
final long testFileLen = cacheBlocksNum * BLOCK_SIZE;
DFSTestUtil.createFile(fs, testFile,
testFileLen, (short) 1, 0xbeef);
List<ExtendedBlockId> blockKeys =
getExtendedBlockId(testFile, testFileLen);
fs.addCachePool(new CachePoolInfo("testPool"));
final long cacheDirectiveId = fs.addCacheDirective(
new CacheDirectiveInfo.Builder().setPool("testPool").
setPath(testFile).setReplication((short) 1).build());
// wait for caching
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksCached =
MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
if (blocksCached != cacheBlocksNum) {
LOG.info("waiting for " + cacheBlocksNum + " blocks to " +
"be cached. Right now " + blocksCached + " blocks are cached.");
return false;
}
LOG.info(cacheBlocksNum + " blocks are now cached.");
return true;
}
}, 1000, 30000);
assertEquals(CACHE_AMOUNT, cacheManager.getCacheUsed());
Map<ExtendedBlockId, Byte> blockKeyToVolume =
PmemVolumeManager.getInstance().getBlockKeyToVolume();
// All block keys should be kept in blockKeyToVolume
assertEquals(blockKeyToVolume.size(), cacheBlocksNum);
assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
// Test each replica's cache file path
for (ExtendedBlockId key : blockKeys) {
if (blockPoolId.isEmpty()) {
blockPoolId = key.getBlockPoolId();
}
String cachePath = cacheManager.
getReplicaCachePath(key.getBlockPoolId(), key.getBlockId());
// The cachePath shouldn't be null if the replica has been cached
// to pmem.
assertNotNull(cachePath);
Path path = new Path(cachePath);
String fileName = path.getName();
if (cachePath.startsWith(PMEM_DIR_0)) {
String expectPath = PmemVolumeManager.
getRealPmemDir(PMEM_DIR_0) + "/" + key.getBlockPoolId();
assertTrue(path.toString().startsWith(expectPath));
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else if (cachePath.startsWith(PMEM_DIR_1)) {
String expectPath = PmemVolumeManager.
getRealPmemDir(PMEM_DIR_1) + "/" + key.getBlockPoolId();
assertTrue(path.toString().startsWith(expectPath));
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else {
fail("The cache path is not the expected one: " + cachePath);
}
}
// Trigger cache recovery
shutdownCluster();
restartCluster();
assertEquals(CACHE_AMOUNT, cacheManager.getCacheUsed());
blockKeyToVolume = PmemVolumeManager.getInstance().getBlockKeyToVolume();
// All block keys should be kept in blockKeyToVolume
assertEquals(blockKeyToVolume.size(), cacheBlocksNum);
assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
// Test each replica's cache file path
for (ExtendedBlockId key : blockKeys) {
String cachePath = cacheManager.
getReplicaCachePath(key.getBlockPoolId(), key.getBlockId());
// The cachePath shouldn't be null if the replica has been cached
// to pmem.
assertNotNull(cachePath);
Path path = new Path(cachePath);
String fileName = path.getName();
if (cachePath.startsWith(PMEM_DIR_0)) {
String expectPath = PmemVolumeManager.
getRealPmemDir(PMEM_DIR_0) + "/" + key.getBlockPoolId();
assertTrue(path.toString().startsWith(expectPath));
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else if (cachePath.startsWith(PMEM_DIR_1)) {
String expectPath = PmemVolumeManager.
getRealPmemDir(PMEM_DIR_1) + "/" + key.getBlockPoolId();
assertTrue(path.toString().startsWith(expectPath));
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else {
fail("The cache path is not the expected one: " + cachePath);
}
}
// Uncache the test file
for (ExtendedBlockId key : blockKeys) {
cacheManager.uncacheBlock(blockPoolId, key.getBlockId());
}
// Wait for uncaching
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksUncached =
MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
if (blocksUncached != cacheBlocksNum) {
LOG.info("waiting for " + cacheBlocksNum + " blocks to be " +
"uncached. Right now " + blocksUncached +
" blocks are uncached.");
return false;
}
LOG.info(cacheBlocksNum + " blocks have been uncached.");
return true;
}
}, 1000, 30000);
// It is expected that no pmem cache space is used.
assertEquals(0, cacheManager.getCacheUsed());
// No record should be kept by blockKeyToVolume after testFile is uncached.
assertEquals(blockKeyToVolume.size(), 0);
}
}