diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 3bb21f8c438..6c7f59abde8 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -305,6 +305,9 @@ Release 2.2.0 - UNRELEASED HADOOP-9417. Support for symlink resolution in LocalFileSystem / RawLocalFileSystem. (Andrew Wang via Colin Patrick McCabe) + HADOOP-9703. org.apache.hadoop.ipc.Client leaks threads on stop. + (Tsuyoshi OZAWA vi Colin Patrick McCabe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 7d66066d103..377c70bf08b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -118,17 +118,70 @@ public class Client { private final byte[] clientId; /** - * Executor on which IPC calls' parameters are sent. Deferring - * the sending of parameters to a separate thread isolates them - * from thread interruptions in the calling code. + * Executor on which IPC calls' parameters are sent. + * Deferring the sending of parameters to a separate + * thread isolates them from thread interruptions in the + * calling code. */ - private static final ExecutorService SEND_PARAMS_EXECUTOR = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("IPC Parameter Sending Thread #%d") - .build()); + private final ExecutorService sendParamsExecutor; + private final static ClientExecutorServiceFactory clientExcecutorFactory = + new ClientExecutorServiceFactory(); + private static class ClientExecutorServiceFactory { + private int executorRefCount = 0; + private ExecutorService clientExecutor = null; + + /** + * Get Executor on which IPC calls' parameters are sent. + * If the internal reference counter is zero, this method + * creates the instance of Executor. If not, this method + * just returns the reference of clientExecutor. + * + * @return An ExecutorService instance + */ + synchronized ExecutorService refAndGetInstance() { + if (executorRefCount == 0) { + clientExecutor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("IPC Parameter Sending Thread #%d") + .build()); + } + executorRefCount++; + + return clientExecutor; + } + + /** + * Cleanup Executor on which IPC calls' parameters are sent. + * If reference counter is zero, this method discards the + * instance of the Executor. If not, this method + * just decrements the internal reference counter. + * + * @return An ExecutorService instance if it exists. + * Null is returned if not. + */ + synchronized ExecutorService unrefAndCleanup() { + executorRefCount--; + assert(executorRefCount >= 0); + + if (executorRefCount == 0) { + clientExecutor.shutdown(); + try { + if (!clientExecutor.awaitTermination(1, TimeUnit.MINUTES)) { + clientExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for clientExecutor" + + "to stop", e); + clientExecutor.shutdownNow(); + } + clientExecutor = null; + } + + return clientExecutor; + } + }; /** * set the ping interval value in configuration @@ -201,7 +254,7 @@ public class Client { synchronized boolean isZeroReference() { return refCount==0; } - + /** * Class that represents an RPC call */ @@ -879,7 +932,8 @@ public class Client { } // Serialize the call to be sent. This is done from the actual - // caller thread, rather than the SEND_PARAMS_EXECUTOR thread, + // caller thread, rather than the sendParamsExecutor thread, + // so that if the serialization throws an error, it is reported // properly. This also parallelizes the serialization. // @@ -896,7 +950,7 @@ public class Client { call.rpcRequest.write(d); synchronized (sendRpcRequestLock) { - Future senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() { + Future senderFuture = sendParamsExecutor.submit(new Runnable() { @Override public void run() { try { @@ -1092,6 +1146,7 @@ public class Client { this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); this.clientId = StringUtils.getUuidBytes(); + this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance(); } /** @@ -1136,6 +1191,8 @@ public class Client { } catch (InterruptedException e) { } } + + clientExcecutorFactory.unrefAndCleanup(); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 535897afcff..3e24c38da55 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -572,6 +572,24 @@ public class TestIPC { assertFalse(noChanged ^ serviceClass == serviceClass2); client.stop(); } + + @Test(timeout=30000, expected=IOException.class) + public void testIpcAfterStopping() throws IOException, InterruptedException { + // start server + Server server = new TestServer(5, false); + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + + // start client + Client client = new Client(LongWritable.class, conf); + client.call(new LongWritable(RANDOM.nextLong()), + addr, null, null, MIN_SLEEP_TIME, 0, conf); + client.stop(); + + // This call should throw IOException. + client.call(new LongWritable(RANDOM.nextLong()), + addr, null, null, MIN_SLEEP_TIME, 0, conf); + } /** * Check that file descriptors aren't leaked by starting