From 3a417cbf1d4bfc249f1f9fbd3c2b792c5e78bf5f Mon Sep 17 00:00:00 2001 From: Aaron Myers Date: Sat, 16 Feb 2013 00:59:01 +0000 Subject: [PATCH] HDFS-347: style cleanups. Contributed by Colin Patrick McCabe. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-347@1446830 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/net/unix/DomainSocket.java | 282 +++++++++++------- .../org/apache/hadoop/net/unix/DomainSocket.c | 32 +- .../hadoop/net/unix/TestDomainSocket.java | 10 +- .../hadoop-hdfs/CHANGES.HDFS-347.txt | 2 + .../hadoop/hdfs/DomainSocketFactory.java | 2 +- .../apache/hadoop/hdfs/net/DomainPeer.java | 6 +- .../hadoop/hdfs/net/DomainPeerServer.java | 2 +- 7 files changed, 195 insertions(+), 141 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java index aa4b952e8d1..4c6ae0592c2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java @@ -51,7 +51,7 @@ public class DomainSocket implements Closeable { } else if (!NativeCodeLoader.isNativeCodeLoaded()) { loadingFailureReason = "libhadoop cannot be loaded."; } else { - String problem = "DomainSocket#anchorNative got error: unknown"; + String problem; try { anchorNative(); problem = null; @@ -132,17 +132,99 @@ public class DomainSocket implements Closeable { } /** - * Status bits - * - * Bit 30: 0 = DomainSocket open, 1 = DomainSocket closed - * Bits 29 to 0: the reference count. + * Tracks the reference count of the file descriptor, and also whether it is + * open or closed. */ - private final AtomicInteger status; + private static class Status { + /** + * Bit mask representing a closed domain socket. + */ + private static final int STATUS_CLOSED_MASK = 1 << 30; + + /** + * Status bits + * + * Bit 30: 0 = DomainSocket open, 1 = DomainSocket closed + * Bits 29 to 0: the reference count. + */ + private final AtomicInteger bits = new AtomicInteger(0); + + Status() { } + + /** + * Increment the reference count of the underlying file descriptor. + * + * @throws ClosedChannelException If the file descriptor is closed. + */ + void reference() throws ClosedChannelException { + int curBits = bits.incrementAndGet(); + if ((curBits & STATUS_CLOSED_MASK) != 0) { + bits.decrementAndGet(); + throw new ClosedChannelException(); + } + } + + /** + * Decrement the reference count of the underlying file descriptor. + * + * @param checkClosed Whether to throw an exception if the file + * descriptor is closed. + * + * @throws AsynchronousCloseException If the file descriptor is closed and + * checkClosed is set. + */ + void unreference(boolean checkClosed) throws AsynchronousCloseException { + int newCount = bits.decrementAndGet(); + assert (newCount & ~STATUS_CLOSED_MASK) >= 0; + if (checkClosed && ((newCount & STATUS_CLOSED_MASK) != 0)) { + throw new AsynchronousCloseException(); + } + } + + /** + * Return true if the file descriptor is currently open. + * + * @return True if the file descriptor is currently open. + */ + boolean isOpen() { + return ((bits.get() & STATUS_CLOSED_MASK) == 0); + } + + /** + * Mark the file descriptor as closed. + * + * Once the file descriptor is closed, it cannot be reopened. + * + * @return The current reference count. + * @throws ClosedChannelException If someone else closes the file + * descriptor before we do. + */ + int setClosed() throws ClosedChannelException { + while (true) { + int curBits = bits.get(); + if ((curBits & STATUS_CLOSED_MASK) != 0) { + throw new ClosedChannelException(); + } + if (bits.compareAndSet(curBits, curBits | STATUS_CLOSED_MASK)) { + return curBits & (~STATUS_CLOSED_MASK); + } + } + } + + /** + * Get the current reference count. + * + * @return The current reference count. + */ + int getReferenceCount() { + return bits.get() & (~STATUS_CLOSED_MASK); + } + } /** - * Bit mask representing a closed domain socket. + * The socket status. */ - private static final int STATUS_CLOSED_MASK = 1 << 30; + private final Status status; /** * The file descriptor associated with this UNIX domain socket. @@ -170,7 +252,7 @@ public class DomainSocket implements Closeable { private final DomainChannel channel = new DomainChannel(); private DomainSocket(String path, int fd) { - this.status = new AtomicInteger(0); + this.status = new Status(); this.fd = fd; this.path = path; } @@ -208,14 +290,14 @@ public class DomainSocket implements Closeable { * @throws SocketTimeoutException If the accept timed out. */ public DomainSocket accept() throws IOException { - fdRef(); + status.reference(); boolean exc = true; try { DomainSocket ret = new DomainSocket(path, accept0(fd)); exc = false; return ret; } finally { - fdUnref(exc); + status.unreference(exc); } } @@ -235,38 +317,14 @@ public class DomainSocket implements Closeable { return new DomainSocket(path, fd); } - /** - * Increment the reference count of the underlying file descriptor. - * - * @throws SocketException If the file descriptor is closed. - */ - private void fdRef() throws ClosedChannelException { - int bits = status.incrementAndGet(); - if ((bits & STATUS_CLOSED_MASK) != 0) { - status.decrementAndGet(); - throw new ClosedChannelException(); - } - } - - /** - * Decrement the reference count of the underlying file descriptor. - */ - private void fdUnref(boolean checkClosed) throws AsynchronousCloseException { - int newCount = status.decrementAndGet(); - assert (newCount & ~STATUS_CLOSED_MASK) >= 0; - if (checkClosed && ((newCount & STATUS_CLOSED_MASK) != 0)) { - throw new AsynchronousCloseException(); - } - } - - /** - * Return true if the file descriptor is currently open. - * - * @return True if the file descriptor is currently open. - */ - public boolean isOpen() { - return ((status.get() & STATUS_CLOSED_MASK) == 0); - } + /** + * Return true if the file descriptor is currently open. + * + * @return True if the file descriptor is currently open. + */ + public boolean isOpen() { + return status.isOpen(); + } /** * @return The socket path. @@ -296,29 +354,29 @@ public class DomainSocket implements Closeable { return channel; } - public static final int SND_BUF_SIZE = 1; - public static final int RCV_BUF_SIZE = 2; - public static final int SND_TIMEO = 3; - public static final int RCV_TIMEO = 4; + public static final int SEND_BUFFER_SIZE = 1; + public static final int RECEIVE_BUFFER_SIZE = 2; + public static final int SEND_TIMEOUT = 3; + public static final int RECEIVE_TIMEOUT = 4; private static native void setAttribute0(int fd, int type, int val) throws IOException; public void setAttribute(int type, int size) throws IOException { - fdRef(); + status.reference(); boolean exc = true; try { setAttribute0(fd, type, size); exc = false; } finally { - fdUnref(exc); + status.unreference(exc); } } private native int getAttribute0(int fd, int type) throws IOException; public int getAttribute(int type) throws IOException { - fdRef(); + status.reference(); int attribute; boolean exc = true; try { @@ -326,7 +384,7 @@ public class DomainSocket implements Closeable { exc = false; return attribute; } finally { - fdUnref(exc); + status.unreference(exc); } } @@ -343,20 +401,17 @@ public class DomainSocket implements Closeable { @Override public void close() throws IOException { // Set the closed bit on this DomainSocket - int bits; - while (true) { - bits = status.get(); - if ((bits & STATUS_CLOSED_MASK) != 0) { - return; // already closed - } - if (status.compareAndSet(bits, bits | STATUS_CLOSED_MASK)) { - break; - } + int refCount; + try { + refCount = status.setClosed(); + } catch (ClosedChannelException e) { + // Someone else already closed the DomainSocket. + return; } // Wait for all references to go away boolean didShutdown = false; boolean interrupted = false; - while ((bits & (~STATUS_CLOSED_MASK)) > 0) { + while (refCount > 0) { if (!didShutdown) { try { // Calling shutdown on the socket will interrupt blocking system @@ -373,60 +428,57 @@ public class DomainSocket implements Closeable { } catch (InterruptedException e) { interrupted = true; } - bits = status.get(); + refCount = status.getReferenceCount(); } - // Close the file descriptor. After this point, the file descriptor - // number will be reused by something else. Although this DomainSocket - // object continues to hold the old file descriptor number (it's a final - // field), we never use it again because we look at the closed bit and - // realize that this DomainSocket is not usable. + // At this point, nobody has a reference to the file descriptor, + // and nobody will be able to get one in the future either. + // We now call close(2) on the file descriptor. + // After this point, the file descriptor number will be reused by + // something else. Although this DomainSocket object continues to hold + // the old file descriptor number (it's a final field), we never use it + // again because this DomainSocket is closed. close0(fd); if (interrupted) { Thread.currentThread().interrupt(); } } - /* - * Clean up if the user forgets to close the socket. - */ - protected void finalize() throws IOException { - close(); - } - - private native static void sendFileDescriptors0(int fd, FileDescriptor jfds[], + private native static void sendFileDescriptors0(int fd, + FileDescriptor descriptors[], byte jbuf[], int offset, int length) throws IOException; /** * Send some FileDescriptor objects to the process on the other side of this * socket. * - * @param jfds The file descriptors to send. + * @param descriptors The file descriptors to send. * @param jbuf Some bytes to send. You must send at least * one byte. * @param offset The offset in the jbuf array to start at. * @param length Length of the jbuf array to use. */ - public void sendFileDescriptors(FileDescriptor jfds[], + public void sendFileDescriptors(FileDescriptor descriptors[], byte jbuf[], int offset, int length) throws IOException { - fdRef(); + status.reference(); boolean exc = true; try { - sendFileDescriptors0(fd, jfds, jbuf, offset, length); + sendFileDescriptors0(fd, descriptors, jbuf, offset, length); exc = false; } finally { - fdUnref(exc); + status.unreference(exc); } } - private static native int receiveFileDescriptors0(int fd, FileDescriptor[] jfds, + private static native int receiveFileDescriptors0(int fd, + FileDescriptor[] descriptors, byte jbuf[], int offset, int length) throws IOException; /** * Receive some FileDescriptor objects from the process on the other side of * this socket. * - * @param jfds (output parameter) Array of FileDescriptors. + * @param descriptors (output parameter) Array of FileDescriptors. * We will fill as many slots as possible with file * descriptors passed from the remote process. The * other slots will contain NULL. @@ -443,16 +495,16 @@ public class DomainSocket implements Closeable { * otherwise, it will be positive. * @throws IOException if there was an I/O error. */ - public int receiveFileDescriptors(FileDescriptor[] jfds, + public int receiveFileDescriptors(FileDescriptor[] descriptors, byte jbuf[], int offset, int length) throws IOException { - fdRef(); + status.reference(); boolean exc = true; try { - int nBytes = receiveFileDescriptors0(fd, jfds, jbuf, offset, length); + int nBytes = receiveFileDescriptors0(fd, descriptors, jbuf, offset, length); exc = false; return nBytes; } finally { - fdUnref(exc); + status.unreference(exc); } } @@ -462,44 +514,44 @@ public class DomainSocket implements Closeable { * * See {@link DomainSocket#recvFileInputStreams(ByteBuffer)} */ - public int recvFileInputStreams(FileInputStream[] fis, byte buf[], + public int recvFileInputStreams(FileInputStream[] streams, byte buf[], int offset, int length) throws IOException { - FileDescriptor fds[] = new FileDescriptor[fis.length]; + FileDescriptor descriptors[] = new FileDescriptor[streams.length]; boolean success = false; - for (int i = 0; i < fis.length; i++) { - fis[i] = null; + for (int i = 0; i < streams.length; i++) { + streams[i] = null; } - fdRef(); + status.reference(); try { - int ret = receiveFileDescriptors0(fd, fds, buf, offset, length); - for (int i = 0, j = 0; i < fds.length; i++) { - if (fds[i] != null) { - fis[j++] = new FileInputStream(fds[i]); - fds[i] = null; + int ret = receiveFileDescriptors0(fd, descriptors, buf, offset, length); + for (int i = 0, j = 0; i < descriptors.length; i++) { + if (descriptors[i] != null) { + streams[j++] = new FileInputStream(descriptors[i]); + descriptors[i] = null; } } success = true; return ret; } finally { if (!success) { - for (int i = 0; i < fds.length; i++) { - if (fds[i] != null) { + for (int i = 0; i < descriptors.length; i++) { + if (descriptors[i] != null) { try { - closeFileDescriptor0(fds[i]); + closeFileDescriptor0(descriptors[i]); } catch (Throwable t) { LOG.warn(t); } - } else if (fis[i] != null) { + } else if (streams[i] != null) { try { - fis[i].close(); + streams[i].close(); } catch (Throwable t) { LOG.warn(t); } finally { - fis[i] = null; } + streams[i] = null; } } } } - fdUnref(!success); + status.unreference(!success); } } @@ -523,7 +575,7 @@ public class DomainSocket implements Closeable { public class DomainInputStream extends InputStream { @Override public int read() throws IOException { - fdRef(); + status.reference(); boolean exc = true; try { byte b[] = new byte[1]; @@ -531,33 +583,33 @@ public class DomainSocket implements Closeable { exc = false; return (ret >= 0) ? b[0] : -1; } finally { - fdUnref(exc); + status.unreference(exc); } } @Override public int read(byte b[], int off, int len) throws IOException { - fdRef(); + status.reference(); boolean exc = true; try { int nRead = DomainSocket.readArray0(DomainSocket.this.fd, b, off, len); exc = false; return nRead; } finally { - fdUnref(exc); + status.unreference(exc); } } @Override public int available() throws IOException { - fdRef(); + status.reference(); boolean exc = true; try { int nAvailable = DomainSocket.available0(DomainSocket.this.fd); exc = false; return nAvailable; } finally { - fdUnref(exc); + status.unreference(exc); } } @@ -579,7 +631,7 @@ public class DomainSocket implements Closeable { @Override public void write(int val) throws IOException { - fdRef(); + status.reference(); boolean exc = true; try { byte b[] = new byte[1]; @@ -587,19 +639,19 @@ public class DomainSocket implements Closeable { DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1); exc = false; } finally { - fdUnref(exc); + status.unreference(exc); } } @Override public void write(byte[] b, int off, int len) throws IOException { - fdRef(); + status.reference(); boolean exc = true; try { DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len); exc = false; } finally { - fdUnref(exc); + status.unreference(exc); } } } @@ -618,7 +670,7 @@ public class DomainSocket implements Closeable { @Override public int read(ByteBuffer dst) throws IOException { - fdRef(); + status.reference(); boolean exc = true; try { int nread = 0; @@ -640,7 +692,7 @@ public class DomainSocket implements Closeable { exc = false; return nread; } finally { - fdUnref(exc); + status.unreference(exc); } } } diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c index b8056b654a9..1c2ea698a00 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c @@ -38,13 +38,13 @@ #include #include -#define SND_BUF_SIZE org_apache_hadoop_net_unix_DomainSocket_SND_BUF_SIZE -#define RCV_BUF_SIZE org_apache_hadoop_net_unix_DomainSocket_RCV_BUF_SIZE -#define SND_TIMEO org_apache_hadoop_net_unix_DomainSocket_SND_TIMEO -#define RCV_TIMEO org_apache_hadoop_net_unix_DomainSocket_RCV_TIMEO +#define SEND_BUFFER_SIZE org_apache_hadoop_net_unix_DomainSocket_SEND_BUFFER_SIZE +#define RECEIVE_BUFFER_SIZE org_apache_hadoop_net_unix_DomainSocket_RECEIVE_BUFFER_SIZE +#define SEND_TIMEOUT org_apache_hadoop_net_unix_DomainSocket_SEND_TIMEOUT +#define RECEIVE_TIMEOUT org_apache_hadoop_net_unix_DomainSocket_RECEIVE_TIMEOUT -#define DEFAULT_RCV_TIMEO 120000 -#define DEFAULT_SND_TIMEO 120000 +#define DEFAULT_RECEIVE_TIMEOUT 120000 +#define DEFAULT_SEND_TIMEOUT 120000 #define LISTEN_BACKLOG 128 /** @@ -391,8 +391,8 @@ JNIEnv *env, jclass clazz, jstring path) (*env)->Throw(env, jthr); return -1; } - if (((jthr = setAttribute0(env, fd, SND_TIMEO, DEFAULT_SND_TIMEO))) || - ((jthr = setAttribute0(env, fd, RCV_TIMEO, DEFAULT_RCV_TIMEO)))) { + if (((jthr = setAttribute0(env, fd, SEND_TIMEOUT, DEFAULT_SEND_TIMEOUT))) || + ((jthr = setAttribute0(env, fd, RECEIVE_TIMEOUT, DEFAULT_RECEIVE_TIMEOUT)))) { RETRY_ON_EINTR(ret, close(fd)); (*env)->Throw(env, jthr); return -1; @@ -412,7 +412,7 @@ static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val) int ret, buf; switch (type) { - case SND_BUF_SIZE: + case SEND_BUFFER_SIZE: buf = val; if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buf, sizeof(buf))) { ret = errno; @@ -420,7 +420,7 @@ static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val) "setsockopt(SO_SNDBUF) error: %s", terror(ret)); } return NULL; - case RCV_BUF_SIZE: + case RECEIVE_BUFFER_SIZE: buf = val; if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buf, sizeof(buf))) { ret = errno; @@ -428,7 +428,7 @@ static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val) "setsockopt(SO_RCVBUF) error: %s", terror(ret)); } return NULL; - case SND_TIMEO: + case SEND_TIMEOUT: javaMillisToTimeVal(val, &tv); if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (struct timeval *)&tv, sizeof(tv))) { @@ -437,7 +437,7 @@ static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val) "setsockopt(SO_SNDTIMEO) error: %s", terror(ret)); } return NULL; - case RCV_TIMEO: + case RECEIVE_TIMEOUT: javaMillisToTimeVal(val, &tv); if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (struct timeval *)&tv, sizeof(tv))) { @@ -487,7 +487,7 @@ JNIEnv *env, jclass clazz, jint fd, jint type) int ret, rval = 0; switch (type) { - case SND_BUF_SIZE: + case SEND_BUFFER_SIZE: len = sizeof(rval); if (getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &rval, &len)) { ret = errno; @@ -496,7 +496,7 @@ JNIEnv *env, jclass clazz, jint fd, jint type) return -1; } return getSockOptBufSizeToJavaBufSize(rval); - case RCV_BUF_SIZE: + case RECEIVE_BUFFER_SIZE: len = sizeof(rval); if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rval, &len)) { ret = errno; @@ -505,7 +505,7 @@ JNIEnv *env, jclass clazz, jint fd, jint type) return -1; } return getSockOptBufSizeToJavaBufSize(rval); - case SND_TIMEO: + case SEND_TIMEOUT: memset(&tv, 0, sizeof(tv)); len = sizeof(struct timeval); if (getsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, &len)) { @@ -515,7 +515,7 @@ JNIEnv *env, jclass clazz, jint fd, jint type) return -1; } return timeValToJavaMillis(&tv); - case RCV_TIMEO: + case RECEIVE_TIMEOUT: memset(&tv, 0, sizeof(tv)); len = sizeof(struct timeval); if (getsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, &len)) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java index 3347655cf6f..d512027d45d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java @@ -282,15 +282,15 @@ public class TestDomainSocket { DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH); try { // Let's set a new receive buffer size - int bufSize = serv.getAttribute(DomainSocket.RCV_BUF_SIZE); + int bufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); int newBufSize = bufSize / 2; - serv.setAttribute(DomainSocket.RCV_BUF_SIZE, newBufSize); - int nextBufSize = serv.getAttribute(DomainSocket.RCV_BUF_SIZE); + serv.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, newBufSize); + int nextBufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); Assert.assertEquals(newBufSize, nextBufSize); // Let's set a server timeout int newTimeout = 1000; - serv.setAttribute(DomainSocket.RCV_TIMEO, newTimeout); - int nextTimeout = serv.getAttribute(DomainSocket.RCV_TIMEO); + serv.setAttribute(DomainSocket.RECEIVE_TIMEOUT, newTimeout); + int nextTimeout = serv.getAttribute(DomainSocket.RECEIVE_TIMEOUT); Assert.assertEquals(newTimeout, nextTimeout); try { serv.accept(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt index 94b5c0ce949..343dcb18767 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt @@ -47,3 +47,5 @@ HDFS-4485. DN should chmod socket path a+w. (Colin Patrick McCabe via atm) HDFS-4453. Make a simple doc to describe the usage and design of the shortcircuit read feature. (Colin Patrick McCabe via atm) HDFS-4496. DFSClient: don't create a domain socket unless we need it (Colin Patrick McCabe via todd) + +HDFS-347: style cleanups (Colin Patrick McCabe via atm) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java index 0e2025bea21..a248f0b6718 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java @@ -114,7 +114,7 @@ class DomainSocketFactory { DomainSocket sock = null; try { sock = DomainSocket.connect(escapedPath); - sock.setAttribute(DomainSocket.RCV_TIMEO, conf.socketTimeout); + sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, conf.socketTimeout); success = true; } catch (IOException e) { LOG.warn("error creating DomainSocket", e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java index abe9b7d52df..46279b62e66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java @@ -50,12 +50,12 @@ public class DomainPeer implements Peer { @Override public void setReadTimeout(int timeoutMs) throws IOException { - socket.setAttribute(DomainSocket.RCV_TIMEO, timeoutMs); + socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs); } @Override public int getReceiveBufferSize() throws IOException { - return socket.getAttribute(DomainSocket.RCV_BUF_SIZE); + return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); } @Override @@ -66,7 +66,7 @@ public class DomainPeer implements Peer { @Override public void setWriteTimeout(int timeoutMs) throws IOException { - socket.setAttribute(DomainSocket.SND_TIMEO, timeoutMs); + socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java index bded892e8ac..dce64621482 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java @@ -48,7 +48,7 @@ public class DomainPeerServer implements PeerServer { @Override public void setReceiveBufferSize(int size) throws IOException { - sock.setAttribute(DomainSocket.RCV_BUF_SIZE, size); + sock.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, size); } @Override