HADOOP-9703. org.apache.hadoop.ipc.Client leaks threads on stop (Tsuyoshi OZAWA via Colin Patrick McCabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1502704 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-07-12 21:55:15 +00:00
parent ce5acc180b
commit d93e7ca15d
3 changed files with 90 additions and 12 deletions

View File

@ -31,6 +31,9 @@ Release 2.2.0 - UNRELEASED
HADOOP-9417. Support for symlink resolution in LocalFileSystem /
RawLocalFileSystem. (Andrew Wang via Colin Patrick McCabe)
HADOOP-9703. org.apache.hadoop.ipc.Client leaks threads on stop.
(Tsuyoshi OZAWA vi Colin Patrick McCabe)
OPTIMIZATIONS
BUG FIXES

View File

@ -115,17 +115,70 @@ public class Client {
final static int PING_CALL_ID = -1;
/**
* Executor on which IPC calls' parameters are sent. Deferring
* the sending of parameters to a separate thread isolates them
* from thread interruptions in the calling code.
* Executor on which IPC calls' parameters are sent.
* Deferring the sending of parameters to a separate
* thread isolates them from thread interruptions in the
* calling code.
*/
private static final ExecutorService SEND_PARAMS_EXECUTOR =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("IPC Parameter Sending Thread #%d")
.build());
private final ExecutorService sendParamsExecutor;
private final static ClientExecutorServiceFactory clientExcecutorFactory =
new ClientExecutorServiceFactory();
private static class ClientExecutorServiceFactory {
private int executorRefCount = 0;
private ExecutorService clientExecutor = null;
/**
* Get Executor on which IPC calls' parameters are sent.
* If the internal reference counter is zero, this method
* creates the instance of Executor. If not, this method
* just returns the reference of clientExecutor.
*
* @return An ExecutorService instance
*/
synchronized ExecutorService refAndGetInstance() {
if (executorRefCount == 0) {
clientExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("IPC Parameter Sending Thread #%d")
.build());
}
executorRefCount++;
return clientExecutor;
}
/**
* Cleanup Executor on which IPC calls' parameters are sent.
* If reference counter is zero, this method discards the
* instance of the Executor. If not, this method
* just decrements the internal reference counter.
*
* @return An ExecutorService instance if it exists.
* Null is returned if not.
*/
synchronized ExecutorService unrefAndCleanup() {
executorRefCount--;
assert(executorRefCount >= 0);
if (executorRefCount == 0) {
clientExecutor.shutdown();
try {
if (!clientExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
clientExecutor.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for clientExecutor" +
"to stop", e);
clientExecutor.shutdownNow();
}
clientExecutor = null;
}
return clientExecutor;
}
};
/**
* set the ping interval value in configuration
@ -198,7 +251,7 @@ public class Client {
synchronized boolean isZeroReference() {
return refCount==0;
}
/**
* Class that represents an RPC call
*/
@ -878,7 +931,8 @@ public class Client {
}
// Serialize the call to be sent. This is done from the actual
// caller thread, rather than the SEND_PARAMS_EXECUTOR thread,
// caller thread, rather than the sendParamsExecutor thread,
// so that if the serialization throws an error, it is reported
// properly. This also parallelizes the serialization.
//
@ -895,7 +949,7 @@ public class Client {
call.rpcRequest.write(d);
synchronized (sendRpcRequestLock) {
Future<?> senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() {
Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
@Override
public void run() {
try {
@ -1090,6 +1144,7 @@ public class Client {
CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
}
/**
@ -1134,6 +1189,8 @@ public class Client {
} catch (InterruptedException e) {
}
}
clientExcecutorFactory.unrefAndCleanup();
}
/**

View File

@ -562,6 +562,24 @@ public class TestIPC {
assertFalse(noChanged ^ serviceClass == serviceClass2);
client.stop();
}
@Test(timeout=30000, expected=IOException.class)
public void testIpcAfterStopping() throws IOException, InterruptedException {
// start server
Server server = new TestServer(5, false);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
// start client
Client client = new Client(LongWritable.class, conf);
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, MIN_SLEEP_TIME, 0, conf);
client.stop();
// This call should throw IOException.
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, MIN_SLEEP_TIME, 0, conf);
}
/**
* Check that file descriptors aren't leaked by starting