diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 9cda0fa643f..27be644ddb9 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -428,6 +428,9 @@ Release 0.23.0 - Unreleased HADOOP-7445. Implement bulk checksum verification using efficient native code. (todd) + HADOOP-7753. Support fadvise and sync_file_range in NativeIO. Add + ReadaheadPool infrastructure for use in HDFS and MR. (todd) + BUG FIXES HADOOP-7630. hadoop-metrics2.properties should have a property *.period diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java new file mode 100644 index 00000000000..046d9e4b736 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.nativeio.NativeIO; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Manages a pool of threads which can issue readahead requests on file descriptors. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ReadaheadPool { + static final Log LOG = LogFactory.getLog(ReadaheadPool.class); + private static final int POOL_SIZE = 4; + private static final int MAX_POOL_SIZE = 16; + private static final int CAPACITY = 1024; + private final ThreadPoolExecutor pool; + + private static ReadaheadPool instance; + + /** + * Return the singleton instance for the current process. + */ + public static ReadaheadPool getInstance() { + synchronized (ReadaheadPool.class) { + if (instance == null && NativeIO.isAvailable()) { + instance = new ReadaheadPool(); + } + return instance; + } + } + + private ReadaheadPool() { + pool = new ThreadPoolExecutor(POOL_SIZE, MAX_POOL_SIZE, 3L, TimeUnit.SECONDS, + new ArrayBlockingQueue(CAPACITY)); + pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); + pool.setThreadFactory(new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Readahead Thread #%d") + .build()); + } + + /** + * Issue a request to readahead on the given file descriptor. + * + * @param identifier a textual identifier that will be used in error + * messages (e.g. the file name) + * @param fd the file descriptor to read ahead + * @param curPos the current offset at which reads are being issued + * @param readaheadLength the configured length to read ahead + * @param maxOffsetToRead the maximum offset that will be readahead + * (useful if, for example, only some segment of the file is + * requested by the user). Pass {@link Long.MAX_VALUE} to allow + * readahead to the end of the file. + * @param lastReadahead the result returned by the previous invocation + * of this function on this file descriptor, or null if this is + * the first call + * @return an object representing this outstanding request, or null + * if no readahead was performed + */ + public ReadaheadRequest readaheadStream( + String identifier, + FileDescriptor fd, + long curPos, + long readaheadLength, + long maxOffsetToRead, + ReadaheadRequest lastReadahead) { + + Preconditions.checkArgument(curPos <= maxOffsetToRead, + "Readahead position %s higher than maxOffsetToRead %s", + curPos, maxOffsetToRead); + + if (readaheadLength <= 0) { + return null; + } + + long lastOffset = Long.MIN_VALUE; + + if (lastReadahead != null) { + lastOffset = lastReadahead.getOffset(); + } + + // trigger each readahead when we have reached the halfway mark + // in the previous readahead. This gives the system time + // to satisfy the readahead before we start reading the data. + long nextOffset = lastOffset + readaheadLength / 2; + if (curPos >= nextOffset) { + // cancel any currently pending readahead, to avoid + // piling things up in the queue. Each reader should have at most + // one outstanding request in the queue. + if (lastReadahead != null) { + lastReadahead.cancel(); + lastReadahead = null; + } + + long length = Math.min(readaheadLength, + maxOffsetToRead - curPos); + + if (length <= 0) { + // we've reached the end of the stream + return null; + } + + return submitReadahead(identifier, fd, curPos, length); + } else { + return lastReadahead; + } + } + + /** + * Submit a request to readahead on the given file descriptor. + * @param identifier a textual identifier used in error messages, etc. + * @param fd the file descriptor to readahead + * @param off the offset at which to start the readahead + * @param len the number of bytes to read + * @return an object representing this pending request + */ + public ReadaheadRequest submitReadahead( + String identifier, FileDescriptor fd, long off, long len) { + ReadaheadRequestImpl req = new ReadaheadRequestImpl( + identifier, fd, off, len); + pool.execute(req); + if (LOG.isTraceEnabled()) { + LOG.trace("submit readahead: " + req); + } + return req; + } + + /** + * An outstanding readahead request that has been submitted to + * the pool. This request may be pending or may have been + * completed. + */ + public interface ReadaheadRequest { + /** + * Cancels the request for readahead. This should be used + * if the reader no longer needs the requested data, before + * closing the related file descriptor. + * + * It is safe to use even if the readahead request has already + * been fulfilled. + */ + public void cancel(); + + /** + * @return the requested offset + */ + public long getOffset(); + + /** + * @return the requested length + */ + public long getLength(); + } + + private static class ReadaheadRequestImpl implements Runnable, ReadaheadRequest { + private final String identifier; + private final FileDescriptor fd; + private final long off, len; + private volatile boolean canceled = false; + + private ReadaheadRequestImpl(String identifier, FileDescriptor fd, long off, long len) { + this.identifier = identifier; + this.fd = fd; + this.off = off; + this.len = len; + } + + public void run() { + if (canceled) return; + // There's a very narrow race here that the file will close right at + // this instant. But if that happens, we'll likely receive an EBADF + // error below, and see that it's canceled, ignoring the error. + // It's also possible that we'll end up requesting readahead on some + // other FD, which may be wasted work, but won't cause a problem. + try { + NativeIO.posixFadviseIfPossible(fd, off, len, + NativeIO.POSIX_FADV_WILLNEED); + } catch (IOException ioe) { + if (canceled) { + // no big deal - the reader canceled the request and closed + // the file. + return; + } + LOG.warn("Failed readahead on " + identifier, + ioe); + } + } + + @Override + public void cancel() { + canceled = true; + // We could attempt to remove it from the work queue, but that would + // add complexity. In practice, the work queues remain very short, + // so removing canceled requests has no gain. + } + + @Override + public long getOffset() { + return off; + } + + @Override + public long getLength() { + return len; + } + + @Override + public String toString() { + return "ReadaheadRequestImpl [identifier='" + identifier + "', fd=" + fd + + ", off=" + off + ", len=" + len + "]"; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java index e3c5803818c..2a7f883d957 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -46,10 +46,41 @@ public class NativeIO { public static final int O_FSYNC = O_SYNC; public static final int O_NDELAY = O_NONBLOCK; + // Flags for posix_fadvise() from bits/fcntl.h + /* No further special treatment. */ + public static final int POSIX_FADV_NORMAL = 0; + /* Expect random page references. */ + public static final int POSIX_FADV_RANDOM = 1; + /* Expect sequential page references. */ + public static final int POSIX_FADV_SEQUENTIAL = 2; + /* Will need these pages. */ + public static final int POSIX_FADV_WILLNEED = 3; + /* Don't need these pages. */ + public static final int POSIX_FADV_DONTNEED = 4; + /* Data will be accessed once. */ + public static final int POSIX_FADV_NOREUSE = 5; + + + /* Wait upon writeout of all pages + in the range before performing the + write. */ + public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1; + /* Initiate writeout of all those + dirty pages in the range which are + not presently under writeback. */ + public static final int SYNC_FILE_RANGE_WRITE = 2; + + /* Wait upon writeout of all pages in + the range after performing the + write. */ + public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4; + private static final Log LOG = LogFactory.getLog(NativeIO.class); private static boolean nativeLoaded = false; private static boolean workaroundNonThreadSafePasswdCalls = false; + private static boolean fadvisePossible = true; + private static boolean syncFileRangePossible = true; static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY = "hadoop.workaround.non.threadsafe.getpwuid"; @@ -88,9 +119,58 @@ public class NativeIO { /** Wrapper around chmod(2) */ public static native void chmod(String path, int mode) throws IOException; + /** Wrapper around posix_fadvise(2) */ + static native void posix_fadvise( + FileDescriptor fd, long offset, long len, int flags) throws NativeIOException; + + /** Wrapper around sync_file_range(2) */ + static native void sync_file_range( + FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException; + /** Initialize the JNI method ID and class ID cache */ private static native void initNative(); + /** + * Call posix_fadvise on the given file descriptor. See the manpage + * for this syscall for more information. On systems where this + * call is not available, does nothing. + * + * @throws NativeIOException if there is an error with the syscall + */ + public static void posixFadviseIfPossible( + FileDescriptor fd, long offset, long len, int flags) + throws NativeIOException { + if (nativeLoaded && fadvisePossible) { + try { + posix_fadvise(fd, offset, len, flags); + } catch (UnsupportedOperationException uoe) { + fadvisePossible = false; + } catch (UnsatisfiedLinkError ule) { + fadvisePossible = false; + } + } + } + + /** + * Call sync_file_range on the given file descriptor. See the manpage + * for this syscall for more information. On systems where this + * call is not available, does nothing. + * + * @throws NativeIOException if there is an error with the syscall + */ + public static void syncFileRangeIfPossible( + FileDescriptor fd, long offset, long nbytes, int flags) + throws NativeIOException { + if (nativeLoaded && syncFileRangePossible) { + try { + sync_file_range(fd, offset, nbytes, flags); + } catch (UnsupportedOperationException uoe) { + syncFileRangePossible = false; + } catch (UnsatisfiedLinkError ule) { + syncFileRangePossible = false; + } + } + } /** * Result type of the fstat call diff --git a/hadoop-common-project/hadoop-common/src/main/native/configure.ac b/hadoop-common-project/hadoop-common/src/main/native/configure.ac index 80f7f81c8d2..debd8f2cc1a 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/configure.ac +++ b/hadoop-common-project/hadoop-common/src/main/native/configure.ac @@ -40,6 +40,7 @@ AC_CONFIG_AUX_DIR([config]) AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_HEADER([config.h]) AC_SYS_LARGEFILE +AC_GNU_SOURCE AM_INIT_AUTOMAKE(hadoop,1.0.0) @@ -57,10 +58,8 @@ if test $JAVA_HOME != "" then JNI_LDFLAGS="-L$JAVA_HOME/jre/lib/$OS_ARCH/server" fi -ldflags_bak=$LDFLAGS LDFLAGS="$LDFLAGS $JNI_LDFLAGS" AC_CHECK_LIB([jvm], [JNI_GetCreatedJavaVMs]) -LDFLAGS=$ldflags_bak AC_SUBST([JNI_LDFLAGS]) # Checks for header files. @@ -94,6 +93,12 @@ AC_CHECK_HEADERS([snappy-c.h], AC_COMPUTE_NEEDED_DSO(snappy,HADOOP_SNAPPY_LIBRAR dnl Check for headers needed by the native Group resolution implementation AC_CHECK_HEADERS([fcntl.h stdlib.h string.h unistd.h], [], AC_MSG_ERROR(Some system headers not found... please ensure their presence on your platform.)) +dnl check for posix_fadvise +AC_CHECK_HEADERS(fcntl.h, [AC_CHECK_FUNCS(posix_fadvise)]) + +dnl check for sync_file_range +AC_CHECK_HEADERS(fcntl.h, [AC_CHECK_FUNCS(sync_file_range)]) + # Checks for typedefs, structures, and compiler characteristics. AC_C_CONST diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c index 209bb7a8dff..fbcf9563ee4 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include "org_apache_hadoop.h" @@ -234,6 +235,81 @@ cleanup: } + +/** + * public static native void posix_fadvise( + * FileDescriptor fd, long offset, long len, int flags); + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_posix_1fadvise( + JNIEnv *env, jclass clazz, + jobject fd_object, jlong offset, jlong len, jint flags) +{ +#ifndef HAVE_POSIX_FADVISE + THROW(env, "java/lang/UnsupportedOperationException", + "fadvise support not available"); +#else + int fd = fd_get(env, fd_object); + PASS_EXCEPTIONS(env); + + int err = 0; + if ((err = posix_fadvise(fd, (off_t)offset, (off_t)len, flags))) { + throw_ioe(env, err); + } +#endif +} + +#if defined(HAVE_SYNC_FILE_RANGE) +# define my_sync_file_range sync_file_range +#elif defined(SYS_sync_file_range) +// RHEL 5 kernels have sync_file_range support, but the glibc +// included does not have the library function. We can +// still call it directly, and if it's not supported by the +// kernel, we'd get ENOSYS. See RedHat Bugzilla #518581 +static int manual_sync_file_range (int fd, __off64_t from, __off64_t to, unsigned int flags) +{ +#ifdef __x86_64__ + return syscall( SYS_sync_file_range, fd, from, to, flags); +#else + return syscall (SYS_sync_file_range, fd, + __LONG_LONG_PAIR ((long) (from >> 32), (long) from), + __LONG_LONG_PAIR ((long) (to >> 32), (long) to), + flags); +#endif +} +#define my_sync_file_range manual_sync_file_range +#endif + +/** + * public static native void sync_file_range( + * FileDescriptor fd, long offset, long len, int flags); + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_sync_1file_1range( + JNIEnv *env, jclass clazz, + jobject fd_object, jlong offset, jlong len, jint flags) +{ +#ifndef my_sync_file_range + THROW(env, "java/lang/UnsupportedOperationException", + "sync_file_range support not available"); +#else + int fd = fd_get(env, fd_object); + PASS_EXCEPTIONS(env); + + if (my_sync_file_range(fd, (off_t)offset, (off_t)len, flags)) { + if (errno == ENOSYS) { + // we know the syscall number, but it's not compiled + // into the running kernel + THROW(env, "java/lang/UnsupportedOperationException", + "sync_file_range kernel support not available"); + return; + } else { + throw_ioe(env, errno); + } + } +#endif +} + /* * public static native FileDescriptor open(String path, int flags, int mode); */ diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/file_descriptor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/file_descriptor.c index 0681db8f832..f2c5509d578 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/file_descriptor.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/file_descriptor.c @@ -54,6 +54,11 @@ void fd_deinit(JNIEnv *env) { * underlying fd, or throw if unavailable */ int fd_get(JNIEnv* env, jobject obj) { + if (obj == NULL) { + THROW(env, "java/lang/NullPointerException", + "FileDescriptor object is null"); + return -1; + } return (*env)->GetIntField(env, obj, fd_descriptor); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java index 51d044bda69..87da844fed9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java @@ -19,6 +19,7 @@ package org.apache.hadoop.io.nativeio; import java.io.File; import java.io.FileDescriptor; +import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; @@ -210,6 +211,66 @@ public class TestNativeIO { assertPermissions(toChmod, 0644); } + + @Test + public void testPosixFadvise() throws Exception { + FileInputStream fis = new FileInputStream("/dev/zero"); + try { + NativeIO.posix_fadvise(fis.getFD(), 0, 0, + NativeIO.POSIX_FADV_SEQUENTIAL); + } catch (UnsupportedOperationException uoe) { + // we should just skip the unit test on machines where we don't + // have fadvise support + assumeTrue(false); + } finally { + fis.close(); + } + + try { + NativeIO.posix_fadvise(fis.getFD(), 0, 1024, + NativeIO.POSIX_FADV_SEQUENTIAL); + + fail("Did not throw on bad file"); + } catch (NativeIOException nioe) { + assertEquals(Errno.EBADF, nioe.getErrno()); + } + + try { + NativeIO.posix_fadvise(null, 0, 1024, + NativeIO.POSIX_FADV_SEQUENTIAL); + + fail("Did not throw on null file"); + } catch (NullPointerException npe) { + // expected + } + } + + @Test + public void testSyncFileRange() throws Exception { + FileOutputStream fos = new FileOutputStream( + new File(TEST_DIR, "testSyncFileRange")); + try { + fos.write("foo".getBytes()); + NativeIO.sync_file_range(fos.getFD(), 0, 1024, + NativeIO.SYNC_FILE_RANGE_WRITE); + // no way to verify that this actually has synced, + // but if it doesn't throw, we can assume it worked + } catch (UnsupportedOperationException uoe) { + // we should just skip the unit test on machines where we don't + // have fadvise support + assumeTrue(false); + } finally { + fos.close(); + } + try { + NativeIO.sync_file_range(fos.getFD(), 0, 1024, + NativeIO.SYNC_FILE_RANGE_WRITE); + fail("Did not throw on bad file"); + } catch (NativeIOException nioe) { + assertEquals(Errno.EBADF, nioe.getErrno()); + } + } + private void assertPermissions(File f, int expected) throws IOException { FileSystem localfs = FileSystem.getLocal(new Configuration()); FsPermission perms = localfs.getFileStatus(