diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/Errno.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/Errno.java index f823978bd71..f6377c7af78 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/Errno.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/Errno.java @@ -58,6 +58,7 @@ public enum Errno { ELOOP, ENAMETOOLONG, ENOTEMPTY, + EOVERFLOW, UNKNOWN; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java index 53d31d6fb96..43f1cb17a5f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -846,4 +847,30 @@ private static native void renameTo0(String src, String dst) private static native void link0(String src, String dst) throws NativeIOException; + + /** + * Unbuffered file copy from src to dst without tainting OS buffer cache + * In Linux, it uses sendfile() which uses O_DIRECT flag internally + * In Windows, it uses CopyFileEx with COPY_FILE_NO_BUFFERING flag + * + * Note: This does not support FreeBSD/OSX which have a different sendfile() + * semantic. Also, this simple native wrapper does minimal parameter checking + * It is recommended to use wrapper function like + * the Storage#nativeCopyFileUnbuffered() function in hadoop-hdfs. + * + * + * @param src The source path + * @param dst The destination path + * @throws IOException + */ + public static void copyFileUnbuffered(File src, File dst) throws IOException { + if ((nativeLoaded) && (Shell.WINDOWS || Shell.LINUX)) { + copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath()); + } else { + FileUtils.copyFile(src, dst); + } + } + + private static native void copyFileUnbuffered0(String src, String dst) + throws NativeIOException; } diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c index b98aa0c0991..f19d6bee81a 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c @@ -35,6 +35,9 @@ #include #include #include +#if !(defined(__FreeBSD__) || defined(__MACH__)) +#include +#endif #include #include #include @@ -1142,6 +1145,70 @@ JNIEnv *env, jclass clazz) #endif } +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_copyFileUnbuffered0( +JNIEnv *env, jclass clazz, jstring jsrc, jstring jdst) +{ +#ifdef UNIX +#if (defined(__FreeBSD__) || defined(__MACH__)) + THROW(env, "java/io/IOException", + "The function copyFileUnbuffered() is not supported on FreeBSD or Mac OS"); + return; +#else + const char *src = NULL, *dst = NULL; + int srcFd = -1; + int dstFd = -1; + struct stat s; + off_t offset = 0; + + src = (*env)->GetStringUTFChars(env, jsrc, NULL); + if (!src) goto cleanup; // exception was thrown + dst = (*env)->GetStringUTFChars(env, jdst, NULL); + if (!dst) goto cleanup; // exception was thrown + + srcFd = open(src, O_RDONLY); + if (srcFd == -1) { + throw_ioe(env, errno); + goto cleanup; + } + if (fstat(srcFd, &s) == -1){ + throw_ioe(env, errno); + goto cleanup; + } + dstFd = open(dst, O_WRONLY | O_CREAT, s.st_mode); + if (dstFd == -1) { + throw_ioe(env, errno); + goto cleanup; + } + if (sendfile(dstFd, srcFd, &offset, s.st_size) == -1) { + throw_ioe(env, errno); + } + +cleanup: + if (src) (*env)->ReleaseStringUTFChars(env, jsrc, src); + if (dst) (*env)->ReleaseStringUTFChars(env, jdst, dst); + if (srcFd != -1) close(srcFd); + if (dstFd != -1) close(dstFd); +#endif +#endif + +#ifdef WINDOWS + LPCWSTR src = NULL, dst = NULL; + + src = (LPCWSTR) (*env)->GetStringChars(env, jsrc, NULL); + if (!src) goto cleanup; // exception was thrown + dst = (LPCWSTR) (*env)->GetStringChars(env, jdst, NULL); + if (!dst) goto cleanup; // exception was thrown + if (!CopyFileEx(src, dst, NULL, NULL, NULL, COPY_FILE_NO_BUFFERING)) { + throw_ioe(env, GetLastError()); + } + +cleanup: + if (src) (*env)->ReleaseStringChars(env, jsrc, src); + if (dst) (*env)->ReleaseStringChars(env, jdst, dst); +#endif +} + /** * vim: sw=2: ts=2: et: */ diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/errno_enum.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/errno_enum.c index 4d07c31394a..08cc305c94e 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/errno_enum.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/errno_enum.c @@ -66,6 +66,7 @@ static errno_mapping_t ERRNO_MAPPINGS[] = { MAPPING(ELOOP), MAPPING(ENAMETOOLONG), MAPPING(ENOTEMPTY), + MAPPING(EOVERFLOW), {-1, NULL} }; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java index 6c3f0038863..5425c4994be 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java @@ -24,14 +24,18 @@ import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; +import java.util.Random; import java.util.concurrent.atomic.AtomicReference; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -622,4 +626,34 @@ public void testGetMemlockLimit() throws Exception { assumeTrue(NativeIO.isAvailable()); NativeIO.getMemlockLimit(); } + + @Test (timeout = 30000) + public void testCopyFileUnbuffered() throws Exception { + final String METHOD_NAME = GenericTestUtils.getMethodName(); + File srcFile = new File(TEST_DIR, METHOD_NAME + ".src.dat"); + File dstFile = new File(TEST_DIR, METHOD_NAME + ".dst.dat"); + final int fileSize = 0x8FFFFFF; // 128 MB + final int SEED = 0xBEEF; + final int batchSize = 4096; + final int numBatches = fileSize / batchSize; + Random rb = new Random(SEED); + FileChannel channel = null; + RandomAccessFile raSrcFile = null; + try { + raSrcFile = new RandomAccessFile(srcFile, "rw"); + channel = raSrcFile.getChannel(); + byte bytesToWrite[] = new byte[batchSize]; + MappedByteBuffer mapBuf; + mapBuf = channel.map(MapMode.READ_WRITE, 0, fileSize); + for (int i = 0; i < numBatches; i++) { + rb.nextBytes(bytesToWrite); + mapBuf.put(bytesToWrite); + } + NativeIO.copyFileUnbuffered(srcFile, dstFile); + }finally { + IOUtils.cleanup(LOG, channel); + IOUtils.cleanup(LOG, raSrcFile); + FileUtils.deleteQuietly(TEST_DIR); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index afb8d358f0d..55dc56725b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -665,6 +665,9 @@ Release 2.6.0 - UNRELEASED HDFS-7112. LazyWriter should use either async IO or one thread per physical disk. (Xiaoyu Yao via cnauroth) + HDFS-7090. Use unbuffered writes when persisting in-memory replicas. + (Xiaoyu Yao via cnauroth) + BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS HDFS-6387. HDFS CLI admin tool for creating & deleting an diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java index 0661026b6f9..4320e229529 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.FileOutputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.lang.management.ManagementFactory; @@ -996,6 +997,93 @@ public static void rename(File from, File to) throws IOException { } } + /** + * Copies a file (usually large) to a new location using native unbuffered IO. + *

+ * This method copies the contents of the specified source file + * to the specified destination file using OS specific unbuffered IO. + * The goal is to avoid churning the file system buffer cache when copying + * large files. TheFileUtils#copyLarge function from apache-commons-io library + * can be used to achieve this with an internal memory buffer but is less + * efficient than the native unbuffered APIs such as sendfile() in Linux and + * CopyFileEx() in Windows wrapped in {@link NativeIO#copyFileUnbuffered}. + * + * The directory holding the destination file is created if it does not exist. + * If the destination file exists, then this method will delete it first. + *

+ * Note: Setting preserveFileDate to + * {@code true} tries to preserve the file's last modified + * date/times using {@link File#setLastModified(long)}, however it is + * not guaranteed that the operation will succeed. + * If the modification operation fails, no indication is provided. + * + * @param srcFile an existing file to copy, must not be {@code null} + * @param destFile the new file, must not be {@code null} + * @param preserveFileDate true if the file date of the copy + * should be the same as the original + * + * @throws NullPointerException if source or destination is {@code null} + * @throws IOException if source or destination is invalid + * @throws IOException if an IO error occurs during copying + */ + public static void nativeCopyFileUnbuffered(File srcFile, File destFile, + boolean preserveFileDate) throws IOException { + if (srcFile == null) { + throw new NullPointerException("Source must not be null"); + } + if (destFile == null) { + throw new NullPointerException("Destination must not be null"); + } + if (srcFile.exists() == false) { + throw new FileNotFoundException("Source '" + srcFile + "' does not exist"); + } + if (srcFile.isDirectory()) { + throw new IOException("Source '" + srcFile + "' exists but is a directory"); + } + if (srcFile.getCanonicalPath().equals(destFile.getCanonicalPath())) { + throw new IOException("Source '" + srcFile + "' and destination '" + + destFile + "' are the same"); + } + File parentFile = destFile.getParentFile(); + if (parentFile != null) { + if (!parentFile.mkdirs() && !parentFile.isDirectory()) { + throw new IOException("Destination '" + parentFile + + "' directory cannot be created"); + } + } + if (destFile.exists()) { + if (FileUtil.canWrite(destFile) == false) { + throw new IOException("Destination '" + destFile + + "' exists but is read-only"); + } else { + if (destFile.delete() == false) { + throw new IOException("Destination '" + destFile + + "' exists but cannot be deleted"); + } + } + } + try { + NativeIO.copyFileUnbuffered(srcFile, destFile); + } catch (NativeIOException e) { + throw new IOException("Failed to copy " + srcFile.getCanonicalPath() + + " to " + destFile.getCanonicalPath() + + " due to failure in NativeIO#copyFileUnbuffered(). " + + e.toString()); + } + if (srcFile.length() != destFile.length()) { + throw new IOException("Failed to copy full contents from '" + srcFile + + "' to '" + destFile + "'"); + } + if (preserveFileDate) { + if (destFile.setLastModified(srcFile.lastModified()) == false) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to preserve last modified date from'" + srcFile + + "' to '" + destFile + "'"); + } + } + } + } + /** * Recursively delete all the content of the directory first and then * the directory itself from the local filesystem. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 07e19cf6ff4..1709066ef48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -724,12 +724,12 @@ static File[] copyBlockFiles(long blockId, long genStamp, final File dstFile = new File(destDir, srcFile.getName()); final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); try { - FileUtils.copyFile(srcMeta, dstMeta); + Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true); } catch (IOException e) { throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e); } try { - FileUtils.copyFile(srcFile, dstFile); + Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true); } catch (IOException e) { throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e); }