diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index d3d8c7be3bd..50106a7b1d0 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -41,6 +41,9 @@ Release 2.7.2 - UNRELEASED HADOOP-12006. Remove unimplemented option for `hadoop fs -ls` from document in branch-2.7. (Akira AJISAKA via ozawa) + HADOOP-12213. Interrupted exception can occur when Client#stop is called. + (Kuhu Shukla via ozawa) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 0b66cd63e9c..fc4855ee72f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -194,9 +194,10 @@ public class Client { clientExecutor.shutdownNow(); } } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for clientExecutor" + - "to stop", e); + LOG.warn("Interrupted while waiting for clientExecutor" + + " to stop"); clientExecutor.shutdownNow(); + Thread.currentThread().interrupt(); } clientExecutor = null; } @@ -253,6 +254,10 @@ public class Client { conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, timeout); } + @VisibleForTesting + public static final ExecutorService getClientExecutor() { + return Client.clientExcecutorFactory.clientExecutor; + } /** * Increment this client's reference count * diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index b44301127dd..7e31dc4d940 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -278,6 +278,8 @@ public class TestIPC { String causeText=cause.getMessage(); assertTrue("Did not find " + causeText + " in " + message, message.contains(causeText)); + } finally { + client.stop(); } } @@ -390,6 +392,7 @@ public class TestIPC { client.call(param, addr, null, null, 0, conf); } finally { + client.stop(); server.stop(); } } @@ -505,6 +508,8 @@ public class TestIPC { fail("Expected an exception to have been thrown"); } catch (IOException e) { assertTrue(e.getMessage().contains("Injected fault")); + } finally { + client.stop(); } } @@ -530,11 +535,11 @@ public class TestIPC { }).when(spyFactory).createSocket(); Server server = new TestServer(1, true); + Client client = new Client(LongWritable.class, conf, spyFactory); server.start(); try { // Call should fail due to injected exception. InetSocketAddress address = NetUtils.getConnectAddress(server); - Client client = new Client(LongWritable.class, conf, spyFactory); try { client.call(new LongWritable(RANDOM.nextLong()), address, null, null, 0, conf); @@ -551,6 +556,7 @@ public class TestIPC { client.call(new LongWritable(RANDOM.nextLong()), address, null, null, 0, conf); } finally { + client.stop(); server.stop(); } } @@ -575,6 +581,7 @@ public class TestIPC { // set timeout to be bigger than 3*ping interval client.call(new LongWritable(RANDOM.nextLong()), addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf); + client.stop(); } @Test(timeout=60000) @@ -595,6 +602,7 @@ public class TestIPC { } catch (SocketTimeoutException e) { LOG.info("Get a SocketTimeoutException ", e); } + client.stop(); } /** @@ -825,6 +833,8 @@ public class TestIPC { } catch (IOException e) { LOG.error(e); } catch (InterruptedException e) { + } finally { + client.stop(); } } }); @@ -926,6 +936,31 @@ public class TestIPC { endFds - startFds < 20); } + /** + * Check if Client is interrupted after handling + * InterruptedException during cleanup + */ + @Test(timeout=30000) + public void testInterrupted() { + Client client = new Client(LongWritable.class, conf); + client.getClientExecutor().submit(new Runnable() { + public void run() { + while(true); + } + }); + Thread.currentThread().interrupt(); + client.stop(); + try { + assertTrue(Thread.currentThread().isInterrupted()); + LOG.info("Expected thread interrupt during client cleanup"); + } catch (AssertionError e) { + LOG.error("The Client did not interrupt after handling an Interrupted Exception"); + Assert.fail("The Client did not interrupt after handling an Interrupted Exception"); + } + // Clear Thread interrupt + Thread.currentThread().interrupted(); + } + private long countOpenFileDescriptors() { return FD_DIR.list().length; } @@ -1255,6 +1290,7 @@ public class TestIPC { Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries)) .createSocket(); } + client.stop(); } private void doIpcVersionTest(