MAPREDUCE-6618. YarnClientProtocolProvider leaking the YarnClient thread. Contributed by Xuan Gong

This commit is contained in:
Jason Lowe 2016-02-01 16:05:06 +00:00
parent af2dccbca5
commit 59a212b6e1
6 changed files with 63 additions and 5 deletions

View File

@ -736,6 +736,9 @@ Release 2.7.3 - UNRELEASED
MAPREDUCE-6554. MRAppMaster servicestart failing with NPE in
MRAppMaster#parsePreviousJobHistory (Bibin A Chundatt via jlowe)
MAPREDUCE-6618. YarnClientProtocolProvider leaking the YarnClient thread.
(Xuan Gong via jlowe)
Release 2.7.2 - 2016-01-25
INCOMPATIBLE CHANGES
@ -1041,6 +1044,9 @@ Release 2.6.4 - UNRELEASED
MAPREDUCE-6554. MRAppMaster servicestart failing with NPE in
MRAppMaster#parsePreviousJobHistory (Bibin A Chundatt via jlowe)
MAPREDUCE-6618. YarnClientProtocolProvider leaking the YarnClient thread.
(Xuan Gong via jlowe)
Release 2.6.3 - 2015-12-17
INCOMPATIBLE CHANGES

View File

@ -22,11 +22,11 @@
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
@ -97,4 +97,26 @@ public MRClientProtocol run() {
}
});
}
public void close() throws IOException {
if (rm != null) {
rm.close();
}
if (hsProxy != null) {
RPC.stopProxy(hsProxy);
hsProxy = null;
}
if (cache != null && !cache.isEmpty()) {
for (ClientServiceDelegate delegate : cache.values()) {
if (delegate != null) {
delegate.close();
delegate = null;
}
}
cache.clear();
cache = null;
}
}
}

View File

@ -33,6 +33,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -532,4 +533,19 @@ public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
throw new IOException("Cannot get log path for a in-progress job");
}
}
public void close() throws IOException {
if (rm != null) {
rm.close();
}
if (historyServerProxy != null) {
RPC.stopProxy(historyServerProxy);
}
if (realProxy != null) {
RPC.stopProxy(realProxy);
realProxy = null;
}
}
}

View File

@ -756,4 +756,15 @@ private static void warnForJavaLibPath(String opts, String component,
envConf + " config settings.");
}
}
public void close() throws IOException {
if (resMgrDelegate != null) {
resMgrDelegate.close();
resMgrDelegate = null;
}
if (clientCache != null) {
clientCache.close();
clientCache = null;
}
}
}

View File

@ -44,7 +44,8 @@ public ClientProtocol create(InetSocketAddress addr, Configuration conf)
@Override
public void close(ClientProtocol clientProtocol) throws IOException {
// nothing to do
if (clientProtocol instanceof YARNRunner) {
((YARNRunner)clientProtocol).close();
}
}
}

View File

@ -21,10 +21,11 @@
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doNothing;
import java.io.IOException;
import java.nio.ByteBuffer;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
@ -113,6 +114,8 @@ public void testClusterGetDelegationToken() throws Exception {
@Override
protected void serviceStart() throws Exception {
assertTrue(this.client instanceof YarnClientImpl);
this.client = spy(this.client);
doNothing().when(this.client).close();
((YarnClientImpl) this.client).setRMClient(cRMProtocol);
}
};
@ -126,5 +129,4 @@ protected void serviceStart() throws Exception {
}
}
}
}