diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java index 1412d610431..96193eed035 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -23,6 +23,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -145,6 +146,12 @@ public class NativeIO { return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded; } + private static void assertCodeLoaded() throws IOException { + if (!isAvailable()) { + throw new IOException("NativeIO was not loaded"); + } + } + /** Wrapper around open(2) */ public static native FileDescriptor open(String path, int flags, int mode) throws IOException; /** Wrapper around fstat(2) */ @@ -225,6 +232,84 @@ public class NativeIO { } } + static native void mlock_native( + ByteBuffer buffer, long len) throws NativeIOException; + static native void munlock_native( + ByteBuffer buffer, long len) throws NativeIOException; + + /** + * Locks the provided direct ByteBuffer into memory, preventing it from + * swapping out. After a buffer is locked, future accesses will not incur + * a page fault. + * + * See the mlock(2) man page for more information. + * + * @throws NativeIOException + */ + public static void mlock(ByteBuffer buffer, long len) + throws IOException { + assertCodeLoaded(); + if (!buffer.isDirect()) { + throw new IOException("Cannot mlock a non-direct ByteBuffer"); + } + mlock_native(buffer, len); + } + + /** + * Unlocks a locked direct ByteBuffer, allowing it to swap out of memory. + * This is a no-op if the ByteBuffer was not previously locked. + * + * See the munlock(2) man page for more information. + * + * @throws NativeIOException + */ + public static void munlock(ByteBuffer buffer, long len) + throws IOException { + assertCodeLoaded(); + if (!buffer.isDirect()) { + throw new IOException("Cannot munlock a non-direct ByteBuffer"); + } + munlock_native(buffer, len); + } + + /** + * Resource limit types copied from + */ + private static class ResourceLimit { + public static final int RLIMIT_CPU = 0; + public static final int RLIMIT_FSIZE = 1; + public static final int RLIMIT_DATA = 2; + public static final int RLIMIT_STACK = 3; + public static final int RLIMIT_CORE = 4; + public static final int RLIMIT_RSS = 5; + public static final int RLIMIT_NPROC = 6; + public static final int RLIMIT_NOFILE = 7; + public static final int RLIMIT_MEMLOCK = 8; + public static final int RLIMIT_AS = 9; + public static final int RLIMIT_LOCKS = 10; + public static final int RLIMIT_SIGPENDING = 11; + public static final int RLIMIT_MSGQUEUE = 12; + public static final int RLIMIT_NICE = 13; + public static final int RLIMIT_RTPRIO = 14; + public static final int RLIMIT_RTTIME = 15; + public static final int RLIMIT_NLIMITS = 16; + } + + static native String getrlimit(int limit) throws NativeIOException; + /** + * Returns the soft limit on the number of bytes that may be locked by the + * process in bytes (RLIMIT_MEMLOCK). + * + * See the getrlimit(2) man page for more information + * + * @return maximum amount of locked memory in bytes + */ + public static long getMemlockLimit() throws IOException { + assertCodeLoaded(); + String strLimit = getrlimit(ResourceLimit.RLIMIT_MEMLOCK); + return Long.parseLong(strLimit); + } + /** Linux only methods used for getOwner() implementation */ private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException; private static native String getUserName(long uid) throws IOException; diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c index cb21a7bee66..afa4720e507 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c @@ -31,8 +31,11 @@ #include #include #include +#include +#include #include #include +#include #include #include #include "config.h" @@ -360,6 +363,76 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_sync_1file_1range( #endif } +/** + * public static native void mlock_native( + * ByteBuffer buffer, long offset); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_mlock_1native( + JNIEnv *env, jclass clazz, + jobject buffer, jlong len) +{ + void* buf = (void*)(*env)->GetDirectBufferAddress(env, buffer); + PASS_EXCEPTIONS(env); + + if (mlock(buf, len)) { + throw_ioe(env, errno); + } +} + +/** + * public static native void munlock_native( + * ByteBuffer buffer, long offset); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_munlock_1native( + JNIEnv *env, jclass clazz, + jobject buffer, jlong len) +{ + void* buf = (void*)(*env)->GetDirectBufferAddress(env, buffer); + PASS_EXCEPTIONS(env); + + if (munlock(buf, len)) { + throw_ioe(env, errno); + } +} + +/** + * public static native String getrlimit( + * int resource); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT jstring JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_getrlimit( + JNIEnv *env, jclass clazz, + jint resource) +{ + jstring ret = NULL; + + struct rlimit rlim; + int rc = getrlimit((int)resource, &rlim); + if (rc != 0) { + throw_ioe(env, errno); + goto cleanup; + } + + // Convert soft limit into a string + char limit[17]; + int len = snprintf(&limit, 17, "%d", rlim.rlim_cur); + ret = (*env)->NewStringUTF(env,&limit); + +cleanup: + return ret; +} + #ifdef __FreeBSD__ static int toFreeBSDFlags(int flags) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java index 4d71e15c4b3..69c963f2d75 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java @@ -24,6 +24,9 @@ import java.io.FileOutputStream; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; import java.util.concurrent.atomic.AtomicReference; import java.util.ArrayList; import java.util.Arrays; @@ -32,6 +35,7 @@ import java.util.List; import org.junit.Assert; import org.junit.Before; import org.junit.Test; + import static org.junit.Assume.*; import static org.junit.Assert.*; @@ -45,6 +49,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.NativeCodeLoader; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Time; public class TestNativeIO { @@ -524,4 +529,57 @@ public class TestNativeIO { FileUtils.deleteQuietly(TEST_DIR); } + + @Test(timeout=10000) + public void testMlock() throws Exception { + assumeTrue(NativeIO.isAvailable()); + assumeTrue(Shell.LINUX); + final File TEST_FILE = new File(new File( + System.getProperty("test.build.data","build/test/data")), + "testMlockFile"); + final int BUF_LEN = 12289; + byte buf[] = new byte[BUF_LEN]; + int bufSum = 0; + for (int i = 0; i < buf.length; i++) { + buf[i] = (byte)(i % 60); + bufSum += buf[i]; + } + FileOutputStream fos = new FileOutputStream(TEST_FILE); + fos.write(buf); + fos.getChannel().force(true); + fos.close(); + + FileInputStream fis = null; + FileChannel channel = null; + try { + // Map file into memory + fis = new FileInputStream(TEST_FILE); + channel = fis.getChannel(); + long fileSize = channel.size(); + MappedByteBuffer mapbuf = channel.map(MapMode.READ_ONLY, 0, fileSize); + // mlock the buffer + NativeIO.POSIX.mlock(mapbuf, fileSize); + // Read the buffer + int sum = 0; + for (int i=0; i 0) { + if (!NativeIO.isAvailable()) { + throw new RuntimeException(String.format( + "Cannot start datanode because the configured max locked memory" + + " size (%s) is greater than zero and native code is not available.", + DFS_DATANODE_MAX_LOCKED_MEMORY_KEY)); + } + long ulimit = NativeIO.POSIX.getMemlockLimit(); + if (dnConf.maxLockedMemory > ulimit) { + throw new RuntimeException(String.format( + "Cannot start datanode because the configured max locked memory" + + " size (%s) of %d bytes is less than the datanode's available" + + " RLIMIT_MEMLOCK ulimit of %d bytes.", + DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, + dnConf.maxLockedMemory, + ulimit)); + } + } + storage = new DataStorage(); // global DN settings diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java index 62565170bb0..f2166b74115 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; import java.io.File; import java.io.IOException; @@ -30,6 +31,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -106,4 +109,26 @@ public class TestDatanodeConfig { throw new IOException("Bad URI", e); } } + + @Test(timeout=60000) + public void testMemlockLimit() throws Exception { + assumeTrue(NativeIO.isAvailable()); + final long memlockLimit = NativeIO.POSIX.getMemlockLimit(); + Configuration conf = cluster.getConfiguration(0); + // Try starting the DN with limit configured to the ulimit + conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, + memlockLimit); + DataNode dn = null; + dn = DataNode.createDataNode(new String[]{}, conf); + dn.shutdown(); + // Try starting the DN with a limit > ulimit + conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, + memlockLimit+1); + try { + dn = DataNode.createDataNode(new String[]{}, conf); + } catch (RuntimeException e) { + GenericTestUtils.assertExceptionContains( + "less than the datanode's available RLIMIT_MEMLOCK", e); + } + } }