MAPREDUCE-6618. YarnClientProtocolProvider leaking the YarnClient thread. Contributed by Xuan Gong

(cherry picked from commit 59a212b6e1)
This commit is contained in:
Jason Lowe 2016-02-01 16:05:06 +00:00
parent dcf2c8b3c8
commit 7bb48ed162
6 changed files with 63 additions and 5 deletions

View File

@ -433,6 +433,9 @@ Release 2.7.3 - UNRELEASED
MAPREDUCE-6554. MRAppMaster servicestart failing with NPE in MAPREDUCE-6554. MRAppMaster servicestart failing with NPE in
MRAppMaster#parsePreviousJobHistory (Bibin A Chundatt via jlowe) MRAppMaster#parsePreviousJobHistory (Bibin A Chundatt via jlowe)
MAPREDUCE-6618. YarnClientProtocolProvider leaking the YarnClient thread.
(Xuan Gong via jlowe)
Release 2.7.2 - 2016-01-25 Release 2.7.2 - 2016-01-25
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -735,6 +738,9 @@ Release 2.6.4 - UNRELEASED
MAPREDUCE-6554. MRAppMaster servicestart failing with NPE in MAPREDUCE-6554. MRAppMaster servicestart failing with NPE in
MRAppMaster#parsePreviousJobHistory (Bibin A Chundatt via jlowe) MRAppMaster#parsePreviousJobHistory (Bibin A Chundatt via jlowe)
MAPREDUCE-6618. YarnClientProtocolProvider leaking the YarnClient thread.
(Xuan Gong via jlowe)
Release 2.6.3 - 2015-12-17 Release 2.6.3 - 2015-12-17
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -22,11 +22,11 @@ import java.io.IOException;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol; import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
@ -97,4 +97,26 @@ public class ClientCache {
} }
}); });
} }
public void close() throws IOException {
if (rm != null) {
rm.close();
}
if (hsProxy != null) {
RPC.stopProxy(hsProxy);
hsProxy = null;
}
if (cache != null && !cache.isEmpty()) {
for (ClientServiceDelegate delegate : cache.values()) {
if (delegate != null) {
delegate.close();
delegate = null;
}
}
cache.clear();
cache = null;
}
}
} }

View File

@ -33,6 +33,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
@ -532,4 +533,19 @@ public class ClientServiceDelegate {
throw new IOException("Cannot get log path for a in-progress job"); throw new IOException("Cannot get log path for a in-progress job");
} }
} }
public void close() throws IOException {
if (rm != null) {
rm.close();
}
if (historyServerProxy != null) {
RPC.stopProxy(historyServerProxy);
}
if (realProxy != null) {
RPC.stopProxy(realProxy);
realProxy = null;
}
}
} }

View File

@ -756,4 +756,15 @@ public class YARNRunner implements ClientProtocol {
envConf + " config settings."); envConf + " config settings.");
} }
} }
public void close() throws IOException {
if (resMgrDelegate != null) {
resMgrDelegate.close();
resMgrDelegate = null;
}
if (clientCache != null) {
clientCache.close();
clientCache = null;
}
}
} }

View File

@ -44,7 +44,8 @@ public class YarnClientProtocolProvider extends ClientProtocolProvider {
@Override @Override
public void close(ClientProtocol clientProtocol) throws IOException { public void close(ClientProtocol clientProtocol) throws IOException {
// nothing to do if (clientProtocol instanceof YARNRunner) {
((YARNRunner)clientProtocol).close();
}
} }
} }

View File

@ -21,10 +21,11 @@ package org.apache.hadoop.mapreduce;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doNothing;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -113,6 +114,8 @@ public class TestYarnClientProtocolProvider extends TestCase {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
assertTrue(this.client instanceof YarnClientImpl); assertTrue(this.client instanceof YarnClientImpl);
this.client = spy(this.client);
doNothing().when(this.client).close();
((YarnClientImpl) this.client).setRMClient(cRMProtocol); ((YarnClientImpl) this.client).setRMClient(cRMProtocol);
} }
}; };
@ -126,5 +129,4 @@ public class TestYarnClientProtocolProvider extends TestCase {
} }
} }
} }
} }