diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index a4f7ceabc00..3414d6d2d31 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -491,6 +491,7 @@
org.apache.hadoop.io.compress.lz4.Lz4Compressor
org.apache.hadoop.io.compress.lz4.Lz4Decompressor
org.apache.hadoop.util.NativeCrc32
+ org.apache.hadoop.net.unix.DomainSocket
${project.build.directory}/native/javah
diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
index c7f05e5c3bc..7c47ca46a9a 100644
--- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt
+++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
@@ -144,10 +144,10 @@ add_executable(test_bulk_crc32
${D}/util/bulk_crc32.c
${T}/util/test_bulk_crc32.c
)
-set_property(SOURCE main.cpp PROPERTY INCLUDE_DIRECTORIES "\"-Werror\" \"-Wall\"")
SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
add_dual_library(hadoop
+ main/native/src/exception.c
${D}/io/compress/lz4/Lz4Compressor.c
${D}/io/compress/lz4/Lz4Decompressor.c
${D}/io/compress/lz4/lz4.c
@@ -157,6 +157,7 @@ add_dual_library(hadoop
${D}/io/nativeio/NativeIO.c
${D}/io/nativeio/errno_enum.c
${D}/io/nativeio/file_descriptor.c
+ ${D}/net/unix/DomainSocket.c
${D}/security/JniBasedUnixGroupsMapping.c
${D}/security/JniBasedUnixGroupsNetgroupMapping.c
${D}/security/getGroup.c
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
new file mode 100644
index 00000000000..9f93924b5be
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
@@ -0,0 +1,619 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net.unix;
+
+import java.io.Closeable;
+import org.apache.hadoop.classification.InterfaceAudience;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The implementation of UNIX domain sockets in Java.
+ *
+ * See {@link DomainSocket} for more information about UNIX domain sockets.
+ */
+@InterfaceAudience.LimitedPrivate("HDFS")
+public class DomainSocket implements Closeable {
+ static {
+ if (SystemUtils.IS_OS_WINDOWS) {
+ loadingFailureReason = "UNIX Domain sockets are not available on Windows.";
+ } else if (!NativeCodeLoader.isNativeCodeLoaded()) {
+ loadingFailureReason = "libhadoop cannot be loaded.";
+ } else {
+ String problem = "DomainSocket#anchorNative got error: unknown";
+ try {
+ anchorNative();
+ problem = null;
+ } catch (Throwable t) {
+ problem = "DomainSocket#anchorNative got error: " + t.getMessage();
+ }
+ loadingFailureReason = problem;
+ }
+ }
+
+ static Log LOG = LogFactory.getLog(DomainSocket.class);
+
+ /**
+ * True only if we should validate the paths used in {@link DomainSocket#bind()}
+ */
+ private static boolean validateBindPaths = true;
+
+ /**
+ * The reason why DomainSocket is not available, or null if it is available.
+ */
+ private final static String loadingFailureReason;
+
+ /**
+ * Initialize the native library code.
+ */
+ private static native void anchorNative();
+
+ /**
+ * This function is designed to validate that the path chosen for a UNIX
+ * domain socket is secure. A socket path is secure if it doesn't allow
+ * unprivileged users to perform a man-in-the-middle attack against it.
+ * For example, one way to perform a man-in-the-middle attack would be for
+ * a malicious user to move the server socket out of the way and create his
+ * own socket in the same place. Not good.
+ *
+ * Note that we only check the path once. It's possible that the
+ * permissions on the path could change, perhaps to something more relaxed,
+ * immediately after the path passes our validation test-- hence creating a
+ * security hole. However, the purpose of this check is to spot common
+ * misconfigurations. System administrators do not commonly change
+ * permissions on these paths while the server is running.
+ *
+ * @param path the path to validate
+ * @param skipComponents the number of starting path components to skip
+ * validation for (used only for testing)
+ */
+ @VisibleForTesting
+ native static void validateSocketPathSecurity0(String path,
+ int skipComponents) throws IOException;
+
+ /**
+ * Return true only if UNIX domain sockets are available.
+ */
+ public static String getLoadingFailureReason() {
+ return loadingFailureReason;
+ }
+
+ /**
+ * Disable validation of the server bind paths.
+ */
+ @VisibleForTesting
+ static void disableBindPathValidation() {
+ validateBindPaths = false;
+ }
+
+ /**
+ * Given a path and a port, compute the effective path by replacing
+ * occurrences of __PORT__ with the port. This is mainly to make it
+ * possible to run multiple DataNodes locally for testing purposes.
+ *
+ * @param path The source path
+ * @param port Port number to use
+ *
+ * @return The effective path
+ */
+ public static String getEffectivePath(String path, int port) {
+ return path.replace("__PORT__", String.valueOf(port));
+ }
+
+ /**
+ * Status bits
+ *
+ * Bit 30: 0 = DomainSocket open, 1 = DomainSocket closed
+ * Bits 29 to 0: the reference count.
+ */
+ private final AtomicInteger status;
+
+ /**
+ * Bit mask representing a closed domain socket.
+ */
+ private static final int STATUS_CLOSED_MASK = 1 << 30;
+
+ /**
+ * The file descriptor associated with this UNIX domain socket.
+ */
+ private final int fd;
+
+ /**
+ * The path associated with this UNIX domain socket.
+ */
+ private final String path;
+
+ /**
+ * The InputStream associated with this socket.
+ */
+ private final DomainInputStream inputStream = new DomainInputStream();
+
+ /**
+ * The OutputStream associated with this socket.
+ */
+ private final DomainOutputStream outputStream = new DomainOutputStream();
+
+ /**
+ * The Channel associated with this socket.
+ */
+ private final DomainChannel channel = new DomainChannel();
+
+ private DomainSocket(String path, int fd) {
+ this.status = new AtomicInteger(0);
+ this.fd = fd;
+ this.path = path;
+ }
+
+ private static native int bind0(String path) throws IOException;
+
+ /**
+ * Create a new DomainSocket listening on the given path.
+ *
+ * @param path The path to bind and listen on.
+ * @return The new DomainSocket.
+ */
+ public static DomainSocket bindAndListen(String path) throws IOException {
+ if (loadingFailureReason != null) {
+ throw new UnsupportedOperationException(loadingFailureReason);
+ }
+ if (validateBindPaths) {
+ validateSocketPathSecurity0(path, 0);
+ }
+ int fd = bind0(path);
+ return new DomainSocket(path, fd);
+ }
+
+ private static native int accept0(int fd) throws IOException;
+
+ /**
+ * Accept a new UNIX domain connection.
+ *
+ * This method can only be used on sockets that were bound with bind().
+ *
+ * @return The new connection.
+ * @throws IOException If there was an I/O error
+ * performing the accept-- such as the
+ * socket being closed from under us.
+ * @throws SocketTimeoutException If the accept timed out.
+ */
+ public DomainSocket accept() throws IOException {
+ fdRef();
+ try {
+ return new DomainSocket(path, accept0(fd));
+ } finally {
+ fdUnref();
+ }
+ }
+
+ private static native int connect0(String path);
+
+ /**
+ * Create a new DomainSocket connected to the given path.
+ *
+ * @param path The path to connect to.
+ * @return The new DomainSocket.
+ */
+ public static DomainSocket connect(String path) throws IOException {
+ if (loadingFailureReason != null) {
+ throw new UnsupportedOperationException(loadingFailureReason);
+ }
+ int fd = connect0(path);
+ return new DomainSocket(path, fd);
+ }
+
+ /**
+ * Increment the reference count of the underlying file descriptor.
+ *
+ * @throws SocketException If the file descriptor is closed.
+ */
+ private void fdRef() throws SocketException {
+ int bits = status.incrementAndGet();
+ if ((bits & STATUS_CLOSED_MASK) != 0) {
+ status.decrementAndGet();
+ throw new SocketException("Socket is closed.");
+ }
+ }
+
+ /**
+ * Decrement the reference count of the underlying file descriptor.
+ */
+ private void fdUnref() {
+ int newCount = status.decrementAndGet();
+ assert newCount >= 0;
+ }
+
+ /**
+ * Return true if the file descriptor is currently open.
+ *
+ * @return True if the file descriptor is currently open.
+ */
+ public boolean isOpen() {
+ return ((status.get() & STATUS_CLOSED_MASK) == 0);
+ }
+
+ /**
+ * @return The socket path.
+ */
+ public String getPath() {
+ return path;
+ }
+
+ /**
+ * @return The socket InputStream
+ */
+ public DomainInputStream getInputStream() {
+ return inputStream;
+ }
+
+ /**
+ * @return The socket OutputStream
+ */
+ public DomainOutputStream getOutputStream() {
+ return outputStream;
+ }
+
+ /**
+ * @return The socket Channel
+ */
+ public DomainChannel getChannel() {
+ return channel;
+ }
+
+ public static final int SND_BUF_SIZE = 1;
+ public static final int RCV_BUF_SIZE = 2;
+ public static final int SND_TIMEO = 3;
+ public static final int RCV_TIMEO = 4;
+
+ private static native void setAttribute0(int fd, int type, int val)
+ throws IOException;
+
+ public void setAttribute(int type, int size) throws IOException {
+ fdRef();
+ try {
+ setAttribute0(fd, type, size);
+ } finally {
+ fdUnref();
+ }
+ }
+
+ private native int getAttribute0(int fd, int type) throws IOException;
+
+ public int getAttribute(int type) throws IOException {
+ fdRef();
+ try {
+ return getAttribute0(fd, type);
+ } finally {
+ fdUnref();
+ }
+ }
+
+ private static native void close0(int fd) throws IOException;
+
+ private static native void closeFileDescriptor0(FileDescriptor fd)
+ throws IOException;
+
+ private static native void shutdown0(int fd) throws IOException;
+
+ /**
+ * Close the Socket.
+ */
+ @Override
+ public void close() throws IOException {
+ // Set the closed bit on this DomainSocket
+ int bits;
+ while (true) {
+ bits = status.get();
+ if ((bits & STATUS_CLOSED_MASK) != 0) {
+ return; // already closed
+ }
+ if (status.compareAndSet(bits, bits | STATUS_CLOSED_MASK)) {
+ break;
+ }
+ }
+ // Wait for all references to go away
+ boolean didShutdown = false;
+ boolean interrupted = false;
+ while ((bits & (~STATUS_CLOSED_MASK)) > 0) {
+ if (!didShutdown) {
+ try {
+ // Calling shutdown on the socket will interrupt blocking system
+ // calls like accept, write, and read that are going on in a
+ // different thread.
+ shutdown0(fd);
+ } catch (IOException e) {
+ LOG.error("shutdown error: ", e);
+ }
+ didShutdown = true;
+ }
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ bits = status.get();
+ }
+
+ // Close the file descriptor. After this point, the file descriptor
+ // number will be reused by something else. Although this DomainSocket
+ // object continues to hold the old file descriptor number (it's a final
+ // field), we never use it again because we look at the closed bit and
+ // realize that this DomainSocket is not usable.
+ close0(fd);
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /*
+ * Clean up if the user forgets to close the socket.
+ */
+ protected void finalize() throws IOException {
+ close();
+ }
+
+ private native static void sendFileDescriptors0(int fd, FileDescriptor jfds[],
+ byte jbuf[], int offset, int length) throws IOException;
+
+ /**
+ * Send some FileDescriptor objects to the process on the other side of this
+ * socket.
+ *
+ * @param jfds The file descriptors to send.
+ * @param jbuf Some bytes to send. You must send at least
+ * one byte.
+ * @param offset The offset in the jbuf array to start at.
+ * @param length Length of the jbuf array to use.
+ */
+ public void sendFileDescriptors(FileDescriptor jfds[],
+ byte jbuf[], int offset, int length) throws IOException {
+ fdRef();
+ try {
+ sendFileDescriptors0(fd, jfds, jbuf, offset, length);
+ } finally {
+ fdUnref();
+ }
+ }
+
+ private static native int receiveFileDescriptors0(int fd, FileDescriptor[] jfds,
+ byte jbuf[], int offset, int length) throws IOException;
+
+ /**
+ * Receive some FileDescriptor objects from the process on the other side of
+ * this socket.
+ *
+ * @param jfds (output parameter) Array of FileDescriptors.
+ * We will fill as many slots as possible with file
+ * descriptors passed from the remote process. The
+ * other slots will contain NULL.
+ * @param jbuf (output parameter) Buffer to read into.
+ * The UNIX domain sockets API requires you to read
+ * at least one byte from the remote process, even
+ * if all you care about is the file descriptors
+ * you will receive.
+ * @param offset Offset into the byte buffer to load data
+ * @param length Length of the byte buffer to use for data
+ *
+ * @return The number of bytes read. This will be -1 if we
+ * reached EOF (similar to SocketInputStream);
+ * otherwise, it will be positive.
+ * @throws IOException if there was an I/O error.
+ */
+ public int receiveFileDescriptors(FileDescriptor[] jfds,
+ byte jbuf[], int offset, int length) throws IOException {
+ fdRef();
+ try {
+ return receiveFileDescriptors0(fd, jfds, jbuf, offset, length);
+ } finally {
+ fdUnref();
+ }
+ }
+
+ /**
+ * Receive some FileDescriptor objects from the process on the other side of
+ * this socket, and wrap them in FileInputStream objects.
+ *
+ * See {@link DomainSocket#recvFileInputStreams(ByteBuffer)}
+ */
+ public int recvFileInputStreams(FileInputStream[] fis, byte buf[],
+ int offset, int length) throws IOException {
+ FileDescriptor fds[] = new FileDescriptor[fis.length];
+ boolean success = false;
+ for (int i = 0; i < fis.length; i++) {
+ fis[i] = null;
+ }
+ fdRef();
+ try {
+ int ret = receiveFileDescriptors0(fd, fds, buf, offset, length);
+ for (int i = 0, j = 0; i < fds.length; i++) {
+ if (fds[i] != null) {
+ fis[j++] = new FileInputStream(fds[i]);
+ fds[i] = null;
+ }
+ }
+ success = true;
+ return ret;
+ } finally {
+ fdUnref();
+ if (!success) {
+ for (int i = 0; i < fds.length; i++) {
+ if (fds[i] != null) {
+ try {
+ closeFileDescriptor0(fds[i]);
+ } catch (Throwable t) {
+ LOG.warn(t);
+ }
+ } else if (fis[i] != null) {
+ try {
+ fis[i].close();
+ } catch (Throwable t) {
+ LOG.warn(t);
+ } finally {
+ fis[i] = null; }
+ }
+ }
+ }
+ }
+ }
+
+ private native static int readArray0(int fd, byte b[], int off, int len)
+ throws IOException;
+
+ private native static int available0(int fd) throws IOException;
+
+ private static native void write0(int fd, int b) throws IOException;
+
+ private static native void writeArray0(int fd, byte b[], int offset, int length)
+ throws IOException;
+
+ private native static int readByteBufferDirect0(int fd, ByteBuffer dst,
+ int position, int remaining) throws IOException;
+
+ /**
+ * Input stream for UNIX domain sockets.
+ */
+ @InterfaceAudience.LimitedPrivate("HDFS")
+ public class DomainInputStream extends InputStream {
+ @Override
+ public int read() throws IOException {
+ fdRef();
+ try {
+ byte b[] = new byte[1];
+ int ret = DomainSocket.readArray0(DomainSocket.this.fd, b, 0, 1);
+ return (ret >= 0) ? b[0] : -1;
+ } finally {
+ fdUnref();
+ }
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ fdRef();
+ try {
+ return DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
+ } finally {
+ fdUnref();
+ }
+ }
+
+ @Override
+ public int available() throws IOException {
+ fdRef();
+ try {
+ return DomainSocket.available0(DomainSocket.this.fd);
+ } finally {
+ fdUnref();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ DomainSocket.this.close();
+ }
+ }
+
+ /**
+ * Output stream for UNIX domain sockets.
+ */
+ @InterfaceAudience.LimitedPrivate("HDFS")
+ public class DomainOutputStream extends OutputStream {
+ @Override
+ public void close() throws IOException {
+ DomainSocket.this.close();
+ }
+
+ @Override
+ public void write(int val) throws IOException {
+ fdRef();
+ try {
+ byte b[] = new byte[1];
+ b[0] = (byte)val;
+ DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1);
+ } finally {
+ fdUnref();
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ fdRef();
+ try {
+ DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len);
+ } finally {
+ fdUnref();
+ }
+ }
+ }
+
+ @InterfaceAudience.LimitedPrivate("HDFS")
+ public class DomainChannel implements ReadableByteChannel {
+ @Override
+ public boolean isOpen() {
+ return DomainSocket.this.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ DomainSocket.this.close();
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ fdRef();
+ try {
+ int nread = 0;
+ if (dst.isDirect()) {
+ nread = DomainSocket.readByteBufferDirect0(DomainSocket.this.fd,
+ dst, dst.position(), dst.remaining());
+ } else if (dst.hasArray()) {
+ nread = DomainSocket.readArray0(DomainSocket.this.fd,
+ dst.array(), dst.position() + dst.arrayOffset(),
+ dst.remaining());
+ } else {
+ throw new AssertionError("we don't support " +
+ "using ByteBuffers that aren't either direct or backed by " +
+ "arrays");
+ }
+ if (nread > 0) {
+ dst.position(dst.position() + nread);
+ }
+ return nread;
+ } finally {
+ fdUnref();
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("DomainSocket(fd=%d,path=%s)", fd, path);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/exception.c b/hadoop-common-project/hadoop-common/src/main/native/src/exception.c
new file mode 100644
index 00000000000..39a03f9cde0
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/exception.c
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "exception.h"
+
+#include
+#include
+#include
+#include
+
+jthrowable newExceptionV(JNIEnv* env, const char *name,
+ const char *fmt, va_list ap)
+{
+ int need;
+ char buf[1], *msg = NULL;
+ va_list ap2;
+ jstring jstr = NULL;
+ jthrowable jthr;
+ jclass clazz;
+ jmethodID excCtor;
+
+ va_copy(ap2, ap);
+ clazz = (*env)->FindClass(env, name);
+ if (!clazz) {
+ jthr = (*env)->ExceptionOccurred(env);
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+ excCtor = (*env)->GetMethodID(env,
+ clazz, "", "(Ljava/lang/String;)V");
+ if (!excCtor) {
+ jthr = (*env)->ExceptionOccurred(env);
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+ need = vsnprintf(buf, sizeof(buf), fmt, ap);
+ if (need < 0) {
+ fmt = "vsnprintf error";
+ need = strlen(fmt);
+ }
+ msg = malloc(need + 1);
+ vsnprintf(msg, need + 1, fmt, ap2);
+ jstr = (*env)->NewStringUTF(env, msg);
+ if (!jstr) {
+ jthr = (*env)->ExceptionOccurred(env);
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+ jthr = (*env)->NewObject(env, clazz, excCtor, jstr);
+ if (!jthr) {
+ jthr = (*env)->ExceptionOccurred(env);
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+
+done:
+ free(msg);
+ va_end(ap2);
+ (*env)->DeleteLocalRef(env, jstr);
+ return jthr;
+}
+
+jthrowable newException(JNIEnv* env, const char *name, const char *fmt, ...)
+{
+ va_list ap;
+ jthrowable jthr;
+
+ va_start(ap, fmt);
+ jthr = newExceptionV(env, name, fmt, ap);
+ va_end(ap);
+ return jthr;
+}
+
+jthrowable newRuntimeException(JNIEnv* env, const char *fmt, ...)
+{
+ va_list ap;
+ jthrowable jthr;
+
+ va_start(ap, fmt);
+ jthr = newExceptionV(env, "java/lang/RuntimeException", fmt, ap);
+ va_end(ap);
+ return jthr;
+}
+
+jthrowable newIOException(JNIEnv* env, const char *fmt, ...)
+{
+ va_list ap;
+ jthrowable jthr;
+
+ va_start(ap, fmt);
+ jthr = newExceptionV(env, "java/io/IOException", fmt, ap);
+ va_end(ap);
+ return jthr;
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/exception.h b/hadoop-common-project/hadoop-common/src/main/native/src/exception.h
new file mode 100644
index 00000000000..d7af3772c85
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/exception.h
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef HADOOP_MAIN_NATIVE_SRC_EXCEPTION_H
+#define HADOOP_MAIN_NATIVE_SRC_EXCEPTION_H
+
+#include /* for jthrowable */
+#include /* for va_list */
+
+/**
+ * Create a new Exception.
+ *
+ * No exceptions will be pending on return.
+ *
+ * @param env The JNI environment
+ * @param name full name of the Java exception class
+ * @param fmt printf-style format string
+ * @param ap printf-style arguments
+ *
+ * @return The RuntimeException
+ */
+jthrowable newExceptionV(JNIEnv* env, const char *name,
+ const char *fmt, va_list ap);
+
+/**
+ * Create a new Exception.
+ *
+ * No exceptions will be pending on return.
+ *
+ * @param env The JNI environment
+ * @param name full name of the Java exception class
+ * @param fmt printf-style format string
+ * @param ... printf-style arguments
+ *
+ * @return The RuntimeException
+ */
+jthrowable newException(JNIEnv* env, const char *name, const char *fmt, ...)
+ __attribute__((format(printf, 3, 4)));
+
+/**
+ * Create a new RuntimeException.
+ *
+ * No exceptions will be pending on return.
+ *
+ * @param env The JNI environment
+ * @param fmt printf-style format string
+ * @param ... printf-style arguments
+ *
+ * @return The RuntimeException
+ */
+jthrowable newRuntimeException(JNIEnv* env, const char *fmt, ...)
+ __attribute__((format(printf, 2, 3)));
+
+/**
+ * Create a new IOException.
+ *
+ * No exceptions will be pending on return.
+ *
+ * @param env The JNI environment
+ * @param fmt printf-style format string
+ * @param ... printf-style arguments
+ *
+ * @return The IOException, or another exception if we failed
+ * to create the NativeIOException.
+ */
+jthrowable newIOException(JNIEnv* env, const char *fmt, ...)
+ __attribute__((format(printf, 2, 3)));
+
+#endif
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c
new file mode 100644
index 00000000000..2daa4866c92
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c
@@ -0,0 +1,924 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include "exception.h"
+#include "org/apache/hadoop/io/nativeio/file_descriptor.h"
+#include "org_apache_hadoop.h"
+#include "org_apache_hadoop_net_unix_DomainSocket.h"
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include /* for FIONREAD */
+#include
+#include
+#include
+#include
+#include
+
+#define SND_BUF_SIZE org_apache_hadoop_net_unix_DomainSocket_SND_BUF_SIZE
+#define RCV_BUF_SIZE org_apache_hadoop_net_unix_DomainSocket_RCV_BUF_SIZE
+#define SND_TIMEO org_apache_hadoop_net_unix_DomainSocket_SND_TIMEO
+#define RCV_TIMEO org_apache_hadoop_net_unix_DomainSocket_RCV_TIMEO
+
+#define DEFAULT_RCV_TIMEO 120000
+#define DEFAULT_SND_TIMEO 120000
+#define LISTEN_BACKLOG 128
+
+/**
+ * Can't pass more than this number of file descriptors in a single message.
+ */
+#define MAX_PASSED_FDS 16
+
+static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val);
+
+/**
+ * Convert an errno to a socket exception name.
+ *
+ * Note: we assume that all of these exceptions have a one-argument constructor
+ * that takes a string.
+ *
+ * @return The exception class name
+ */
+static const char *errnoToSocketExceptionName(int errnum)
+{
+ switch (errnum) {
+ case EAGAIN:
+ /* accept(2) returns EAGAIN when a socket timeout has been set, and that
+ * timeout elapses without an incoming connection. This error code is also
+ * used in non-blocking I/O, but we don't support that. */
+ case ETIMEDOUT:
+ return "java/net/SocketTimeoutException";
+ case EHOSTDOWN:
+ case EHOSTUNREACH:
+ case ECONNREFUSED:
+ return "java/net/NoRouteToHostException";
+ case ENOTSUP:
+ return "java/lang/UnsupportedOperationException";
+ default:
+ return "java/net/SocketException";
+ }
+}
+
+static jthrowable newSocketException(JNIEnv *env, int errnum,
+ const char *fmt, ...)
+ __attribute__((format(printf, 3, 4)));
+
+static jthrowable newSocketException(JNIEnv *env, int errnum,
+ const char *fmt, ...)
+{
+ va_list ap;
+ jthrowable jthr;
+
+ va_start(ap, fmt);
+ jthr = newExceptionV(env, errnoToSocketExceptionName(errnum), fmt, ap);
+ va_end(ap);
+ return jthr;
+}
+
+static const char* terror(int errnum)
+{
+ if ((errnum < 0) || (errnum >= sys_nerr)) {
+ return "unknown error.";
+ }
+ return sys_errlist[errnum];
+}
+
+/**
+ * Flexible buffer that will try to fit data on the stack, and fall back
+ * to the heap if necessary.
+ */
+struct flexibleBuffer {
+ int8_t *curBuf;
+ int8_t *allocBuf;
+ int8_t stackBuf[8196];
+};
+
+static jthrowable flexBufInit(JNIEnv *env, struct flexibleBuffer *flexBuf, jint length)
+{
+ flexBuf->curBuf = flexBuf->allocBuf = NULL;
+ if (length < sizeof(flexBuf->stackBuf)) {
+ flexBuf->curBuf = flexBuf->stackBuf;
+ return NULL;
+ }
+ flexBuf->allocBuf = malloc(length);
+ if (!flexBuf->allocBuf) {
+ return newException(env, "java/lang/OutOfMemoryError",
+ "OOM allocating space for %d bytes of data.", length);
+ }
+ flexBuf->curBuf = flexBuf->allocBuf;
+ return NULL;
+}
+
+static void flexBufFree(struct flexibleBuffer *flexBuf)
+{
+ free(flexBuf->allocBuf);
+}
+
+static jthrowable setup(JNIEnv *env, int *ofd, jobject jpath, int doConnect)
+{
+ const char *cpath = NULL;
+ struct sockaddr_un addr;
+ jthrowable jthr = NULL;
+ int fd = -1, ret;
+
+ fd = socket(PF_UNIX, SOCK_STREAM, 0);
+ if (fd < 0) {
+ ret = errno;
+ jthr = newSocketException(env, ret,
+ "error creating UNIX domain socket with SOCK_STREAM: %s",
+ terror(ret));
+ goto done;
+ }
+ memset(&addr, 0, sizeof(&addr));
+ addr.sun_family = AF_UNIX;
+ cpath = (*env)->GetStringUTFChars(env, jpath, NULL);
+ if (!cpath) {
+ jthr = (*env)->ExceptionOccurred(env);
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+ ret = snprintf(addr.sun_path, sizeof(addr.sun_path),
+ "%s", cpath);
+ if (ret < 0) {
+ ret = errno;
+ jthr = newSocketException(env, EIO,
+ "error computing UNIX domain socket path: error %d (%s)",
+ ret, terror(ret));
+ goto done;
+ }
+ if (ret >= sizeof(addr.sun_path)) {
+ jthr = newSocketException(env, ENAMETOOLONG,
+ "error computing UNIX domain socket path: path too long. "
+ "The longest UNIX domain socket path possible on this host "
+ "is %zd bytes.", sizeof(addr.sun_path) - 1);
+ goto done;
+ }
+ if (doConnect) {
+ RETRY_ON_EINTR(ret, connect(fd,
+ (struct sockaddr*)&addr, sizeof(addr)));
+ if (ret < 0) {
+ ret = errno;
+ jthr = newException(env, "java/net/ConnectException",
+ "connect(2) error: %s when trying to connect to '%s'",
+ terror(ret), addr.sun_path);
+ goto done;
+ }
+ } else {
+ RETRY_ON_EINTR(ret, unlink(addr.sun_path));
+ RETRY_ON_EINTR(ret, bind(fd, (struct sockaddr*)&addr, sizeof(addr)));
+ if (ret < 0) {
+ ret = errno;
+ jthr = newException(env, "java/net/BindException",
+ "bind(2) error: %s when trying to bind to '%s'",
+ terror(ret), addr.sun_path);
+ goto done;
+ }
+ if (listen(fd, LISTEN_BACKLOG) < 0) {
+ ret = errno;
+ jthr = newException(env, "java/net/BindException",
+ "listen(2) error: %s when trying to listen to '%s'",
+ terror(ret), addr.sun_path);
+ goto done;
+ }
+ }
+
+done:
+ if (cpath) {
+ (*env)->ReleaseStringUTFChars(env, jpath, cpath);
+ }
+ if (jthr) {
+ if (fd > 0) {
+ RETRY_ON_EINTR(ret, close(fd));
+ fd = -1;
+ }
+ } else {
+ *ofd = fd;
+ }
+ return jthr;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_anchorNative(
+JNIEnv *env, jclass clazz)
+{
+ fd_init(env); // for fd_get, fd_create, etc.
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_validateSocketPathSecurity0(
+JNIEnv *env, jclass clazz, jobject jstr, jint skipComponents)
+{
+ jint utfLength;
+ char path[PATH_MAX], check[PATH_MAX] = { 0 }, *token, *rest;
+ struct stat st;
+ int ret, mode, strlenPath;
+ uid_t uid;
+ jthrowable jthr = NULL;
+
+ utfLength = (*env)->GetStringUTFLength(env, jstr);
+ if (utfLength > sizeof(path)) {
+ jthr = newIOException(env, "path is too long! We expected a path "
+ "no longer than %zd UTF-8 bytes.", sizeof(path));
+ goto done;
+ }
+ (*env)->GetStringUTFRegion(env, jstr, 0, utfLength, path);
+ jthr = (*env)->ExceptionOccurred(env);
+ if (jthr) {
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+ uid = geteuid();
+ strlenPath = strlen(path);
+ if (strlenPath == 0) {
+ jthr = newIOException(env, "socket path is empty.");
+ goto done;
+ }
+ if (path[strlenPath - 1] == '/') {
+ /* It makes no sense to have a socket path that ends in a slash, since
+ * sockets are not directories. */
+ jthr = newIOException(env, "bad socket path '%s'. The socket path "
+ "must not end in a slash.", path);
+ goto done;
+ }
+ rest = path;
+ while ((token = strtok_r(rest, "/", &rest))) {
+ strcat(check, "/");
+ strcat(check, token);
+ if (skipComponents > 0) {
+ skipComponents--;
+ continue;
+ }
+ if (!index(rest, '/')) {
+ /* Don't validate the last component, since it's not supposed to be a
+ * directory. (If it is a directory, we will fail to create the socket
+ * later with EISDIR or similar.)
+ */
+ break;
+ }
+ if (stat(check, &st) < 0) {
+ ret = errno;
+ jthr = newIOException(env, "failed to stat a path component: '%s'. "
+ "error code %d (%s)", check, ret, terror(ret));
+ goto done;
+ }
+ mode = st.st_mode & 0777;
+ if (mode & 0002) {
+ jthr = newIOException(env, "the path component: '%s' is "
+ "world-writable. Its permissions are 0%03o. Please fix "
+ "this or select a different socket path.", check, mode);
+ goto done;
+ }
+ if ((mode & 0020) && (st.st_gid != 0)) {
+ jthr = newIOException(env, "the path component: '%s' is "
+ "group-writable, and the group is not root. Its permissions are "
+ "0%03o, and it is owned by gid %d. Please fix this or "
+ "select a different socket path.", check, mode, st.st_gid);
+ goto done;
+ }
+ if ((mode & 0200) && (st.st_uid != 0) &&
+ (st.st_uid != uid)) {
+ jthr = newIOException(env, "the path component: '%s' is "
+ "owned by a user who is not root and not you. Your effective user "
+ "id is %d; the path is owned by user id %d, and its permissions are "
+ "0%03o. Please fix this or select a different socket path.",
+ check, uid, st.st_uid, mode);
+ goto done;
+ goto done;
+ }
+ }
+done:
+ if (jthr) {
+ (*env)->Throw(env, jthr);
+ }
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_bind0(
+JNIEnv *env, jclass clazz, jstring path)
+{
+ int fd;
+ jthrowable jthr = NULL;
+
+ jthr = setup(env, &fd, path, 0);
+ if (jthr) {
+ (*env)->Throw(env, jthr);
+ }
+ return fd;
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_accept0(
+JNIEnv *env, jclass clazz, jint fd)
+{
+ int ret, newFd = -1;
+ socklen_t slen;
+ struct sockaddr_un remote;
+ jthrowable jthr = NULL;
+
+ slen = sizeof(remote);
+ do {
+ newFd = accept(fd, (struct sockaddr*)&remote, &slen);
+ } while ((newFd < 0) && (errno == EINTR));
+ if (newFd < 0) {
+ ret = errno;
+ jthr = newSocketException(env, ret, "accept(2) error: %s", terror(ret));
+ goto done;
+ }
+
+done:
+ if (jthr) {
+ if (newFd > 0) {
+ RETRY_ON_EINTR(ret, close(newFd));
+ newFd = -1;
+ }
+ (*env)->Throw(env, jthr);
+ }
+ return newFd;
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_connect0(
+JNIEnv *env, jclass clazz, jstring path)
+{
+ int ret, fd;
+ jthrowable jthr = NULL;
+
+ jthr = setup(env, &fd, path, 1);
+ if (jthr) {
+ (*env)->Throw(env, jthr);
+ return -1;
+ }
+ if (((jthr = setAttribute0(env, fd, SND_TIMEO, DEFAULT_SND_TIMEO))) ||
+ ((jthr = setAttribute0(env, fd, RCV_TIMEO, DEFAULT_RCV_TIMEO)))) {
+ RETRY_ON_EINTR(ret, close(fd));
+ (*env)->Throw(env, jthr);
+ return -1;
+ }
+ return fd;
+}
+
+static void javaMillisToTimeVal(int javaMillis, struct timeval *tv)
+{
+ tv->tv_sec = javaMillis / 1000;
+ tv->tv_usec = (javaMillis - (tv->tv_sec * 1000)) * 1000;
+}
+
+static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val)
+{
+ struct timeval tv;
+ int ret, buf;
+
+ switch (type) {
+ case SND_BUF_SIZE:
+ buf = val;
+ if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buf, sizeof(buf))) {
+ ret = errno;
+ return newSocketException(env, ret,
+ "setsockopt(SO_SNDBUF) error: %s", terror(ret));
+ }
+ return NULL;
+ case RCV_BUF_SIZE:
+ buf = val;
+ if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buf, sizeof(buf))) {
+ ret = errno;
+ return newSocketException(env, ret,
+ "setsockopt(SO_RCVBUF) error: %s", terror(ret));
+ }
+ return NULL;
+ case SND_TIMEO:
+ javaMillisToTimeVal(val, &tv);
+ if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (struct timeval *)&tv,
+ sizeof(tv))) {
+ ret = errno;
+ return newSocketException(env, ret,
+ "setsockopt(SO_SNDTIMEO) error: %s", terror(ret));
+ }
+ return NULL;
+ case RCV_TIMEO:
+ javaMillisToTimeVal(val, &tv);
+ if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (struct timeval *)&tv,
+ sizeof(tv))) {
+ ret = errno;
+ return newSocketException(env, ret,
+ "setsockopt(SO_RCVTIMEO) error: %s", terror(ret));
+ }
+ return NULL;
+ default:
+ break;
+ }
+ return newRuntimeException(env, "Invalid attribute type %d.", type);
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_setAttribute0(
+JNIEnv *env, jclass clazz, jint fd, jint type, jint val)
+{
+ jthrowable jthr = setAttribute0(env, fd, type, val);
+ if (jthr) {
+ (*env)->Throw(env, jthr);
+ }
+}
+
+static jint getSockOptBufSizeToJavaBufSize(int size)
+{
+#ifdef __linux__
+ // Linux always doubles the value that you set with setsockopt.
+ // We cut it in half here so that programs can at least read back the same
+ // value they set.
+ size /= 2;
+#endif
+ return size;
+}
+
+static int timeValToJavaMillis(const struct timeval *tv)
+{
+ return (tv->tv_sec * 1000) + (tv->tv_usec / 1000);
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_getAttribute0(
+JNIEnv *env, jclass clazz, jint fd, jint type)
+{
+ struct timeval tv;
+ socklen_t len;
+ int ret, rval = 0;
+
+ switch (type) {
+ case SND_BUF_SIZE:
+ len = sizeof(rval);
+ if (getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &rval, &len)) {
+ ret = errno;
+ (*env)->Throw(env, newSocketException(env, ret,
+ "getsockopt(SO_SNDBUF) error: %s", terror(ret)));
+ return -1;
+ }
+ return getSockOptBufSizeToJavaBufSize(rval);
+ case RCV_BUF_SIZE:
+ len = sizeof(rval);
+ if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rval, &len)) {
+ ret = errno;
+ (*env)->Throw(env, newSocketException(env, ret,
+ "getsockopt(SO_RCVBUF) error: %s", terror(ret)));
+ return -1;
+ }
+ return getSockOptBufSizeToJavaBufSize(rval);
+ case SND_TIMEO:
+ memset(&tv, 0, sizeof(tv));
+ len = sizeof(struct timeval);
+ if (getsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, &len)) {
+ ret = errno;
+ (*env)->Throw(env, newSocketException(env, ret,
+ "getsockopt(SO_SNDTIMEO) error: %s", terror(ret)));
+ return -1;
+ }
+ return timeValToJavaMillis(&tv);
+ case RCV_TIMEO:
+ memset(&tv, 0, sizeof(tv));
+ len = sizeof(struct timeval);
+ if (getsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, &len)) {
+ ret = errno;
+ (*env)->Throw(env, newSocketException(env, ret,
+ "getsockopt(SO_RCVTIMEO) error: %s", terror(ret)));
+ return -1;
+ }
+ return timeValToJavaMillis(&tv);
+ default:
+ (*env)->Throw(env, newRuntimeException(env,
+ "Invalid attribute type %d.", type));
+ return -1;
+ }
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_close0(
+JNIEnv *env, jclass clazz, jint fd)
+{
+ int ret;
+
+ RETRY_ON_EINTR(ret, close(fd));
+ if (ret) {
+ ret = errno;
+ (*env)->Throw(env, newSocketException(env, ret,
+ "close(2) error: %s", terror(ret)));
+ }
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_closeFileDescriptor0(
+JNIEnv *env, jclass clazz, jobject jfd)
+{
+ Java_org_apache_hadoop_net_unix_DomainSocket_close0(
+ env, clazz, fd_get(env, jfd));
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_shutdown0(
+JNIEnv *env, jclass clazz, jint fd)
+{
+ int ret;
+
+ RETRY_ON_EINTR(ret, shutdown(fd, SHUT_RDWR));
+ if (ret) {
+ ret = errno;
+ (*env)->Throw(env, newSocketException(env, ret,
+ "shutdown(2) error: %s", terror(ret)));
+ }
+}
+
+/**
+ * Write an entire buffer to a file descriptor.
+ *
+ * @param env The JNI environment.
+ * @param fd The fd to write to.
+ * @param buf The buffer to write
+ * @param amt The length of the buffer to write.
+ * @return NULL on success; or the unraised exception representing
+ * the problem.
+ */
+static jthrowable write_fully(JNIEnv *env, int fd, int8_t *buf, int amt)
+{
+ int err, res;
+
+ while (amt > 0) {
+ res = write(fd, buf, amt);
+ if (res < 0) {
+ err = errno;
+ if (err == EINTR) {
+ continue;
+ }
+ return newSocketException(env, err, "write(2) error: %s", terror(err));
+ }
+ amt -= res;
+ buf += res;
+ }
+ return NULL;
+}
+
+/**
+ * Our auxillary data setup.
+ *
+ * See man 3 cmsg for more information about auxillary socket data on UNIX.
+ *
+ * We use __attribute__((packed)) to ensure that the compiler doesn't insert any
+ * padding between 'hdr' and 'fds'.
+ * We use __attribute__((aligned(8)) to ensure that the compiler puts the start
+ * of the structure at an address which is a multiple of 8. If we did not do
+ * this, the attribute((packed)) would cause the compiler to generate a lot of
+ * slow code for accessing unaligned memory.
+ */
+struct cmsghdr_with_fds {
+ struct cmsghdr hdr;
+ int fds[MAX_PASSED_FDS];
+} __attribute__((packed,aligned(8)));
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_sendFileDescriptors0(
+JNIEnv *env, jclass clazz, jint fd, jobject jfds, jobject jbuf,
+jint offset, jint length)
+{
+ struct iovec vec[1];
+ struct flexibleBuffer flexBuf;
+ struct cmsghdr_with_fds aux;
+ jint jfdsLen;
+ int i, ret = -1, auxLen;
+ struct msghdr socketMsg;
+ jthrowable jthr = NULL;
+
+ jthr = flexBufInit(env, &flexBuf, length);
+ if (jthr) {
+ goto done;
+ }
+ if (length <= 0) {
+ jthr = newException(env, "java/lang/IllegalArgumentException",
+ "You must write at least one byte.");
+ goto done;
+ }
+ jfdsLen = (*env)->GetArrayLength(env, jfds);
+ if (jfdsLen <= 0) {
+ jthr = newException(env, "java/lang/IllegalArgumentException",
+ "Called sendFileDescriptors with no file descriptors.");
+ goto done;
+ } else if (jfdsLen > MAX_PASSED_FDS) {
+ jfdsLen = 0;
+ jthr = newException(env, "java/lang/IllegalArgumentException",
+ "Called sendFileDescriptors with an array of %d length. "
+ "The maximum is %d.", jfdsLen, MAX_PASSED_FDS);
+ goto done;
+ }
+ (*env)->GetByteArrayRegion(env, jbuf, offset, length, flexBuf.curBuf);
+ jthr = (*env)->ExceptionOccurred(env);
+ if (jthr) {
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+ memset(&vec, 0, sizeof(vec));
+ vec[0].iov_base = flexBuf.curBuf;
+ vec[0].iov_len = length;
+ auxLen = CMSG_LEN(jfdsLen * sizeof(int));
+ memset(&aux, 0, auxLen);
+ memset(&socketMsg, 0, sizeof(socketMsg));
+ socketMsg.msg_iov = vec;
+ socketMsg.msg_iovlen = 1;
+ socketMsg.msg_control = &aux;
+ socketMsg.msg_controllen = auxLen;
+ aux.hdr.cmsg_len = auxLen;
+ aux.hdr.cmsg_level = SOL_SOCKET;
+ aux.hdr.cmsg_type = SCM_RIGHTS;
+ for (i = 0; i < jfdsLen; i++) {
+ jobject jfd = (*env)->GetObjectArrayElement(env, jfds, i);
+ if (!jfd) {
+ jthr = (*env)->ExceptionOccurred(env);
+ if (jthr) {
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+ jthr = newException(env, "java/lang/NullPointerException",
+ "element %d of jfds was NULL.", i);
+ goto done;
+ }
+ aux.fds[i] = fd_get(env, jfd);
+ (*env)->DeleteLocalRef(env, jfd);
+ if (jthr) {
+ goto done;
+ }
+ }
+ RETRY_ON_EINTR(ret, sendmsg(fd, &socketMsg, 0));
+ if (ret < 0) {
+ ret = errno;
+ jthr = newSocketException(env, ret, "sendmsg(2) error: %s", terror(ret));
+ goto done;
+ }
+ length -= ret;
+ if (length > 0) {
+ // Write the rest of the bytes we were asked to send.
+ // This time, no fds will be attached.
+ jthr = write_fully(env, fd, flexBuf.curBuf + ret, length);
+ if (jthr) {
+ goto done;
+ }
+ }
+
+done:
+ flexBufFree(&flexBuf);
+ if (jthr) {
+ (*env)->Throw(env, jthr);
+ }
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_receiveFileDescriptors0(
+JNIEnv *env, jclass clazz, jint fd, jarray jfds, jarray jbuf,
+jint offset, jint length)
+{
+ struct iovec vec[1];
+ struct flexibleBuffer flexBuf;
+ struct cmsghdr_with_fds aux;
+ int i, jRecvFdsLen = 0, auxLen;
+ jint jfdsLen = 0;
+ struct msghdr socketMsg;
+ ssize_t bytesRead = -1;
+ jobject fdObj;
+ jthrowable jthr = NULL;
+
+ jthr = flexBufInit(env, &flexBuf, length);
+ if (jthr) {
+ goto done;
+ }
+ if (length <= 0) {
+ jthr = newRuntimeException(env, "You must read at least one byte.");
+ goto done;
+ }
+ jfdsLen = (*env)->GetArrayLength(env, jfds);
+ if (jfdsLen <= 0) {
+ jthr = newException(env, "java/lang/IllegalArgumentException",
+ "Called receiveFileDescriptors with an array of %d length. "
+ "You must pass at least one fd.", jfdsLen);
+ goto done;
+ } else if (jfdsLen > MAX_PASSED_FDS) {
+ jfdsLen = 0;
+ jthr = newException(env, "java/lang/IllegalArgumentException",
+ "Called receiveFileDescriptors with an array of %d length. "
+ "The maximum is %d.", jfdsLen, MAX_PASSED_FDS);
+ goto done;
+ }
+ for (i = 0; i < jfdsLen; i++) {
+ (*env)->SetObjectArrayElement(env, jfds, i, NULL);
+ }
+ vec[0].iov_base = flexBuf.curBuf;
+ vec[0].iov_len = length;
+ auxLen = CMSG_LEN(jfdsLen * sizeof(int));
+ memset(&aux, 0, auxLen);
+ memset(&socketMsg, 0, auxLen);
+ socketMsg.msg_iov = vec;
+ socketMsg.msg_iovlen = 1;
+ socketMsg.msg_control = &aux;
+ socketMsg.msg_controllen = auxLen;
+ aux.hdr.cmsg_len = auxLen;
+ aux.hdr.cmsg_level = SOL_SOCKET;
+ aux.hdr.cmsg_type = SCM_RIGHTS;
+ RETRY_ON_EINTR(bytesRead, recvmsg(fd, &socketMsg, 0));
+ if (bytesRead < 0) {
+ int ret = errno;
+ if (ret == ECONNABORTED) {
+ // The remote peer disconnected on us. Treat this as an EOF.
+ bytesRead = -1;
+ goto done;
+ }
+ jthr = newSocketException(env, ret, "recvmsg(2) failed: %s",
+ terror(ret));
+ goto done;
+ } else if (bytesRead == 0) {
+ bytesRead = -1;
+ goto done;
+ }
+ jRecvFdsLen = (aux.hdr.cmsg_len - sizeof(struct cmsghdr)) / sizeof(int);
+ for (i = 0; i < jRecvFdsLen; i++) {
+ fdObj = fd_create(env, aux.fds[i]);
+ if (!fdObj) {
+ jthr = (*env)->ExceptionOccurred(env);
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+ // Make this -1 so we don't attempt to close it twice in an error path.
+ aux.fds[i] = -1;
+ (*env)->SetObjectArrayElement(env, jfds, i, fdObj);
+ // There is no point keeping around a local reference to the fdObj.
+ // The array continues to reference it.
+ (*env)->DeleteLocalRef(env, fdObj);
+ }
+ (*env)->SetByteArrayRegion(env, jbuf, offset, length, flexBuf.curBuf);
+ jthr = (*env)->ExceptionOccurred(env);
+ if (jthr) {
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+done:
+ flexBufFree(&flexBuf);
+ if (jthr) {
+ // Free any FileDescriptor references we may have created,
+ // or file descriptors we may have been passed.
+ for (i = 0; i < jRecvFdsLen; i++) {
+ if (aux.fds[i] >= 0) {
+ RETRY_ON_EINTR(i, close(aux.fds[i]));
+ aux.fds[i] = -1;
+ }
+ fdObj = (*env)->GetObjectArrayElement(env, jfds, i);
+ if (fdObj) {
+ int ret, afd = fd_get(env, fdObj);
+ if (afd >= 0) {
+ RETRY_ON_EINTR(ret, close(afd));
+ }
+ (*env)->SetObjectArrayElement(env, jfds, i, NULL);
+ (*env)->DeleteLocalRef(env, fdObj);
+ }
+ }
+ (*env)->Throw(env, jthr);
+ }
+ return bytesRead;
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_readArray0(
+JNIEnv *env, jclass clazz, jint fd, jarray b, jint offset, jint length)
+{
+ int ret = -1;
+ struct flexibleBuffer flexBuf;
+ jthrowable jthr;
+
+ jthr = flexBufInit(env, &flexBuf, length);
+ if (jthr) {
+ goto done;
+ }
+ RETRY_ON_EINTR(ret, read(fd, flexBuf.curBuf, length));
+ if (ret < 0) {
+ ret = errno;
+ if (ret == ECONNABORTED) {
+ // The remote peer disconnected on us. Treat this as an EOF.
+ ret = -1;
+ goto done;
+ }
+ jthr = newSocketException(env, ret, "read(2) error: %s",
+ terror(ret));
+ goto done;
+ }
+ if (ret == 0) {
+ goto done;
+ }
+ (*env)->SetByteArrayRegion(env, b, offset, ret, flexBuf.curBuf);
+ jthr = (*env)->ExceptionOccurred(env);
+ if (jthr) {
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+done:
+ flexBufFree(&flexBuf);
+ if (jthr) {
+ (*env)->Throw(env, jthr);
+ }
+ return ret == 0 ? -1 : ret; // Java wants -1 on EOF
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_available0(
+JNIEnv *env, jclass clazz, jint fd)
+{
+ int ret, avail = 0;
+ jthrowable jthr = NULL;
+
+ RETRY_ON_EINTR(ret, ioctl(fd, FIONREAD, &avail));
+ if (ret < 0) {
+ ret = errno;
+ jthr = newSocketException(env, ret,
+ "ioctl(%d, FIONREAD) error: %s", fd, terror(ret));
+ goto done;
+ }
+done:
+ if (jthr) {
+ (*env)->Throw(env, jthr);
+ }
+ return avail;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_writeArray0(
+JNIEnv *env, jclass clazz, jint fd, jarray b, jint offset, jint length)
+{
+ struct flexibleBuffer flexBuf;
+ jthrowable jthr;
+
+ jthr = flexBufInit(env, &flexBuf, length);
+ if (jthr) {
+ goto done;
+ }
+ (*env)->GetByteArrayRegion(env, b, offset, length, flexBuf.curBuf);
+ jthr = (*env)->ExceptionOccurred(env);
+ if (jthr) {
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+ jthr = write_fully(env, fd, flexBuf.curBuf, length);
+ if (jthr) {
+ goto done;
+ }
+
+done:
+ flexBufFree(&flexBuf);
+ if (jthr) {
+ (*env)->Throw(env, jthr);
+ }
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_readByteBufferDirect0(
+JNIEnv *env, jclass clazz, jint fd, jobject dst, jint position, jint remaining)
+{
+ uint8_t *buf;
+ jthrowable jthr = NULL;
+ int res = -1;
+
+ buf = (*env)->GetDirectBufferAddress(env, dst);
+ if (!buf) {
+ jthr = newRuntimeException(env, "GetDirectBufferAddress failed.");
+ goto done;
+ }
+ RETRY_ON_EINTR(res, read(fd, buf + position, remaining));
+ if (res < 0) {
+ res = errno;
+ if (res != ECONNABORTED) {
+ jthr = newSocketException(env, res, "read(2) error: %s",
+ terror(res));
+ goto done;
+ } else {
+ // The remote peer disconnected on us. Treat this as an EOF.
+ res = -1;
+ }
+ }
+done:
+ if (jthr) {
+ (*env)->Throw(env, jthr);
+ }
+ return res;
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h b/hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h
index a50c41dbbb4..21f21e1b0c1 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h
@@ -99,6 +99,10 @@ void *do_dlsym(JNIEnv *env, void *handle, const char *symbol) {
THROW(env, "java/lang/InternalError", exception_msg); \
}
+#define RETRY_ON_EINTR(ret, expr) do { \
+ ret = expr; \
+} while ((ret == -1) && (errno == EINTR));
+
#endif
//vim: sw=2: ts=2: et
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java
new file mode 100644
index 00000000000..1df78d96b7d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net.unix;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+
+/**
+ * Create a temporary directory in which sockets can be created.
+ * When creating a UNIX domain socket, the name
+ * must be fairly short (around 110 bytes on most platforms).
+ */
+public class TemporarySocketDirectory implements Closeable {
+ private File dir;
+
+ public TemporarySocketDirectory() {
+ String tmp = System.getProperty("java.io.tmpdir", "/tmp");
+ dir = new File(tmp, "socks." + (System.currentTimeMillis() +
+ "." + (new Random().nextInt())));
+ dir.mkdirs();
+ dir.setWritable(true, true);
+ }
+
+ public File getDir() {
+ return dir;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (dir != null) {
+ FileUtils.deleteDirectory(dir);
+ dir = null;
+ }
+ }
+
+ protected void finalize() throws IOException {
+ close();
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
new file mode 100644
index 00000000000..ab293ff5138
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net.unix;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket.DomainChannel;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+
+import com.google.common.io.Files;
+
+public class TestDomainSocket {
+ private static TemporarySocketDirectory sockDir;
+
+ @BeforeClass
+ public static void init() {
+ sockDir = new TemporarySocketDirectory();
+ DomainSocket.disableBindPathValidation();
+ }
+
+ @AfterClass
+ public static void shutdown() throws IOException {
+ sockDir.close();
+ }
+
+ @Before
+ public void checkPrecondition() {
+ Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
+ }
+
+ /**
+ * Test that we can create a socket and close it, even if it hasn't been
+ * opened.
+ *
+ * @throws IOException
+ */
+ @Test(timeout=180000)
+ public void testSocketCreateAndClose() throws IOException {
+ DomainSocket serv = DomainSocket.bindAndListen(
+ new File(sockDir.getDir(), "test_sock_create_and_close").
+ getAbsolutePath());
+ serv.close();
+ }
+
+ /**
+ * Test DomainSocket path setting and getting.
+ *
+ * @throws IOException
+ */
+ @Test(timeout=180000)
+ public void testSocketPathSetGet() throws IOException {
+ Assert.assertEquals("/var/run/hdfs/sock.100",
+ DomainSocket.getEffectivePath("/var/run/hdfs/sock.__PORT__", 100));
+ }
+
+ /**
+ * Test that if one thread is blocking in accept(), another thread
+ * can close the socket and stop the accept.
+ *
+ * @throws IOException
+ */
+ @Test(timeout=180000)
+ public void testSocketAcceptAndClose() throws Exception {
+ final String TEST_PATH =
+ new File(sockDir.getDir(), "test_sock_accept_and_close").getAbsolutePath();
+ final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
+ ExecutorService exeServ = Executors.newSingleThreadExecutor();
+ Callable callable = new Callable() {
+ public Void call(){
+ try {
+ serv.accept();
+ throw new RuntimeException("expected the accept() to be " +
+ "interrupted and fail");
+ } catch (IOException e) {
+ return null;
+ }
+ }
+ };
+ Future future = exeServ.submit(callable);
+ Thread.sleep(500);
+ serv.close();
+ future.get(2, TimeUnit.MINUTES);
+ }
+
+ /**
+ * Test that attempting to connect to an invalid path doesn't work.
+ *
+ * @throws IOException
+ */
+ @Test(timeout=180000)
+ public void testInvalidOperations() throws IOException {
+ try {
+ DomainSocket.connect(
+ new File(sockDir.getDir(), "test_sock_invalid_operation").
+ getAbsolutePath());
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains("connect(2) error: ", e);
+ }
+ }
+
+ /**
+ * Test setting some server options.
+ *
+ * @throws IOException
+ */
+ @Test(timeout=180000)
+ public void testServerOptions() throws Exception {
+ final String TEST_PATH = new File(sockDir.getDir(),
+ "test_sock_server_options").getAbsolutePath();
+ DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
+ try {
+ // Let's set a new receive buffer size
+ int bufSize = serv.getAttribute(DomainSocket.RCV_BUF_SIZE);
+ int newBufSize = bufSize / 2;
+ serv.setAttribute(DomainSocket.RCV_BUF_SIZE, newBufSize);
+ int nextBufSize = serv.getAttribute(DomainSocket.RCV_BUF_SIZE);
+ Assert.assertEquals(newBufSize, nextBufSize);
+ // Let's set a server timeout
+ int newTimeout = 1000;
+ serv.setAttribute(DomainSocket.RCV_TIMEO, newTimeout);
+ int nextTimeout = serv.getAttribute(DomainSocket.RCV_TIMEO);
+ Assert.assertEquals(newTimeout, nextTimeout);
+ try {
+ serv.accept();
+ Assert.fail("expected the accept() to time out and fail");
+ } catch (SocketTimeoutException e) {
+ GenericTestUtils.assertExceptionContains("accept(2) error: ", e);
+ }
+ } finally {
+ serv.close();
+ Assert.assertFalse(serv.isOpen());
+ }
+ }
+
+ /**
+ * A Throwable representing success.
+ *
+ * We can't use null to represent this, because you cannot insert null into
+ * ArrayBlockingQueue.
+ */
+ static class Success extends Throwable {
+ private static final long serialVersionUID = 1L;
+ }
+
+ static interface WriteStrategy {
+ /**
+ * Initialize a WriteStrategy object from a Socket.
+ */
+ public void init(DomainSocket s) throws IOException;
+
+ /**
+ * Write some bytes.
+ */
+ public void write(byte b[]) throws IOException;
+ }
+
+ static class OutputStreamWriteStrategy implements WriteStrategy {
+ private OutputStream outs = null;
+
+ public void init(DomainSocket s) throws IOException {
+ outs = s.getOutputStream();
+ }
+
+ public void write(byte b[]) throws IOException {
+ outs.write(b);
+ }
+ }
+
+ abstract static class ReadStrategy {
+ /**
+ * Initialize a ReadStrategy object from a DomainSocket.
+ */
+ public abstract void init(DomainSocket s) throws IOException;
+
+ /**
+ * Read some bytes.
+ */
+ public abstract int read(byte b[], int off, int length) throws IOException;
+
+ public void readFully(byte buf[], int off, int len) throws IOException {
+ int toRead = len;
+ while (toRead > 0) {
+ int ret = read(buf, off, toRead);
+ if (ret < 0) {
+ throw new IOException( "Premature EOF from inputStream");
+ }
+ toRead -= ret;
+ off += ret;
+ }
+ }
+ }
+
+ static class InputStreamReadStrategy extends ReadStrategy {
+ private InputStream ins = null;
+
+ @Override
+ public void init(DomainSocket s) throws IOException {
+ ins = s.getInputStream();
+ }
+
+ @Override
+ public int read(byte b[], int off, int length) throws IOException {
+ return ins.read(b, off, length);
+ }
+ }
+
+ static class DirectByteBufferReadStrategy extends ReadStrategy {
+ private DomainChannel ch = null;
+
+ @Override
+ public void init(DomainSocket s) throws IOException {
+ ch = s.getChannel();
+ }
+
+ @Override
+ public int read(byte b[], int off, int length) throws IOException {
+ ByteBuffer buf = ByteBuffer.allocateDirect(b.length);
+ int nread = ch.read(buf);
+ if (nread < 0) return nread;
+ buf.flip();
+ buf.get(b, off, nread);
+ return nread;
+ }
+ }
+
+ static class ArrayBackedByteBufferReadStrategy extends ReadStrategy {
+ private DomainChannel ch = null;
+
+ @Override
+ public void init(DomainSocket s) throws IOException {
+ ch = s.getChannel();
+ }
+
+ @Override
+ public int read(byte b[], int off, int length) throws IOException {
+ ByteBuffer buf = ByteBuffer.wrap(b);
+ int nread = ch.read(buf);
+ if (nread < 0) return nread;
+ buf.flip();
+ buf.get(b, off, nread);
+ return nread;
+ }
+ }
+
+ /**
+ * Test a simple client/server interaction.
+ *
+ * @throws IOException
+ */
+ void testClientServer1(final Class extends WriteStrategy> writeStrategyClass,
+ final Class extends ReadStrategy> readStrategyClass) throws Exception {
+ final String TEST_PATH = new File(sockDir.getDir(),
+ "test_sock_client_server1").getAbsolutePath();
+ final byte clientMsg1[] = new byte[] { 0x1, 0x2, 0x3, 0x4, 0x5, 0x6 };
+ final byte serverMsg1[] = new byte[] { 0x9, 0x8, 0x7, 0x6, 0x5 };
+ final byte clientMsg2 = 0x45;
+ final ArrayBlockingQueue threadResults =
+ new ArrayBlockingQueue(2);
+ final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
+ Thread serverThread = new Thread() {
+ public void run(){
+ // Run server
+ DomainSocket conn = null;
+ try {
+ conn = serv.accept();
+ byte in1[] = new byte[clientMsg1.length];
+ ReadStrategy reader = readStrategyClass.newInstance();
+ reader.init(conn);
+ reader.readFully(in1, 0, in1.length);
+ Assert.assertTrue(Arrays.equals(clientMsg1, in1));
+ WriteStrategy writer = writeStrategyClass.newInstance();
+ writer.init(conn);
+ writer.write(serverMsg1);
+ InputStream connInputStream = conn.getInputStream();
+ int in2 = connInputStream.read();
+ Assert.assertEquals((int)clientMsg2, in2);
+ conn.close();
+ } catch (Throwable e) {
+ threadResults.add(e);
+ Assert.fail(e.getMessage());
+ }
+ threadResults.add(new Success());
+ }
+ };
+ serverThread.start();
+
+ Thread clientThread = new Thread() {
+ public void run(){
+ try {
+ DomainSocket client = DomainSocket.connect(TEST_PATH);
+ WriteStrategy writer = writeStrategyClass.newInstance();
+ writer.init(client);
+ writer.write(clientMsg1);
+ ReadStrategy reader = readStrategyClass.newInstance();
+ reader.init(client);
+ byte in1[] = new byte[serverMsg1.length];
+ reader.readFully(in1, 0, in1.length);
+ Assert.assertTrue(Arrays.equals(serverMsg1, in1));
+ OutputStream clientOutputStream = client.getOutputStream();
+ clientOutputStream.write(clientMsg2);
+ client.close();
+ } catch (Throwable e) {
+ threadResults.add(e);
+ }
+ threadResults.add(new Success());
+ }
+ };
+ clientThread.start();
+
+ for (int i = 0; i < 2; i++) {
+ Throwable t = threadResults.take();
+ if (!(t instanceof Success)) {
+ Assert.fail(t.getMessage() + ExceptionUtils.getStackTrace(t));
+ }
+ }
+ serverThread.join(120000);
+ clientThread.join(120000);
+ serv.close();
+ }
+
+ @Test(timeout=180000)
+ public void testClientServerOutStreamInStream() throws Exception {
+ testClientServer1(OutputStreamWriteStrategy.class,
+ InputStreamReadStrategy.class);
+ }
+
+ @Test(timeout=180000)
+ public void testClientServerOutStreamInDbb() throws Exception {
+ testClientServer1(OutputStreamWriteStrategy.class,
+ DirectByteBufferReadStrategy.class);
+ }
+
+ @Test(timeout=180000)
+ public void testClientServerOutStreamInAbb() throws Exception {
+ testClientServer1(OutputStreamWriteStrategy.class,
+ ArrayBackedByteBufferReadStrategy.class);
+ }
+
+ static private class PassedFile {
+ private final int idx;
+ private final byte[] contents;
+ private FileInputStream fis;
+
+ public PassedFile(int idx) throws IOException {
+ this.idx = idx;
+ this.contents = new byte[] { (byte)(idx % 127) };
+ Files.write(contents, new File(getPath()));
+ this.fis = new FileInputStream(getPath());
+ }
+
+ public String getPath() {
+ return new File(sockDir.getDir(), "passed_file" + idx).getAbsolutePath();
+ }
+
+ public FileInputStream getInputStream() throws IOException {
+ return fis;
+ }
+
+ public void cleanup() throws IOException {
+ new File(getPath()).delete();
+ fis.close();
+ }
+
+ public void checkInputStream(FileInputStream fis) throws IOException {
+ byte buf[] = new byte[contents.length];
+ IOUtils.readFully(fis, buf, 0, buf.length);
+ Arrays.equals(contents, buf);
+ }
+
+ protected void finalize() {
+ try {
+ cleanup();
+ } catch(Throwable t) {
+ // ignore
+ }
+ }
+ }
+
+ /**
+ * Test file descriptor passing.
+ *
+ * @throws IOException
+ */
+ @Test(timeout=180000)
+ public void testFdPassing() throws Exception {
+ final String TEST_PATH =
+ new File(sockDir.getDir(), "test_sock").getAbsolutePath();
+ final byte clientMsg1[] = new byte[] { 0x11, 0x22, 0x33, 0x44, 0x55, 0x66 };
+ final byte serverMsg1[] = new byte[] { 0x31, 0x30, 0x32, 0x34, 0x31, 0x33,
+ 0x44, 0x1, 0x1, 0x1, 0x1, 0x1 };
+ final ArrayBlockingQueue threadResults =
+ new ArrayBlockingQueue(2);
+ final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
+ final PassedFile passedFiles[] =
+ new PassedFile[] { new PassedFile(1), new PassedFile(2) };
+ final FileDescriptor passedFds[] = new FileDescriptor[passedFiles.length];
+ for (int i = 0; i < passedFiles.length; i++) {
+ passedFds[i] = passedFiles[i].getInputStream().getFD();
+ }
+ Thread serverThread = new Thread() {
+ public void run(){
+ // Run server
+ DomainSocket conn = null;
+ try {
+ conn = serv.accept();
+ byte in1[] = new byte[clientMsg1.length];
+ InputStream connInputStream = conn.getInputStream();
+ IOUtils.readFully(connInputStream, in1, 0, in1.length);
+ Assert.assertTrue(Arrays.equals(clientMsg1, in1));
+ DomainSocket domainConn = (DomainSocket)conn;
+ domainConn.sendFileDescriptors(passedFds, serverMsg1, 0,
+ serverMsg1.length);
+ conn.close();
+ } catch (Throwable e) {
+ threadResults.add(e);
+ Assert.fail(e.getMessage());
+ }
+ threadResults.add(new Success());
+ }
+ };
+ serverThread.start();
+
+ Thread clientThread = new Thread() {
+ public void run(){
+ try {
+ DomainSocket client = DomainSocket.connect(TEST_PATH);
+ OutputStream clientOutputStream = client.getOutputStream();
+ InputStream clientInputStream = client.getInputStream();
+ clientOutputStream.write(clientMsg1);
+ DomainSocket domainConn = (DomainSocket)client;
+ byte in1[] = new byte[serverMsg1.length];
+ FileInputStream recvFis[] = new FileInputStream[passedFds.length];
+ int r = domainConn.
+ recvFileInputStreams(recvFis, in1, 0, in1.length - 1);
+ Assert.assertTrue(r > 0);
+ IOUtils.readFully(clientInputStream, in1, r, in1.length - r);
+ Assert.assertTrue(Arrays.equals(serverMsg1, in1));
+ for (int i = 0; i < passedFds.length; i++) {
+ Assert.assertNotNull(recvFis[i]);
+ passedFiles[i].checkInputStream(recvFis[i]);
+ }
+ for (FileInputStream fis : recvFis) {
+ fis.close();
+ }
+ client.close();
+ } catch (Throwable e) {
+ threadResults.add(e);
+ }
+ threadResults.add(new Success());
+ }
+ };
+ clientThread.start();
+
+ for (int i = 0; i < 2; i++) {
+ Throwable t = threadResults.take();
+ if (!(t instanceof Success)) {
+ Assert.fail(t.getMessage() + ExceptionUtils.getStackTrace(t));
+ }
+ }
+ serverThread.join(120000);
+ clientThread.join(120000);
+ serv.close();
+ for (PassedFile pf : passedFiles) {
+ pf.cleanup();
+ }
+ }
+
+ /**
+ * Run validateSocketPathSecurity
+ *
+ * @param str The path to validate
+ * @param prefix A prefix to skip validation for
+ * @throws IOException
+ */
+ private static void testValidateSocketPath(String str, String prefix)
+ throws IOException {
+ int skipComponents = 0;
+ File prefixFile = new File(prefix);
+ while (true) {
+ prefixFile = prefixFile.getParentFile();
+ if (prefixFile == null) {
+ break;
+ }
+ skipComponents++;
+ }
+ DomainSocket.validateSocketPathSecurity0(str,
+ skipComponents);
+ }
+
+ /**
+ * Test file descriptor path security.
+ *
+ * @throws IOException
+ */
+ @Test(timeout=180000)
+ public void testFdPassingPathSecurity() throws Exception {
+ TemporarySocketDirectory tmp = new TemporarySocketDirectory();
+ try {
+ String prefix = tmp.getDir().getAbsolutePath();
+ Shell.execCommand(new String [] {
+ "mkdir", "-p", prefix + "/foo/bar/baz" });
+ Shell.execCommand(new String [] {
+ "chmod", "0700", prefix + "/foo/bar/baz" });
+ Shell.execCommand(new String [] {
+ "chmod", "0700", prefix + "/foo/bar" });
+ Shell.execCommand(new String [] {
+ "chmod", "0707", prefix + "/foo" });
+ Shell.execCommand(new String [] {
+ "mkdir", "-p", prefix + "/q1/q2" });
+ Shell.execCommand(new String [] {
+ "chmod", "0700", prefix + "/q1" });
+ Shell.execCommand(new String [] {
+ "chmod", "0700", prefix + "/q1/q2" });
+ testValidateSocketPath(prefix + "/q1/q2", prefix);
+ try {
+ testValidateSocketPath(prefix + "/foo/bar/baz", prefix);
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains("/foo' is world-writable. " +
+ "Its permissions are 0707. Please fix this or select a " +
+ "different socket path.", e);
+ }
+ try {
+ testValidateSocketPath(prefix + "/nope", prefix);
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains("failed to stat a path " +
+ "component: ", e);
+ }
+ // Root should be secure
+ DomainSocket.validateSocketPathSecurity0("/foo", 0);
+ } finally {
+ tmp.close();
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
index 7c509ee7e80..337cbec292e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
@@ -4,3 +4,6 @@ These will be integrated to trunk CHANGES.txt after merge
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.
(Colin Patrick McCabe via todd)
+
+HDFS-4354. Create DomainSocket and DomainPeer and associated unit tests.
+(Colin Patrick McCabe via todd)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
index eb2d0c92d9e..7c6c3f4b6ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
@@ -23,6 +23,8 @@ import java.io.OutputStream;
import java.net.Socket;
import java.nio.channels.ReadableByteChannel;
+import org.apache.hadoop.net.unix.DomainSocket;
+
/**
* Represents a peer that we communicate with by using a basic Socket
* that has no associated Channel.
@@ -118,4 +120,9 @@ class BasicInetPeer implements Peer {
public String toString() {
return "BasicInetPeer(" + socket.toString() + ")";
}
+
+ @Override
+ public DomainSocket getDomainSocket() {
+ return null;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
new file mode 100644
index 00000000000..abe9b7d52df
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Represents a peer that we communicate with by using blocking I/O
+ * on a UNIX domain socket.
+ */
+@InterfaceAudience.Private
+public class DomainPeer implements Peer {
+ private final DomainSocket socket;
+ private final OutputStream out;
+ private final InputStream in;
+ private final ReadableByteChannel channel;
+
+ public DomainPeer(DomainSocket socket) {
+ this.socket = socket;
+ this.out = socket.getOutputStream();
+ this.in = socket.getInputStream();
+ this.channel = socket.getChannel();
+ }
+
+ @Override
+ public ReadableByteChannel getInputStreamChannel() {
+ return channel;
+ }
+
+ @Override
+ public void setReadTimeout(int timeoutMs) throws IOException {
+ socket.setAttribute(DomainSocket.RCV_TIMEO, timeoutMs);
+ }
+
+ @Override
+ public int getReceiveBufferSize() throws IOException {
+ return socket.getAttribute(DomainSocket.RCV_BUF_SIZE);
+ }
+
+ @Override
+ public boolean getTcpNoDelay() throws IOException {
+ /* No TCP, no TCP_NODELAY. */
+ return false;
+ }
+
+ @Override
+ public void setWriteTimeout(int timeoutMs) throws IOException {
+ socket.setAttribute(DomainSocket.SND_TIMEO, timeoutMs);
+ }
+
+ @Override
+ public boolean isClosed() {
+ return !socket.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ socket.close();
+ }
+
+ @Override
+ public String getRemoteAddressString() {
+ return "unix:" + socket.getPath();
+ }
+
+ @Override
+ public String getLocalAddressString() {
+ return "";
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return in;
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return out;
+ }
+
+ @Override
+ public boolean isLocal() {
+ /* UNIX domain sockets can only be used for local communication. */
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "DomainPeer(" + getRemoteAddressString() + ")";
+ }
+
+ @Override
+ public DomainSocket getDomainSocket() {
+ return socket;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
new file mode 100644
index 00000000000..d22584fc787
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.net;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.net.PeerServer;
+import org.apache.hadoop.net.unix.DomainSocket;
+
+class DomainPeerServer implements PeerServer {
+ static Log LOG = LogFactory.getLog(DomainPeerServer.class);
+ private final DomainSocket sock;
+
+ DomainPeerServer(DomainSocket sock) {
+ this.sock = sock;
+ }
+
+ public DomainPeerServer(String path, int port)
+ throws IOException {
+ this(DomainSocket.bindAndListen(DomainSocket.getEffectivePath(path, port)));
+ }
+
+ public String getBindPath() {
+ return sock.getPath();
+ }
+
+ @Override
+ public void setReceiveBufferSize(int size) throws IOException {
+ sock.setAttribute(DomainSocket.RCV_BUF_SIZE, size);
+ }
+
+ @Override
+ public Peer accept() throws IOException, SocketTimeoutException {
+ DomainSocket connSock = sock.accept();
+ Peer peer = null;
+ boolean success = false;
+ try {
+ peer = new DomainPeer(connSock);
+ success = true;
+ return peer;
+ } finally {
+ if (!success) {
+ if (peer != null) peer.close();
+ connSock.close();
+ }
+ }
+ }
+
+ @Override
+ public String getListeningString() {
+ return "unix:" + sock.getPath();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ sock.close();
+ } catch (IOException e) {
+ LOG.error("error closing DomainPeerServer: ", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "DomainPeerServer(" + getListeningString() + ")";
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
index 295632ca63d..6b0e506d3cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.net.unix.DomainSocket;
import java.io.InputStream;
import java.io.OutputStream;
@@ -133,4 +134,9 @@ public class EncryptedPeer implements Peer {
public String toString() {
return "EncryptedPeer(" + enclosedPeer + ")";
}
+
+ @Override
+ public DomainSocket getDomainSocket() {
+ return enclosedPeer.getDomainSocket();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
index 1186490512b..1dc9d1d1186 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
@@ -25,6 +25,7 @@ import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.net.SocketInputStream;
import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.net.unix.DomainSocket;
/**
* Represents a peer that we communicate with by using non-blocking I/O
@@ -122,4 +123,9 @@ class NioInetPeer implements Peer {
public String toString() {
return "NioInetPeer(" + socket.toString() + ")";
}
+
+ @Override
+ public DomainSocket getDomainSocket() {
+ return null;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
index 129ada76116..862720e921f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.net.unix.DomainSocket;
/**
* Represents a connection to a peer.
@@ -105,4 +106,10 @@ public interface Peer extends Closeable {
* computer as we.
*/
public boolean isLocal();
+
+ /**
+ * @return The DomainSocket associated with the current
+ * peer, or null if there is none.
+ */
+ public DomainSocket getDomainSocket();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
index 0953c410b03..bb580bc9534 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.net.unix.DomainSocket;
import org.junit.Test;
public class TestPeerCache {
@@ -114,6 +115,11 @@ public class TestPeerCache {
public String toString() {
return "FakePeer(dnId=" + dnId + ")";
}
+
+ @Override
+ public DomainSocket getDomainSocket() {
+ return null;
+ }
}
@Test