HDFS-7090. Use unbuffered writes when persisting in-memory replicas. Contributed by Xiaoyu Yao.
This commit is contained in:
parent
4aed2d8e91
commit
1770bb942f
|
@ -58,6 +58,7 @@ public enum Errno {
|
||||||
ELOOP,
|
ELOOP,
|
||||||
ENAMETOOLONG,
|
ENAMETOOLONG,
|
||||||
ENOTEMPTY,
|
ENOTEMPTY,
|
||||||
|
EOVERFLOW,
|
||||||
|
|
||||||
UNKNOWN;
|
UNKNOWN;
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.nio.MappedByteBuffer;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -846,4 +847,30 @@ public class NativeIO {
|
||||||
|
|
||||||
private static native void link0(String src, String dst)
|
private static native void link0(String src, String dst)
|
||||||
throws NativeIOException;
|
throws NativeIOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unbuffered file copy from src to dst without tainting OS buffer cache
|
||||||
|
* In Linux, it uses sendfile() which uses O_DIRECT flag internally
|
||||||
|
* In Windows, it uses CopyFileEx with COPY_FILE_NO_BUFFERING flag
|
||||||
|
*
|
||||||
|
* Note: This does not support FreeBSD/OSX which have a different sendfile()
|
||||||
|
* semantic. Also, this simple native wrapper does minimal parameter checking
|
||||||
|
* It is recommended to use wrapper function like
|
||||||
|
* the Storage#nativeCopyFileUnbuffered() function in hadoop-hdfs.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* @param src The source path
|
||||||
|
* @param dst The destination path
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void copyFileUnbuffered(File src, File dst) throws IOException {
|
||||||
|
if ((nativeLoaded) && (Shell.WINDOWS || Shell.LINUX)) {
|
||||||
|
copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
|
||||||
|
} else {
|
||||||
|
FileUtils.copyFile(src, dst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static native void copyFileUnbuffered0(String src, String dst)
|
||||||
|
throws NativeIOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,9 @@
|
||||||
#include <sys/resource.h>
|
#include <sys/resource.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <sys/syscall.h>
|
#include <sys/syscall.h>
|
||||||
|
#if !(defined(__FreeBSD__) || defined(__MACH__))
|
||||||
|
#include <sys/sendfile.h>
|
||||||
|
#endif
|
||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
@ -1142,6 +1145,70 @@ JNIEnv *env, jclass clazz)
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JNIEXPORT void JNICALL
|
||||||
|
Java_org_apache_hadoop_io_nativeio_NativeIO_copyFileUnbuffered0(
|
||||||
|
JNIEnv *env, jclass clazz, jstring jsrc, jstring jdst)
|
||||||
|
{
|
||||||
|
#ifdef UNIX
|
||||||
|
#if (defined(__FreeBSD__) || defined(__MACH__))
|
||||||
|
THROW(env, "java/io/IOException",
|
||||||
|
"The function copyFileUnbuffered() is not supported on FreeBSD or Mac OS");
|
||||||
|
return;
|
||||||
|
#else
|
||||||
|
const char *src = NULL, *dst = NULL;
|
||||||
|
int srcFd = -1;
|
||||||
|
int dstFd = -1;
|
||||||
|
struct stat s;
|
||||||
|
off_t offset = 0;
|
||||||
|
|
||||||
|
src = (*env)->GetStringUTFChars(env, jsrc, NULL);
|
||||||
|
if (!src) goto cleanup; // exception was thrown
|
||||||
|
dst = (*env)->GetStringUTFChars(env, jdst, NULL);
|
||||||
|
if (!dst) goto cleanup; // exception was thrown
|
||||||
|
|
||||||
|
srcFd = open(src, O_RDONLY);
|
||||||
|
if (srcFd == -1) {
|
||||||
|
throw_ioe(env, errno);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
if (fstat(srcFd, &s) == -1){
|
||||||
|
throw_ioe(env, errno);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
dstFd = open(dst, O_WRONLY | O_CREAT, s.st_mode);
|
||||||
|
if (dstFd == -1) {
|
||||||
|
throw_ioe(env, errno);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
if (sendfile(dstFd, srcFd, &offset, s.st_size) == -1) {
|
||||||
|
throw_ioe(env, errno);
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
if (src) (*env)->ReleaseStringUTFChars(env, jsrc, src);
|
||||||
|
if (dst) (*env)->ReleaseStringUTFChars(env, jdst, dst);
|
||||||
|
if (srcFd != -1) close(srcFd);
|
||||||
|
if (dstFd != -1) close(dstFd);
|
||||||
|
#endif
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifdef WINDOWS
|
||||||
|
LPCWSTR src = NULL, dst = NULL;
|
||||||
|
|
||||||
|
src = (LPCWSTR) (*env)->GetStringChars(env, jsrc, NULL);
|
||||||
|
if (!src) goto cleanup; // exception was thrown
|
||||||
|
dst = (LPCWSTR) (*env)->GetStringChars(env, jdst, NULL);
|
||||||
|
if (!dst) goto cleanup; // exception was thrown
|
||||||
|
if (!CopyFileEx(src, dst, NULL, NULL, NULL, COPY_FILE_NO_BUFFERING)) {
|
||||||
|
throw_ioe(env, GetLastError());
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
if (src) (*env)->ReleaseStringChars(env, jsrc, src);
|
||||||
|
if (dst) (*env)->ReleaseStringChars(env, jdst, dst);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* vim: sw=2: ts=2: et:
|
* vim: sw=2: ts=2: et:
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -66,6 +66,7 @@ static errno_mapping_t ERRNO_MAPPINGS[] = {
|
||||||
MAPPING(ELOOP),
|
MAPPING(ELOOP),
|
||||||
MAPPING(ENAMETOOLONG),
|
MAPPING(ENAMETOOLONG),
|
||||||
MAPPING(ENOTEMPTY),
|
MAPPING(ENOTEMPTY),
|
||||||
|
MAPPING(EOVERFLOW),
|
||||||
{-1, NULL}
|
{-1, NULL}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -24,14 +24,18 @@ import java.io.FileOutputStream;
|
||||||
import java.io.FileReader;
|
import java.io.FileReader;
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
import java.nio.MappedByteBuffer;
|
import java.nio.MappedByteBuffer;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
import java.nio.channels.FileChannel.MapMode;
|
import java.nio.channels.FileChannel.MapMode;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -622,4 +626,34 @@ public class TestNativeIO {
|
||||||
assumeTrue(NativeIO.isAvailable());
|
assumeTrue(NativeIO.isAvailable());
|
||||||
NativeIO.getMemlockLimit();
|
NativeIO.getMemlockLimit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 30000)
|
||||||
|
public void testCopyFileUnbuffered() throws Exception {
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
File srcFile = new File(TEST_DIR, METHOD_NAME + ".src.dat");
|
||||||
|
File dstFile = new File(TEST_DIR, METHOD_NAME + ".dst.dat");
|
||||||
|
final int fileSize = 0x8FFFFFF; // 128 MB
|
||||||
|
final int SEED = 0xBEEF;
|
||||||
|
final int batchSize = 4096;
|
||||||
|
final int numBatches = fileSize / batchSize;
|
||||||
|
Random rb = new Random(SEED);
|
||||||
|
FileChannel channel = null;
|
||||||
|
RandomAccessFile raSrcFile = null;
|
||||||
|
try {
|
||||||
|
raSrcFile = new RandomAccessFile(srcFile, "rw");
|
||||||
|
channel = raSrcFile.getChannel();
|
||||||
|
byte bytesToWrite[] = new byte[batchSize];
|
||||||
|
MappedByteBuffer mapBuf;
|
||||||
|
mapBuf = channel.map(MapMode.READ_WRITE, 0, fileSize);
|
||||||
|
for (int i = 0; i < numBatches; i++) {
|
||||||
|
rb.nextBytes(bytesToWrite);
|
||||||
|
mapBuf.put(bytesToWrite);
|
||||||
|
}
|
||||||
|
NativeIO.copyFileUnbuffered(srcFile, dstFile);
|
||||||
|
}finally {
|
||||||
|
IOUtils.cleanup(LOG, channel);
|
||||||
|
IOUtils.cleanup(LOG, raSrcFile);
|
||||||
|
FileUtils.deleteQuietly(TEST_DIR);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -355,6 +355,9 @@ Trunk (Unreleased)
|
||||||
HDFS-7112. LazyWriter should use either async IO or one thread per physical
|
HDFS-7112. LazyWriter should use either async IO or one thread per physical
|
||||||
disk. (Xiaoyu Yao via cnauroth)
|
disk. (Xiaoyu Yao via cnauroth)
|
||||||
|
|
||||||
|
HDFS-7090. Use unbuffered writes when persisting in-memory replicas.
|
||||||
|
(Xiaoyu Yao via cnauroth)
|
||||||
|
|
||||||
Release 2.7.0 - UNRELEASED
|
Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.common;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
|
@ -997,6 +998,93 @@ public abstract class Storage extends StorageInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copies a file (usually large) to a new location using native unbuffered IO.
|
||||||
|
* <p>
|
||||||
|
* This method copies the contents of the specified source file
|
||||||
|
* to the specified destination file using OS specific unbuffered IO.
|
||||||
|
* The goal is to avoid churning the file system buffer cache when copying
|
||||||
|
* large files. TheFileUtils#copyLarge function from apache-commons-io library
|
||||||
|
* can be used to achieve this with an internal memory buffer but is less
|
||||||
|
* efficient than the native unbuffered APIs such as sendfile() in Linux and
|
||||||
|
* CopyFileEx() in Windows wrapped in {@link NativeIO#copyFileUnbuffered}.
|
||||||
|
*
|
||||||
|
* The directory holding the destination file is created if it does not exist.
|
||||||
|
* If the destination file exists, then this method will delete it first.
|
||||||
|
* <p>
|
||||||
|
* <strong>Note:</strong> Setting <code>preserveFileDate</code> to
|
||||||
|
* {@code true} tries to preserve the file's last modified
|
||||||
|
* date/times using {@link File#setLastModified(long)}, however it is
|
||||||
|
* not guaranteed that the operation will succeed.
|
||||||
|
* If the modification operation fails, no indication is provided.
|
||||||
|
*
|
||||||
|
* @param srcFile an existing file to copy, must not be {@code null}
|
||||||
|
* @param destFile the new file, must not be {@code null}
|
||||||
|
* @param preserveFileDate true if the file date of the copy
|
||||||
|
* should be the same as the original
|
||||||
|
*
|
||||||
|
* @throws NullPointerException if source or destination is {@code null}
|
||||||
|
* @throws IOException if source or destination is invalid
|
||||||
|
* @throws IOException if an IO error occurs during copying
|
||||||
|
*/
|
||||||
|
public static void nativeCopyFileUnbuffered(File srcFile, File destFile,
|
||||||
|
boolean preserveFileDate) throws IOException {
|
||||||
|
if (srcFile == null) {
|
||||||
|
throw new NullPointerException("Source must not be null");
|
||||||
|
}
|
||||||
|
if (destFile == null) {
|
||||||
|
throw new NullPointerException("Destination must not be null");
|
||||||
|
}
|
||||||
|
if (srcFile.exists() == false) {
|
||||||
|
throw new FileNotFoundException("Source '" + srcFile + "' does not exist");
|
||||||
|
}
|
||||||
|
if (srcFile.isDirectory()) {
|
||||||
|
throw new IOException("Source '" + srcFile + "' exists but is a directory");
|
||||||
|
}
|
||||||
|
if (srcFile.getCanonicalPath().equals(destFile.getCanonicalPath())) {
|
||||||
|
throw new IOException("Source '" + srcFile + "' and destination '" +
|
||||||
|
destFile + "' are the same");
|
||||||
|
}
|
||||||
|
File parentFile = destFile.getParentFile();
|
||||||
|
if (parentFile != null) {
|
||||||
|
if (!parentFile.mkdirs() && !parentFile.isDirectory()) {
|
||||||
|
throw new IOException("Destination '" + parentFile
|
||||||
|
+ "' directory cannot be created");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (destFile.exists()) {
|
||||||
|
if (FileUtil.canWrite(destFile) == false) {
|
||||||
|
throw new IOException("Destination '" + destFile
|
||||||
|
+ "' exists but is read-only");
|
||||||
|
} else {
|
||||||
|
if (destFile.delete() == false) {
|
||||||
|
throw new IOException("Destination '" + destFile
|
||||||
|
+ "' exists but cannot be deleted");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
NativeIO.copyFileUnbuffered(srcFile, destFile);
|
||||||
|
} catch (NativeIOException e) {
|
||||||
|
throw new IOException("Failed to copy " + srcFile.getCanonicalPath()
|
||||||
|
+ " to " + destFile.getCanonicalPath()
|
||||||
|
+ " due to failure in NativeIO#copyFileUnbuffered(). "
|
||||||
|
+ e.toString());
|
||||||
|
}
|
||||||
|
if (srcFile.length() != destFile.length()) {
|
||||||
|
throw new IOException("Failed to copy full contents from '" + srcFile
|
||||||
|
+ "' to '" + destFile + "'");
|
||||||
|
}
|
||||||
|
if (preserveFileDate) {
|
||||||
|
if (destFile.setLastModified(srcFile.lastModified()) == false) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Failed to preserve last modified date from'" + srcFile
|
||||||
|
+ "' to '" + destFile + "'");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recursively delete all the content of the directory first and then
|
* Recursively delete all the content of the directory first and then
|
||||||
* the directory itself from the local filesystem.
|
* the directory itself from the local filesystem.
|
||||||
|
|
|
@ -723,12 +723,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
final File dstFile = new File(destDir, srcFile.getName());
|
final File dstFile = new File(destDir, srcFile.getName());
|
||||||
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
|
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
|
||||||
try {
|
try {
|
||||||
FileUtils.copyFile(srcMeta, dstMeta);
|
Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
|
throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
FileUtils.copyFile(srcFile, dstFile);
|
Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
|
throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue