From d93e7ca15d4f1ea89acca40956372f85da269f88 Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Fri, 12 Jul 2013 21:55:15 +0000 Subject: [PATCH] HADOOP-9703. org.apache.hadoop.ipc.Client leaks threads on stop (Tsuyoshi OZAWA via Colin Patrick McCabe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1502704 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../java/org/apache/hadoop/ipc/Client.java | 81 ++++++++++++++++--- .../java/org/apache/hadoop/ipc/TestIPC.java | 18 +++++ 3 files changed, 90 insertions(+), 12 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 4ddf3513df9..9c44d76718c 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -31,6 +31,9 @@ Release 2.2.0 - UNRELEASED HADOOP-9417. Support for symlink resolution in LocalFileSystem / RawLocalFileSystem. (Andrew Wang via Colin Patrick McCabe) + HADOOP-9703. org.apache.hadoop.ipc.Client leaks threads on stop. + (Tsuyoshi OZAWA vi Colin Patrick McCabe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index b624c78d517..6265c92b326 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -115,17 +115,70 @@ public class Client { final static int PING_CALL_ID = -1; /** - * Executor on which IPC calls' parameters are sent. Deferring - * the sending of parameters to a separate thread isolates them - * from thread interruptions in the calling code. + * Executor on which IPC calls' parameters are sent. + * Deferring the sending of parameters to a separate + * thread isolates them from thread interruptions in the + * calling code. */ - private static final ExecutorService SEND_PARAMS_EXECUTOR = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("IPC Parameter Sending Thread #%d") - .build()); + private final ExecutorService sendParamsExecutor; + private final static ClientExecutorServiceFactory clientExcecutorFactory = + new ClientExecutorServiceFactory(); + private static class ClientExecutorServiceFactory { + private int executorRefCount = 0; + private ExecutorService clientExecutor = null; + + /** + * Get Executor on which IPC calls' parameters are sent. + * If the internal reference counter is zero, this method + * creates the instance of Executor. If not, this method + * just returns the reference of clientExecutor. + * + * @return An ExecutorService instance + */ + synchronized ExecutorService refAndGetInstance() { + if (executorRefCount == 0) { + clientExecutor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("IPC Parameter Sending Thread #%d") + .build()); + } + executorRefCount++; + + return clientExecutor; + } + + /** + * Cleanup Executor on which IPC calls' parameters are sent. + * If reference counter is zero, this method discards the + * instance of the Executor. If not, this method + * just decrements the internal reference counter. + * + * @return An ExecutorService instance if it exists. + * Null is returned if not. + */ + synchronized ExecutorService unrefAndCleanup() { + executorRefCount--; + assert(executorRefCount >= 0); + + if (executorRefCount == 0) { + clientExecutor.shutdown(); + try { + if (!clientExecutor.awaitTermination(1, TimeUnit.MINUTES)) { + clientExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for clientExecutor" + + "to stop", e); + clientExecutor.shutdownNow(); + } + clientExecutor = null; + } + + return clientExecutor; + } + }; /** * set the ping interval value in configuration @@ -198,7 +251,7 @@ public class Client { synchronized boolean isZeroReference() { return refCount==0; } - + /** * Class that represents an RPC call */ @@ -878,7 +931,8 @@ public class Client { } // Serialize the call to be sent. This is done from the actual - // caller thread, rather than the SEND_PARAMS_EXECUTOR thread, + // caller thread, rather than the sendParamsExecutor thread, + // so that if the serialization throws an error, it is reported // properly. This also parallelizes the serialization. // @@ -895,7 +949,7 @@ public class Client { call.rpcRequest.write(d); synchronized (sendRpcRequestLock) { - Future senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() { + Future senderFuture = sendParamsExecutor.submit(new Runnable() { @Override public void run() { try { @@ -1090,6 +1144,7 @@ public class Client { CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); + this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance(); } /** @@ -1134,6 +1189,8 @@ public class Client { } catch (InterruptedException e) { } } + + clientExcecutorFactory.unrefAndCleanup(); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 24b20f89efc..6078ae121f3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -562,6 +562,24 @@ public class TestIPC { assertFalse(noChanged ^ serviceClass == serviceClass2); client.stop(); } + + @Test(timeout=30000, expected=IOException.class) + public void testIpcAfterStopping() throws IOException, InterruptedException { + // start server + Server server = new TestServer(5, false); + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + + // start client + Client client = new Client(LongWritable.class, conf); + client.call(new LongWritable(RANDOM.nextLong()), + addr, null, null, MIN_SLEEP_TIME, 0, conf); + client.stop(); + + // This call should throw IOException. + client.call(new LongWritable(RANDOM.nextLong()), + addr, null, null, MIN_SLEEP_TIME, 0, conf); + } /** * Check that file descriptors aren't leaked by starting