diff --git a/CHANGES.txt b/CHANGES.txt index b5368dee9f6..5774b87e3cf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1569,6 +1569,9 @@ Release 0.21.0 - Unreleased despite failure at any level. (Contributed by Ravi Gummadi and Vinod Kumar Vavilapalli) + HADOOP-6723. Unchecked exceptions thrown in IPC Connection should not + orphan clients. (Todd Lipcon via tomwhite) + Release 0.20.3 - Unreleased NEW FEATURES diff --git a/src/java/org/apache/hadoop/ipc/Client.java b/src/java/org/apache/hadoop/ipc/Client.java index 06b3d0faaac..68d96e15b4d 100644 --- a/src/java/org/apache/hadoop/ipc/Client.java +++ b/src/java/org/apache/hadoop/ipc/Client.java @@ -602,8 +602,16 @@ public class Client { LOG.debug(getName() + ": starting, having connections " + connections.size()); - while (waitForWork()) {//wait here for work - read or close connection - receiveResponse(); + try { + while (waitForWork()) {//wait here for work - read or close connection + receiveResponse(); + } + } catch (Throwable t) { + // This truly is unexpected, since we catch IOException in receiveResponse + // -- this is only to be really sure that we don't leave a client hanging + // forever. + LOG.warn("Unexpected error reading responses on connection " + this, t); + markClosed(new IOException("Error reading responses", t)); } close(); diff --git a/src/test/core/org/apache/hadoop/ipc/TestIPC.java b/src/test/core/org/apache/hadoop/ipc/TestIPC.java index f41894df22e..f9b48e6a70d 100644 --- a/src/test/core/org/apache/hadoop/ipc/TestIPC.java +++ b/src/test/core/org/apache/hadoop/ipc/TestIPC.java @@ -249,6 +249,23 @@ public class TestIPC extends TestCase { throw new IOException(ERR_MSG); } } + + private static class LongRTEWritable extends LongWritable { + private final static String ERR_MSG = + "Come across an runtime exception while reading"; + + LongRTEWritable() {} + + LongRTEWritable(long longValue) { + super(longValue); + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + throw new RuntimeException(ERR_MSG); + } + } + public void testErrorClient() throws Exception { // start server Server server = new TestServer(1, false); @@ -268,6 +285,30 @@ public class TestIPC extends TestCase { assertEquals(LongErrorWritable.ERR_MSG, cause.getMessage()); } } + + public void testRuntimeExceptionWritable() throws Exception { + // start server + Server server = new TestServer(1, false); + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + + // start client + Client client = new Client(LongRTEWritable.class, conf); + try { + client.call(new LongRTEWritable(RANDOM.nextLong()), + addr, null, null); + fail("Expected an exception to have been thrown"); + } catch (IOException e) { + // check error + Throwable cause = e.getCause(); + assertTrue(cause instanceof IOException); + // it's double-wrapped + Throwable cause2 = cause.getCause(); + assertTrue(cause2 instanceof RuntimeException); + + assertEquals(LongRTEWritable.ERR_MSG, cause2.getMessage()); + } + } /** * Test that, if the socket factory throws an IOE, it properly propagates