diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 3e2cdb3c2e5..769e5569a2f 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -635,6 +635,9 @@ Release 2.7.2 - UNRELEASED HADOOP-10365. BufferedOutputStream in FileUtil#unpackEntries() should be closed in finally block. (Kiran Kumar M R and Sanghyun Yun via ozawa) + HADOOP-12213. Interrupted exception can occur when Client#stop is called. + (Kuhu Shukla via ozawa) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 8062dd60008..9c84773dca7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -197,9 +197,10 @@ public class Client { clientExecutor.shutdownNow(); } } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for clientExecutor" + - "to stop", e); + LOG.warn("Interrupted while waiting for clientExecutor" + + " to stop"); clientExecutor.shutdownNow(); + Thread.currentThread().interrupt(); } clientExecutor = null; } @@ -256,6 +257,10 @@ public class Client { conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, timeout); } + @VisibleForTesting + public static final ExecutorService getClientExecutor() { + return Client.clientExcecutorFactory.clientExecutor; + } /** * Increment this client's reference count * diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 08508aee7a9..4e2e2f13f13 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -304,6 +304,8 @@ public class TestIPC { String causeText=cause.getMessage(); assertTrue("Did not find " + causeText + " in " + message, message.contains(causeText)); + } finally { + client.stop(); } } @@ -416,6 +418,7 @@ public class TestIPC { client.call(param, addr, null, null, 0, conf); } finally { + client.stop(); server.stop(); } } @@ -531,6 +534,8 @@ public class TestIPC { fail("Expected an exception to have been thrown"); } catch (IOException e) { assertTrue(e.getMessage().contains("Injected fault")); + } finally { + client.stop(); } } @@ -556,11 +561,11 @@ public class TestIPC { }).when(spyFactory).createSocket(); Server server = new TestServer(1, true); + Client client = new Client(LongWritable.class, conf, spyFactory); server.start(); try { // Call should fail due to injected exception. InetSocketAddress address = NetUtils.getConnectAddress(server); - Client client = new Client(LongWritable.class, conf, spyFactory); try { client.call(new LongWritable(RANDOM.nextLong()), address, null, null, 0, conf); @@ -577,6 +582,7 @@ public class TestIPC { client.call(new LongWritable(RANDOM.nextLong()), address, null, null, 0, conf); } finally { + client.stop(); server.stop(); } } @@ -601,6 +607,7 @@ public class TestIPC { // set timeout to be bigger than 3*ping interval client.call(new LongWritable(RANDOM.nextLong()), addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf); + client.stop(); } @Test(timeout=60000) @@ -621,6 +628,7 @@ public class TestIPC { } catch (SocketTimeoutException e) { LOG.info("Get a SocketTimeoutException ", e); } + client.stop(); } /** @@ -851,6 +859,8 @@ public class TestIPC { } catch (IOException e) { LOG.error(e); } catch (InterruptedException e) { + } finally { + client.stop(); } } }); @@ -952,6 +962,31 @@ public class TestIPC { endFds - startFds < 20); } + /** + * Check if Client is interrupted after handling + * InterruptedException during cleanup + */ + @Test(timeout=30000) + public void testInterrupted() { + Client client = new Client(LongWritable.class, conf); + client.getClientExecutor().submit(new Runnable() { + public void run() { + while(true); + } + }); + Thread.currentThread().interrupt(); + client.stop(); + try { + assertTrue(Thread.currentThread().isInterrupted()); + LOG.info("Expected thread interrupt during client cleanup"); + } catch (AssertionError e) { + LOG.error("The Client did not interrupt after handling an Interrupted Exception"); + Assert.fail("The Client did not interrupt after handling an Interrupted Exception"); + } + // Clear Thread interrupt + Thread.currentThread().interrupted(); + } + private long countOpenFileDescriptors() { return FD_DIR.list().length; } @@ -1315,6 +1350,7 @@ public class TestIPC { Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries)) .createSocket(); } + client.stop(); } private void doIpcVersionTest(