YARN-8893. [AMRMProxy] Fix thread leak in AMRMClientRelayer and UAM client. Contributed by Botong Huang.

This commit is contained in:
Giovanni Matteo Fumarola 2018-11-02 15:30:08 -07:00
parent aed836efbf
commit 989715ec50
9 changed files with 103 additions and 77 deletions

View File

@ -27,9 +27,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -47,8 +46,6 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -66,8 +63,7 @@ import com.google.common.annotations.VisibleForTesting;
* pending requests similar to AMRMClient, and handles RM re-sync automatically
* without propagate the re-sync exception back to AMRMClient.
*/
public class AMRMClientRelayer extends AbstractService
implements ApplicationMasterProtocol {
public class AMRMClientRelayer implements ApplicationMasterProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(AMRMClientRelayer.class);
@ -136,51 +132,16 @@ public class AMRMClientRelayer extends AbstractService
private AMRMClientRelayerMetrics metrics;
public AMRMClientRelayer() {
super(AMRMClientRelayer.class.getName());
this.resetResponseId = -1;
this.metrics = AMRMClientRelayerMetrics.getInstance();
this.rmClient = null;
this.appId = null;
this.rmId = "";
}
public AMRMClientRelayer(ApplicationMasterProtocol rmClient,
ApplicationId appId, String rmId) {
this();
this.resetResponseId = -1;
this.metrics = AMRMClientRelayerMetrics.getInstance();
this.rmId = "";
this.rmClient = rmClient;
this.appId = appId;
this.rmId = rmId;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
final YarnConfiguration conf = new YarnConfiguration(getConfig());
try {
if (this.rmClient == null) {
this.rmClient =
ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
}
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (this.rmClient != null) {
RPC.stopProxy(this.rmClient);
}
shutdown();
super.serviceStop();
}
public void setAMRegistrationRequest(
RegisterApplicationMasterRequest registerRequest) {
this.amRegistrationRequest = registerRequest;
@ -231,6 +192,14 @@ public class AMRMClientRelayer extends AbstractService
.decrClientPending(rmId, req.getContainerUpdateType(), 1);
}
}
if (this.rmClient != null) {
try {
RPC.stopProxy(this.rmClient);
this.rmClient = null;
} catch (HadoopIllegalArgumentException e) {
}
}
}
@Override

View File

@ -371,6 +371,34 @@ public class UnmanagedAMPoolManager extends AbstractService {
return response;
}
/**
* Shutdown an UAM client without killing it in YarnRM.
*
* @param uamId uam Id
* @throws YarnException if fails
*/
public void shutDownConnections(String uamId)
throws YarnException {
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
LOG.info(
"Shutting down UAM id {} for application {} without killing the UAM",
uamId, this.appIdMap.get(uamId));
this.unmanagedAppMasterMap.remove(uamId).shutDownConnections();
}
/**
* Shutdown all UAM clients without killing them in YarnRM.
*
* @throws YarnException if fails
*/
public void shutDownConnections() throws YarnException {
for (String uamId : this.unmanagedAppMasterMap.keySet()) {
shutDownConnections(uamId);
}
}
/**
* Get the id of all running UAMs.
*

View File

@ -255,9 +255,6 @@ public class UnmanagedApplicationManager {
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request)
throws YarnException, IOException {
this.heartbeatHandler.shutdown();
if (this.userUgi == null) {
if (this.connectionInitiated) {
// This is possible if the async launchUAM is still
@ -270,7 +267,12 @@ public class UnmanagedApplicationManager {
+ "be called before createAndRegister");
}
}
return this.rmProxyRelayer.finishApplicationMaster(request);
FinishApplicationMasterResponse response =
this.rmProxyRelayer.finishApplicationMaster(request);
if (response.getIsUnregistered()) {
shutDownConnections();
}
return response;
}
/**
@ -282,11 +284,10 @@ public class UnmanagedApplicationManager {
*/
public KillApplicationResponse forceKillApplication()
throws IOException, YarnException {
shutDownConnections();
KillApplicationRequest request =
KillApplicationRequest.newInstance(this.applicationId);
this.heartbeatHandler.shutdown();
if (this.rmClient == null) {
this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
UserGroupInformation.createRemoteUser(this.submitter), null);
@ -323,6 +324,14 @@ public class UnmanagedApplicationManager {
}
}
/**
* Shutdown this UAM client, without killing the UAM in the YarnRM side.
*/
public void shutDownConnections() {
this.heartbeatHandler.shutdown();
this.rmProxyRelayer.shutdown();
}
/**
* Returns the application id of the UAM.
*
@ -532,4 +541,9 @@ public class UnmanagedApplicationManager {
protected void drainHeartbeatThread() {
this.heartbeatHandler.drainHeartbeatThread();
}
@VisibleForTesting
protected boolean isHeartbeatThreadAlive() {
return this.heartbeatHandler.isAlive();
}
}

View File

@ -120,7 +120,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
@ -338,9 +337,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
applicationContainerIdMap.remove(appId);
}
return FinishApplicationMasterResponse.newInstance(
request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED
? true : false);
return FinishApplicationMasterResponse.newInstance(true);
}
protected ApplicationId getApplicationId(int id) {

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestExceptio
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -155,16 +156,17 @@ public class TestAMRMClientRelayer {
this.mockAMS = new MockApplicationMasterService();
this.relayer = new AMRMClientRelayer(this.mockAMS, null, "TEST");
this.relayer.init(conf);
this.relayer.start();
this.relayer.registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance("", 0, ""));
clearAllocateRequestLists();
}
@After
public void cleanup() {
this.relayer.shutdown();
}
private void assertAsksAndReleases(int expectedAsk, int expectedRelease) {
Assert.assertEquals(expectedAsk, this.mockAMS.lastAsk.size());
Assert.assertEquals(expectedRelease, this.mockAMS.lastRelease.size());

View File

@ -141,17 +141,11 @@ public class TestAMRMClientRelayerMetrics {
this.homeRelayer = new AMRMClientRelayer(this.mockAMS,
ApplicationId.newInstance(0, 0), this.homeID);
this.homeRelayer.init(conf);
this.homeRelayer.start();
this.homeRelayer.registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance("", 0, ""));
this.uamRelayer = new AMRMClientRelayer(this.mockAMS,
ApplicationId.newInstance(0, 0), this.uamID);
this.uamRelayer.init(conf);
this.uamRelayer.start();
this.uamRelayer.registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance("", 0, ""));

View File

@ -87,7 +87,7 @@ public class TestUnmanagedApplicationManager {
}
}
@Test(timeout = 5000)
@Test(timeout = 10000)
public void testBasicUsage()
throws YarnException, IOException, InterruptedException {
@ -104,6 +104,11 @@ public class TestUnmanagedApplicationManager {
finishApplicationMaster(
FinishApplicationMasterRequest.newInstance(null, null, null),
attemptId);
while (uam.isHeartbeatThreadAlive()) {
LOG.info("waiting for heartbeat thread to finish");
Thread.sleep(100);
}
}
/*
@ -261,7 +266,7 @@ public class TestUnmanagedApplicationManager {
attemptId);
}
@Test
@Test(timeout = 10000)
public void testForceKill()
throws YarnException, IOException, InterruptedException {
launchUAM(attemptId);
@ -269,6 +274,11 @@ public class TestUnmanagedApplicationManager {
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
uam.forceKillApplication();
while (uam.isHeartbeatThreadAlive()) {
LOG.info("waiting for heartbeat thread to finish");
Thread.sleep(100);
}
try {
uam.forceKillApplication();
Assert.fail("Should fail because application is already killed");
@ -276,6 +286,19 @@ public class TestUnmanagedApplicationManager {
}
}
@Test(timeout = 10000)
public void testShutDownConnections()
throws YarnException, IOException, InterruptedException {
launchUAM(attemptId);
registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
uam.shutDownConnections();
while (uam.isHeartbeatThreadAlive()) {
LOG.info("waiting for heartbeat thread to finish");
Thread.sleep(100);
}
}
protected UserGroupInformation getUGIWithToken(
ApplicationAttemptId appAttemptId) {
UserGroupInformation ugi =

View File

@ -716,12 +716,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
uamPool.finishApplicationMaster(subClusterId, finishRequest);
if (uamResponse.getIsUnregistered()) {
AMRMClientRelayer relayer =
secondaryRelayers.remove(subClusterId);
if(relayer != null) {
relayer.shutdown();
}
secondaryRelayers.remove(subClusterId);
if (getNMStateStore() != null) {
getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
NMSS_SECONDARY_SC_PREFIX + subClusterId);
@ -801,8 +796,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
@Override
public void shutdown() {
LOG.info("Shutting down FederationInterceptor for {}", this.attemptId);
// Do not stop uamPool service and kill UAMs here because of possible second
// app attempt
try {
this.uamPool.shutDownConnections();
} catch (YarnException e) {
LOG.error("Error shutting down all UAM clients without killing them", e);
}
if (this.threadpool != null) {
try {
this.threadpool.shutdown();
@ -814,9 +817,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// Stop the home heartbeat thread
this.homeHeartbeartHandler.shutdown();
this.homeRMRelayer.shutdown();
for (AMRMClientRelayer relayer : this.secondaryRelayers.values()) {
relayer.shutdown();
}
super.shutdown();
}

View File

@ -206,7 +206,6 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(false, finshResponse.getIsUnregistered());
try {
// Try to finish an application master that is already finished.