HDFS-4388. DomainSocket should throw AsynchronousCloseException when appropriate. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-347@1432339 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2013-01-12 00:12:30 +00:00
parent 9a4030e0e8
commit 6f8ee865de
3 changed files with 193 additions and 24 deletions

View File

@ -25,6 +25,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
@ -207,10 +209,13 @@ public class DomainSocket implements Closeable {
*/
public DomainSocket accept() throws IOException {
fdRef();
boolean exc = true;
try {
return new DomainSocket(path, accept0(fd));
DomainSocket ret = new DomainSocket(path, accept0(fd));
exc = false;
return ret;
} finally {
fdUnref();
fdUnref(exc);
}
}
@ -235,20 +240,23 @@ public class DomainSocket implements Closeable {
*
* @throws SocketException If the file descriptor is closed.
*/
private void fdRef() throws SocketException {
private void fdRef() throws ClosedChannelException {
int bits = status.incrementAndGet();
if ((bits & STATUS_CLOSED_MASK) != 0) {
status.decrementAndGet();
throw new SocketException("Socket is closed.");
throw new ClosedChannelException();
}
}
/**
* Decrement the reference count of the underlying file descriptor.
*/
private void fdUnref() {
private void fdUnref(boolean checkClosed) throws AsynchronousCloseException {
int newCount = status.decrementAndGet();
assert newCount >= 0;
assert (newCount & ~STATUS_CLOSED_MASK) >= 0;
if (checkClosed & ((newCount & STATUS_CLOSED_MASK) != 0)) {
throw new AsynchronousCloseException();
}
}
/**
@ -298,10 +306,12 @@ public class DomainSocket implements Closeable {
public void setAttribute(int type, int size) throws IOException {
fdRef();
boolean exc = true;
try {
setAttribute0(fd, type, size);
exc = false;
} finally {
fdUnref();
fdUnref(exc);
}
}
@ -309,10 +319,14 @@ public class DomainSocket implements Closeable {
public int getAttribute(int type) throws IOException {
fdRef();
int attribute;
boolean exc = true;
try {
return getAttribute0(fd, type);
attribute = getAttribute0(fd, type);
exc = false;
return attribute;
} finally {
fdUnref();
fdUnref(exc);
}
}
@ -396,10 +410,12 @@ public class DomainSocket implements Closeable {
public void sendFileDescriptors(FileDescriptor jfds[],
byte jbuf[], int offset, int length) throws IOException {
fdRef();
boolean exc = true;
try {
sendFileDescriptors0(fd, jfds, jbuf, offset, length);
exc = false;
} finally {
fdUnref();
fdUnref(exc);
}
}
@ -430,10 +446,13 @@ public class DomainSocket implements Closeable {
public int receiveFileDescriptors(FileDescriptor[] jfds,
byte jbuf[], int offset, int length) throws IOException {
fdRef();
boolean exc = true;
try {
return receiveFileDescriptors0(fd, jfds, jbuf, offset, length);
int nBytes = receiveFileDescriptors0(fd, jfds, jbuf, offset, length);
exc = false;
return nBytes;
} finally {
fdUnref();
fdUnref(exc);
}
}
@ -462,7 +481,6 @@ public class DomainSocket implements Closeable {
success = true;
return ret;
} finally {
fdUnref();
if (!success) {
for (int i = 0; i < fds.length; i++) {
if (fds[i] != null) {
@ -481,6 +499,7 @@ public class DomainSocket implements Closeable {
}
}
}
fdUnref(!success);
}
}
@ -505,32 +524,40 @@ public class DomainSocket implements Closeable {
@Override
public int read() throws IOException {
fdRef();
boolean exc = true;
try {
byte b[] = new byte[1];
int ret = DomainSocket.readArray0(DomainSocket.this.fd, b, 0, 1);
exc = false;
return (ret >= 0) ? b[0] : -1;
} finally {
fdUnref();
fdUnref(exc);
}
}
@Override
public int read(byte b[], int off, int len) throws IOException {
fdRef();
boolean exc = true;
try {
return DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
int nRead = DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
exc = false;
return nRead;
} finally {
fdUnref();
fdUnref(exc);
}
}
@Override
public int available() throws IOException {
fdRef();
boolean exc = true;
try {
return DomainSocket.available0(DomainSocket.this.fd);
int nAvailable = DomainSocket.available0(DomainSocket.this.fd);
exc = false;
return nAvailable;
} finally {
fdUnref();
fdUnref(exc);
}
}
@ -553,22 +580,26 @@ public class DomainSocket implements Closeable {
@Override
public void write(int val) throws IOException {
fdRef();
boolean exc = true;
try {
byte b[] = new byte[1];
b[0] = (byte)val;
DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1);
exc = false;
} finally {
fdUnref();
fdUnref(exc);
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
fdRef();
boolean exc = true;
try {
DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len);
exc = false;
} finally {
fdUnref();
fdUnref(exc);
}
}
}
@ -588,6 +619,7 @@ public class DomainSocket implements Closeable {
@Override
public int read(ByteBuffer dst) throws IOException {
fdRef();
boolean exc = true;
try {
int nread = 0;
if (dst.isDirect()) {
@ -605,9 +637,10 @@ public class DomainSocket implements Closeable {
if (nread > 0) {
dst.position(dst.position() + nread);
}
exc = false;
return nread;
} finally {
fdUnref();
fdUnref(exc);
}
}
}

View File

@ -25,6 +25,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
@ -96,8 +98,47 @@ public class TestDomainSocket {
}
/**
* Test that if one thread is blocking in accept(), another thread
* can close the socket and stop the accept.
* Test that we get a read result of -1 on EOF.
*
* @throws IOException
*/
@Test(timeout=180000)
public void testSocketReadEof() throws Exception {
final String TEST_PATH = new File(sockDir.getDir(),
"testSocketReadEof").getAbsolutePath();
final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
ExecutorService exeServ = Executors.newSingleThreadExecutor();
Callable<Void> callable = new Callable<Void>() {
public Void call(){
DomainSocket conn;
try {
conn = serv.accept();
} catch (IOException e) {
throw new RuntimeException("unexpected IOException", e);
}
byte buf[] = new byte[100];
for (int i = 0; i < buf.length; i++) {
buf[i] = 0;
}
try {
Assert.assertEquals(-1, conn.getInputStream().read());
} catch (IOException e) {
throw new RuntimeException("unexpected IOException", e);
}
return null;
}
};
Future<Void> future = exeServ.submit(callable);
DomainSocket conn = DomainSocket.connect(serv.getPath());
Thread.sleep(50);
conn.close();
serv.close();
future.get(2, TimeUnit.MINUTES);
}
/**
* Test that if one thread is blocking in a read or write operation, another
* thread can close the socket and stop the accept.
*
* @throws IOException
*/
@ -113,8 +154,10 @@ public class TestDomainSocket {
serv.accept();
throw new RuntimeException("expected the accept() to be " +
"interrupted and fail");
} catch (IOException e) {
} catch (AsynchronousCloseException e) {
return null;
} catch (IOException e) {
throw new RuntimeException("unexpected IOException", e);
}
}
};
@ -124,6 +167,96 @@ public class TestDomainSocket {
future.get(2, TimeUnit.MINUTES);
}
/**
* Test that we get an AsynchronousCloseException when the DomainSocket
* we're using is closed during a read or write operation.
*
* @throws IOException
*/
private void testAsyncCloseDuringIO(final boolean closeDuringWrite)
throws Exception {
final String TEST_PATH = new File(sockDir.getDir(),
"testAsyncCloseDuringIO(" + closeDuringWrite + ")").getAbsolutePath();
final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
ExecutorService exeServ = Executors.newFixedThreadPool(2);
Callable<Void> serverCallable = new Callable<Void>() {
public Void call() {
DomainSocket serverConn = null;
try {
serverConn = serv.accept();
byte buf[] = new byte[100];
for (int i = 0; i < buf.length; i++) {
buf[i] = 0;
}
// The server just continues either writing or reading until someone
// asynchronously closes the client's socket. At that point, all our
// reads return EOF, and writes get a socket error.
if (closeDuringWrite) {
try {
while (true) {
serverConn.getOutputStream().write(buf);
}
} catch (IOException e) {
}
} else {
do { ; } while
(serverConn.getInputStream().read(buf, 0, buf.length) != -1);
}
} catch (IOException e) {
throw new RuntimeException("unexpected IOException", e);
} finally {
IOUtils.cleanup(DomainSocket.LOG, serverConn);
}
return null;
}
};
Future<Void> serverFuture = exeServ.submit(serverCallable);
final DomainSocket clientConn = DomainSocket.connect(serv.getPath());
Callable<Void> clientCallable = new Callable<Void>() {
public Void call(){
// The client writes or reads until another thread
// asynchronously closes the socket. At that point, we should
// get ClosedChannelException, or possibly its subclass
// AsynchronousCloseException.
byte buf[] = new byte[100];
for (int i = 0; i < buf.length; i++) {
buf[i] = 0;
}
try {
if (closeDuringWrite) {
while (true) {
clientConn.getOutputStream().write(buf);
}
} else {
while (true) {
clientConn.getInputStream().read(buf, 0, buf.length);
}
}
} catch (ClosedChannelException e) {
return null;
} catch (IOException e) {
throw new RuntimeException("unexpected IOException", e);
}
}
};
Future<Void> clientFuture = exeServ.submit(clientCallable);
Thread.sleep(500);
clientConn.close();
serv.close();
clientFuture.get(2, TimeUnit.MINUTES);
serverFuture.get(2, TimeUnit.MINUTES);
}
@Test(timeout=180000)
public void testAsyncCloseDuringWrite() throws Exception {
testAsyncCloseDuringIO(true);
}
@Test(timeout=180000)
public void testAsyncCloseDuringRead() throws Exception {
testAsyncCloseDuringIO(false);
}
/**
* Test that attempting to connect to an invalid path doesn't work.
*

View File

@ -10,3 +10,6 @@ HDFS-4354. Create DomainSocket and DomainPeer and associated unit tests.
HDFS-4356. BlockReaderLocal should use passed file descriptors rather than paths.
(Colin Patrick McCabe via todd)
HDFS-4388. DomainSocket should throw AsynchronousCloseException when appropriate.
(Colin Patrick McCabe via todd)