YARN-9067. Fixed Resource Manager resource leak via YARN service.

Contributed by Eric Yang
This commit is contained in:
Eric Yang 2018-11-29 13:50:06 -05:00
parent 65e0d6ff46
commit 1b731de944
3 changed files with 118 additions and 81 deletions

View File

@ -117,10 +117,13 @@ public class ApiServer {
@Override
public Void run() throws YarnException, IOException {
ServiceClient sc = getServiceClient();
try {
sc.init(YARN_CONFIG);
sc.start();
sc.actionBuild(service);
} finally {
sc.close();
}
return null;
}
});
@ -132,11 +135,14 @@ public class ApiServer {
@Override
public ApplicationId run() throws IOException, YarnException {
ServiceClient sc = getServiceClient();
try {
sc.init(YARN_CONFIG);
sc.start();
ApplicationId applicationId = sc.actionCreate(service);
sc.close();
return applicationId;
} finally {
sc.close();
}
}
});
serviceStatus.setDiagnostics("Application ID: " + applicationId);
@ -244,6 +250,7 @@ public class ApiServer {
public Integer run() throws Exception {
int result = 0;
ServiceClient sc = getServiceClient();
try {
sc.init(YARN_CONFIG);
sc.start();
Exception stopException = null;
@ -266,7 +273,9 @@ public class ApiServer {
throw stopException;
}
}
} finally {
sc.close();
}
return result;
}
});
@ -377,13 +386,16 @@ public class ApiServer {
@Override
public Map<String, Long> run() throws YarnException, IOException {
ServiceClient sc = new ServiceClient();
try {
sc.init(YARN_CONFIG);
sc.start();
Map<String, Long> original = sc.flexByRestService(appName,
Collections.singletonMap(componentName,
component.getNumberOfContainers()));
sc.close();
return original;
} finally {
sc.close();
}
}
});
ServiceStatus status = new ServiceStatus();
@ -625,12 +637,15 @@ public class ApiServer {
public Integer run() throws YarnException, IOException {
int result = 0;
ServiceClient sc = new ServiceClient();
try {
sc.init(YARN_CONFIG);
sc.start();
result = sc
.actionFlex(appName, componentCountStrings);
sc.close();
return Integer.valueOf(result);
} finally {
sc.close();
}
}
});
if (result == EXIT_SUCCESS) {
@ -651,12 +666,15 @@ public class ApiServer {
@Override
public String run() throws YarnException, IOException {
ServiceClient sc = getServiceClient();
try {
sc.init(YARN_CONFIG);
sc.start();
String newLifeTime = sc.updateLifetime(appName,
updateAppData.getLifetime());
sc.close();
return newLifeTime;
} finally {
sc.close();
}
}
});
ServiceStatus status = new ServiceStatus();
@ -674,11 +692,14 @@ public class ApiServer {
@Override public ApplicationId run()
throws YarnException, IOException {
ServiceClient sc = getServiceClient();
try {
sc.init(YARN_CONFIG);
sc.start();
ApplicationId appId = sc.actionStartAndGetId(appName);
sc.close();
return appId;
} finally {
sc.close();
}
}
});
LOG.info("Successfully started service " + appName);
@ -695,6 +716,7 @@ public class ApiServer {
ServiceStatus status = new ServiceStatus();
ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
ServiceClient sc = getServiceClient();
try {
sc.init(YARN_CONFIG);
sc.start();
if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) {
@ -702,7 +724,9 @@ public class ApiServer {
} else {
sc.initiateUpgrade(service);
}
} finally {
sc.close();
}
return null;
});
LOG.info("Service {} version {} upgrade initialized", service.getName(),
@ -717,11 +741,14 @@ public class ApiServer {
final UserGroupInformation ugi) throws IOException, InterruptedException {
int result = ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
ServiceClient sc = getServiceClient();
try {
sc.init(YARN_CONFIG);
sc.start();
int exitCode = sc.actionCancelUpgrade(serviceName);
sc.close();
return exitCode;
} finally {
sc.close();
}
});
if (result == EXIT_SUCCESS) {
ServiceStatus status = new ServiceStatus();
@ -786,10 +813,13 @@ public class ApiServer {
return ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
int result1;
ServiceClient sc = getServiceClient();
try {
sc.init(YARN_CONFIG);
sc.start();
result1 = sc.actionUpgrade(service, containers);
} finally {
sc.close();
}
return result1;
});
}
@ -799,11 +829,14 @@ public class ApiServer {
return ugi.doAs((PrivilegedExceptionAction<Service>) () -> {
ServiceClient sc = getServiceClient();
try {
sc.init(YARN_CONFIG);
sc.start();
Service app1 = sc.getStatus(serviceName);
sc.close();
return app1;
} finally {
sc.close();
}
});
}
@ -814,12 +847,15 @@ public class ApiServer {
return ugi.doAs((PrivilegedExceptionAction<Container[]>) () -> {
Container[] result;
ServiceClient sc = getServiceClient();
try {
sc.init(YARN_CONFIG);
sc.start();
result = sc.getContainers(serviceName, componentNames, version,
containerStates);
sc.close();
return result;
} finally {
sc.close();
}
});
}

View File

@ -75,7 +75,7 @@ public class ServiceClientTest extends ServiceClient {
public void forceStop() {
expectedInstances.clear();
super.stop();
stop();
}
@Override

View File

@ -149,6 +149,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
if (registryClient != null) {
registryClient.stop();
}
fs.getFileSystem().close();
super.serviceStop();
}