From 1b731de94403c05743f698b6fee3dbaf1a2540a3 Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Thu, 29 Nov 2018 13:50:06 -0500 Subject: [PATCH] YARN-9067. Fixed Resource Manager resource leak via YARN service. Contributed by Eric Yang --- .../hadoop/yarn/service/webapp/ApiServer.java | 196 +++++++++++------- .../yarn/service/ServiceClientTest.java | 2 +- .../yarn/service/client/ServiceClient.java | 1 + 3 files changed, 118 insertions(+), 81 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index c4e33171d48..51ad00a9a25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -117,10 +117,13 @@ public class ApiServer { @Override public Void run() throws YarnException, IOException { ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - sc.actionBuild(service); - sc.close(); + try { + sc.init(YARN_CONFIG); + sc.start(); + sc.actionBuild(service); + } finally { + sc.close(); + } return null; } }); @@ -132,11 +135,14 @@ public class ApiServer { @Override public ApplicationId run() throws IOException, YarnException { ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - ApplicationId applicationId = sc.actionCreate(service); - sc.close(); - return applicationId; + try { + sc.init(YARN_CONFIG); + sc.start(); + ApplicationId applicationId = sc.actionCreate(service); + return applicationId; + } finally { + sc.close(); + } } }); serviceStatus.setDiagnostics("Application ID: " + applicationId); @@ -244,29 +250,32 @@ public class ApiServer { public Integer run() throws Exception { int result = 0; ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - Exception stopException = null; try { - result = sc.actionStop(appName, destroy); - if (result == EXIT_SUCCESS) { - LOG.info("Successfully stopped service {}", appName); + sc.init(YARN_CONFIG); + sc.start(); + Exception stopException = null; + try { + result = sc.actionStop(appName, destroy); + if (result == EXIT_SUCCESS) { + LOG.info("Successfully stopped service {}", appName); + } + } catch (Exception e) { + LOG.info("Got exception stopping service", e); + stopException = e; } - } catch (Exception e) { - LOG.info("Got exception stopping service", e); - stopException = e; + if (destroy) { + result = sc.actionDestroy(appName); + if (result == EXIT_SUCCESS) { + LOG.info("Successfully deleted service {}", appName); + } + } else { + if (stopException != null) { + throw stopException; + } + } + } finally { + sc.close(); } - if (destroy) { - result = sc.actionDestroy(appName); - if (result == EXIT_SUCCESS) { - LOG.info("Successfully deleted service {}", appName); - } - } else { - if (stopException != null) { - throw stopException; - } - } - sc.close(); return result; } }); @@ -377,13 +386,16 @@ public class ApiServer { @Override public Map run() throws YarnException, IOException { ServiceClient sc = new ServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - Map original = sc.flexByRestService(appName, - Collections.singletonMap(componentName, - component.getNumberOfContainers())); - sc.close(); - return original; + try { + sc.init(YARN_CONFIG); + sc.start(); + Map original = sc.flexByRestService(appName, + Collections.singletonMap(componentName, + component.getNumberOfContainers())); + return original; + } finally { + sc.close(); + } } }); ServiceStatus status = new ServiceStatus(); @@ -625,12 +637,15 @@ public class ApiServer { public Integer run() throws YarnException, IOException { int result = 0; ServiceClient sc = new ServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - result = sc - .actionFlex(appName, componentCountStrings); - sc.close(); - return Integer.valueOf(result); + try { + sc.init(YARN_CONFIG); + sc.start(); + result = sc + .actionFlex(appName, componentCountStrings); + return Integer.valueOf(result); + } finally { + sc.close(); + } } }); if (result == EXIT_SUCCESS) { @@ -651,12 +666,15 @@ public class ApiServer { @Override public String run() throws YarnException, IOException { ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - String newLifeTime = sc.updateLifetime(appName, - updateAppData.getLifetime()); - sc.close(); - return newLifeTime; + try { + sc.init(YARN_CONFIG); + sc.start(); + String newLifeTime = sc.updateLifetime(appName, + updateAppData.getLifetime()); + return newLifeTime; + } finally { + sc.close(); + } } }); ServiceStatus status = new ServiceStatus(); @@ -674,11 +692,14 @@ public class ApiServer { @Override public ApplicationId run() throws YarnException, IOException { ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - ApplicationId appId = sc.actionStartAndGetId(appName); - sc.close(); - return appId; + try { + sc.init(YARN_CONFIG); + sc.start(); + ApplicationId appId = sc.actionStartAndGetId(appName); + return appId; + } finally { + sc.close(); + } } }); LOG.info("Successfully started service " + appName); @@ -695,14 +716,17 @@ public class ApiServer { ServiceStatus status = new ServiceStatus(); ugi.doAs((PrivilegedExceptionAction) () -> { ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) { - sc.actionUpgradeExpress(service); - } else { - sc.initiateUpgrade(service); + try { + sc.init(YARN_CONFIG); + sc.start(); + if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) { + sc.actionUpgradeExpress(service); + } else { + sc.initiateUpgrade(service); + } + } finally { + sc.close(); } - sc.close(); return null; }); LOG.info("Service {} version {} upgrade initialized", service.getName(), @@ -717,11 +741,14 @@ public class ApiServer { final UserGroupInformation ugi) throws IOException, InterruptedException { int result = ugi.doAs((PrivilegedExceptionAction) () -> { ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - int exitCode = sc.actionCancelUpgrade(serviceName); - sc.close(); - return exitCode; + try { + sc.init(YARN_CONFIG); + sc.start(); + int exitCode = sc.actionCancelUpgrade(serviceName); + return exitCode; + } finally { + sc.close(); + } }); if (result == EXIT_SUCCESS) { ServiceStatus status = new ServiceStatus(); @@ -786,10 +813,13 @@ public class ApiServer { return ugi.doAs((PrivilegedExceptionAction) () -> { int result1; ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - result1 = sc.actionUpgrade(service, containers); - sc.close(); + try { + sc.init(YARN_CONFIG); + sc.start(); + result1 = sc.actionUpgrade(service, containers); + } finally { + sc.close(); + } return result1; }); } @@ -799,11 +829,14 @@ public class ApiServer { return ugi.doAs((PrivilegedExceptionAction) () -> { ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - Service app1 = sc.getStatus(serviceName); - sc.close(); - return app1; + try { + sc.init(YARN_CONFIG); + sc.start(); + Service app1 = sc.getStatus(serviceName); + return app1; + } finally { + sc.close(); + } }); } @@ -814,12 +847,15 @@ public class ApiServer { return ugi.doAs((PrivilegedExceptionAction) () -> { Container[] result; ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - result = sc.getContainers(serviceName, componentNames, version, - containerStates); - sc.close(); - return result; + try { + sc.init(YARN_CONFIG); + sc.start(); + result = sc.getContainers(serviceName, componentNames, version, + containerStates); + return result; + } finally { + sc.close(); + } }); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java index d022614e708..89366b43889 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java @@ -75,7 +75,7 @@ public class ServiceClientTest extends ServiceClient { public void forceStop() { expectedInstances.clear(); - super.stop(); + stop(); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index b79d771d260..80eb67dc021 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -149,6 +149,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, if (registryClient != null) { registryClient.stop(); } + fs.getFileSystem().close(); super.serviceStop(); }