HADOOP-12658. Clear javadoc and check style issues around DomainSocket. Contributed by Kai Zheng
This commit is contained in:
parent
1af2917856
commit
778146eaae
|
@ -1025,6 +1025,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HADOOP-12686. Update FileSystemShell documentation to mention the meaning
|
HADOOP-12686. Update FileSystemShell documentation to mention the meaning
|
||||||
of each columns of fs -du. (Daisuke Kobayashi via aajisaka)
|
of each columns of fs -du. (Daisuke Kobayashi via aajisaka)
|
||||||
|
|
||||||
|
HADOOP-12658. Clear javadoc and check style issues around DomainSocket
|
||||||
|
(Kai Zheng via umamahesh)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
||||||
|
|
|
@ -55,7 +55,8 @@ public class IOUtils {
|
||||||
* @param close whether or not close the InputStream and
|
* @param close whether or not close the InputStream and
|
||||||
* OutputStream at the end. The streams are closed in the finally clause.
|
* OutputStream at the end. The streams are closed in the finally clause.
|
||||||
*/
|
*/
|
||||||
public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close)
|
public static void copyBytes(InputStream in, OutputStream out,
|
||||||
|
int buffSize, boolean close)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
copyBytes(in, out, buffSize);
|
copyBytes(in, out, buffSize);
|
||||||
|
@ -194,7 +195,7 @@ public class IOUtils {
|
||||||
* @throws IOException if it could not read requested number of bytes
|
* @throws IOException if it could not read requested number of bytes
|
||||||
* for any reason (including EOF)
|
* for any reason (including EOF)
|
||||||
*/
|
*/
|
||||||
public static void readFully(InputStream in, byte buf[],
|
public static void readFully(InputStream in, byte[] buf,
|
||||||
int off, int len) throws IOException {
|
int off, int len) throws IOException {
|
||||||
int toRead = len;
|
int toRead = len;
|
||||||
while (toRead > 0) {
|
while (toRead > 0) {
|
||||||
|
|
|
@ -63,7 +63,8 @@ public class DomainSocket implements Closeable {
|
||||||
static Log LOG = LogFactory.getLog(DomainSocket.class);
|
static Log LOG = LogFactory.getLog(DomainSocket.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* True only if we should validate the paths used in {@link DomainSocket#bind()}
|
* True only if we should validate the paths used in
|
||||||
|
* {@link DomainSocket#bindAndListen(String)}
|
||||||
*/
|
*/
|
||||||
private static boolean validateBindPaths = true;
|
private static boolean validateBindPaths = true;
|
||||||
|
|
||||||
|
@ -220,11 +221,11 @@ public class DomainSocket implements Closeable {
|
||||||
*
|
*
|
||||||
* This method can only be used on sockets that were bound with bind().
|
* This method can only be used on sockets that were bound with bind().
|
||||||
*
|
*
|
||||||
* @return The new connection.
|
* @return The new connection.
|
||||||
* @throws IOException If there was an I/O error
|
* @throws IOException If there was an I/O error performing the accept--
|
||||||
* performing the accept-- such as the
|
* such as the socket being closed from under us.
|
||||||
* socket being closed from under us.
|
* Particularly when the accept is timed out, it throws
|
||||||
* @throws SocketTimeoutException If the accept timed out.
|
* SocketTimeoutException.
|
||||||
*/
|
*/
|
||||||
public DomainSocket accept() throws IOException {
|
public DomainSocket accept() throws IOException {
|
||||||
refCount.reference();
|
refCount.reference();
|
||||||
|
@ -238,13 +239,15 @@ public class DomainSocket implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static native int connect0(String path);
|
private static native int connect0(String path) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new DomainSocket connected to the given path.
|
* Create a new DomainSocket connected to the given path.
|
||||||
*
|
*
|
||||||
* @param path The path to connect to.
|
* @param path The path to connect to.
|
||||||
* @return The new DomainSocket.
|
* @throws IOException If there was an I/O error performing the connect.
|
||||||
|
*
|
||||||
|
* @return The new DomainSocket.
|
||||||
*/
|
*/
|
||||||
public static DomainSocket connect(String path) throws IOException {
|
public static DomainSocket connect(String path) throws IOException {
|
||||||
if (loadingFailureReason != null) {
|
if (loadingFailureReason != null) {
|
||||||
|
@ -425,47 +428,11 @@ public class DomainSocket implements Closeable {
|
||||||
|
|
||||||
private static native int receiveFileDescriptors0(int fd,
|
private static native int receiveFileDescriptors0(int fd,
|
||||||
FileDescriptor[] descriptors,
|
FileDescriptor[] descriptors,
|
||||||
byte jbuf[], int offset, int length) throws IOException;
|
byte[] buf, int offset, int length) throws IOException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Receive some FileDescriptor objects from the process on the other side of
|
|
||||||
* this socket.
|
|
||||||
*
|
|
||||||
* @param descriptors (output parameter) Array of FileDescriptors.
|
|
||||||
* We will fill as many slots as possible with file
|
|
||||||
* descriptors passed from the remote process. The
|
|
||||||
* other slots will contain NULL.
|
|
||||||
* @param jbuf (output parameter) Buffer to read into.
|
|
||||||
* The UNIX domain sockets API requires you to read
|
|
||||||
* at least one byte from the remote process, even
|
|
||||||
* if all you care about is the file descriptors
|
|
||||||
* you will receive.
|
|
||||||
* @param offset Offset into the byte buffer to load data
|
|
||||||
* @param length Length of the byte buffer to use for data
|
|
||||||
*
|
|
||||||
* @return The number of bytes read. This will be -1 if we
|
|
||||||
* reached EOF (similar to SocketInputStream);
|
|
||||||
* otherwise, it will be positive.
|
|
||||||
* @throws IOException if there was an I/O error.
|
|
||||||
*/
|
|
||||||
public int receiveFileDescriptors(FileDescriptor[] descriptors,
|
|
||||||
byte jbuf[], int offset, int length) throws IOException {
|
|
||||||
refCount.reference();
|
|
||||||
boolean exc = true;
|
|
||||||
try {
|
|
||||||
int nBytes = receiveFileDescriptors0(fd, descriptors, jbuf, offset, length);
|
|
||||||
exc = false;
|
|
||||||
return nBytes;
|
|
||||||
} finally {
|
|
||||||
unreference(exc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Receive some FileDescriptor objects from the process on the other side of
|
* Receive some FileDescriptor objects from the process on the other side of
|
||||||
* this socket, and wrap them in FileInputStream objects.
|
* this socket, and wrap them in FileInputStream objects.
|
||||||
*
|
|
||||||
* See {@link DomainSocket#recvFileInputStreams(ByteBuffer)}
|
|
||||||
*/
|
*/
|
||||||
public int recvFileInputStreams(FileInputStream[] streams, byte buf[],
|
public int recvFileInputStreams(FileInputStream[] streams, byte buf[],
|
||||||
int offset, int length) throws IOException {
|
int offset, int length) throws IOException {
|
||||||
|
|
|
@ -591,7 +591,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
switch (resp.getStatus()) {
|
switch (resp.getStatus()) {
|
||||||
case SUCCESS:
|
case SUCCESS:
|
||||||
byte buf[] = new byte[1];
|
byte buf[] = new byte[1];
|
||||||
FileInputStream fis[] = new FileInputStream[2];
|
FileInputStream[] fis = new FileInputStream[2];
|
||||||
sock.recvFileInputStreams(fis, buf, 0, buf.length);
|
sock.recvFileInputStreams(fis, buf, 0, buf.length);
|
||||||
ShortCircuitReplica replica = null;
|
ShortCircuitReplica replica = null;
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -168,7 +168,7 @@ public class DfsClientShmManager implements Closeable {
|
||||||
case SUCCESS:
|
case SUCCESS:
|
||||||
DomainSocket sock = peer.getDomainSocket();
|
DomainSocket sock = peer.getDomainSocket();
|
||||||
byte buf[] = new byte[1];
|
byte buf[] = new byte[1];
|
||||||
FileInputStream fis[] = new FileInputStream[1];
|
FileInputStream[] fis = new FileInputStream[1];
|
||||||
if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
|
if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
|
||||||
throw new EOFException("got EOF while trying to transfer the " +
|
throw new EOFException("got EOF while trying to transfer the " +
|
||||||
"file descriptor for the shared memory segment.");
|
"file descriptor for the shared memory segment.");
|
||||||
|
|
|
@ -425,12 +425,12 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
DataNodeFaultInjector.get().sendShortCircuitShmResponse();
|
DataNodeFaultInjector.get().sendShortCircuitShmResponse();
|
||||||
ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
|
ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
|
||||||
setId(PBHelperClient.convert(shmInfo.shmId)).build().
|
setId(PBHelperClient.convert(shmInfo.getShmId())).build().
|
||||||
writeDelimitedTo(socketOut);
|
writeDelimitedTo(socketOut);
|
||||||
// Send the file descriptor for the shared memory segment.
|
// Send the file descriptor for the shared memory segment.
|
||||||
byte buf[] = new byte[] { (byte)0 };
|
byte buf[] = new byte[] { (byte)0 };
|
||||||
FileDescriptor shmFdArray[] =
|
FileDescriptor shmFdArray[] =
|
||||||
new FileDescriptor[] { shmInfo.stream.getFD() };
|
new FileDescriptor[] {shmInfo.getFileStream().getFD()};
|
||||||
sock.sendFileDescriptors(shmFdArray, buf, 0, buf.length);
|
sock.sendFileDescriptors(shmFdArray, buf, 0, buf.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -471,7 +471,8 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
"cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " +
|
"cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " +
|
||||||
"op: REQUEST_SHORT_CIRCUIT_SHM," +
|
"op: REQUEST_SHORT_CIRCUIT_SHM," +
|
||||||
" shmId: %016x%016x, srvID: %s, success: true",
|
" shmId: %016x%016x, srvID: %s, success: true",
|
||||||
clientName, shmInfo.shmId.getHi(), shmInfo.shmId.getLo(),
|
clientName, shmInfo.getShmId().getHi(),
|
||||||
|
shmInfo.getShmId().getLo(),
|
||||||
datanode.getDatanodeUuid()));
|
datanode.getDatanodeUuid()));
|
||||||
} else {
|
} else {
|
||||||
BlockSender.ClientTraceLog.info(String.format(
|
BlockSender.ClientTraceLog.info(String.format(
|
||||||
|
@ -490,7 +491,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
// bad behavior inside the poll() call. See HADOOP-11802 for details.
|
// bad behavior inside the poll() call. See HADOOP-11802 for details.
|
||||||
try {
|
try {
|
||||||
LOG.warn("Failed to send success response back to the client. " +
|
LOG.warn("Failed to send success response back to the client. " +
|
||||||
"Shutting down socket for " + shmInfo.shmId + ".");
|
"Shutting down socket for " + shmInfo.getShmId() + ".");
|
||||||
sock.shutdown();
|
sock.shutdown();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to shut down socket in error handler", e);
|
LOG.warn("Failed to shut down socket in error handler", e);
|
||||||
|
|
|
@ -165,7 +165,7 @@ public class ShortCircuitRegistry {
|
||||||
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS +
|
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS +
|
||||||
" was set to " + interruptCheck);
|
" was set to " + interruptCheck);
|
||||||
}
|
}
|
||||||
String shmPaths[] =
|
String[] shmPaths =
|
||||||
conf.getTrimmedStrings(DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS);
|
conf.getTrimmedStrings(DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS);
|
||||||
if (shmPaths.length == 0) {
|
if (shmPaths.length == 0) {
|
||||||
shmPaths =
|
shmPaths =
|
||||||
|
@ -263,14 +263,22 @@ public class ShortCircuitRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class NewShmInfo implements Closeable {
|
public static class NewShmInfo implements Closeable {
|
||||||
public final ShmId shmId;
|
private final ShmId shmId;
|
||||||
public final FileInputStream stream;
|
private final FileInputStream stream;
|
||||||
|
|
||||||
NewShmInfo(ShmId shmId, FileInputStream stream) {
|
NewShmInfo(ShmId shmId, FileInputStream stream) {
|
||||||
this.shmId = shmId;
|
this.shmId = shmId;
|
||||||
this.stream = stream;
|
this.stream = stream;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ShmId getShmId() {
|
||||||
|
return shmId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FileInputStream getFileStream() {
|
||||||
|
return stream;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
stream.close();
|
stream.close();
|
||||||
|
|
Loading…
Reference in New Issue