HDFS-14356. Implement HDFS cache on SCM with native PMDK libs. Contributed by Feilong He.
(cherry picked from commit d1aad44490
)
This commit is contained in:
parent
f982f96623
commit
9281b72550
28
BUILDING.txt
28
BUILDING.txt
|
@ -82,6 +82,8 @@ Optional packages:
|
|||
$ sudo apt-get install fuse libfuse-dev
|
||||
* ZStandard compression
|
||||
$ sudo apt-get install zstd
|
||||
* PMDK library for storage class memory(SCM) as HDFS cache backend
|
||||
Please refer to http://pmem.io/ and https://github.com/pmem/pmdk
|
||||
|
||||
----------------------------------------------------------------------------------
|
||||
Maven main modules:
|
||||
|
@ -246,6 +248,32 @@ Maven build goals:
|
|||
invoke, run 'mvn dependency-check:aggregate'. Note that this plugin
|
||||
requires maven 3.1.1 or greater.
|
||||
|
||||
PMDK library build options:
|
||||
|
||||
The Persistent Memory Development Kit (PMDK), formerly known as NVML, is a growing
|
||||
collection of libraries which have been developed for various use cases, tuned,
|
||||
validated to production quality, and thoroughly documented. These libraries are built
|
||||
on the Direct Access (DAX) feature available in both Linux and Windows, which allows
|
||||
applications directly load/store access to persistent memory by memory-mapping files
|
||||
on a persistent memory aware file system.
|
||||
|
||||
It is currently an optional component, meaning that Hadoop can be built without
|
||||
this dependency. Please Note the library is used via dynamic module. For getting
|
||||
more details please refer to the official sites:
|
||||
http://pmem.io/ and https://github.com/pmem/pmdk.
|
||||
|
||||
* -Drequire.pmdk is used to build the project with PMDK libraries forcibly. With this
|
||||
option provided, the build will fail if libpmem library is not found. If this option
|
||||
is not given, the build will generate a version of Hadoop with libhadoop.so.
|
||||
And storage class memory(SCM) backed HDFS cache is still supported without PMDK involved.
|
||||
Because PMDK can bring better caching write/read performance, it is recommended to build
|
||||
the project with this option if user plans to use SCM backed HDFS cache.
|
||||
* -Dpmdk.lib is used to specify a nonstandard location for PMDK libraries if they are not
|
||||
under /usr/lib or /usr/lib64.
|
||||
* -Dbundle.pmdk is used to copy the specified libpmem libraries into the distribution tar
|
||||
package. This option requires that -Dpmdk.lib is specified. With -Dbundle.pmdk provided,
|
||||
the build will fail if -Dpmdk.lib is not specified.
|
||||
|
||||
----------------------------------------------------------------------------------
|
||||
Building components separately
|
||||
|
||||
|
|
|
@ -96,6 +96,12 @@ for i in "$@"; do
|
|||
--isalbundle=*)
|
||||
ISALBUNDLE=${i#*=}
|
||||
;;
|
||||
--pmdklib=*)
|
||||
PMDKLIB=${i#*=}
|
||||
;;
|
||||
--pmdkbundle=*)
|
||||
PMDKBUNDLE=${i#*=}
|
||||
;;
|
||||
--opensslbinbundle=*)
|
||||
OPENSSLBINBUNDLE=${i#*=}
|
||||
;;
|
||||
|
@ -153,6 +159,8 @@ if [[ -d "${LIB_DIR}" ]]; then
|
|||
bundle_native_lib "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}"
|
||||
|
||||
bundle_native_lib "${ISALBUNDLE}" "isal.lib" "isa" "${ISALLIB}"
|
||||
|
||||
bundle_native_lib "${PMDKBUNDLE}" "pmdk.lib" "pmdk" "${PMDKLIB}"
|
||||
fi
|
||||
|
||||
# Windows
|
||||
|
|
|
@ -708,6 +708,8 @@
|
|||
<REQUIRE_ISAL>${require.isal} </REQUIRE_ISAL>
|
||||
<CUSTOM_ISAL_PREFIX>${isal.prefix} </CUSTOM_ISAL_PREFIX>
|
||||
<CUSTOM_ISAL_LIB>${isal.lib} </CUSTOM_ISAL_LIB>
|
||||
<REQUIRE_PMDK>${require.pmdk}</REQUIRE_PMDK>
|
||||
<CUSTOM_PMDK_LIB>${pmdk.lib}</CUSTOM_PMDK_LIB>
|
||||
<REQUIRE_OPENSSL>${require.openssl} </REQUIRE_OPENSSL>
|
||||
<CUSTOM_OPENSSL_PREFIX>${openssl.prefix} </CUSTOM_OPENSSL_PREFIX>
|
||||
<CUSTOM_OPENSSL_LIB>${openssl.lib} </CUSTOM_OPENSSL_LIB>
|
||||
|
|
|
@ -121,6 +121,7 @@ else ()
|
|||
ENDIF(REQUIRE_ZSTD)
|
||||
endif ()
|
||||
|
||||
#Require ISA-L
|
||||
set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
|
||||
hadoop_set_find_shared_library_version("2")
|
||||
find_library(ISAL_LIBRARY
|
||||
|
@ -159,6 +160,25 @@ else (ISAL_LIBRARY)
|
|||
ENDIF(REQUIRE_ISAL)
|
||||
endif (ISAL_LIBRARY)
|
||||
|
||||
# Build with PMDK library if -Drequire.pmdk option is specified.
|
||||
if(REQUIRE_PMDK)
|
||||
set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
|
||||
hadoop_set_find_shared_library_version("1")
|
||||
find_library(PMDK_LIBRARY
|
||||
NAMES pmem
|
||||
PATHS ${CUSTOM_PMDK_LIB} /usr/lib /usr/lib64)
|
||||
set(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES})
|
||||
|
||||
if(PMDK_LIBRARY)
|
||||
GET_FILENAME_COMPONENT(HADOOP_PMDK_LIBRARY ${PMDK_LIBRARY} NAME)
|
||||
set(PMDK_SOURCE_FILES ${SRC}/io/nativeio/pmdk_load.c)
|
||||
else(PMDK_LIBRARY)
|
||||
MESSAGE(FATAL_ERROR "The required PMDK library is NOT found. PMDK_LIBRARY=${PMDK_LIBRARY}")
|
||||
endif(PMDK_LIBRARY)
|
||||
else(REQUIRE_PMDK)
|
||||
MESSAGE(STATUS "Build without PMDK support.")
|
||||
endif(REQUIRE_PMDK)
|
||||
|
||||
# Build hardware CRC32 acceleration, if supported on the platform.
|
||||
if(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
|
||||
set(BULK_CRC_ARCH_SOURCE_FIlE "${SRC}/util/bulk_crc32_x86.c")
|
||||
|
@ -256,6 +276,7 @@ hadoop_add_dual_library(hadoop
|
|||
${SRC}/io/compress/zlib/ZlibDecompressor.c
|
||||
${BZIP2_SOURCE_FILES}
|
||||
${SRC}/io/nativeio/NativeIO.c
|
||||
${PMDK_SOURCE_FILES}
|
||||
${SRC}/io/nativeio/errno_enum.c
|
||||
${SRC}/io/nativeio/file_descriptor.c
|
||||
${SRC}/io/nativeio/SharedFileDescriptorFactory.c
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@"
|
||||
#cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@"
|
||||
#cmakedefine HADOOP_ISAL_LIBRARY "@HADOOP_ISAL_LIBRARY@"
|
||||
#cmakedefine HADOOP_PMDK_LIBRARY "@HADOOP_PMDK_LIBRARY@"
|
||||
#cmakedefine HAVE_SYNC_FILE_RANGE
|
||||
#cmakedefine HAVE_POSIX_FADVISE
|
||||
|
||||
|
|
|
@ -99,6 +99,48 @@ public class NativeIO {
|
|||
write. */
|
||||
public static int SYNC_FILE_RANGE_WAIT_AFTER = 4;
|
||||
|
||||
/**
|
||||
* Keeps the support state of PMDK.
|
||||
*/
|
||||
public enum SupportState {
|
||||
UNSUPPORTED(-1),
|
||||
PMDK_LIB_NOT_FOUND(1),
|
||||
SUPPORTED(0);
|
||||
|
||||
private byte stateCode;
|
||||
SupportState(int stateCode) {
|
||||
this.stateCode = (byte) stateCode;
|
||||
}
|
||||
|
||||
public int getStateCode() {
|
||||
return stateCode;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
String msg;
|
||||
switch (stateCode) {
|
||||
case -1:
|
||||
msg = "The native code is built without PMDK support.";
|
||||
break;
|
||||
case 1:
|
||||
msg = "The native code is built with PMDK support, but PMDK libs " +
|
||||
"are NOT found in execution environment or failed to be loaded.";
|
||||
break;
|
||||
case 0:
|
||||
msg = "The native code is built with PMDK support, and PMDK libs " +
|
||||
"are loaded successfully.";
|
||||
break;
|
||||
default:
|
||||
msg = "The state code: " + stateCode + " is unrecognized!";
|
||||
}
|
||||
return msg;
|
||||
}
|
||||
}
|
||||
|
||||
// Denotes the state of supporting PMDK. The value is set by JNI.
|
||||
private static SupportState pmdkSupportState =
|
||||
SupportState.PMDK_LIB_NOT_FOUND;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class);
|
||||
|
||||
// Set to true via JNI if possible
|
||||
|
@ -123,6 +165,93 @@ public class NativeIO {
|
|||
POSIX.cacheManipulator = cacheManipulator;
|
||||
}
|
||||
|
||||
// This method is invoked by JNI.
|
||||
public static void setPmdkSupportState(int stateCode) {
|
||||
for (SupportState state : SupportState.values()) {
|
||||
if (state.getStateCode() == stateCode) {
|
||||
pmdkSupportState = state;
|
||||
return;
|
||||
}
|
||||
}
|
||||
LOG.error("The state code: " + stateCode + " is unrecognized!");
|
||||
}
|
||||
|
||||
public static boolean isPmdkAvailable() {
|
||||
LOG.info(pmdkSupportState.getMessage());
|
||||
return pmdkSupportState == SupportState.SUPPORTED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Denote memory region for a file mapped.
|
||||
*/
|
||||
public static class PmemMappedRegion {
|
||||
private long address;
|
||||
private long length;
|
||||
private boolean isPmem;
|
||||
|
||||
public PmemMappedRegion(long address, long length, boolean isPmem) {
|
||||
this.address = address;
|
||||
this.length = length;
|
||||
this.isPmem = isPmem;
|
||||
}
|
||||
|
||||
public boolean isPmem() {
|
||||
return this.isPmem;
|
||||
}
|
||||
|
||||
public long getAddress() {
|
||||
return this.address;
|
||||
}
|
||||
|
||||
public long getLength() {
|
||||
return this.length;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* JNI wrapper of persist memory operations.
|
||||
*/
|
||||
public static class Pmem {
|
||||
// check whether the address is a Pmem address or DIMM address
|
||||
public static boolean isPmem(long address, long length) {
|
||||
return NativeIO.POSIX.isPmemCheck(address, length);
|
||||
}
|
||||
|
||||
// create a pmem file and memory map it
|
||||
public static PmemMappedRegion mapBlock(String path, long length) {
|
||||
return NativeIO.POSIX.pmemCreateMapFile(path, length);
|
||||
}
|
||||
|
||||
// unmap a pmem file
|
||||
public static boolean unmapBlock(long address, long length) {
|
||||
return NativeIO.POSIX.pmemUnMap(address, length);
|
||||
}
|
||||
|
||||
// copy data from disk file(src) to pmem file(dest), without flush
|
||||
public static void memCopy(byte[] src, long dest, boolean isPmem,
|
||||
long length) {
|
||||
NativeIO.POSIX.pmemCopy(src, dest, isPmem, length);
|
||||
}
|
||||
|
||||
// flush the memory content to persistent storage
|
||||
public static void memSync(PmemMappedRegion region) {
|
||||
if (region.isPmem()) {
|
||||
NativeIO.POSIX.pmemDrain();
|
||||
} else {
|
||||
NativeIO.POSIX.pmemSync(region.getAddress(), region.getLength());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static native boolean isPmemCheck(long address, long length);
|
||||
private static native PmemMappedRegion pmemCreateMapFile(String path,
|
||||
long length);
|
||||
private static native boolean pmemUnMap(long address, long length);
|
||||
private static native void pmemCopy(byte[] src, long dest, boolean isPmem,
|
||||
long length);
|
||||
private static native void pmemDrain();
|
||||
private static native void pmemSync(long address, long length);
|
||||
|
||||
/**
|
||||
* Used to manipulate the operating system cache.
|
||||
*/
|
||||
|
@ -142,8 +271,8 @@ public class NativeIO {
|
|||
}
|
||||
|
||||
public void posixFadviseIfPossible(String identifier,
|
||||
FileDescriptor fd, long offset, long len, int flags)
|
||||
throws NativeIOException {
|
||||
FileDescriptor fd, long offset, long len, int flags)
|
||||
throws NativeIOException {
|
||||
NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
|
||||
len, flags);
|
||||
}
|
||||
|
@ -743,7 +872,7 @@ public class NativeIO {
|
|||
* user account name, of the format DOMAIN\UserName. This method
|
||||
* will remove the domain part of the full logon name.
|
||||
*
|
||||
* @param Fthe full principal name containing the domain
|
||||
* @param name the full principal name containing the domain
|
||||
* @return name with domain removed
|
||||
*/
|
||||
private static String stripDomain(String name) {
|
||||
|
|
|
@ -36,6 +36,10 @@
|
|||
#include <sys/resource.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/syscall.h>
|
||||
#ifdef HADOOP_PMDK_LIBRARY
|
||||
#include <libpmem.h>
|
||||
#include "pmdk_load.h"
|
||||
#endif
|
||||
#if !(defined(__FreeBSD__) || defined(__MACH__))
|
||||
#include <sys/sendfile.h>
|
||||
#endif
|
||||
|
@ -60,6 +64,7 @@
|
|||
|
||||
#define NATIVE_IO_POSIX_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX"
|
||||
#define NATIVE_IO_STAT_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat"
|
||||
#define NATIVE_IO_POSIX_PMEMREGION_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$PmemMappedRegion"
|
||||
|
||||
#define SET_INT_OR_RETURN(E, C, F) \
|
||||
{ \
|
||||
|
@ -81,6 +86,12 @@ static jmethodID nioe_ctor;
|
|||
// Please see HADOOP-7156 for details.
|
||||
jobject pw_lock_object;
|
||||
|
||||
#ifdef HADOOP_PMDK_LIBRARY
|
||||
// the NativeIO$POSIX$PmemMappedRegion inner class and its constructor
|
||||
static jclass pmem_region_clazz = NULL;
|
||||
static jmethodID pmem_region_ctor = NULL;
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Throw a java.IO.IOException, generating the message from errno.
|
||||
* NB. this is also used form windows_secure_container_executor.c
|
||||
|
@ -269,6 +280,63 @@ static void nioe_deinit(JNIEnv *env) {
|
|||
nioe_ctor = NULL;
|
||||
}
|
||||
|
||||
#ifdef HADOOP_PMDK_LIBRARY
|
||||
static int loadPmdkLib(JNIEnv *env) {
|
||||
char errMsg[1024];
|
||||
jclass clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_CLASS);
|
||||
if (clazz == NULL) {
|
||||
return 0; // exception has been raised
|
||||
}
|
||||
load_pmdk_lib(errMsg, sizeof(errMsg));
|
||||
jmethodID mid = (*env)->GetStaticMethodID(env, clazz, "setPmdkSupportState", "(I)V");
|
||||
if (mid == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (strlen(errMsg) > 0) {
|
||||
(*env)->CallStaticVoidMethod(env, clazz, mid, 1);
|
||||
return 0;
|
||||
}
|
||||
(*env)->CallStaticVoidMethod(env, clazz, mid, 0);
|
||||
return 1;
|
||||
}
|
||||
|
||||
static void pmem_region_init(JNIEnv *env, jclass nativeio_class) {
|
||||
|
||||
jclass clazz = NULL;
|
||||
// Init Stat
|
||||
clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_PMEMREGION_CLASS);
|
||||
if (!clazz) {
|
||||
THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion class");
|
||||
return; // exception has been raised
|
||||
}
|
||||
|
||||
// Init PmemMappedRegion class
|
||||
pmem_region_clazz = (*env)->NewGlobalRef(env, clazz);
|
||||
if (!pmem_region_clazz) {
|
||||
THROW(env, "java/io/IOException", "Failed to new global reference of PmemMappedRegion class");
|
||||
return; // exception has been raised
|
||||
}
|
||||
|
||||
pmem_region_ctor = (*env)->GetMethodID(env, pmem_region_clazz, "<init>", "(JJZ)V");
|
||||
if (!pmem_region_ctor) {
|
||||
THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion constructor");
|
||||
return; // exception has been raised
|
||||
}
|
||||
}
|
||||
|
||||
static void pmem_region_deinit(JNIEnv *env) {
|
||||
if (pmem_region_ctor != NULL) {
|
||||
(*env)->DeleteGlobalRef(env, pmem_region_ctor);
|
||||
pmem_region_ctor = NULL;
|
||||
}
|
||||
|
||||
if (pmem_region_clazz != NULL) {
|
||||
(*env)->DeleteGlobalRef(env, pmem_region_clazz);
|
||||
pmem_region_clazz = NULL;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* private static native void initNative();
|
||||
*
|
||||
|
@ -292,6 +360,11 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_initNative(
|
|||
#ifdef UNIX
|
||||
errno_enum_init(env);
|
||||
PASS_EXCEPTIONS_GOTO(env, error);
|
||||
#ifdef HADOOP_PMDK_LIBRARY
|
||||
if (loadPmdkLib(env)) {
|
||||
pmem_region_init(env, clazz);
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
return;
|
||||
error:
|
||||
|
@ -299,6 +372,9 @@ error:
|
|||
// class wasn't initted yet
|
||||
#ifdef UNIX
|
||||
stat_deinit(env);
|
||||
#ifdef HADOOP_PMDK_LIBRARY
|
||||
pmem_region_deinit(env);
|
||||
#endif
|
||||
#endif
|
||||
nioe_deinit(env);
|
||||
fd_deinit(env);
|
||||
|
@ -1383,3 +1459,179 @@ cleanup:
|
|||
/**
|
||||
* vim: sw=2: ts=2: et:
|
||||
*/
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
|
||||
* Method: isPmemCheck
|
||||
* Signature: (JJ)Z
|
||||
*/
|
||||
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_isPmemCheck(
|
||||
JNIEnv *env, jclass thisClass, jlong address, jlong length) {
|
||||
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
|
||||
jint is_pmem = pmdkLoader->pmem_is_pmem(address, length);
|
||||
return (is_pmem) ? JNI_TRUE : JNI_FALSE;
|
||||
#else
|
||||
THROW(env, "java/lang/UnsupportedOperationException",
|
||||
"The function isPmemCheck is not supported.");
|
||||
return JNI_FALSE;
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
|
||||
* Method: pmemCreateMapFile
|
||||
* Signature: (Ljava/lang/String;J)Lorg/apache/hadoop/io/nativeio/NativeIO/POSIX/PmemMappedRegion;
|
||||
*/
|
||||
JNIEXPORT jobject JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCreateMapFile(
|
||||
JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength) {
|
||||
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
|
||||
/* create a pmem file and memory map it */
|
||||
const char * path = NULL;
|
||||
void * pmemaddr = NULL;
|
||||
size_t mapped_len = 0;
|
||||
int is_pmem = 1;
|
||||
char msg[1000];
|
||||
|
||||
path = (*env)->GetStringUTFChars(env, filePath, NULL);
|
||||
if (!path) {
|
||||
THROW(env, "java/lang/IllegalArgumentException", "File Path cannot be null");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (fileLength <= 0) {
|
||||
(*env)->ReleaseStringUTFChars(env, filePath, path);
|
||||
THROW(env, "java/lang/IllegalArgumentException", "File length should be positive");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pmemaddr = pmdkLoader->pmem_map_file(path, fileLength, PMEM_FILE_CREATE|PMEM_FILE_EXCL,
|
||||
0666, &mapped_len, &is_pmem);
|
||||
|
||||
if (!pmemaddr) {
|
||||
snprintf(msg, sizeof(msg), "Failed to create pmem file. file: %s, length: %x, error msg: %s", path, fileLength, pmem_errormsg());
|
||||
THROW(env, "java/io/IOException", msg);
|
||||
(*env)->ReleaseStringUTFChars(env, filePath, path);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (fileLength != mapped_len) {
|
||||
snprintf(msg, sizeof(msg), "Mapped length doesn't match the request length. file :%s, request length:%x, returned length:%x, error msg:%s", path, fileLength, mapped_len, pmem_errormsg());
|
||||
THROW(env, "java/io/IOException", msg);
|
||||
(*env)->ReleaseStringUTFChars(env, filePath, path);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
(*env)->ReleaseStringUTFChars(env, filePath, path);
|
||||
|
||||
if ((!pmem_region_clazz) || (!pmem_region_ctor)) {
|
||||
THROW(env, "java/io/IOException", "PmemMappedRegion class or constructor is NULL");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
jobject ret = (*env)->NewObject(env, pmem_region_clazz, pmem_region_ctor, pmemaddr, mapped_len, (jboolean)is_pmem);
|
||||
return ret;
|
||||
|
||||
#else
|
||||
THROW(env, "java/lang/UnsupportedOperationException",
|
||||
"The function pmemCreateMapFile is not supported.");
|
||||
return NULL;
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
|
||||
* Method: pmemUnMap
|
||||
* Signature: (JJ)V
|
||||
*/
|
||||
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemUnMap(
|
||||
JNIEnv *env, jclass thisClass, jlong address, jlong length) {
|
||||
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
|
||||
int succeed = 0;
|
||||
char msg[1000];
|
||||
succeed = pmdkLoader->pmem_unmap(address, length);
|
||||
// succeed = -1 failure; succeed = 0 success
|
||||
if (succeed != 0) {
|
||||
snprintf(msg, sizeof(msg), "Failed to unmap region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg());
|
||||
THROW(env, "java/io/IOException", msg);
|
||||
return JNI_FALSE;
|
||||
} else {
|
||||
return JNI_TRUE;
|
||||
}
|
||||
#else
|
||||
THROW(env, "java/lang/UnsupportedOperationException",
|
||||
"The function pmemUnMap is not supported.");
|
||||
return JNI_FALSE;
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
|
||||
* Method: pmemCopy
|
||||
* Signature: ([BJJ)V
|
||||
*/
|
||||
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCopy(
|
||||
JNIEnv *env, jclass thisClass, jbyteArray buf, jlong address, jboolean is_pmem, jlong length) {
|
||||
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
|
||||
char msg[1000];
|
||||
jbyte* srcBuf = (*env)->GetByteArrayElements(env, buf, 0);
|
||||
snprintf(msg, sizeof(msg), "Pmem copy content. dest: %x, length: %x, src: %x ", address, length, srcBuf);
|
||||
if (is_pmem) {
|
||||
pmdkLoader->pmem_memcpy_nodrain(address, srcBuf, length);
|
||||
} else {
|
||||
memcpy(address, srcBuf, length);
|
||||
}
|
||||
(*env)->ReleaseByteArrayElements(env, buf, srcBuf, 0);
|
||||
return;
|
||||
#else
|
||||
THROW(env, "java/lang/UnsupportedOperationException",
|
||||
"The function pmemCopy is not supported.");
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_apache_hadoop_io_nativeio_NativeIO
|
||||
* Method: pmemDrain
|
||||
* Signature: ()V
|
||||
*/
|
||||
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemDrain(
|
||||
JNIEnv *env, jclass thisClass) {
|
||||
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
|
||||
pmdkLoader->pmem_drain();
|
||||
#else
|
||||
THROW(env, "java/lang/UnsupportedOperationException",
|
||||
"The function pmemDrain is not supported.");
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
|
||||
* Method: pmemSync
|
||||
* Signature: (JJ)V
|
||||
*/
|
||||
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemSync
|
||||
(JNIEnv * env, jclass thisClass, jlong address, jlong length) {
|
||||
|
||||
#if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
|
||||
int succeed = 0;
|
||||
char msg[1000];
|
||||
succeed = pmdkLoader->pmem_msync(address, length);
|
||||
// succeed = -1 failure
|
||||
if (succeed = -1) {
|
||||
snprintf(msg, sizeof(msg), "Failed to msync region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg());
|
||||
THROW(env, "java/io/IOException", msg);
|
||||
return;
|
||||
}
|
||||
#else
|
||||
THROW(env, "java/lang/UnsupportedOperationException",
|
||||
"The function pmemSync is not supported.");
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <errno.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "org_apache_hadoop.h"
|
||||
#include "pmdk_load.h"
|
||||
#include "org_apache_hadoop_io_nativeio_NativeIO.h"
|
||||
#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h"
|
||||
|
||||
#ifdef UNIX
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <dlfcn.h>
|
||||
|
||||
#include "config.h"
|
||||
#endif
|
||||
|
||||
PmdkLibLoader * pmdkLoader;
|
||||
|
||||
/**
|
||||
* pmdk_load.c
|
||||
* Utility of loading the libpmem library and the required functions.
|
||||
* Building of this codes won't rely on any libpmem source codes, but running
|
||||
* into this will rely on successfully loading of the dynamic library.
|
||||
*
|
||||
*/
|
||||
|
||||
static const char* load_functions() {
|
||||
#ifdef UNIX
|
||||
PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_map_file), "pmem_map_file");
|
||||
PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_unmap), "pmem_unmap");
|
||||
PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_is_pmem), "pmem_is_pmem");
|
||||
PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_drain), "pmem_drain");
|
||||
PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_memcpy_nodrain), "pmem_memcpy_nodrain");
|
||||
PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_msync), "pmem_msync");
|
||||
#endif
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void load_pmdk_lib(char* err, size_t err_len) {
|
||||
const char* errMsg;
|
||||
const char* library = NULL;
|
||||
#ifdef UNIX
|
||||
Dl_info dl_info;
|
||||
#else
|
||||
LPTSTR filename = NULL;
|
||||
#endif
|
||||
|
||||
err[0] = '\0';
|
||||
|
||||
if (pmdkLoader != NULL) {
|
||||
return;
|
||||
}
|
||||
pmdkLoader = calloc(1, sizeof(PmdkLibLoader));
|
||||
|
||||
// Load PMDK library
|
||||
#ifdef UNIX
|
||||
pmdkLoader->libec = dlopen(HADOOP_PMDK_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
|
||||
if (pmdkLoader->libec == NULL) {
|
||||
snprintf(err, err_len, "Failed to load %s (%s)",
|
||||
HADOOP_PMDK_LIBRARY, dlerror());
|
||||
return;
|
||||
}
|
||||
// Clear any existing error
|
||||
dlerror();
|
||||
#endif
|
||||
errMsg = load_functions(pmdkLoader->libec);
|
||||
if (errMsg != NULL) {
|
||||
snprintf(err, err_len, "Loading functions from PMDK failed: %s", errMsg);
|
||||
}
|
||||
|
||||
#ifdef UNIX
|
||||
if(dladdr(pmdkLoader->pmem_map_file, &dl_info)) {
|
||||
library = dl_info.dli_fname;
|
||||
}
|
||||
#else
|
||||
if (GetModuleFileName(pmdkLoader->libec, filename, 256) > 0) {
|
||||
library = filename;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (library == NULL) {
|
||||
library = HADOOP_PMDK_LIBRARY;
|
||||
}
|
||||
|
||||
pmdkLoader->libname = strdup(library);
|
||||
}
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <errno.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "org_apache_hadoop.h"
|
||||
|
||||
#ifdef UNIX
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <dlfcn.h>
|
||||
#endif
|
||||
|
||||
#ifndef _PMDK_LOAD_H_
|
||||
#define _PMDK_LOAD_H_
|
||||
|
||||
|
||||
#ifdef UNIX
|
||||
// For libpmem.h
|
||||
typedef void * (*__d_pmem_map_file)(const char *path, size_t len, int flags, mode_t mode,
|
||||
size_t *mapped_lenp, int *is_pmemp);
|
||||
typedef int (* __d_pmem_unmap)(void *addr, size_t len);
|
||||
typedef int (*__d_pmem_is_pmem)(const void *addr, size_t len);
|
||||
typedef void (*__d_pmem_drain)(void);
|
||||
typedef void * (*__d_pmem_memcpy_nodrain)(void *pmemdest, const void *src, size_t len);
|
||||
typedef int (* __d_pmem_msync)(const void *addr, size_t len);
|
||||
|
||||
#endif
|
||||
|
||||
typedef struct __PmdkLibLoader {
|
||||
// The loaded library handle
|
||||
void* libec;
|
||||
char* libname;
|
||||
__d_pmem_map_file pmem_map_file;
|
||||
__d_pmem_unmap pmem_unmap;
|
||||
__d_pmem_is_pmem pmem_is_pmem;
|
||||
__d_pmem_drain pmem_drain;
|
||||
__d_pmem_memcpy_nodrain pmem_memcpy_nodrain;
|
||||
__d_pmem_msync pmem_msync;
|
||||
} PmdkLibLoader;
|
||||
|
||||
extern PmdkLibLoader * pmdkLoader;
|
||||
|
||||
/**
|
||||
* A helper function to dlsym a 'symbol' from a given library-handle.
|
||||
*/
|
||||
|
||||
#ifdef UNIX
|
||||
|
||||
static __attribute__ ((unused))
|
||||
void *myDlsym(void *handle, const char *symbol) {
|
||||
void *func_ptr = dlsym(handle, symbol);
|
||||
return func_ptr;
|
||||
}
|
||||
|
||||
/* A helper macro to dlsym the requisite dynamic symbol in NON-JNI env. */
|
||||
#define PMDK_LOAD_DYNAMIC_SYMBOL(func_ptr, symbol) \
|
||||
if ((func_ptr = myDlsym(pmdkLoader->libec, symbol)) == NULL) { \
|
||||
return "Failed to load symbol" symbol; \
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
/**
|
||||
* Return 0 if not support, 1 otherwise.
|
||||
*/
|
||||
int build_support_pmdk();
|
||||
|
||||
/**
|
||||
* Initialize and load PMDK library, returning error message if any.
|
||||
*
|
||||
* @param err The err message buffer.
|
||||
* @param err_len The length of the message buffer.
|
||||
*/
|
||||
void load_pmdk_lib(char* err, size_t err_len);
|
||||
|
||||
#endif //_PMDK_LOAD_H_
|
|
@ -25,6 +25,8 @@ import java.io.FileReader;
|
|||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.MappedByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.FileChannel.MapMode;
|
||||
|
@ -782,4 +784,155 @@ public class TestNativeIO {
|
|||
assertTrue("Native POSIX_FADV_NOREUSE const not set",
|
||||
POSIX_FADV_NOREUSE >= 0);
|
||||
}
|
||||
|
||||
|
||||
@Test (timeout=10000)
|
||||
public void testPmemCheckParameters() {
|
||||
assumeNotWindows("Native PMDK not supported on Windows");
|
||||
// Skip testing while the build or environment does not support PMDK
|
||||
assumeTrue(NativeIO.POSIX.isPmdkAvailable());
|
||||
|
||||
// Please make sure /mnt/pmem0 is a persistent memory device with total
|
||||
// volume size 'volumeSize'
|
||||
String filePath = "/$:";
|
||||
long length = 0;
|
||||
long volumnSize = 16 * 1024 * 1024 * 1024L;
|
||||
|
||||
// Incorrect file length
|
||||
try {
|
||||
NativeIO.POSIX.Pmem.mapBlock(filePath, length);
|
||||
fail("Illegal length parameter should be detected");
|
||||
} catch (Exception e) {
|
||||
LOG.info(e.getMessage());
|
||||
}
|
||||
|
||||
// Incorrect file length
|
||||
filePath = "/mnt/pmem0/test_native_io";
|
||||
length = -1L;
|
||||
try {
|
||||
NativeIO.POSIX.Pmem.mapBlock(filePath, length);
|
||||
fail("Illegal length parameter should be detected");
|
||||
}catch (Exception e) {
|
||||
LOG.info(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout=10000)
|
||||
public void testPmemMapMultipleFiles() {
|
||||
assumeNotWindows("Native PMDK not supported on Windows");
|
||||
// Skip testing while the build or environment does not support PMDK
|
||||
assumeTrue(NativeIO.POSIX.isPmdkAvailable());
|
||||
|
||||
// Please make sure /mnt/pmem0 is a persistent memory device with total
|
||||
// volume size 'volumeSize'
|
||||
String filePath = "/mnt/pmem0/test_native_io";
|
||||
long length = 0;
|
||||
long volumnSize = 16 * 1024 * 1024 * 1024L;
|
||||
|
||||
// Multiple files, each with 128MB size, aggregated size exceeds volume
|
||||
// limit 16GB
|
||||
length = 128 * 1024 * 1024L;
|
||||
long fileNumber = volumnSize / length;
|
||||
LOG.info("File number = " + fileNumber);
|
||||
for (int i = 0; i < fileNumber; i++) {
|
||||
String path = filePath + i;
|
||||
LOG.info("File path = " + path);
|
||||
NativeIO.POSIX.Pmem.mapBlock(path, length);
|
||||
}
|
||||
try {
|
||||
NativeIO.POSIX.Pmem.mapBlock(filePath, length);
|
||||
fail("Request map extra file when persistent memory is all occupied");
|
||||
} catch (Exception e) {
|
||||
LOG.info(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout=10000)
|
||||
public void testPmemMapBigFile() {
|
||||
assumeNotWindows("Native PMDK not supported on Windows");
|
||||
// Skip testing while the build or environment does not support PMDK
|
||||
assumeTrue(NativeIO.POSIX.isPmdkAvailable());
|
||||
|
||||
// Please make sure /mnt/pmem0 is a persistent memory device with total
|
||||
// volume size 'volumeSize'
|
||||
String filePath = "/mnt/pmem0/test_native_io_big";
|
||||
long length = 0;
|
||||
long volumeSize = 16 * 1024 * 1024 * 1024L;
|
||||
|
||||
// One file length exceeds persistent memory volume 16GB.
|
||||
length = volumeSize + 1024L;
|
||||
try {
|
||||
LOG.info("File length = " + length);
|
||||
NativeIO.POSIX.Pmem.mapBlock(filePath, length);
|
||||
fail("File length exceeds persistent memory total volume size");
|
||||
}catch (Exception e) {
|
||||
LOG.info(e.getMessage());
|
||||
deletePmemMappedFile(filePath);
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout=10000)
|
||||
public void testPmemCopy() throws IOException {
|
||||
assumeNotWindows("Native PMDK not supported on Windows");
|
||||
// Skip testing while the build or environment does not support PMDK
|
||||
assumeTrue(NativeIO.POSIX.isPmdkAvailable());
|
||||
|
||||
// Create and map a block file. Please make sure /mnt/pmem0 is a persistent
|
||||
// memory device.
|
||||
String filePath = "/mnt/pmem0/copy";
|
||||
long length = 4096;
|
||||
PmemMappedRegion region = NativeIO.POSIX.Pmem.mapBlock(filePath, length);
|
||||
assertTrue(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length));
|
||||
assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length + 100));
|
||||
assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() + 100, length));
|
||||
assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() - 100, length));
|
||||
|
||||
// Copy content to mapped file
|
||||
byte[] data = generateSequentialBytes(0, (int) length);
|
||||
NativeIO.POSIX.Pmem.memCopy(data, region.getAddress(), region.isPmem(),
|
||||
length);
|
||||
|
||||
// Read content before pmemSync
|
||||
byte[] readBuf1 = new byte[(int)length];
|
||||
IOUtils.readFully(new FileInputStream(filePath), readBuf1, 0, (int)length);
|
||||
assertArrayEquals(data, readBuf1);
|
||||
|
||||
byte[] readBuf2 = new byte[(int)length];
|
||||
// Sync content to persistent memory twice
|
||||
NativeIO.POSIX.Pmem.memSync(region);
|
||||
NativeIO.POSIX.Pmem.memSync(region);
|
||||
// Read content after pmemSync twice
|
||||
IOUtils.readFully(new FileInputStream(filePath), readBuf2, 0, (int)length);
|
||||
assertArrayEquals(data, readBuf2);
|
||||
|
||||
//Read content after unmap twice
|
||||
NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length);
|
||||
NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length);
|
||||
byte[] readBuf3 = new byte[(int)length];
|
||||
IOUtils.readFully(new FileInputStream(filePath), readBuf3, 0, (int)length);
|
||||
assertArrayEquals(data, readBuf3);
|
||||
}
|
||||
|
||||
private static byte[] generateSequentialBytes(int start, int length) {
|
||||
byte[] result = new byte[length];
|
||||
|
||||
for (int i = 0; i < length; i++) {
|
||||
result[i] = (byte) ((start + i) % 127);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static void deletePmemMappedFile(String filePath) {
|
||||
try {
|
||||
if (filePath != null) {
|
||||
boolean result = Files.deleteIfExists(Paths.get(filePath));
|
||||
if (!result) {
|
||||
throw new IOException();
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Failed to delete the mapped file " + filePath +
|
||||
" from persistent memory", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -214,6 +214,28 @@ public class FsDatasetCache {
|
|||
return PmemVolumeManager.getInstance().getCachePath(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get cache address on persistent memory for read operation.
|
||||
* The cache address comes from PMDK lib function when mapping
|
||||
* block to persistent memory.
|
||||
*
|
||||
* @param bpid blockPoolId
|
||||
* @param blockId blockId
|
||||
* @return address
|
||||
*/
|
||||
long getCacheAddress(String bpid, long blockId) {
|
||||
if (cacheLoader.isTransientCache() ||
|
||||
!isCached(bpid, blockId)) {
|
||||
return -1;
|
||||
}
|
||||
if (!(cacheLoader.isNativeLoader())) {
|
||||
return -1;
|
||||
}
|
||||
ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
|
||||
MappableBlock mappableBlock = mappableBlockMap.get(key).mappableBlock;
|
||||
return mappableBlock.getAddress();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return List of cached blocks suitable for translation into a
|
||||
* {@link BlockListAsLongs} for a cache report.
|
||||
|
|
|
@ -815,6 +815,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
String cachePath = cacheManager.getReplicaCachePath(
|
||||
b.getBlockPoolId(), b.getBlockId());
|
||||
if (cachePath != null) {
|
||||
long addr = cacheManager.getCacheAddress(
|
||||
b.getBlockPoolId(), b.getBlockId());
|
||||
if (addr != -1) {
|
||||
LOG.debug("Get InputStream by cache address.");
|
||||
return FsDatasetUtil.getDirectInputStream(
|
||||
addr, info.getBlockDataLength());
|
||||
}
|
||||
LOG.debug("Get InputStream by cache file path.");
|
||||
return FsDatasetUtil.getInputStreamAndSeek(
|
||||
new File(cachePath), seekOffset);
|
||||
}
|
||||
|
|
|
@ -26,7 +26,10 @@ import java.io.FilenameFilter;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
|
@ -43,6 +46,7 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
|||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.htrace.shaded.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
|
||||
|
||||
/** Utility methods. */
|
||||
@InterfaceAudience.Private
|
||||
|
@ -132,6 +136,24 @@ public class FsDatasetUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static InputStream getDirectInputStream(long addr, long length)
|
||||
throws IOException {
|
||||
try {
|
||||
Class<?> directByteBufferClass =
|
||||
Class.forName("java.nio.DirectByteBuffer");
|
||||
Constructor<?> constructor =
|
||||
directByteBufferClass.getDeclaredConstructor(long.class, int.class);
|
||||
constructor.setAccessible(true);
|
||||
ByteBuffer byteBuffer =
|
||||
(ByteBuffer) constructor.newInstance(addr, (int)length);
|
||||
return new ByteBufferBackedInputStream(byteBuffer);
|
||||
} catch (ClassNotFoundException | NoSuchMethodException |
|
||||
IllegalAccessException | InvocationTargetException |
|
||||
InstantiationException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the meta-file for the specified block file and then return the
|
||||
* generation stamp from the name of the meta-file. Generally meta file will
|
||||
|
|
|
@ -35,4 +35,10 @@ public interface MappableBlock extends Closeable {
|
|||
* @return the number of bytes that have been cached.
|
||||
*/
|
||||
long getLength();
|
||||
|
||||
/**
|
||||
* Get cache address if applicable.
|
||||
* Return -1 if not applicable.
|
||||
*/
|
||||
long getAddress();
|
||||
}
|
||||
|
|
|
@ -64,8 +64,7 @@ public abstract class MappableBlockLoader {
|
|||
* @return The Mappable block.
|
||||
*/
|
||||
abstract MappableBlock load(long length, FileInputStream blockIn,
|
||||
FileInputStream metaIn, String blockFileName,
|
||||
ExtendedBlockId key)
|
||||
FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -106,6 +105,11 @@ public abstract class MappableBlockLoader {
|
|||
*/
|
||||
abstract boolean isTransientCache();
|
||||
|
||||
/**
|
||||
* Check whether this is a native pmem cache loader.
|
||||
*/
|
||||
abstract boolean isNativeLoader();
|
||||
|
||||
/**
|
||||
* Clean up cache, can be used during DataNode shutdown.
|
||||
*/
|
||||
|
@ -117,8 +121,7 @@ public abstract class MappableBlockLoader {
|
|||
* Verifies the block's checksum. This is an I/O intensive operation.
|
||||
*/
|
||||
protected void verifyChecksum(long length, FileInputStream metaIn,
|
||||
FileChannel blockChannel, String blockFileName)
|
||||
throws IOException {
|
||||
FileChannel blockChannel, String blockFileName) throws IOException {
|
||||
// Verify the checksum from the block's meta file
|
||||
// Get the DataChecksum from the meta file header
|
||||
BlockMetadataHeader header =
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DNConf;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
|
||||
/**
|
||||
* Creates MappableBlockLoader.
|
||||
|
@ -42,6 +43,9 @@ public final class MappableBlockLoaderFactory {
|
|||
if (conf.getPmemVolumes() == null || conf.getPmemVolumes().length == 0) {
|
||||
return new MemoryMappableBlockLoader();
|
||||
}
|
||||
if (NativeIO.isAvailable() && NativeIO.POSIX.isPmdkAvailable()) {
|
||||
return new NativePmemMappableBlockLoader();
|
||||
}
|
||||
return new PmemMappableBlockLoader();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,8 +66,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
|
|||
*/
|
||||
@Override
|
||||
MappableBlock load(long length, FileInputStream blockIn,
|
||||
FileInputStream metaIn, String blockFileName,
|
||||
ExtendedBlockId key)
|
||||
FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
|
||||
throws IOException {
|
||||
MemoryMappedBlock mappableBlock = null;
|
||||
MappedByteBuffer mmap = null;
|
||||
|
@ -116,4 +115,9 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
|
|||
public boolean isTransientCache() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isNativeLoader() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,6 +44,11 @@ public class MemoryMappedBlock implements MappableBlock {
|
|||
return length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAddress() {
|
||||
return -1L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (mmap != null) {
|
||||
|
|
|
@ -0,0 +1,191 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
|
||||
/**
|
||||
* Map block to persistent memory with native PMDK libs.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(NativePmemMappableBlockLoader.class);
|
||||
|
||||
@Override
|
||||
void initialize(FsDatasetCache cacheManager) throws IOException {
|
||||
super.initialize(cacheManager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the block.
|
||||
*
|
||||
* Map the block and verify its checksum.
|
||||
*
|
||||
* The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
|
||||
* is a persistent memory volume chosen by PmemVolumeManager.
|
||||
*
|
||||
* @param length The current length of the block.
|
||||
* @param blockIn The block input stream. Should be positioned at the
|
||||
* start. The caller must close this.
|
||||
* @param metaIn The meta file input stream. Should be positioned at
|
||||
* the start. The caller must close this.
|
||||
* @param blockFileName The block file name, for logging purposes.
|
||||
* @param key The extended block ID.
|
||||
*
|
||||
* @throws IOException If mapping block to persistent memory fails or
|
||||
* checksum fails.
|
||||
*
|
||||
* @return The Mappable block.
|
||||
*/
|
||||
@Override
|
||||
public MappableBlock load(long length, FileInputStream blockIn,
|
||||
FileInputStream metaIn, String blockFileName,
|
||||
ExtendedBlockId key)
|
||||
throws IOException {
|
||||
NativePmemMappedBlock mappableBlock = null;
|
||||
POSIX.PmemMappedRegion region = null;
|
||||
String filePath = null;
|
||||
|
||||
FileChannel blockChannel = null;
|
||||
try {
|
||||
blockChannel = blockIn.getChannel();
|
||||
if (blockChannel == null) {
|
||||
throw new IOException("Block InputStream has no FileChannel.");
|
||||
}
|
||||
|
||||
assert NativeIO.isAvailable();
|
||||
filePath = PmemVolumeManager.getInstance().getCachePath(key);
|
||||
region = POSIX.Pmem.mapBlock(filePath, length);
|
||||
if (region == null) {
|
||||
throw new IOException("Failed to map the block " + blockFileName +
|
||||
" to persistent storage.");
|
||||
}
|
||||
verifyChecksumAndMapBlock(region, length, metaIn, blockChannel,
|
||||
blockFileName);
|
||||
mappableBlock = new NativePmemMappedBlock(region.getAddress(),
|
||||
region.getLength(), key);
|
||||
LOG.info("Successfully cached one replica:{} into persistent memory"
|
||||
+ ", [cached path={}, address={}, length={}]", key, filePath,
|
||||
region.getAddress(), length);
|
||||
} finally {
|
||||
IOUtils.closeQuietly(blockChannel);
|
||||
if (mappableBlock == null) {
|
||||
if (region != null) {
|
||||
// unmap content from persistent memory
|
||||
POSIX.Pmem.unmapBlock(region.getAddress(),
|
||||
region.getLength());
|
||||
FsDatasetUtil.deleteMappedFile(filePath);
|
||||
}
|
||||
}
|
||||
}
|
||||
return mappableBlock;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies the block's checksum meanwhile map block to persistent memory.
|
||||
* This is an I/O intensive operation.
|
||||
*/
|
||||
private void verifyChecksumAndMapBlock(POSIX.PmemMappedRegion region,
|
||||
long length, FileInputStream metaIn, FileChannel blockChannel,
|
||||
String blockFileName) throws IOException {
|
||||
// Verify the checksum from the block's meta file
|
||||
// Get the DataChecksum from the meta file header
|
||||
BlockMetadataHeader header =
|
||||
BlockMetadataHeader.readHeader(new DataInputStream(
|
||||
new BufferedInputStream(metaIn, BlockMetadataHeader
|
||||
.getHeaderSize())));
|
||||
FileChannel metaChannel = null;
|
||||
try {
|
||||
metaChannel = metaIn.getChannel();
|
||||
if (metaChannel == null) {
|
||||
throw new IOException("Cannot get FileChannel" +
|
||||
" from Block InputStream meta file.");
|
||||
}
|
||||
DataChecksum checksum = header.getChecksum();
|
||||
final int bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||
final int checksumSize = checksum.getChecksumSize();
|
||||
final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
|
||||
ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
|
||||
ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
|
||||
// Verify the checksum
|
||||
int bytesVerified = 0;
|
||||
long mappedAddress = -1L;
|
||||
if (region != null) {
|
||||
mappedAddress = region.getAddress();
|
||||
}
|
||||
while (bytesVerified < length) {
|
||||
Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
|
||||
"Unexpected partial chunk before EOF.");
|
||||
assert bytesVerified % bytesPerChecksum == 0;
|
||||
int bytesRead = fillBuffer(blockChannel, blockBuf);
|
||||
if (bytesRead == -1) {
|
||||
throw new IOException(
|
||||
"Checksum verification failed for the block " + blockFileName +
|
||||
": premature EOF");
|
||||
}
|
||||
blockBuf.flip();
|
||||
// Number of read chunks, including partial chunk at end
|
||||
int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
|
||||
checksumBuf.limit(chunks * checksumSize);
|
||||
fillBuffer(metaChannel, checksumBuf);
|
||||
checksumBuf.flip();
|
||||
checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
|
||||
bytesVerified);
|
||||
// Success
|
||||
bytesVerified += bytesRead;
|
||||
// Copy data to persistent file
|
||||
POSIX.Pmem.memCopy(blockBuf.array(), mappedAddress,
|
||||
region.isPmem(), bytesRead);
|
||||
mappedAddress += bytesRead;
|
||||
// Clear buffer
|
||||
blockBuf.clear();
|
||||
checksumBuf.clear();
|
||||
}
|
||||
if (region != null) {
|
||||
POSIX.Pmem.memSync(region);
|
||||
}
|
||||
} finally {
|
||||
IOUtils.closeQuietly(metaChannel);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isNativeLoader() {
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Represents an HDFS block that is mapped to persistent memory by the DataNode.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class NativePmemMappedBlock implements MappableBlock {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(NativePmemMappedBlock.class);
|
||||
|
||||
private long pmemMappedAddress = -1L;
|
||||
private long length;
|
||||
private ExtendedBlockId key;
|
||||
|
||||
NativePmemMappedBlock(long pmemMappedAddress, long length,
|
||||
ExtendedBlockId key) {
|
||||
assert length > 0;
|
||||
this.pmemMappedAddress = pmemMappedAddress;
|
||||
this.length = length;
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLength() {
|
||||
return length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAddress() {
|
||||
return pmemMappedAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (pmemMappedAddress != -1L) {
|
||||
String cacheFilePath =
|
||||
PmemVolumeManager.getInstance().getCachePath(key);
|
||||
try {
|
||||
// Current libpmem will report error when pmem_unmap is called with
|
||||
// length not aligned with page size, although the length is returned
|
||||
// by pmem_map_file.
|
||||
boolean success =
|
||||
NativeIO.POSIX.Pmem.unmapBlock(pmemMappedAddress, length);
|
||||
if (!success) {
|
||||
throw new IOException("Failed to unmap the mapped file from " +
|
||||
"pmem address: " + pmemMappedAddress);
|
||||
}
|
||||
pmemMappedAddress = -1L;
|
||||
FsDatasetUtil.deleteMappedFile(cacheFilePath);
|
||||
LOG.info("Successfully uncached one replica:{} from persistent memory"
|
||||
+ ", [cached path={}, length={}]", key, cacheFilePath, length);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("IOException occurred for block {}!", key, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -43,7 +43,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
|
|||
|
||||
@Override
|
||||
void initialize(FsDatasetCache cacheManager) throws IOException {
|
||||
LOG.info("Initializing cache loader: PmemMappableBlockLoader.");
|
||||
LOG.info("Initializing cache loader: " + this.getClass().getName());
|
||||
DNConf dnConf = cacheManager.getDnConf();
|
||||
PmemVolumeManager.init(dnConf.getPmemVolumes());
|
||||
pmemVolumeManager = PmemVolumeManager.getInstance();
|
||||
|
@ -71,8 +71,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
|
|||
*/
|
||||
@Override
|
||||
MappableBlock load(long length, FileInputStream blockIn,
|
||||
FileInputStream metaIn, String blockFileName,
|
||||
ExtendedBlockId key)
|
||||
FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
|
||||
throws IOException {
|
||||
PmemMappedBlock mappableBlock = null;
|
||||
String cachePath = null;
|
||||
|
@ -132,6 +131,11 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isNativeLoader() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
void shutdown() {
|
||||
LOG.info("Clean up cache on persistent memory during shutdown.");
|
||||
|
|
|
@ -49,6 +49,11 @@ public class PmemMappedBlock implements MappableBlock {
|
|||
return length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAddress() {
|
||||
return -1L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
String cacheFilePath =
|
||||
|
|
Loading…
Reference in New Issue