HADOOP-12658. Clear javadoc and check style issues around DomainSocket. Contributed by Kai Zheng

(cherry picked from commit 778146eaae)
This commit is contained in:
Uma Mahesh 2016-01-04 14:32:09 -08:00
parent 40b41ac8fc
commit cdd796d199
7 changed files with 37 additions and 57 deletions

View File

@ -386,6 +386,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12686. Update FileSystemShell documentation to mention the meaning HADOOP-12686. Update FileSystemShell documentation to mention the meaning
of each columns of fs -du. (Daisuke Kobayashi via aajisaka) of each columns of fs -du. (Daisuke Kobayashi via aajisaka)
HADOOP-12658. Clear javadoc and check style issues around DomainSocket
(Kai Zheng via umamahesh)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -53,7 +53,8 @@ public class IOUtils {
* @param close whether or not close the InputStream and * @param close whether or not close the InputStream and
* OutputStream at the end. The streams are closed in the finally clause. * OutputStream at the end. The streams are closed in the finally clause.
*/ */
public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) public static void copyBytes(InputStream in, OutputStream out,
int buffSize, boolean close)
throws IOException { throws IOException {
try { try {
copyBytes(in, out, buffSize); copyBytes(in, out, buffSize);
@ -192,7 +193,7 @@ public class IOUtils {
* @throws IOException if it could not read requested number of bytes * @throws IOException if it could not read requested number of bytes
* for any reason (including EOF) * for any reason (including EOF)
*/ */
public static void readFully(InputStream in, byte buf[], public static void readFully(InputStream in, byte[] buf,
int off, int len) throws IOException { int off, int len) throws IOException {
int toRead = len; int toRead = len;
while (toRead > 0) { while (toRead > 0) {

View File

@ -63,7 +63,8 @@ public class DomainSocket implements Closeable {
static Log LOG = LogFactory.getLog(DomainSocket.class); static Log LOG = LogFactory.getLog(DomainSocket.class);
/** /**
* True only if we should validate the paths used in {@link DomainSocket#bind()} * True only if we should validate the paths used in
* {@link DomainSocket#bindAndListen(String)}
*/ */
private static boolean validateBindPaths = true; private static boolean validateBindPaths = true;
@ -220,11 +221,11 @@ public class DomainSocket implements Closeable {
* *
* This method can only be used on sockets that were bound with bind(). * This method can only be used on sockets that were bound with bind().
* *
* @return The new connection. * @return The new connection.
* @throws IOException If there was an I/O error * @throws IOException If there was an I/O error performing the accept--
* performing the accept-- such as the * such as the socket being closed from under us.
* socket being closed from under us. * Particularly when the accept is timed out, it throws
* @throws SocketTimeoutException If the accept timed out. * SocketTimeoutException.
*/ */
public DomainSocket accept() throws IOException { public DomainSocket accept() throws IOException {
refCount.reference(); refCount.reference();
@ -238,13 +239,15 @@ public class DomainSocket implements Closeable {
} }
} }
private static native int connect0(String path); private static native int connect0(String path) throws IOException;
/** /**
* Create a new DomainSocket connected to the given path. * Create a new DomainSocket connected to the given path.
* *
* @param path The path to connect to. * @param path The path to connect to.
* @return The new DomainSocket. * @throws IOException If there was an I/O error performing the connect.
*
* @return The new DomainSocket.
*/ */
public static DomainSocket connect(String path) throws IOException { public static DomainSocket connect(String path) throws IOException {
if (loadingFailureReason != null) { if (loadingFailureReason != null) {
@ -425,47 +428,11 @@ public class DomainSocket implements Closeable {
private static native int receiveFileDescriptors0(int fd, private static native int receiveFileDescriptors0(int fd,
FileDescriptor[] descriptors, FileDescriptor[] descriptors,
byte jbuf[], int offset, int length) throws IOException; byte[] buf, int offset, int length) throws IOException;
/**
* Receive some FileDescriptor objects from the process on the other side of
* this socket.
*
* @param descriptors (output parameter) Array of FileDescriptors.
* We will fill as many slots as possible with file
* descriptors passed from the remote process. The
* other slots will contain NULL.
* @param jbuf (output parameter) Buffer to read into.
* The UNIX domain sockets API requires you to read
* at least one byte from the remote process, even
* if all you care about is the file descriptors
* you will receive.
* @param offset Offset into the byte buffer to load data
* @param length Length of the byte buffer to use for data
*
* @return The number of bytes read. This will be -1 if we
* reached EOF (similar to SocketInputStream);
* otherwise, it will be positive.
* @throws IOException if there was an I/O error.
*/
public int receiveFileDescriptors(FileDescriptor[] descriptors,
byte jbuf[], int offset, int length) throws IOException {
refCount.reference();
boolean exc = true;
try {
int nBytes = receiveFileDescriptors0(fd, descriptors, jbuf, offset, length);
exc = false;
return nBytes;
} finally {
unreference(exc);
}
}
/** /**
* Receive some FileDescriptor objects from the process on the other side of * Receive some FileDescriptor objects from the process on the other side of
* this socket, and wrap them in FileInputStream objects. * this socket, and wrap them in FileInputStream objects.
*
* See {@link DomainSocket#recvFileInputStreams(ByteBuffer)}
*/ */
public int recvFileInputStreams(FileInputStream[] streams, byte buf[], public int recvFileInputStreams(FileInputStream[] streams, byte buf[],
int offset, int length) throws IOException { int offset, int length) throws IOException {

View File

@ -591,7 +591,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
switch (resp.getStatus()) { switch (resp.getStatus()) {
case SUCCESS: case SUCCESS:
byte buf[] = new byte[1]; byte buf[] = new byte[1];
FileInputStream fis[] = new FileInputStream[2]; FileInputStream[] fis = new FileInputStream[2];
sock.recvFileInputStreams(fis, buf, 0, buf.length); sock.recvFileInputStreams(fis, buf, 0, buf.length);
ShortCircuitReplica replica = null; ShortCircuitReplica replica = null;
try { try {

View File

@ -168,7 +168,7 @@ public class DfsClientShmManager implements Closeable {
case SUCCESS: case SUCCESS:
DomainSocket sock = peer.getDomainSocket(); DomainSocket sock = peer.getDomainSocket();
byte buf[] = new byte[1]; byte buf[] = new byte[1];
FileInputStream fis[] = new FileInputStream[1]; FileInputStream[] fis = new FileInputStream[1];
if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) { if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
throw new EOFException("got EOF while trying to transfer the " + throw new EOFException("got EOF while trying to transfer the " +
"file descriptor for the shared memory segment."); "file descriptor for the shared memory segment.");

View File

@ -425,12 +425,12 @@ class DataXceiver extends Receiver implements Runnable {
throws IOException { throws IOException {
DataNodeFaultInjector.get().sendShortCircuitShmResponse(); DataNodeFaultInjector.get().sendShortCircuitShmResponse();
ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS). ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
setId(PBHelperClient.convert(shmInfo.shmId)).build(). setId(PBHelperClient.convert(shmInfo.getShmId())).build().
writeDelimitedTo(socketOut); writeDelimitedTo(socketOut);
// Send the file descriptor for the shared memory segment. // Send the file descriptor for the shared memory segment.
byte buf[] = new byte[] { (byte)0 }; byte buf[] = new byte[] { (byte)0 };
FileDescriptor shmFdArray[] = FileDescriptor shmFdArray[] =
new FileDescriptor[] { shmInfo.stream.getFD() }; new FileDescriptor[] {shmInfo.getFileStream().getFD()};
sock.sendFileDescriptors(shmFdArray, buf, 0, buf.length); sock.sendFileDescriptors(shmFdArray, buf, 0, buf.length);
} }
@ -471,7 +471,8 @@ class DataXceiver extends Receiver implements Runnable {
"cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " + "cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " +
"op: REQUEST_SHORT_CIRCUIT_SHM," + "op: REQUEST_SHORT_CIRCUIT_SHM," +
" shmId: %016x%016x, srvID: %s, success: true", " shmId: %016x%016x, srvID: %s, success: true",
clientName, shmInfo.shmId.getHi(), shmInfo.shmId.getLo(), clientName, shmInfo.getShmId().getHi(),
shmInfo.getShmId().getLo(),
datanode.getDatanodeUuid())); datanode.getDatanodeUuid()));
} else { } else {
BlockSender.ClientTraceLog.info(String.format( BlockSender.ClientTraceLog.info(String.format(
@ -490,7 +491,7 @@ class DataXceiver extends Receiver implements Runnable {
// bad behavior inside the poll() call. See HADOOP-11802 for details. // bad behavior inside the poll() call. See HADOOP-11802 for details.
try { try {
LOG.warn("Failed to send success response back to the client. " + LOG.warn("Failed to send success response back to the client. " +
"Shutting down socket for " + shmInfo.shmId + "."); "Shutting down socket for " + shmInfo.getShmId() + ".");
sock.shutdown(); sock.shutdown();
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to shut down socket in error handler", e); LOG.warn("Failed to shut down socket in error handler", e);

View File

@ -165,7 +165,7 @@ public class ShortCircuitRegistry {
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS + DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS +
" was set to " + interruptCheck); " was set to " + interruptCheck);
} }
String shmPaths[] = String[] shmPaths =
conf.getTrimmedStrings(DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS); conf.getTrimmedStrings(DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS);
if (shmPaths.length == 0) { if (shmPaths.length == 0) {
shmPaths = shmPaths =
@ -263,14 +263,22 @@ public class ShortCircuitRegistry {
} }
public static class NewShmInfo implements Closeable { public static class NewShmInfo implements Closeable {
public final ShmId shmId; private final ShmId shmId;
public final FileInputStream stream; private final FileInputStream stream;
NewShmInfo(ShmId shmId, FileInputStream stream) { NewShmInfo(ShmId shmId, FileInputStream stream) {
this.shmId = shmId; this.shmId = shmId;
this.stream = stream; this.stream = stream;
} }
public ShmId getShmId() {
return shmId;
}
public FileInputStream getFileStream() {
return stream;
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
stream.close(); stream.close();