From 6f8ee865debd830d2b800de88dd150fc049d1e42 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Sat, 12 Jan 2013 00:12:30 +0000 Subject: [PATCH] HDFS-4388. DomainSocket should throw AsynchronousCloseException when appropriate. Contributed by Colin Patrick McCabe. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-347@1432339 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/net/unix/DomainSocket.java | 75 +++++++--- .../hadoop/net/unix/TestDomainSocket.java | 139 +++++++++++++++++- .../hadoop-hdfs/CHANGES.HDFS-347.txt | 3 + 3 files changed, 193 insertions(+), 24 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java index b1f8749f581..9c37db3d4fb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java @@ -25,6 +25,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.SocketException; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadableByteChannel; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; @@ -207,10 +209,13 @@ public class DomainSocket implements Closeable { */ public DomainSocket accept() throws IOException { fdRef(); + boolean exc = true; try { - return new DomainSocket(path, accept0(fd)); + DomainSocket ret = new DomainSocket(path, accept0(fd)); + exc = false; + return ret; } finally { - fdUnref(); + fdUnref(exc); } } @@ -235,20 +240,23 @@ public class DomainSocket implements Closeable { * * @throws SocketException If the file descriptor is closed. */ - private void fdRef() throws SocketException { + private void fdRef() throws ClosedChannelException { int bits = status.incrementAndGet(); if ((bits & STATUS_CLOSED_MASK) != 0) { status.decrementAndGet(); - throw new SocketException("Socket is closed."); + throw new ClosedChannelException(); } } /** * Decrement the reference count of the underlying file descriptor. */ - private void fdUnref() { + private void fdUnref(boolean checkClosed) throws AsynchronousCloseException { int newCount = status.decrementAndGet(); - assert newCount >= 0; + assert (newCount & ~STATUS_CLOSED_MASK) >= 0; + if (checkClosed & ((newCount & STATUS_CLOSED_MASK) != 0)) { + throw new AsynchronousCloseException(); + } } /** @@ -298,10 +306,12 @@ public class DomainSocket implements Closeable { public void setAttribute(int type, int size) throws IOException { fdRef(); + boolean exc = true; try { setAttribute0(fd, type, size); + exc = false; } finally { - fdUnref(); + fdUnref(exc); } } @@ -309,10 +319,14 @@ public class DomainSocket implements Closeable { public int getAttribute(int type) throws IOException { fdRef(); + int attribute; + boolean exc = true; try { - return getAttribute0(fd, type); + attribute = getAttribute0(fd, type); + exc = false; + return attribute; } finally { - fdUnref(); + fdUnref(exc); } } @@ -396,10 +410,12 @@ public class DomainSocket implements Closeable { public void sendFileDescriptors(FileDescriptor jfds[], byte jbuf[], int offset, int length) throws IOException { fdRef(); + boolean exc = true; try { sendFileDescriptors0(fd, jfds, jbuf, offset, length); + exc = false; } finally { - fdUnref(); + fdUnref(exc); } } @@ -430,10 +446,13 @@ public class DomainSocket implements Closeable { public int receiveFileDescriptors(FileDescriptor[] jfds, byte jbuf[], int offset, int length) throws IOException { fdRef(); + boolean exc = true; try { - return receiveFileDescriptors0(fd, jfds, jbuf, offset, length); + int nBytes = receiveFileDescriptors0(fd, jfds, jbuf, offset, length); + exc = false; + return nBytes; } finally { - fdUnref(); + fdUnref(exc); } } @@ -462,7 +481,6 @@ public class DomainSocket implements Closeable { success = true; return ret; } finally { - fdUnref(); if (!success) { for (int i = 0; i < fds.length; i++) { if (fds[i] != null) { @@ -481,6 +499,7 @@ public class DomainSocket implements Closeable { } } } + fdUnref(!success); } } @@ -505,32 +524,40 @@ public class DomainSocket implements Closeable { @Override public int read() throws IOException { fdRef(); + boolean exc = true; try { byte b[] = new byte[1]; int ret = DomainSocket.readArray0(DomainSocket.this.fd, b, 0, 1); + exc = false; return (ret >= 0) ? b[0] : -1; } finally { - fdUnref(); + fdUnref(exc); } } @Override public int read(byte b[], int off, int len) throws IOException { fdRef(); + boolean exc = true; try { - return DomainSocket.readArray0(DomainSocket.this.fd, b, off, len); + int nRead = DomainSocket.readArray0(DomainSocket.this.fd, b, off, len); + exc = false; + return nRead; } finally { - fdUnref(); + fdUnref(exc); } } @Override public int available() throws IOException { fdRef(); + boolean exc = true; try { - return DomainSocket.available0(DomainSocket.this.fd); + int nAvailable = DomainSocket.available0(DomainSocket.this.fd); + exc = false; + return nAvailable; } finally { - fdUnref(); + fdUnref(exc); } } @@ -553,22 +580,26 @@ public class DomainSocket implements Closeable { @Override public void write(int val) throws IOException { fdRef(); + boolean exc = true; try { byte b[] = new byte[1]; b[0] = (byte)val; DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1); + exc = false; } finally { - fdUnref(); + fdUnref(exc); } } @Override public void write(byte[] b, int off, int len) throws IOException { fdRef(); + boolean exc = true; try { DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len); + exc = false; } finally { - fdUnref(); + fdUnref(exc); } } } @@ -588,6 +619,7 @@ public class DomainSocket implements Closeable { @Override public int read(ByteBuffer dst) throws IOException { fdRef(); + boolean exc = true; try { int nread = 0; if (dst.isDirect()) { @@ -605,9 +637,10 @@ public class DomainSocket implements Closeable { if (nread > 0) { dst.position(dst.position() + nread); } + exc = false; return nread; } finally { - fdUnref(); + fdUnref(exc); } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java index ab293ff5138..3bba0b1e256 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java @@ -25,6 +25,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.ClosedChannelException; import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; @@ -96,8 +98,47 @@ public class TestDomainSocket { } /** - * Test that if one thread is blocking in accept(), another thread - * can close the socket and stop the accept. + * Test that we get a read result of -1 on EOF. + * + * @throws IOException + */ + @Test(timeout=180000) + public void testSocketReadEof() throws Exception { + final String TEST_PATH = new File(sockDir.getDir(), + "testSocketReadEof").getAbsolutePath(); + final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH); + ExecutorService exeServ = Executors.newSingleThreadExecutor(); + Callable callable = new Callable() { + public Void call(){ + DomainSocket conn; + try { + conn = serv.accept(); + } catch (IOException e) { + throw new RuntimeException("unexpected IOException", e); + } + byte buf[] = new byte[100]; + for (int i = 0; i < buf.length; i++) { + buf[i] = 0; + } + try { + Assert.assertEquals(-1, conn.getInputStream().read()); + } catch (IOException e) { + throw new RuntimeException("unexpected IOException", e); + } + return null; + } + }; + Future future = exeServ.submit(callable); + DomainSocket conn = DomainSocket.connect(serv.getPath()); + Thread.sleep(50); + conn.close(); + serv.close(); + future.get(2, TimeUnit.MINUTES); + } + + /** + * Test that if one thread is blocking in a read or write operation, another + * thread can close the socket and stop the accept. * * @throws IOException */ @@ -113,8 +154,10 @@ public class TestDomainSocket { serv.accept(); throw new RuntimeException("expected the accept() to be " + "interrupted and fail"); - } catch (IOException e) { + } catch (AsynchronousCloseException e) { return null; + } catch (IOException e) { + throw new RuntimeException("unexpected IOException", e); } } }; @@ -124,6 +167,96 @@ public class TestDomainSocket { future.get(2, TimeUnit.MINUTES); } + /** + * Test that we get an AsynchronousCloseException when the DomainSocket + * we're using is closed during a read or write operation. + * + * @throws IOException + */ + private void testAsyncCloseDuringIO(final boolean closeDuringWrite) + throws Exception { + final String TEST_PATH = new File(sockDir.getDir(), + "testAsyncCloseDuringIO(" + closeDuringWrite + ")").getAbsolutePath(); + final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH); + ExecutorService exeServ = Executors.newFixedThreadPool(2); + Callable serverCallable = new Callable() { + public Void call() { + DomainSocket serverConn = null; + try { + serverConn = serv.accept(); + byte buf[] = new byte[100]; + for (int i = 0; i < buf.length; i++) { + buf[i] = 0; + } + // The server just continues either writing or reading until someone + // asynchronously closes the client's socket. At that point, all our + // reads return EOF, and writes get a socket error. + if (closeDuringWrite) { + try { + while (true) { + serverConn.getOutputStream().write(buf); + } + } catch (IOException e) { + } + } else { + do { ; } while + (serverConn.getInputStream().read(buf, 0, buf.length) != -1); + } + } catch (IOException e) { + throw new RuntimeException("unexpected IOException", e); + } finally { + IOUtils.cleanup(DomainSocket.LOG, serverConn); + } + return null; + } + }; + Future serverFuture = exeServ.submit(serverCallable); + final DomainSocket clientConn = DomainSocket.connect(serv.getPath()); + Callable clientCallable = new Callable() { + public Void call(){ + // The client writes or reads until another thread + // asynchronously closes the socket. At that point, we should + // get ClosedChannelException, or possibly its subclass + // AsynchronousCloseException. + byte buf[] = new byte[100]; + for (int i = 0; i < buf.length; i++) { + buf[i] = 0; + } + try { + if (closeDuringWrite) { + while (true) { + clientConn.getOutputStream().write(buf); + } + } else { + while (true) { + clientConn.getInputStream().read(buf, 0, buf.length); + } + } + } catch (ClosedChannelException e) { + return null; + } catch (IOException e) { + throw new RuntimeException("unexpected IOException", e); + } + } + }; + Future clientFuture = exeServ.submit(clientCallable); + Thread.sleep(500); + clientConn.close(); + serv.close(); + clientFuture.get(2, TimeUnit.MINUTES); + serverFuture.get(2, TimeUnit.MINUTES); + } + + @Test(timeout=180000) + public void testAsyncCloseDuringWrite() throws Exception { + testAsyncCloseDuringIO(true); + } + + @Test(timeout=180000) + public void testAsyncCloseDuringRead() throws Exception { + testAsyncCloseDuringIO(false); + } + /** * Test that attempting to connect to an invalid path doesn't work. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt index af90e837028..b5dce88312b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt @@ -10,3 +10,6 @@ HDFS-4354. Create DomainSocket and DomainPeer and associated unit tests. HDFS-4356. BlockReaderLocal should use passed file descriptors rather than paths. (Colin Patrick McCabe via todd) + +HDFS-4388. DomainSocket should throw AsynchronousCloseException when appropriate. +(Colin Patrick McCabe via todd)