HDFS-5746. Add ShortCircuitSharedMemorySegment (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1563363 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a420b40e00
commit
4fa696384d
|
@ -395,9 +395,9 @@ checkJavadocWarnings () {
|
||||||
echo ""
|
echo ""
|
||||||
echo "There appear to be $javadocWarnings javadoc warnings generated by the patched build."
|
echo "There appear to be $javadocWarnings javadoc warnings generated by the patched build."
|
||||||
|
|
||||||
#There are 12 warnings that are caused by things that are caused by using sun internal APIs.
|
#There are 14 warnings that are caused by things that are caused by using sun internal APIs.
|
||||||
#There are 2 warnings that are caused by the Apache DS Dn class used in MiniKdc.
|
#There are 2 warnings that are caused by the Apache DS Dn class used in MiniKdc.
|
||||||
OK_JAVADOC_WARNINGS=14;
|
OK_JAVADOC_WARNINGS=16;
|
||||||
### if current warnings greater than OK_JAVADOC_WARNINGS
|
### if current warnings greater than OK_JAVADOC_WARNINGS
|
||||||
if [[ $javadocWarnings -gt $OK_JAVADOC_WARNINGS ]] ; then
|
if [[ $javadocWarnings -gt $OK_JAVADOC_WARNINGS ]] ; then
|
||||||
JIRA_COMMENT="$JIRA_COMMENT
|
JIRA_COMMENT="$JIRA_COMMENT
|
||||||
|
|
|
@ -545,6 +545,7 @@
|
||||||
<javahClassName>org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor</javahClassName>
|
<javahClassName>org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor</javahClassName>
|
||||||
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsMapping</javahClassName>
|
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsMapping</javahClassName>
|
||||||
<javahClassName>org.apache.hadoop.io.nativeio.NativeIO</javahClassName>
|
<javahClassName>org.apache.hadoop.io.nativeio.NativeIO</javahClassName>
|
||||||
|
<javahClassName>org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory</javahClassName>
|
||||||
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
|
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
|
||||||
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
|
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
|
||||||
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
|
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
|
||||||
|
@ -552,6 +553,7 @@
|
||||||
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
|
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
|
||||||
<javahClassName>org.apache.hadoop.util.NativeCrc32</javahClassName>
|
<javahClassName>org.apache.hadoop.util.NativeCrc32</javahClassName>
|
||||||
<javahClassName>org.apache.hadoop.net.unix.DomainSocket</javahClassName>
|
<javahClassName>org.apache.hadoop.net.unix.DomainSocket</javahClassName>
|
||||||
|
<javahClassName>org.apache.hadoop.net.unix.DomainSocketWatcher</javahClassName>
|
||||||
</javahClassNames>
|
</javahClassNames>
|
||||||
<javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory>
|
<javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -178,7 +178,9 @@ add_dual_library(hadoop
|
||||||
${D}/io/nativeio/NativeIO.c
|
${D}/io/nativeio/NativeIO.c
|
||||||
${D}/io/nativeio/errno_enum.c
|
${D}/io/nativeio/errno_enum.c
|
||||||
${D}/io/nativeio/file_descriptor.c
|
${D}/io/nativeio/file_descriptor.c
|
||||||
|
${D}/io/nativeio/SharedFileDescriptorFactory.c
|
||||||
${D}/net/unix/DomainSocket.c
|
${D}/net/unix/DomainSocket.c
|
||||||
|
${D}/net/unix/DomainSocketWatcher.c
|
||||||
${D}/security/JniBasedUnixGroupsMapping.c
|
${D}/security/JniBasedUnixGroupsMapping.c
|
||||||
${D}/security/JniBasedUnixGroupsNetgroupMapping.c
|
${D}/security/JniBasedUnixGroupsNetgroupMapping.c
|
||||||
${D}/security/hadoop_group_info.c
|
${D}/security/hadoop_group_info.c
|
||||||
|
|
|
@ -487,6 +487,16 @@ public class NativeIO {
|
||||||
new ConcurrentHashMap<Integer, CachedName>();
|
new ConcurrentHashMap<Integer, CachedName>();
|
||||||
|
|
||||||
private enum IdCache { USER, GROUP }
|
private enum IdCache { USER, GROUP }
|
||||||
|
|
||||||
|
public final static int MMAP_PROT_READ = 0x1;
|
||||||
|
public final static int MMAP_PROT_WRITE = 0x2;
|
||||||
|
public final static int MMAP_PROT_EXEC = 0x4;
|
||||||
|
|
||||||
|
public static native long mmap(FileDescriptor fd, int prot,
|
||||||
|
boolean shared, long length) throws IOException;
|
||||||
|
|
||||||
|
public static native void munmap(long addr, long length)
|
||||||
|
throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean workaroundNonThreadSafePasswdCalls = false;
|
private static boolean workaroundNonThreadSafePasswdCalls = false;
|
||||||
|
|
|
@ -0,0 +1,90 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.nativeio;
|
||||||
|
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.FileDescriptor;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.SystemUtils;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A factory for creating shared file descriptors inside a given directory.
|
||||||
|
* Typically, the directory will be /dev/shm or /tmp.
|
||||||
|
*
|
||||||
|
* We will hand out file descriptors that correspond to unlinked files residing
|
||||||
|
* in that directory. These file descriptors are suitable for sharing across
|
||||||
|
* multiple processes and are both readable and writable.
|
||||||
|
*
|
||||||
|
* Because we unlink the temporary files right after creating them, a JVM crash
|
||||||
|
* usually does not leave behind any temporary files in the directory. However,
|
||||||
|
* it may happen that we crash right after creating the file and before
|
||||||
|
* unlinking it. In the constructor, we attempt to clean up after any such
|
||||||
|
* remnants by trying to unlink any temporary files created by previous
|
||||||
|
* SharedFileDescriptorFactory instances that also used our prefix.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class SharedFileDescriptorFactory {
|
||||||
|
private final String prefix;
|
||||||
|
private final String path;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a SharedFileDescriptorFactory.
|
||||||
|
*
|
||||||
|
* @param prefix Prefix to add to all file names we use.
|
||||||
|
* @param path Path to use.
|
||||||
|
*/
|
||||||
|
public SharedFileDescriptorFactory(String prefix, String path)
|
||||||
|
throws IOException {
|
||||||
|
Preconditions.checkArgument(NativeIO.isAvailable());
|
||||||
|
Preconditions.checkArgument(SystemUtils.IS_OS_UNIX);
|
||||||
|
this.prefix = prefix;
|
||||||
|
this.path = path;
|
||||||
|
deleteStaleTemporaryFiles0(prefix, path);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a shared file descriptor which will be both readable and writable.
|
||||||
|
*
|
||||||
|
* @param length The starting file length.
|
||||||
|
*
|
||||||
|
* @return The file descriptor, wrapped in a FileInputStream.
|
||||||
|
* @throws IOException If there was an I/O or configuration error creating
|
||||||
|
* the descriptor.
|
||||||
|
*/
|
||||||
|
public FileInputStream createDescriptor(int length) throws IOException {
|
||||||
|
return new FileInputStream(createDescriptor0(prefix, path, length));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete temporary files in the directory, NOT following symlinks.
|
||||||
|
*/
|
||||||
|
private static native void deleteStaleTemporaryFiles0(String prefix,
|
||||||
|
String path) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a file with O_EXCL, and then resize it to the desired size.
|
||||||
|
*/
|
||||||
|
private static native FileDescriptor createDescriptor0(String prefix,
|
||||||
|
String path, int length) throws IOException;
|
||||||
|
}
|
|
@ -24,17 +24,15 @@ import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.SocketException;
|
|
||||||
import java.nio.channels.AsynchronousCloseException;
|
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.nio.channels.ReadableByteChannel;
|
import java.nio.channels.ReadableByteChannel;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import org.apache.commons.lang.SystemUtils;
|
import org.apache.commons.lang.SystemUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.util.NativeCodeLoader;
|
import org.apache.hadoop.util.NativeCodeLoader;
|
||||||
|
import org.apache.hadoop.util.CloseableReferenceCount;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -132,104 +130,14 @@ public class DomainSocket implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tracks the reference count of the file descriptor, and also whether it is
|
* The socket reference count and closed bit.
|
||||||
* open or closed.
|
|
||||||
*/
|
*/
|
||||||
private static class Status {
|
final CloseableReferenceCount refCount;
|
||||||
/**
|
|
||||||
* Bit mask representing a closed domain socket.
|
|
||||||
*/
|
|
||||||
private static final int STATUS_CLOSED_MASK = 1 << 30;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Status bits
|
|
||||||
*
|
|
||||||
* Bit 30: 0 = DomainSocket open, 1 = DomainSocket closed
|
|
||||||
* Bits 29 to 0: the reference count.
|
|
||||||
*/
|
|
||||||
private final AtomicInteger bits = new AtomicInteger(0);
|
|
||||||
|
|
||||||
Status() { }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Increment the reference count of the underlying file descriptor.
|
|
||||||
*
|
|
||||||
* @throws ClosedChannelException If the file descriptor is closed.
|
|
||||||
*/
|
|
||||||
void reference() throws ClosedChannelException {
|
|
||||||
int curBits = bits.incrementAndGet();
|
|
||||||
if ((curBits & STATUS_CLOSED_MASK) != 0) {
|
|
||||||
bits.decrementAndGet();
|
|
||||||
throw new ClosedChannelException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Decrement the reference count of the underlying file descriptor.
|
|
||||||
*
|
|
||||||
* @param checkClosed Whether to throw an exception if the file
|
|
||||||
* descriptor is closed.
|
|
||||||
*
|
|
||||||
* @throws AsynchronousCloseException If the file descriptor is closed and
|
|
||||||
* checkClosed is set.
|
|
||||||
*/
|
|
||||||
void unreference(boolean checkClosed) throws AsynchronousCloseException {
|
|
||||||
int newCount = bits.decrementAndGet();
|
|
||||||
assert (newCount & ~STATUS_CLOSED_MASK) >= 0;
|
|
||||||
if (checkClosed && ((newCount & STATUS_CLOSED_MASK) != 0)) {
|
|
||||||
throw new AsynchronousCloseException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return true if the file descriptor is currently open.
|
|
||||||
*
|
|
||||||
* @return True if the file descriptor is currently open.
|
|
||||||
*/
|
|
||||||
boolean isOpen() {
|
|
||||||
return ((bits.get() & STATUS_CLOSED_MASK) == 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mark the file descriptor as closed.
|
|
||||||
*
|
|
||||||
* Once the file descriptor is closed, it cannot be reopened.
|
|
||||||
*
|
|
||||||
* @return The current reference count.
|
|
||||||
* @throws ClosedChannelException If someone else closes the file
|
|
||||||
* descriptor before we do.
|
|
||||||
*/
|
|
||||||
int setClosed() throws ClosedChannelException {
|
|
||||||
while (true) {
|
|
||||||
int curBits = bits.get();
|
|
||||||
if ((curBits & STATUS_CLOSED_MASK) != 0) {
|
|
||||||
throw new ClosedChannelException();
|
|
||||||
}
|
|
||||||
if (bits.compareAndSet(curBits, curBits | STATUS_CLOSED_MASK)) {
|
|
||||||
return curBits & (~STATUS_CLOSED_MASK);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the current reference count.
|
|
||||||
*
|
|
||||||
* @return The current reference count.
|
|
||||||
*/
|
|
||||||
int getReferenceCount() {
|
|
||||||
return bits.get() & (~STATUS_CLOSED_MASK);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The socket status.
|
|
||||||
*/
|
|
||||||
private final Status status;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The file descriptor associated with this UNIX domain socket.
|
* The file descriptor associated with this UNIX domain socket.
|
||||||
*/
|
*/
|
||||||
private final int fd;
|
final int fd;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The path associated with this UNIX domain socket.
|
* The path associated with this UNIX domain socket.
|
||||||
|
@ -252,13 +160,21 @@ public class DomainSocket implements Closeable {
|
||||||
private final DomainChannel channel = new DomainChannel();
|
private final DomainChannel channel = new DomainChannel();
|
||||||
|
|
||||||
private DomainSocket(String path, int fd) {
|
private DomainSocket(String path, int fd) {
|
||||||
this.status = new Status();
|
this.refCount = new CloseableReferenceCount();
|
||||||
this.fd = fd;
|
this.fd = fd;
|
||||||
this.path = path;
|
this.path = path;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static native int bind0(String path) throws IOException;
|
private static native int bind0(String path) throws IOException;
|
||||||
|
|
||||||
|
private void unreference(boolean checkClosed) throws ClosedChannelException {
|
||||||
|
if (checkClosed) {
|
||||||
|
refCount.unreferenceCheckClosed();
|
||||||
|
} else {
|
||||||
|
refCount.unreference();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new DomainSocket listening on the given path.
|
* Create a new DomainSocket listening on the given path.
|
||||||
*
|
*
|
||||||
|
@ -308,14 +224,14 @@ public class DomainSocket implements Closeable {
|
||||||
* @throws SocketTimeoutException If the accept timed out.
|
* @throws SocketTimeoutException If the accept timed out.
|
||||||
*/
|
*/
|
||||||
public DomainSocket accept() throws IOException {
|
public DomainSocket accept() throws IOException {
|
||||||
status.reference();
|
refCount.reference();
|
||||||
boolean exc = true;
|
boolean exc = true;
|
||||||
try {
|
try {
|
||||||
DomainSocket ret = new DomainSocket(path, accept0(fd));
|
DomainSocket ret = new DomainSocket(path, accept0(fd));
|
||||||
exc = false;
|
exc = false;
|
||||||
return ret;
|
return ret;
|
||||||
} finally {
|
} finally {
|
||||||
status.unreference(exc);
|
unreference(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -341,7 +257,7 @@ public class DomainSocket implements Closeable {
|
||||||
* @return True if the file descriptor is currently open.
|
* @return True if the file descriptor is currently open.
|
||||||
*/
|
*/
|
||||||
public boolean isOpen() {
|
public boolean isOpen() {
|
||||||
return status.isOpen();
|
return refCount.isOpen();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -381,20 +297,20 @@ public class DomainSocket implements Closeable {
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
public void setAttribute(int type, int size) throws IOException {
|
public void setAttribute(int type, int size) throws IOException {
|
||||||
status.reference();
|
refCount.reference();
|
||||||
boolean exc = true;
|
boolean exc = true;
|
||||||
try {
|
try {
|
||||||
setAttribute0(fd, type, size);
|
setAttribute0(fd, type, size);
|
||||||
exc = false;
|
exc = false;
|
||||||
} finally {
|
} finally {
|
||||||
status.unreference(exc);
|
unreference(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private native int getAttribute0(int fd, int type) throws IOException;
|
private native int getAttribute0(int fd, int type) throws IOException;
|
||||||
|
|
||||||
public int getAttribute(int type) throws IOException {
|
public int getAttribute(int type) throws IOException {
|
||||||
status.reference();
|
refCount.reference();
|
||||||
int attribute;
|
int attribute;
|
||||||
boolean exc = true;
|
boolean exc = true;
|
||||||
try {
|
try {
|
||||||
|
@ -402,7 +318,7 @@ public class DomainSocket implements Closeable {
|
||||||
exc = false;
|
exc = false;
|
||||||
return attribute;
|
return attribute;
|
||||||
} finally {
|
} finally {
|
||||||
status.unreference(exc);
|
unreference(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -419,9 +335,9 @@ public class DomainSocket implements Closeable {
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
// Set the closed bit on this DomainSocket
|
// Set the closed bit on this DomainSocket
|
||||||
int refCount;
|
int count;
|
||||||
try {
|
try {
|
||||||
refCount = status.setClosed();
|
count = refCount.setClosed();
|
||||||
} catch (ClosedChannelException e) {
|
} catch (ClosedChannelException e) {
|
||||||
// Someone else already closed the DomainSocket.
|
// Someone else already closed the DomainSocket.
|
||||||
return;
|
return;
|
||||||
|
@ -429,7 +345,7 @@ public class DomainSocket implements Closeable {
|
||||||
// Wait for all references to go away
|
// Wait for all references to go away
|
||||||
boolean didShutdown = false;
|
boolean didShutdown = false;
|
||||||
boolean interrupted = false;
|
boolean interrupted = false;
|
||||||
while (refCount > 0) {
|
while (count > 0) {
|
||||||
if (!didShutdown) {
|
if (!didShutdown) {
|
||||||
try {
|
try {
|
||||||
// Calling shutdown on the socket will interrupt blocking system
|
// Calling shutdown on the socket will interrupt blocking system
|
||||||
|
@ -446,7 +362,7 @@ public class DomainSocket implements Closeable {
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
interrupted = true;
|
interrupted = true;
|
||||||
}
|
}
|
||||||
refCount = status.getReferenceCount();
|
count = refCount.getReferenceCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
// At this point, nobody has a reference to the file descriptor,
|
// At this point, nobody has a reference to the file descriptor,
|
||||||
|
@ -478,13 +394,13 @@ public class DomainSocket implements Closeable {
|
||||||
*/
|
*/
|
||||||
public void sendFileDescriptors(FileDescriptor descriptors[],
|
public void sendFileDescriptors(FileDescriptor descriptors[],
|
||||||
byte jbuf[], int offset, int length) throws IOException {
|
byte jbuf[], int offset, int length) throws IOException {
|
||||||
status.reference();
|
refCount.reference();
|
||||||
boolean exc = true;
|
boolean exc = true;
|
||||||
try {
|
try {
|
||||||
sendFileDescriptors0(fd, descriptors, jbuf, offset, length);
|
sendFileDescriptors0(fd, descriptors, jbuf, offset, length);
|
||||||
exc = false;
|
exc = false;
|
||||||
} finally {
|
} finally {
|
||||||
status.unreference(exc);
|
unreference(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -515,14 +431,14 @@ public class DomainSocket implements Closeable {
|
||||||
*/
|
*/
|
||||||
public int receiveFileDescriptors(FileDescriptor[] descriptors,
|
public int receiveFileDescriptors(FileDescriptor[] descriptors,
|
||||||
byte jbuf[], int offset, int length) throws IOException {
|
byte jbuf[], int offset, int length) throws IOException {
|
||||||
status.reference();
|
refCount.reference();
|
||||||
boolean exc = true;
|
boolean exc = true;
|
||||||
try {
|
try {
|
||||||
int nBytes = receiveFileDescriptors0(fd, descriptors, jbuf, offset, length);
|
int nBytes = receiveFileDescriptors0(fd, descriptors, jbuf, offset, length);
|
||||||
exc = false;
|
exc = false;
|
||||||
return nBytes;
|
return nBytes;
|
||||||
} finally {
|
} finally {
|
||||||
status.unreference(exc);
|
unreference(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -539,7 +455,7 @@ public class DomainSocket implements Closeable {
|
||||||
for (int i = 0; i < streams.length; i++) {
|
for (int i = 0; i < streams.length; i++) {
|
||||||
streams[i] = null;
|
streams[i] = null;
|
||||||
}
|
}
|
||||||
status.reference();
|
refCount.reference();
|
||||||
try {
|
try {
|
||||||
int ret = receiveFileDescriptors0(fd, descriptors, buf, offset, length);
|
int ret = receiveFileDescriptors0(fd, descriptors, buf, offset, length);
|
||||||
for (int i = 0, j = 0; i < descriptors.length; i++) {
|
for (int i = 0, j = 0; i < descriptors.length; i++) {
|
||||||
|
@ -569,7 +485,7 @@ public class DomainSocket implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
status.unreference(!success);
|
unreference(!success);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -593,7 +509,7 @@ public class DomainSocket implements Closeable {
|
||||||
public class DomainInputStream extends InputStream {
|
public class DomainInputStream extends InputStream {
|
||||||
@Override
|
@Override
|
||||||
public int read() throws IOException {
|
public int read() throws IOException {
|
||||||
status.reference();
|
refCount.reference();
|
||||||
boolean exc = true;
|
boolean exc = true;
|
||||||
try {
|
try {
|
||||||
byte b[] = new byte[1];
|
byte b[] = new byte[1];
|
||||||
|
@ -601,33 +517,33 @@ public class DomainSocket implements Closeable {
|
||||||
exc = false;
|
exc = false;
|
||||||
return (ret >= 0) ? b[0] : -1;
|
return (ret >= 0) ? b[0] : -1;
|
||||||
} finally {
|
} finally {
|
||||||
status.unreference(exc);
|
unreference(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int read(byte b[], int off, int len) throws IOException {
|
public int read(byte b[], int off, int len) throws IOException {
|
||||||
status.reference();
|
refCount.reference();
|
||||||
boolean exc = true;
|
boolean exc = true;
|
||||||
try {
|
try {
|
||||||
int nRead = DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
|
int nRead = DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
|
||||||
exc = false;
|
exc = false;
|
||||||
return nRead;
|
return nRead;
|
||||||
} finally {
|
} finally {
|
||||||
status.unreference(exc);
|
unreference(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int available() throws IOException {
|
public int available() throws IOException {
|
||||||
status.reference();
|
refCount.reference();
|
||||||
boolean exc = true;
|
boolean exc = true;
|
||||||
try {
|
try {
|
||||||
int nAvailable = DomainSocket.available0(DomainSocket.this.fd);
|
int nAvailable = DomainSocket.available0(DomainSocket.this.fd);
|
||||||
exc = false;
|
exc = false;
|
||||||
return nAvailable;
|
return nAvailable;
|
||||||
} finally {
|
} finally {
|
||||||
status.unreference(exc);
|
unreference(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -649,7 +565,7 @@ public class DomainSocket implements Closeable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(int val) throws IOException {
|
public void write(int val) throws IOException {
|
||||||
status.reference();
|
refCount.reference();
|
||||||
boolean exc = true;
|
boolean exc = true;
|
||||||
try {
|
try {
|
||||||
byte b[] = new byte[1];
|
byte b[] = new byte[1];
|
||||||
|
@ -657,19 +573,19 @@ public class DomainSocket implements Closeable {
|
||||||
DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1);
|
DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1);
|
||||||
exc = false;
|
exc = false;
|
||||||
} finally {
|
} finally {
|
||||||
status.unreference(exc);
|
unreference(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(byte[] b, int off, int len) throws IOException {
|
public void write(byte[] b, int off, int len) throws IOException {
|
||||||
status.reference();
|
refCount.reference();
|
||||||
boolean exc = true;
|
boolean exc = true;
|
||||||
try {
|
try {
|
||||||
DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len);
|
DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len);
|
||||||
exc = false;
|
exc = false;
|
||||||
} finally {
|
} finally {
|
||||||
status.unreference(exc);
|
unreference(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -688,7 +604,7 @@ public class DomainSocket implements Closeable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int read(ByteBuffer dst) throws IOException {
|
public int read(ByteBuffer dst) throws IOException {
|
||||||
status.reference();
|
refCount.reference();
|
||||||
boolean exc = true;
|
boolean exc = true;
|
||||||
try {
|
try {
|
||||||
int nread = 0;
|
int nread = 0;
|
||||||
|
@ -710,7 +626,7 @@ public class DomainSocket implements Closeable {
|
||||||
exc = false;
|
exc = false;
|
||||||
return nread;
|
return nread;
|
||||||
} finally {
|
} finally {
|
||||||
status.unreference(exc);
|
unreference(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,478 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.net.unix;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.EOFException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.channels.ClosedChannelException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.locks.Condition;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.SystemUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.util.NativeCodeLoader;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.util.concurrent.Uninterruptibles;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The DomainSocketWatcher watches a set of domain sockets to see when they
|
||||||
|
* become readable, or closed. When one of those events happens, it makes a
|
||||||
|
* callback.
|
||||||
|
*
|
||||||
|
* See {@link DomainSocket} for more information about UNIX domain sockets.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.LimitedPrivate("HDFS")
|
||||||
|
public final class DomainSocketWatcher extends Thread implements Closeable {
|
||||||
|
static {
|
||||||
|
if (SystemUtils.IS_OS_WINDOWS) {
|
||||||
|
loadingFailureReason = "UNIX Domain sockets are not available on Windows.";
|
||||||
|
} else if (!NativeCodeLoader.isNativeCodeLoaded()) {
|
||||||
|
loadingFailureReason = "libhadoop cannot be loaded.";
|
||||||
|
} else {
|
||||||
|
String problem;
|
||||||
|
try {
|
||||||
|
anchorNative();
|
||||||
|
problem = null;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
problem = "DomainSocketWatcher#anchorNative got error: " +
|
||||||
|
t.getMessage();
|
||||||
|
}
|
||||||
|
loadingFailureReason = problem;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static Log LOG = LogFactory.getLog(DomainSocketWatcher.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The reason why DomainSocketWatcher is not available, or null if it is
|
||||||
|
* available.
|
||||||
|
*/
|
||||||
|
private final static String loadingFailureReason;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes the native library code.
|
||||||
|
*/
|
||||||
|
private static native void anchorNative();
|
||||||
|
|
||||||
|
interface Handler {
|
||||||
|
/**
|
||||||
|
* Handles an event on a socket. An event may be the socket becoming
|
||||||
|
* readable, or the remote end being closed.
|
||||||
|
*
|
||||||
|
* @param sock The socket that the event occurred on.
|
||||||
|
* @return Whether we should close the socket.
|
||||||
|
*/
|
||||||
|
boolean handle(DomainSocket sock);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handler for {DomainSocketWatcher#notificationSockets[1]}
|
||||||
|
*/
|
||||||
|
private class NotificationHandler implements Handler {
|
||||||
|
public boolean handle(DomainSocket sock) {
|
||||||
|
try {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(this + ": NotificationHandler: doing a read on " +
|
||||||
|
sock.fd);
|
||||||
|
}
|
||||||
|
if (sock.getInputStream().read() == -1) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(this + ": NotificationHandler: got EOF on " + sock.fd);
|
||||||
|
}
|
||||||
|
throw new EOFException();
|
||||||
|
}
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(this + ": NotificationHandler: read succeeded on " +
|
||||||
|
sock.fd);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(this + ": NotificationHandler: setting closed to " +
|
||||||
|
"true for " + sock.fd);
|
||||||
|
}
|
||||||
|
closed = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class Entry {
|
||||||
|
final DomainSocket socket;
|
||||||
|
final Handler handler;
|
||||||
|
|
||||||
|
Entry(DomainSocket socket, Handler handler) {
|
||||||
|
this.socket = socket;
|
||||||
|
this.handler = handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
DomainSocket getDomainSocket() {
|
||||||
|
return socket;
|
||||||
|
}
|
||||||
|
|
||||||
|
Handler getHandler() {
|
||||||
|
return handler;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The FdSet is a set of file descriptors that gets passed to poll(2).
|
||||||
|
* It contains a native memory segment, so that we don't have to copy
|
||||||
|
* in the poll0 function.
|
||||||
|
*/
|
||||||
|
private static class FdSet {
|
||||||
|
private long data;
|
||||||
|
|
||||||
|
private native static long alloc0();
|
||||||
|
|
||||||
|
FdSet() {
|
||||||
|
data = alloc0();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a file descriptor to the set.
|
||||||
|
*
|
||||||
|
* @param fd The file descriptor to add.
|
||||||
|
*/
|
||||||
|
native void add(int fd);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a file descriptor from the set.
|
||||||
|
*
|
||||||
|
* @param fd The file descriptor to remove.
|
||||||
|
*/
|
||||||
|
native void remove(int fd);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get an array containing all the FDs marked as readable.
|
||||||
|
* Also clear the state of all FDs.
|
||||||
|
*
|
||||||
|
* @return An array containing all of the currently readable file
|
||||||
|
* descriptors.
|
||||||
|
*/
|
||||||
|
native int[] getAndClearReadableFds();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the object and de-allocate the memory used.
|
||||||
|
*/
|
||||||
|
native void close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lock which protects toAdd, toRemove, and closed.
|
||||||
|
*/
|
||||||
|
private final ReentrantLock lock = new ReentrantLock();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Condition variable which indicates that toAdd and toRemove have been
|
||||||
|
* processed.
|
||||||
|
*/
|
||||||
|
private final Condition processedCond = lock.newCondition();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Entries to add.
|
||||||
|
*/
|
||||||
|
private final LinkedList<Entry> toAdd =
|
||||||
|
new LinkedList<Entry>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Entries to remove.
|
||||||
|
*/
|
||||||
|
private final TreeMap<Integer, DomainSocket> toRemove =
|
||||||
|
new TreeMap<Integer, DomainSocket>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum length of time to go between checking whether the interrupted
|
||||||
|
* bit has been set for this thread.
|
||||||
|
*/
|
||||||
|
private final int interruptCheckPeriodMs;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A pair of sockets used to wake up the thread after it has called poll(2).
|
||||||
|
*/
|
||||||
|
private final DomainSocket notificationSockets[];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether or not this DomainSocketWatcher is closed.
|
||||||
|
*/
|
||||||
|
private boolean closed = false;
|
||||||
|
|
||||||
|
public DomainSocketWatcher(int interruptCheckPeriodMs) throws IOException {
|
||||||
|
if (loadingFailureReason != null) {
|
||||||
|
throw new UnsupportedOperationException(loadingFailureReason);
|
||||||
|
}
|
||||||
|
notificationSockets = DomainSocket.socketpair();
|
||||||
|
this.interruptCheckPeriodMs = interruptCheckPeriodMs;
|
||||||
|
Preconditions.checkArgument(interruptCheckPeriodMs > 0);
|
||||||
|
watcherThread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the DomainSocketWatcher and wait for its thread to terminate.
|
||||||
|
*
|
||||||
|
* If there is more than one close, all but the first will be ignored.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
try {
|
||||||
|
lock.lock();
|
||||||
|
if (closed) return;
|
||||||
|
LOG.info(this + ": closing");
|
||||||
|
closed = true;
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
// Close notificationSockets[0], so that notificationSockets[1] gets an EOF
|
||||||
|
// event. This will wake up the thread immediately if it is blocked inside
|
||||||
|
// the select() system call.
|
||||||
|
notificationSockets[0].close();
|
||||||
|
// Wait for the select thread to terminate.
|
||||||
|
Uninterruptibles.joinUninterruptibly(watcherThread);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a socket.
|
||||||
|
*
|
||||||
|
* @param sock The socket to add. It is an error to re-add a socket that
|
||||||
|
* we are already watching.
|
||||||
|
* @param handler The handler to associate with this socket. This may be
|
||||||
|
* called any time after this function is called.
|
||||||
|
*/
|
||||||
|
public void add(DomainSocket sock, Handler handler) {
|
||||||
|
try {
|
||||||
|
lock.lock();
|
||||||
|
checkNotClosed();
|
||||||
|
Entry entry = new Entry(sock, handler);
|
||||||
|
try {
|
||||||
|
sock.refCount.reference();
|
||||||
|
} catch (ClosedChannelException e) {
|
||||||
|
Preconditions.checkArgument(false,
|
||||||
|
"tried to add a closed DomainSocket to " + this);
|
||||||
|
}
|
||||||
|
toAdd.add(entry);
|
||||||
|
kick();
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
processedCond.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
this.interrupt();
|
||||||
|
}
|
||||||
|
if (!toAdd.contains(entry)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
checkNotClosed();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a socket. Its handler will be called.
|
||||||
|
*
|
||||||
|
* @param sock The socket to remove.
|
||||||
|
*/
|
||||||
|
public void remove(DomainSocket sock) {
|
||||||
|
try {
|
||||||
|
lock.lock();
|
||||||
|
checkNotClosed();
|
||||||
|
toRemove.put(sock.fd, sock);
|
||||||
|
kick();
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
processedCond.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
this.interrupt();
|
||||||
|
}
|
||||||
|
if (!toRemove.containsKey(sock.fd)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
checkNotClosed();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wake up the DomainSocketWatcher thread.
|
||||||
|
*/
|
||||||
|
private void kick() {
|
||||||
|
try {
|
||||||
|
notificationSockets[0].getOutputStream().write(0);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error(this + ": error writing to notificationSockets[0]", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that the DomainSocketWatcher is not closed.
|
||||||
|
* Must be called while holding the lock.
|
||||||
|
*/
|
||||||
|
private void checkNotClosed() {
|
||||||
|
Preconditions.checkState(lock.isHeldByCurrentThread());
|
||||||
|
if (closed) {
|
||||||
|
throw new RuntimeException("DomainSocketWatcher is closed.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendCallback(String caller, TreeMap<Integer, Entry> entries,
|
||||||
|
FdSet fdSet, int fd) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(this + ": " + caller + " starting sendCallback for fd " + fd);
|
||||||
|
}
|
||||||
|
Entry entry = entries.get(fd);
|
||||||
|
Preconditions.checkNotNull(entry,
|
||||||
|
this + ": fdSet contained " + fd + ", which we were " +
|
||||||
|
"not tracking.");
|
||||||
|
DomainSocket sock = entry.getDomainSocket();
|
||||||
|
if (entry.getHandler().handle(sock)) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(this + ": " + caller + ": closing fd " + fd +
|
||||||
|
" at the request of the handler.");
|
||||||
|
}
|
||||||
|
if (toRemove.remove(fd) != null) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(this + ": " + caller + " : sendCallback processed fd " +
|
||||||
|
fd + " in toRemove.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
sock.refCount.unreferenceCheckClosed();
|
||||||
|
} catch (IOException e) {
|
||||||
|
Preconditions.checkArgument(false,
|
||||||
|
this + ": file descriptor " + sock.fd + " was closed while " +
|
||||||
|
"still in the poll(2) loop.");
|
||||||
|
}
|
||||||
|
IOUtils.cleanup(LOG, sock);
|
||||||
|
entries.remove(fd);
|
||||||
|
fdSet.remove(fd);
|
||||||
|
} else {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(this + ": " + caller + ": sendCallback not " +
|
||||||
|
"closing fd " + fd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Thread watcherThread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
LOG.info(this + ": starting with interruptCheckPeriodMs = " +
|
||||||
|
interruptCheckPeriodMs);
|
||||||
|
final TreeMap<Integer, Entry> entries = new TreeMap<Integer, Entry>();
|
||||||
|
FdSet fdSet = new FdSet();
|
||||||
|
addNotificationSocket(entries, fdSet);
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
for (int fd : fdSet.getAndClearReadableFds()) {
|
||||||
|
sendCallback("getAndClearReadableFds", entries, fdSet, fd);
|
||||||
|
}
|
||||||
|
if (!(toAdd.isEmpty() && toRemove.isEmpty())) {
|
||||||
|
// Handle pending additions (before pending removes).
|
||||||
|
for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) {
|
||||||
|
Entry entry = iter.next();
|
||||||
|
DomainSocket sock = entry.getDomainSocket();
|
||||||
|
Entry prevEntry = entries.put(sock.fd, entry);
|
||||||
|
Preconditions.checkState(prevEntry == null,
|
||||||
|
this + ": tried to watch a file descriptor that we " +
|
||||||
|
"were already watching: " + sock);
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(this + ": adding fd " + sock.fd);
|
||||||
|
}
|
||||||
|
fdSet.add(sock.fd);
|
||||||
|
iter.remove();
|
||||||
|
}
|
||||||
|
// Handle pending removals
|
||||||
|
while (true) {
|
||||||
|
Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry();
|
||||||
|
if (entry == null) break;
|
||||||
|
sendCallback("handlePendingRemovals",
|
||||||
|
entries, fdSet, entry.getValue().fd);
|
||||||
|
}
|
||||||
|
processedCond.signalAll();
|
||||||
|
}
|
||||||
|
// Check if the thread should terminate. Doing this check now is
|
||||||
|
// easier than at the beginning of the loop, since we know toAdd and
|
||||||
|
// toRemove are now empty and processedCond has been notified if it
|
||||||
|
// needed to be.
|
||||||
|
if (closed) {
|
||||||
|
LOG.info(toString() + " thread terminating.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Check if someone sent our thread an InterruptedException while we
|
||||||
|
// were waiting in poll().
|
||||||
|
if (Thread.interrupted()) {
|
||||||
|
throw new InterruptedException();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
doPoll0(interruptCheckPeriodMs, fdSet);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.info(toString() + " terminating on InterruptedException");
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error(toString() + " terminating on IOException", e);
|
||||||
|
} finally {
|
||||||
|
for (Entry entry : entries.values()) {
|
||||||
|
sendCallback("close", entries, fdSet, entry.getDomainSocket().fd);
|
||||||
|
}
|
||||||
|
entries.clear();
|
||||||
|
fdSet.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
private void addNotificationSocket(final TreeMap<Integer, Entry> entries,
|
||||||
|
FdSet fdSet) {
|
||||||
|
entries.put(notificationSockets[1].fd,
|
||||||
|
new Entry(notificationSockets[1], new NotificationHandler()));
|
||||||
|
try {
|
||||||
|
notificationSockets[1].refCount.reference();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
fdSet.add(notificationSockets[1].fd);
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(this + ": adding notificationSocket " +
|
||||||
|
notificationSockets[1].fd + ", connected to " +
|
||||||
|
notificationSockets[0].fd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return "DomainSocketWatcher(" + System.identityHashCode(this) + ")";
|
||||||
|
}
|
||||||
|
|
||||||
|
private static native int doPoll0(int maxWaitMs, FdSet readFds)
|
||||||
|
throws IOException;
|
||||||
|
}
|
|
@ -0,0 +1,125 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
import java.nio.channels.AsynchronousCloseException;
|
||||||
|
import java.nio.channels.ClosedChannelException;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A closeable object that maintains a reference count.
|
||||||
|
*
|
||||||
|
* Once the object is closed, attempting to take a new reference will throw
|
||||||
|
* ClosedChannelException.
|
||||||
|
*/
|
||||||
|
public class CloseableReferenceCount {
|
||||||
|
/**
|
||||||
|
* Bit mask representing a closed domain socket.
|
||||||
|
*/
|
||||||
|
private static final int STATUS_CLOSED_MASK = 1 << 30;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The status bits.
|
||||||
|
*
|
||||||
|
* Bit 30: 0 = open, 1 = closed.
|
||||||
|
* Bits 29 to 0: the reference count.
|
||||||
|
*/
|
||||||
|
private final AtomicInteger status = new AtomicInteger(0);
|
||||||
|
|
||||||
|
public CloseableReferenceCount() { }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increment the reference count.
|
||||||
|
*
|
||||||
|
* @throws ClosedChannelException If the status is closed.
|
||||||
|
*/
|
||||||
|
public void reference() throws ClosedChannelException {
|
||||||
|
int curBits = status.incrementAndGet();
|
||||||
|
if ((curBits & STATUS_CLOSED_MASK) != 0) {
|
||||||
|
status.decrementAndGet();
|
||||||
|
throw new ClosedChannelException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decrement the reference count.
|
||||||
|
*
|
||||||
|
* @return True if the object is closed and has no outstanding
|
||||||
|
* references.
|
||||||
|
*/
|
||||||
|
public boolean unreference() {
|
||||||
|
int newVal = status.decrementAndGet();
|
||||||
|
Preconditions.checkState(newVal != 0xffffffff,
|
||||||
|
"called unreference when the reference count was already at 0.");
|
||||||
|
return newVal == STATUS_CLOSED_MASK;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decrement the reference count, checking to make sure that the
|
||||||
|
* CloseableReferenceCount is not closed.
|
||||||
|
*
|
||||||
|
* @throws AsynchronousCloseException If the status is closed.
|
||||||
|
*/
|
||||||
|
public void unreferenceCheckClosed() throws ClosedChannelException {
|
||||||
|
int newVal = status.decrementAndGet();
|
||||||
|
if ((newVal & STATUS_CLOSED_MASK) != 0) {
|
||||||
|
throw new AsynchronousCloseException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return true if the status is currently open.
|
||||||
|
*
|
||||||
|
* @return True if the status is currently open.
|
||||||
|
*/
|
||||||
|
public boolean isOpen() {
|
||||||
|
return ((status.get() & STATUS_CLOSED_MASK) == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mark the status as closed.
|
||||||
|
*
|
||||||
|
* Once the status is closed, it cannot be reopened.
|
||||||
|
*
|
||||||
|
* @return The current reference count.
|
||||||
|
* @throws ClosedChannelException If someone else closes the object
|
||||||
|
* before we do.
|
||||||
|
*/
|
||||||
|
public int setClosed() throws ClosedChannelException {
|
||||||
|
while (true) {
|
||||||
|
int curBits = status.get();
|
||||||
|
if ((curBits & STATUS_CLOSED_MASK) != 0) {
|
||||||
|
throw new ClosedChannelException();
|
||||||
|
}
|
||||||
|
if (status.compareAndSet(curBits, curBits | STATUS_CLOSED_MASK)) {
|
||||||
|
return curBits & (~STATUS_CLOSED_MASK);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the current reference count.
|
||||||
|
*
|
||||||
|
* @return The current reference count.
|
||||||
|
*/
|
||||||
|
public int getReferenceCount() {
|
||||||
|
return status.get() & (~STATUS_CLOSED_MASK);
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
#include "org_apache_hadoop.h"
|
#include "org_apache_hadoop.h"
|
||||||
#include "org_apache_hadoop_io_nativeio_NativeIO.h"
|
#include "org_apache_hadoop_io_nativeio_NativeIO.h"
|
||||||
|
#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h"
|
||||||
|
|
||||||
#ifdef UNIX
|
#ifdef UNIX
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
@ -49,6 +50,10 @@
|
||||||
#include "file_descriptor.h"
|
#include "file_descriptor.h"
|
||||||
#include "errno_enum.h"
|
#include "errno_enum.h"
|
||||||
|
|
||||||
|
#define MMAP_PROT_READ org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_READ
|
||||||
|
#define MMAP_PROT_WRITE org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_WRITE
|
||||||
|
#define MMAP_PROT_EXEC org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_EXEC
|
||||||
|
|
||||||
// the NativeIO$POSIX$Stat inner class and its constructor
|
// the NativeIO$POSIX$Stat inner class and its constructor
|
||||||
static jclass stat_clazz;
|
static jclass stat_clazz;
|
||||||
static jmethodID stat_ctor;
|
static jmethodID stat_ctor;
|
||||||
|
@ -661,6 +666,39 @@ cleanup:
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jlong JNICALL
|
||||||
|
Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_mmap(
|
||||||
|
JNIEnv *env, jclass clazz, jobject jfd, jint jprot,
|
||||||
|
jboolean jshared, jlong length)
|
||||||
|
{
|
||||||
|
void *addr = 0;
|
||||||
|
int prot, flags, fd;
|
||||||
|
|
||||||
|
prot = ((jprot & MMAP_PROT_READ) ? PROT_READ : 0) |
|
||||||
|
((jprot & MMAP_PROT_WRITE) ? PROT_WRITE : 0) |
|
||||||
|
((jprot & MMAP_PROT_EXEC) ? PROT_EXEC : 0);
|
||||||
|
flags = (jshared == JNI_TRUE) ? MAP_SHARED : MAP_PRIVATE;
|
||||||
|
fd = fd_get(env, jfd);
|
||||||
|
addr = mmap(NULL, length, prot, flags, fd, 0);
|
||||||
|
if (addr == MAP_FAILED) {
|
||||||
|
throw_ioe(env, errno);
|
||||||
|
}
|
||||||
|
return (jlong)(intptr_t)addr;
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT void JNICALL
|
||||||
|
Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_munmap(
|
||||||
|
JNIEnv *env, jclass clazz, jlong jaddr, jlong length)
|
||||||
|
{
|
||||||
|
void *addr;
|
||||||
|
|
||||||
|
addr = (void*)(intptr_t)jaddr;
|
||||||
|
if (munmap(addr, length) < 0) {
|
||||||
|
throw_ioe(env, errno);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* static native String getGroupName(int gid);
|
* static native String getGroupName(int gid);
|
||||||
*
|
*
|
||||||
|
|
|
@ -0,0 +1,162 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "org_apache_hadoop.h"
|
||||||
|
|
||||||
|
#ifdef UNIX
|
||||||
|
|
||||||
|
#include "exception.h"
|
||||||
|
#include "file_descriptor.h"
|
||||||
|
#include "org_apache_hadoop.h"
|
||||||
|
#include "org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory.h"
|
||||||
|
|
||||||
|
#include <dirent.h>
|
||||||
|
#include <errno.h>
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <limits.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <sys/stat.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
static pthread_mutex_t g_rand_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
|
||||||
|
JNIEXPORT void JNICALL
|
||||||
|
Java_org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory_deleteStaleTemporaryFiles0(
|
||||||
|
JNIEnv *env, jclass clazz, jstring jprefix, jstring jpath)
|
||||||
|
{
|
||||||
|
const char *prefix = NULL, *path = NULL;
|
||||||
|
char target[PATH_MAX];
|
||||||
|
jthrowable jthr;
|
||||||
|
DIR *dp = NULL;
|
||||||
|
struct dirent *de;
|
||||||
|
|
||||||
|
prefix = (*env)->GetStringUTFChars(env, jprefix, NULL);
|
||||||
|
if (!prefix) goto done; // exception raised
|
||||||
|
path = (*env)->GetStringUTFChars(env, jpath, NULL);
|
||||||
|
if (!path) goto done; // exception raised
|
||||||
|
|
||||||
|
dp = opendir(path);
|
||||||
|
if (!dp) {
|
||||||
|
int ret = errno;
|
||||||
|
jthr = newIOException(env, "opendir(%s) error %d: %s",
|
||||||
|
path, ret, terror(ret));
|
||||||
|
(*env)->Throw(env, jthr);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
while ((de = readdir(dp))) {
|
||||||
|
if (strncmp(prefix, de->d_name, strlen(prefix)) == 0) {
|
||||||
|
int ret = snprintf(target, PATH_MAX, "%s/%s", path, de->d_name);
|
||||||
|
if ((0 < ret) && (ret < PATH_MAX)) {
|
||||||
|
unlink(target);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
done:
|
||||||
|
if (dp) {
|
||||||
|
closedir(dp);
|
||||||
|
}
|
||||||
|
if (prefix) {
|
||||||
|
(*env)->ReleaseStringUTFChars(env, jprefix, prefix);
|
||||||
|
}
|
||||||
|
if (path) {
|
||||||
|
(*env)->ReleaseStringUTFChars(env, jpath, path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jobject JNICALL
|
||||||
|
Java_org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory_createDescriptor0(
|
||||||
|
JNIEnv *env, jclass clazz, jstring jprefix, jstring jpath, jint length)
|
||||||
|
{
|
||||||
|
const char *prefix = NULL, *path = NULL;
|
||||||
|
char target[PATH_MAX];
|
||||||
|
int ret, fd = -1, rnd;
|
||||||
|
jthrowable jthr;
|
||||||
|
jobject jret = NULL;
|
||||||
|
|
||||||
|
prefix = (*env)->GetStringUTFChars(env, jprefix, NULL);
|
||||||
|
if (!prefix) goto done; // exception raised
|
||||||
|
path = (*env)->GetStringUTFChars(env, jpath, NULL);
|
||||||
|
if (!path) goto done; // exception raised
|
||||||
|
|
||||||
|
pthread_mutex_lock(&g_rand_lock);
|
||||||
|
rnd = rand();
|
||||||
|
pthread_mutex_unlock(&g_rand_lock);
|
||||||
|
while (1) {
|
||||||
|
ret = snprintf(target, PATH_MAX, "%s/%s_%d",
|
||||||
|
path, prefix, rnd);
|
||||||
|
if (ret < 0) {
|
||||||
|
jthr = newIOException(env, "snprintf error");
|
||||||
|
(*env)->Throw(env, jthr);
|
||||||
|
goto done;
|
||||||
|
} else if (ret >= PATH_MAX) {
|
||||||
|
jthr = newIOException(env, "computed path was too long.");
|
||||||
|
(*env)->Throw(env, jthr);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
fd = open(target, O_CREAT | O_EXCL | O_RDWR, 0700);
|
||||||
|
if (fd >= 0) break; // success
|
||||||
|
ret = errno;
|
||||||
|
if (ret == EEXIST) {
|
||||||
|
// Bad luck -- we got a very rare collision here between us and
|
||||||
|
// another DataNode (or process). Try again.
|
||||||
|
continue;
|
||||||
|
} else if (ret == EINTR) {
|
||||||
|
// Most of the time, this error is only possible when opening FIFOs.
|
||||||
|
// But let's be thorough.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
jthr = newIOException(env, "open(%s, O_CREAT | O_EXCL | O_RDWR) "
|
||||||
|
"failed: error %d (%s)", target, ret, terror(ret));
|
||||||
|
(*env)->Throw(env, jthr);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
if (unlink(target) < 0) {
|
||||||
|
jthr = newIOException(env, "unlink(%s) failed: error %d (%s)",
|
||||||
|
path, ret, terror(ret));
|
||||||
|
(*env)->Throw(env, jthr);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
if (ftruncate(fd, length) < 0) {
|
||||||
|
jthr = newIOException(env, "ftruncate(%s, %d) failed: error %d (%s)",
|
||||||
|
path, length, ret, terror(ret));
|
||||||
|
(*env)->Throw(env, jthr);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
jret = fd_create(env, fd); // throws exception on error.
|
||||||
|
|
||||||
|
done:
|
||||||
|
if (prefix) {
|
||||||
|
(*env)->ReleaseStringUTFChars(env, jprefix, prefix);
|
||||||
|
}
|
||||||
|
if (path) {
|
||||||
|
(*env)->ReleaseStringUTFChars(env, jpath, path);
|
||||||
|
}
|
||||||
|
if (!jret) {
|
||||||
|
if (fd >= 0) {
|
||||||
|
close(fd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return jret;
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,247 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "config.h"
|
||||||
|
#include "exception.h"
|
||||||
|
#include "org_apache_hadoop.h"
|
||||||
|
#include "org_apache_hadoop_net_unix_DomainSocketWatcher.h"
|
||||||
|
|
||||||
|
#include <errno.h>
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <jni.h>
|
||||||
|
#include <poll.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <sys/select.h>
|
||||||
|
#include <sys/time.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
static jfieldID fd_set_data_fid;
|
||||||
|
|
||||||
|
#define FD_SET_DATA_MIN_SIZE 2
|
||||||
|
|
||||||
|
struct fd_set_data {
|
||||||
|
/**
|
||||||
|
* Number of fds we have allocated space for.
|
||||||
|
*/
|
||||||
|
int alloc_size;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of fds actually in use.
|
||||||
|
*/
|
||||||
|
int used_size;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Beginning of pollfd data.
|
||||||
|
*/
|
||||||
|
struct pollfd pollfd[0];
|
||||||
|
};
|
||||||
|
|
||||||
|
JNIEXPORT void JNICALL
|
||||||
|
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_anchorNative(
|
||||||
|
JNIEnv *env, jclass clazz)
|
||||||
|
{
|
||||||
|
jclass fd_set_class;
|
||||||
|
|
||||||
|
fd_set_class = (*env)->FindClass(env,
|
||||||
|
"org/apache/hadoop/net/unix/DomainSocketWatcher$FdSet");
|
||||||
|
if (!fd_set_class) return; // exception raised
|
||||||
|
fd_set_data_fid = (*env)->GetFieldID(env, fd_set_class, "data", "J");
|
||||||
|
if (!fd_set_data_fid) return; // exception raised
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jlong JNICALL
|
||||||
|
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_alloc0(
|
||||||
|
JNIEnv *env, jclass clazz)
|
||||||
|
{
|
||||||
|
struct fd_set_data *sd;
|
||||||
|
|
||||||
|
sd = calloc(1, sizeof(struct fd_set_data) +
|
||||||
|
(sizeof(struct pollfd) * FD_SET_DATA_MIN_SIZE));
|
||||||
|
if (!sd) {
|
||||||
|
(*env)->Throw(env, newRuntimeException(env, "out of memory allocating "
|
||||||
|
"DomainSocketWatcher#FdSet"));
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
sd->alloc_size = FD_SET_DATA_MIN_SIZE;
|
||||||
|
sd->used_size = 0;
|
||||||
|
return (jlong)(intptr_t)sd;
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT void JNICALL
|
||||||
|
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_add(
|
||||||
|
JNIEnv *env, jobject obj, jint fd)
|
||||||
|
{
|
||||||
|
struct fd_set_data *sd, *nd;
|
||||||
|
struct pollfd *pollfd;
|
||||||
|
|
||||||
|
sd = (struct fd_set_data*)(intptr_t)(*env)->
|
||||||
|
GetLongField(env, obj, fd_set_data_fid);
|
||||||
|
if (sd->used_size + 1 > sd->alloc_size) {
|
||||||
|
nd = realloc(sd, sizeof(struct fd_set_data) +
|
||||||
|
(sizeof(struct pollfd) * sd->alloc_size * 2));
|
||||||
|
if (!nd) {
|
||||||
|
(*env)->Throw(env, newRuntimeException(env, "out of memory adding "
|
||||||
|
"another fd to DomainSocketWatcher#FdSet. we have %d already",
|
||||||
|
sd->alloc_size));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
nd->alloc_size = nd->alloc_size * 2;
|
||||||
|
(*env)->SetLongField(env, obj, fd_set_data_fid, (jlong)(intptr_t)nd);
|
||||||
|
sd = nd;
|
||||||
|
}
|
||||||
|
pollfd = &sd->pollfd[sd->used_size];
|
||||||
|
sd->used_size++;
|
||||||
|
pollfd->fd = fd;
|
||||||
|
pollfd->events = POLLIN;
|
||||||
|
pollfd->revents = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT void JNICALL
|
||||||
|
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_remove(
|
||||||
|
JNIEnv *env, jobject obj, jint fd)
|
||||||
|
{
|
||||||
|
struct fd_set_data *sd;
|
||||||
|
struct pollfd *pollfd, *last_pollfd;
|
||||||
|
int used_size, i;
|
||||||
|
|
||||||
|
sd = (struct fd_set_data*)(intptr_t)(*env)->
|
||||||
|
GetLongField(env, obj, fd_set_data_fid);
|
||||||
|
used_size = sd->used_size;
|
||||||
|
for (i = 0; i < used_size; i++) {
|
||||||
|
pollfd = sd->pollfd + i;
|
||||||
|
if (pollfd->fd == fd) break;
|
||||||
|
}
|
||||||
|
if (i == used_size) {
|
||||||
|
(*env)->Throw(env, newRuntimeException(env, "failed to remove fd %d "
|
||||||
|
"from the FdSet because it was never present.", fd));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
last_pollfd = sd->pollfd + (used_size - 1);
|
||||||
|
if (used_size > 1) {
|
||||||
|
// Move last pollfd to the new empty slot if needed
|
||||||
|
pollfd->fd = last_pollfd->fd;
|
||||||
|
pollfd->events = last_pollfd->events;
|
||||||
|
pollfd->revents = last_pollfd->revents;
|
||||||
|
}
|
||||||
|
memset(last_pollfd, 0, sizeof(struct pollfd));
|
||||||
|
sd->used_size--;
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jobject JNICALL
|
||||||
|
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_getAndClearReadableFds(
|
||||||
|
JNIEnv *env, jobject obj)
|
||||||
|
{
|
||||||
|
int *carr = NULL;
|
||||||
|
jobject jarr = NULL;
|
||||||
|
struct fd_set_data *sd;
|
||||||
|
int used_size, num_readable = 0, i, j;
|
||||||
|
jthrowable jthr = NULL;
|
||||||
|
|
||||||
|
sd = (struct fd_set_data*)(intptr_t)(*env)->
|
||||||
|
GetLongField(env, obj, fd_set_data_fid);
|
||||||
|
used_size = sd->used_size;
|
||||||
|
for (i = 0; i < used_size; i++) {
|
||||||
|
if (sd->pollfd[i].revents & POLLIN) {
|
||||||
|
num_readable++;
|
||||||
|
} else {
|
||||||
|
sd->pollfd[i].revents = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (num_readable > 0) {
|
||||||
|
carr = malloc(sizeof(int) * num_readable);
|
||||||
|
if (!carr) {
|
||||||
|
jthr = newRuntimeException(env, "failed to allocate a temporary array "
|
||||||
|
"of %d ints", num_readable);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
j = 0;
|
||||||
|
for (i = 0; ((i < used_size) && (j < num_readable)); i++) {
|
||||||
|
if (sd->pollfd[i].revents & POLLIN) {
|
||||||
|
carr[j] = sd->pollfd[i].fd;
|
||||||
|
j++;
|
||||||
|
sd->pollfd[i].revents = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (j != num_readable) {
|
||||||
|
jthr = newRuntimeException(env, "failed to fill entire carr "
|
||||||
|
"array of size %d: only filled %d elements", num_readable, j);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jarr = (*env)->NewIntArray(env, num_readable);
|
||||||
|
if (!jarr) {
|
||||||
|
jthr = (*env)->ExceptionOccurred(env);
|
||||||
|
(*env)->ExceptionClear(env);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
if (num_readable > 0) {
|
||||||
|
(*env)->SetIntArrayRegion(env, jarr, 0, num_readable, carr);
|
||||||
|
jthr = (*env)->ExceptionOccurred(env);
|
||||||
|
if (jthr) {
|
||||||
|
(*env)->ExceptionClear(env);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
done:
|
||||||
|
free(carr);
|
||||||
|
if (jthr) {
|
||||||
|
(*env)->DeleteLocalRef(env, jarr);
|
||||||
|
jarr = NULL;
|
||||||
|
}
|
||||||
|
return jarr;
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT void JNICALL
|
||||||
|
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_close(
|
||||||
|
JNIEnv *env, jobject obj)
|
||||||
|
{
|
||||||
|
struct fd_set_data *sd;
|
||||||
|
|
||||||
|
sd = (struct fd_set_data*)(intptr_t)(*env)->
|
||||||
|
GetLongField(env, obj, fd_set_data_fid);
|
||||||
|
if (sd) {
|
||||||
|
free(sd);
|
||||||
|
(*env)->SetLongField(env, obj, fd_set_data_fid, 0L);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jint JNICALL
|
||||||
|
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_doPoll0(
|
||||||
|
JNIEnv *env, jclass clazz, jint checkMs, jobject fdSet)
|
||||||
|
{
|
||||||
|
struct fd_set_data *sd;
|
||||||
|
int ret, err;
|
||||||
|
|
||||||
|
sd = (struct fd_set_data*)(intptr_t)(*env)->
|
||||||
|
GetLongField(env, fdSet, fd_set_data_fid);
|
||||||
|
ret = poll(sd->pollfd, sd->used_size, checkMs);
|
||||||
|
if (ret >= 0) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
err = errno;
|
||||||
|
if (err != EINTR) { // treat EINTR as 0 fds ready
|
||||||
|
(*env)->Throw(env, newIOException(env,
|
||||||
|
"poll(2) failed with error code %d: %s", err, terror(err)));
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,82 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.nativeio;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Assume;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.apache.commons.lang.SystemUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
public class TestSharedFileDescriptorFactory {
|
||||||
|
static final Log LOG = LogFactory.getLog(TestSharedFileDescriptorFactory.class);
|
||||||
|
|
||||||
|
private static final File TEST_BASE =
|
||||||
|
new File(System.getProperty("test.build.data", "/tmp"));
|
||||||
|
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testReadAndWrite() throws Exception {
|
||||||
|
Assume.assumeTrue(NativeIO.isAvailable());
|
||||||
|
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
|
||||||
|
File path = new File(TEST_BASE, "testReadAndWrite");
|
||||||
|
path.mkdirs();
|
||||||
|
SharedFileDescriptorFactory factory =
|
||||||
|
new SharedFileDescriptorFactory("woot_", path.getAbsolutePath());
|
||||||
|
FileInputStream inStream = factory.createDescriptor(4096);
|
||||||
|
FileOutputStream outStream = new FileOutputStream(inStream.getFD());
|
||||||
|
outStream.write(101);
|
||||||
|
inStream.getChannel().position(0);
|
||||||
|
Assert.assertEquals(101, inStream.read());
|
||||||
|
inStream.close();
|
||||||
|
outStream.close();
|
||||||
|
FileUtil.fullyDelete(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
static private void createTempFile(String path) throws Exception {
|
||||||
|
FileOutputStream fos = new FileOutputStream(path);
|
||||||
|
fos.write(101);
|
||||||
|
fos.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testCleanupRemainders() throws Exception {
|
||||||
|
Assume.assumeTrue(NativeIO.isAvailable());
|
||||||
|
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
|
||||||
|
File path = new File(TEST_BASE, "testCleanupRemainders");
|
||||||
|
path.mkdirs();
|
||||||
|
String remainder1 = path.getAbsolutePath() +
|
||||||
|
Path.SEPARATOR + "woot2_remainder1";
|
||||||
|
String remainder2 = path.getAbsolutePath() +
|
||||||
|
Path.SEPARATOR + "woot2_remainder2";
|
||||||
|
createTempFile(remainder1);
|
||||||
|
createTempFile(remainder2);
|
||||||
|
new SharedFileDescriptorFactory("woot2_", path.getAbsolutePath());
|
||||||
|
// creating the SharedFileDescriptorFactory should have removed
|
||||||
|
// the remainders
|
||||||
|
Assert.assertFalse(new File(remainder1).exists());
|
||||||
|
Assert.assertFalse(new File(remainder2).exists());
|
||||||
|
FileUtil.fullyDelete(path);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,150 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.net.unix;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.junit.Assume;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.Uninterruptibles;
|
||||||
|
|
||||||
|
public class TestDomainSocketWatcher {
|
||||||
|
static final Log LOG = LogFactory.getLog(TestDomainSocketWatcher.class);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() {
|
||||||
|
Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that we can create a DomainSocketWatcher and then shut it down.
|
||||||
|
*/
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testCreateShutdown() throws Exception {
|
||||||
|
DomainSocketWatcher watcher = new DomainSocketWatcher(10000000);
|
||||||
|
watcher.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that we can get notifications out a DomainSocketWatcher.
|
||||||
|
*/
|
||||||
|
@Test(timeout=180000)
|
||||||
|
public void testDeliverNotifications() throws Exception {
|
||||||
|
DomainSocketWatcher watcher = new DomainSocketWatcher(10000000);
|
||||||
|
DomainSocket pair[] = DomainSocket.socketpair();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
watcher.add(pair[1], new DomainSocketWatcher.Handler() {
|
||||||
|
@Override
|
||||||
|
public boolean handle(DomainSocket sock) {
|
||||||
|
latch.countDown();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
pair[0].close();
|
||||||
|
latch.await();
|
||||||
|
watcher.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that a java interruption can stop the watcher thread
|
||||||
|
*/
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testInterruption() throws Exception {
|
||||||
|
DomainSocketWatcher watcher = new DomainSocketWatcher(10);
|
||||||
|
watcher.interrupt();
|
||||||
|
Uninterruptibles.joinUninterruptibly(watcher);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testStress() throws Exception {
|
||||||
|
final int SOCKET_NUM = 250;
|
||||||
|
final ReentrantLock lock = new ReentrantLock();
|
||||||
|
final DomainSocketWatcher watcher = new DomainSocketWatcher(10000000);
|
||||||
|
final ArrayList<DomainSocket[]> pairs = new ArrayList<DomainSocket[]>();
|
||||||
|
final AtomicInteger handled = new AtomicInteger(0);
|
||||||
|
|
||||||
|
final Thread adderThread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < SOCKET_NUM; i++) {
|
||||||
|
DomainSocket pair[] = DomainSocket.socketpair();
|
||||||
|
watcher.add(pair[1], new DomainSocketWatcher.Handler() {
|
||||||
|
@Override
|
||||||
|
public boolean handle(DomainSocket sock) {
|
||||||
|
handled.incrementAndGet();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
pairs.add(pair);
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.error(e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final Thread removerThread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
final Random random = new Random();
|
||||||
|
try {
|
||||||
|
while (handled.get() != SOCKET_NUM) {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
if (!pairs.isEmpty()) {
|
||||||
|
int idx = random.nextInt(pairs.size());
|
||||||
|
DomainSocket pair[] = pairs.remove(idx);
|
||||||
|
if (random.nextBoolean()) {
|
||||||
|
pair[0].close();
|
||||||
|
} else {
|
||||||
|
watcher.remove(pair[1]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.error(e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
adderThread.start();
|
||||||
|
removerThread.start();
|
||||||
|
Uninterruptibles.joinUninterruptibly(adderThread);
|
||||||
|
Uninterruptibles.joinUninterruptibly(removerThread);
|
||||||
|
watcher.close();
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,6 +20,8 @@ Release 2.4.0 - UNRELEASED
|
||||||
HDFS-5859. DataNode#checkBlockToken should check block tokens even if
|
HDFS-5859. DataNode#checkBlockToken should check block tokens even if
|
||||||
security is not enabled. (cmccabe)
|
security is not enabled. (cmccabe)
|
||||||
|
|
||||||
|
HDFS-5746. Add ShortCircuitSharedMemorySegment (cmccabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
||||||
|
|
|
@ -0,0 +1,302 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.client;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
|
import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
|
||||||
|
import org.apache.hadoop.util.CloseableReferenceCount;
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
|
|
||||||
|
import sun.misc.Unsafe;
|
||||||
|
|
||||||
|
public class ShortCircuitSharedMemorySegment implements Closeable {
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(ShortCircuitSharedMemorySegment.class);
|
||||||
|
|
||||||
|
private static final int BYTES_PER_SLOT = 64;
|
||||||
|
|
||||||
|
private static final Unsafe unsafe;
|
||||||
|
|
||||||
|
static {
|
||||||
|
Unsafe theUnsafe = null;
|
||||||
|
try {
|
||||||
|
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||||
|
f.setAccessible(true);
|
||||||
|
theUnsafe = (Unsafe)f.get(null);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.error("failed to load misc.Unsafe", e);
|
||||||
|
}
|
||||||
|
unsafe = theUnsafe;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A slot containing information about a replica.
|
||||||
|
*
|
||||||
|
* The format is:
|
||||||
|
* word 0
|
||||||
|
* bit 0:32 Slot flags (see below).
|
||||||
|
* bit 33:63 Anchor count.
|
||||||
|
* word 1:7
|
||||||
|
* Reserved for future use, such as statistics.
|
||||||
|
* Padding is also useful for avoiding false sharing.
|
||||||
|
*
|
||||||
|
* Little-endian versus big-endian is not relevant here since both the client
|
||||||
|
* and the server reside on the same computer and use the same orientation.
|
||||||
|
*/
|
||||||
|
public class Slot implements Closeable {
|
||||||
|
/**
|
||||||
|
* Flag indicating that the slot is in use.
|
||||||
|
*/
|
||||||
|
private static final long SLOT_IN_USE_FLAG = 1L<<63;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flag indicating that the slot can be anchored.
|
||||||
|
*/
|
||||||
|
private static final long ANCHORABLE_FLAG = 1L<<62;
|
||||||
|
|
||||||
|
private long slotAddress;
|
||||||
|
|
||||||
|
Slot(long slotAddress) {
|
||||||
|
this.slotAddress = slotAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make a given slot anchorable.
|
||||||
|
*/
|
||||||
|
public void makeAnchorable() {
|
||||||
|
Preconditions.checkState(slotAddress != 0,
|
||||||
|
"Called makeAnchorable on a slot that was closed.");
|
||||||
|
long prev;
|
||||||
|
do {
|
||||||
|
prev = unsafe.getLongVolatile(null, this.slotAddress);
|
||||||
|
if ((prev & ANCHORABLE_FLAG) != 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
|
||||||
|
prev, prev | ANCHORABLE_FLAG));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make a given slot unanchorable.
|
||||||
|
*/
|
||||||
|
public void makeUnanchorable() {
|
||||||
|
Preconditions.checkState(slotAddress != 0,
|
||||||
|
"Called makeUnanchorable on a slot that was closed.");
|
||||||
|
long prev;
|
||||||
|
do {
|
||||||
|
prev = unsafe.getLongVolatile(null, this.slotAddress);
|
||||||
|
if ((prev & ANCHORABLE_FLAG) == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
|
||||||
|
prev, prev & (~ANCHORABLE_FLAG)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to add an anchor for a given slot.
|
||||||
|
*
|
||||||
|
* When a slot is anchored, we know that the block it refers to is resident
|
||||||
|
* in memory.
|
||||||
|
*
|
||||||
|
* @return True if the slot is anchored.
|
||||||
|
*/
|
||||||
|
public boolean addAnchor() {
|
||||||
|
long prev;
|
||||||
|
do {
|
||||||
|
prev = unsafe.getLongVolatile(null, this.slotAddress);
|
||||||
|
if ((prev & 0x7fffffff) == 0x7fffffff) {
|
||||||
|
// Too many other threads have anchored the slot (2 billion?)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if ((prev & ANCHORABLE_FLAG) == 0) {
|
||||||
|
// Slot can't be anchored right now.
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
|
||||||
|
prev, prev + 1));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove an anchor for a given slot.
|
||||||
|
*/
|
||||||
|
public void removeAnchor() {
|
||||||
|
long prev;
|
||||||
|
do {
|
||||||
|
prev = unsafe.getLongVolatile(null, this.slotAddress);
|
||||||
|
Preconditions.checkState((prev & 0x7fffffff) != 0,
|
||||||
|
"Tried to remove anchor for slot " + slotAddress +", which was " +
|
||||||
|
"not anchored.");
|
||||||
|
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
|
||||||
|
prev, prev - 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The index of this slot.
|
||||||
|
*/
|
||||||
|
public int getIndex() {
|
||||||
|
Preconditions.checkState(slotAddress != 0);
|
||||||
|
return Ints.checkedCast(
|
||||||
|
(slotAddress - baseAddress) / BYTES_PER_SLOT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (slotAddress == 0) return;
|
||||||
|
long prev;
|
||||||
|
do {
|
||||||
|
prev = unsafe.getLongVolatile(null, this.slotAddress);
|
||||||
|
Preconditions.checkState((prev & SLOT_IN_USE_FLAG) != 0,
|
||||||
|
"tried to close slot that wasn't open");
|
||||||
|
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
|
||||||
|
prev, 0));
|
||||||
|
slotAddress = 0;
|
||||||
|
if (ShortCircuitSharedMemorySegment.this.refCount.unreference()) {
|
||||||
|
ShortCircuitSharedMemorySegment.this.free();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The stream that we're going to use to create this shared memory segment.
|
||||||
|
*
|
||||||
|
* Although this is a FileInputStream, we are going to assume that the
|
||||||
|
* underlying file descriptor is writable as well as readable.
|
||||||
|
* It would be more appropriate to use a RandomAccessFile here, but that class
|
||||||
|
* does not have any public accessor which returns a FileDescriptor, unlike
|
||||||
|
* FileInputStream.
|
||||||
|
*/
|
||||||
|
private final FileInputStream stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Length of the shared memory segment.
|
||||||
|
*/
|
||||||
|
private final int length;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The base address of the memory-mapped file.
|
||||||
|
*/
|
||||||
|
private final long baseAddress;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reference count and 'closed' status.
|
||||||
|
*/
|
||||||
|
private final CloseableReferenceCount refCount = new CloseableReferenceCount();
|
||||||
|
|
||||||
|
public ShortCircuitSharedMemorySegment(FileInputStream stream)
|
||||||
|
throws IOException {
|
||||||
|
if (!NativeIO.isAvailable()) {
|
||||||
|
throw new UnsupportedOperationException("NativeIO is not available.");
|
||||||
|
}
|
||||||
|
if (Shell.WINDOWS) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"ShortCircuitSharedMemorySegment is not yet implemented " +
|
||||||
|
"for Windows.");
|
||||||
|
}
|
||||||
|
if (unsafe == null) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"can't use ShortCircuitSharedMemorySegment because we failed to " +
|
||||||
|
"load misc.Unsafe.");
|
||||||
|
}
|
||||||
|
this.refCount.reference();
|
||||||
|
this.stream = stream;
|
||||||
|
this.length = getEffectiveLength(stream);
|
||||||
|
this.baseAddress = POSIX.mmap(this.stream.getFD(),
|
||||||
|
POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, this.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate the effective usable size of the shared memory segment.
|
||||||
|
* We round down to a multiple of the slot size and do some validation.
|
||||||
|
*
|
||||||
|
* @param stream The stream we're using.
|
||||||
|
* @return The effective usable size of the shared memory segment.
|
||||||
|
*/
|
||||||
|
private static int getEffectiveLength(FileInputStream stream)
|
||||||
|
throws IOException {
|
||||||
|
int intSize = Ints.checkedCast(stream.getChannel().size());
|
||||||
|
int slots = intSize / BYTES_PER_SLOT;
|
||||||
|
Preconditions.checkState(slots > 0, "size of shared memory segment was " +
|
||||||
|
intSize + ", but that is not enough to hold even one slot.");
|
||||||
|
return slots * BYTES_PER_SLOT;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean allocateSlot(long address) {
|
||||||
|
long prev;
|
||||||
|
do {
|
||||||
|
prev = unsafe.getLongVolatile(null, address);
|
||||||
|
if ((prev & Slot.SLOT_IN_USE_FLAG) != 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} while (!unsafe.compareAndSwapLong(null, address,
|
||||||
|
prev, prev | Slot.SLOT_IN_USE_FLAG));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate a new Slot in this shared memory segment.
|
||||||
|
*
|
||||||
|
* @return A newly allocated Slot, or null if there were no available
|
||||||
|
* slots.
|
||||||
|
*/
|
||||||
|
public Slot allocateNextSlot() throws IOException {
|
||||||
|
ShortCircuitSharedMemorySegment.this.refCount.reference();
|
||||||
|
Slot slot = null;
|
||||||
|
try {
|
||||||
|
final int numSlots = length / BYTES_PER_SLOT;
|
||||||
|
for (int i = 0; i < numSlots; i++) {
|
||||||
|
long address = this.baseAddress + (i * BYTES_PER_SLOT);
|
||||||
|
if (allocateSlot(address)) {
|
||||||
|
slot = new Slot(address);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (slot == null) {
|
||||||
|
if (refCount.unreference()) {
|
||||||
|
free();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return slot;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
refCount.setClosed();
|
||||||
|
if (refCount.unreference()) {
|
||||||
|
free();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void free() throws IOException {
|
||||||
|
IOUtils.cleanup(LOG, stream);
|
||||||
|
POSIX.munmap(baseAddress, length);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,104 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.client;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.SystemUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
|
import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory;
|
||||||
|
import org.apache.hadoop.hdfs.client.ShortCircuitSharedMemorySegment.Slot;
|
||||||
|
import org.junit.Assume;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
public class TestShortCircuitSharedMemorySegment {
|
||||||
|
public static final Log LOG =
|
||||||
|
LogFactory.getLog(TestShortCircuitSharedMemorySegment.class);
|
||||||
|
|
||||||
|
private static final File TEST_BASE =
|
||||||
|
new File(System.getProperty("test.build.data", "/tmp"));
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() {
|
||||||
|
Assume.assumeTrue(NativeIO.isAvailable());
|
||||||
|
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testStartupShutdown() throws Exception {
|
||||||
|
File path = new File(TEST_BASE, "testStartupShutdown");
|
||||||
|
path.mkdirs();
|
||||||
|
SharedFileDescriptorFactory factory =
|
||||||
|
new SharedFileDescriptorFactory("shm_", path.getAbsolutePath());
|
||||||
|
FileInputStream stream = factory.createDescriptor(4096);
|
||||||
|
ShortCircuitSharedMemorySegment shm =
|
||||||
|
new ShortCircuitSharedMemorySegment(stream);
|
||||||
|
shm.close();
|
||||||
|
stream.close();
|
||||||
|
FileUtil.fullyDelete(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testAllocateSlots() throws Exception {
|
||||||
|
File path = new File(TEST_BASE, "testAllocateSlots");
|
||||||
|
path.mkdirs();
|
||||||
|
SharedFileDescriptorFactory factory =
|
||||||
|
new SharedFileDescriptorFactory("shm_", path.getAbsolutePath());
|
||||||
|
FileInputStream stream = factory.createDescriptor(4096);
|
||||||
|
ShortCircuitSharedMemorySegment shm =
|
||||||
|
new ShortCircuitSharedMemorySegment(stream);
|
||||||
|
int numSlots = 0;
|
||||||
|
ArrayList<Slot> slots = new ArrayList<Slot>();
|
||||||
|
while (true) {
|
||||||
|
Slot slot = shm.allocateNextSlot();
|
||||||
|
if (slot == null) {
|
||||||
|
LOG.info("allocated " + numSlots + " slots before running out.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
slots.add(slot);
|
||||||
|
numSlots++;
|
||||||
|
}
|
||||||
|
int slotIdx = 0;
|
||||||
|
for (Slot slot : slots) {
|
||||||
|
Assert.assertFalse(slot.addAnchor());
|
||||||
|
Assert.assertEquals(slotIdx++, slot.getIndex());
|
||||||
|
}
|
||||||
|
for (Slot slot : slots) {
|
||||||
|
slot.makeAnchorable();
|
||||||
|
}
|
||||||
|
for (Slot slot : slots) {
|
||||||
|
Assert.assertTrue(slot.addAnchor());
|
||||||
|
}
|
||||||
|
for (Slot slot : slots) {
|
||||||
|
slot.removeAnchor();
|
||||||
|
}
|
||||||
|
shm.close();
|
||||||
|
for (Slot slot : slots) {
|
||||||
|
slot.close();
|
||||||
|
}
|
||||||
|
stream.close();
|
||||||
|
FileUtil.fullyDelete(path);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue