YARN-7984. Improved YARN service stop/destroy and clean up.

Contributed by Billie Rinaldi
This commit is contained in:
Eric Yang 2018-04-10 17:40:49 -04:00
parent 8ab776d61e
commit d553799030
6 changed files with 191 additions and 38 deletions

View File

@ -231,30 +231,40 @@ public class ApiServer {
e.getCause().getMessage());
} catch (YarnException | FileNotFoundException e) {
return formatResponse(Status.NOT_FOUND, e.getMessage());
} catch (IOException | InterruptedException e) {
} catch (Exception e) {
LOG.error("Fail to stop service: {}", e);
return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
private Response stopService(String appName, boolean destroy,
final UserGroupInformation ugi) throws IOException,
InterruptedException, YarnException, FileNotFoundException {
final UserGroupInformation ugi) throws Exception {
int result = ugi.doAs(new PrivilegedExceptionAction<Integer>() {
@Override
public Integer run() throws IOException, YarnException,
FileNotFoundException {
public Integer run() throws Exception {
int result = 0;
ServiceClient sc = getServiceClient();
sc.init(YARN_CONFIG);
sc.start();
result = sc.actionStop(appName, destroy);
if (result == EXIT_SUCCESS) {
LOG.info("Successfully stopped service {}", appName);
Exception stopException = null;
try {
result = sc.actionStop(appName, destroy);
if (result == EXIT_SUCCESS) {
LOG.info("Successfully stopped service {}", appName);
}
} catch (Exception e) {
LOG.info("Got exception stopping service", e);
stopException = e;
}
if (destroy) {
result = sc.actionDestroy(appName);
LOG.info("Successfully deleted service {}", appName);
if (result == EXIT_SUCCESS) {
LOG.info("Successfully deleted service {}", appName);
}
} else {
if (stopException != null) {
throw stopException;
}
}
sc.close();
return result;
@ -262,8 +272,21 @@ public class ApiServer {
});
ServiceStatus serviceStatus = new ServiceStatus();
if (destroy) {
serviceStatus.setDiagnostics("Successfully destroyed service " +
appName);
if (result == EXIT_SUCCESS) {
serviceStatus.setDiagnostics("Successfully destroyed service " +
appName);
} else {
if (result == EXIT_NOT_FOUND) {
serviceStatus
.setDiagnostics("Service " + appName + " doesn't exist");
return formatResponse(Status.BAD_REQUEST, serviceStatus);
} else {
serviceStatus
.setDiagnostics("Service " + appName + " error cleaning up " +
"registry");
return formatResponse(Status.INTERNAL_SERVER_ERROR, serviceStatus);
}
}
} else {
if (result == EXIT_COMMAND_ARGUMENT_ERROR) {
serviceStatus
@ -394,7 +417,7 @@ public class ApiServer {
String message = "Service is not found in hdfs: " + appName;
LOG.error(message, e);
return formatResponse(Status.NOT_FOUND, e.getMessage());
} catch (IOException | InterruptedException e) {
} catch (Exception e) {
String message = "Error while performing operation for app: " + appName;
LOG.error(message, e);
return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());

View File

@ -103,6 +103,12 @@ public class ServiceClientTest extends ServiceClient {
}
if (serviceName.equals("jenkins")) {
return EXIT_SUCCESS;
} else if (serviceName.equals("jenkins-already-stopped")) {
return EXIT_SUCCESS;
} else if (serviceName.equals("jenkins-doesn't-exist")) {
return EXIT_NOT_FOUND;
} else if (serviceName.equals("jenkins-error-cleaning-registry")) {
return EXIT_OTHER_FAILURE;
} else {
throw new IllegalArgumentException();
}

View File

@ -154,6 +154,24 @@ public class TestApiServer {
actual.getStatus());
}
@Test
public void testBadDeleteService3() {
final Response actual = apiServer.deleteService(request,
"jenkins-doesn't-exist");
assertEquals("Delete service is ",
Response.status(Status.BAD_REQUEST).build().getStatus(),
actual.getStatus());
}
@Test
public void testBadDeleteService4() {
final Response actual = apiServer.deleteService(request,
"jenkins-error-cleaning-registry");
assertEquals("Delete service is ",
Response.status(Status.INTERNAL_SERVER_ERROR).build().getStatus(),
actual.getStatus());
}
@Test
public void testGoodDeleteService() {
final Response actual = apiServer.deleteService(request, "jenkins");
@ -161,6 +179,14 @@ public class TestApiServer {
Response.status(Status.OK).build().getStatus(), actual.getStatus());
}
@Test
public void testDeleteStoppedService() {
final Response actual = apiServer.deleteService(request,
"jenkins-already-stopped");
assertEquals("Delete service is ",
Response.status(Status.OK).build().getStatus(), actual.getStatus());
}
@Test
public void testDecreaseContainerAndStop() {
Service service = new Service();

View File

@ -375,8 +375,14 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
.save(fs.getFileSystem(), ServiceApiUtil.getServiceJsonPath(fs, serviceName),
persistedService, true);
ApplicationId appId = getAppId(serviceName);
if (appId == null) {
String message = "Application ID doesn't exist for " + serviceName;
LOG.error(message);
throw new YarnException(message);
}
ApplicationReport appReport =
yarnClient.getApplicationReport(getAppId(serviceName));
yarnClient.getApplicationReport(appId);
if (appReport.getYarnApplicationState() != RUNNING) {
String message =
serviceName + " is at " + appReport.getYarnApplicationState()
@ -408,10 +414,16 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
throws YarnException, IOException {
ServiceApiUtil.validateNameFormat(serviceName, getConfig());
ApplicationId currentAppId = getAppId(serviceName);
if (currentAppId == null) {
LOG.info("Application ID doesn't exist for service {}", serviceName);
cleanUpRegistry(serviceName);
return EXIT_COMMAND_ARGUMENT_ERROR;
}
ApplicationReport report = yarnClient.getApplicationReport(currentAppId);
if (terminatedStates.contains(report.getYarnApplicationState())) {
LOG.info("Service {} is already in a terminated state {}", serviceName,
report.getYarnApplicationState());
cleanUpRegistry(serviceName);
return EXIT_COMMAND_ARGUMENT_ERROR;
}
if (preRunningStates.contains(report.getYarnApplicationState())) {
@ -419,6 +431,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
+ ", forcefully killed by user!";
yarnClient.killApplication(currentAppId, msg);
LOG.info(msg);
cleanUpRegistry(serviceName);
return EXIT_SUCCESS;
}
if (StringUtils.isEmpty(report.getHost())) {
@ -438,10 +451,12 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
yarnClient.killApplication(currentAppId,
serviceName + " is forcefully killed by user!");
LOG.info("Forcefully kill the service: " + serviceName);
cleanUpRegistry(serviceName);
return EXIT_SUCCESS;
}
if (!waitForAppStopped) {
cleanUpRegistry(serviceName);
return EXIT_SUCCESS;
}
// Wait until the app is killed.
@ -471,6 +486,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
+ e.getMessage() + ", forcefully kill the app.");
yarnClient.killApplication(currentAppId, "Forcefully kill the app");
}
cleanUpRegistry(serviceName);
return EXIT_SUCCESS;
}
@ -484,7 +500,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
FileSystem fileSystem = fs.getFileSystem();
// remove from the appId cache
cachedAppInfo.remove(serviceName);
boolean destroySucceed = true;
int ret = EXIT_SUCCESS;
if (fileSystem.exists(appDir)) {
if (fileSystem.delete(appDir, true)) {
LOG.info("Successfully deleted service dir for " + serviceName + ": "
@ -498,13 +514,34 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
} else {
LOG.info("Service '" + serviceName + "' doesn't exist at hdfs path: "
+ appDir);
destroySucceed = false;
ret = EXIT_NOT_FOUND;
}
try {
deleteZKNode(serviceName);
// don't set destroySucceed to false if no ZK node exists because not
// all services use a ZK node
} catch (Exception e) {
throw new IOException("Could not delete zk node for " + serviceName, e);
}
if (!cleanUpRegistry(serviceName)) {
if (ret == EXIT_SUCCESS) {
ret = EXIT_OTHER_FAILURE;
}
}
if (ret == EXIT_SUCCESS) {
LOG.info("Successfully destroyed service {}", serviceName);
return ret;
} else if (ret == EXIT_NOT_FOUND) {
LOG.error("Error on destroy '" + serviceName + "': not found.");
return ret;
} else {
LOG.error("Error on destroy '" + serviceName + "': error cleaning up " +
"registry.");
return ret;
}
}
private boolean cleanUpRegistry(String serviceName) throws SliderException {
String registryPath =
ServiceRegistryUtils.registryPathForInstance(serviceName);
try {
@ -514,18 +551,13 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
LOG.info(
"Service '" + serviceName + "' doesn't exist at ZK registry path: "
+ registryPath);
destroySucceed = false;
// not counted as a failure if the registry entries don't exist
}
} catch (IOException e) {
LOG.warn("Error deleting registry entry {}", registryPath, e);
return false;
}
if (destroySucceed) {
LOG.info("Successfully destroyed service {}", serviceName);
return EXIT_SUCCESS;
} else {
LOG.error("Error on destroy '" + serviceName + "': not found.");
return -1;
}
return true;
}
private synchronized RegistryOperations getRegistryClient()
@ -540,17 +572,25 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
return registryClient;
}
private boolean deleteZKNode(String clusterName) throws Exception {
/**
* Delete service's ZK node. This is a different node from the service's
* registry entry and is set aside for the service to use for its own ZK data.
*
* @param serviceName service name
* @return true if the node was deleted, false if the node doesn't exist
* @throws Exception if the node couldn't be deleted
*/
private boolean deleteZKNode(String serviceName) throws Exception {
CuratorFramework curatorFramework = getCuratorClient();
String user = RegistryUtils.currentUser();
String zkPath = ServiceRegistryUtils.mkServiceHomePath(user, clusterName);
String zkPath = ServiceRegistryUtils.mkServiceHomePath(user, serviceName);
if (curatorFramework.checkExists().forPath(zkPath) != null) {
curatorFramework.delete().deletingChildrenIfNeeded().forPath(zkPath);
LOG.info("Deleted zookeeper path: " + zkPath);
return true;
} else {
LOG.info(
"Service '" + clusterName + "' doesn't exist at ZK path: " + zkPath);
"Service '" + serviceName + "' doesn't exist at ZK path: " + zkPath);
return false;
}
}
@ -963,6 +1003,9 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
public String updateLifetime(String serviceName, long lifetime)
throws YarnException, IOException {
ApplicationId currentAppId = getAppId(serviceName);
if (currentAppId == null) {
throw new YarnException("Application ID not found for " + serviceName);
}
ApplicationReport report = yarnClient.getApplicationReport(currentAppId);
if (report == null) {
throw new YarnException("Service not found for " + serviceName);
@ -1008,8 +1051,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
throws IOException, YarnException {
try {
// try parsing appIdOrName, if it succeeds, it means it's appId
ApplicationId.fromString(appIdOrName);
return getStatusByAppId(appIdOrName);
ApplicationId appId = ApplicationId.fromString(appIdOrName);
return getStatusByAppId(appId);
} catch (IllegalArgumentException e) {
// not appId format, it could be appName.
Service status = getStatus(appIdOrName);
@ -1017,10 +1060,10 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
}
}
private String getStatusByAppId(String appId)
private String getStatusByAppId(ApplicationId appId)
throws IOException, YarnException {
ApplicationReport appReport =
yarnClient.getApplicationReport(ApplicationId.fromString(appId));
yarnClient.getApplicationReport(appId);
if (appReport.getYarnApplicationState() != RUNNING) {
return "";
@ -1037,10 +1080,14 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
public Service getStatus(String serviceName)
throws IOException, YarnException {
ServiceApiUtil.validateNameFormat(serviceName, getConfig());
ApplicationId currentAppId = getAppId(serviceName);
ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId);
Service appSpec = new Service();
appSpec.setName(serviceName);
ApplicationId currentAppId = getAppId(serviceName);
if (currentAppId == null) {
LOG.info("Service {} does not have an application ID", serviceName);
return appSpec;
}
ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId);
appSpec.setState(convertState(appReport.getYarnApplicationState()));
ApplicationTimeout lifetime =
appReport.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME);
@ -1167,7 +1214,11 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
throw new YarnException("Service " + serviceName
+ " doesn't exist on hdfs. Please check if the app exists in RM");
}
ApplicationId currentAppId = ApplicationId.fromString(persistedService.getId());
if (persistedService.getId() == null) {
return null;
}
ApplicationId currentAppId = ApplicationId.fromString(persistedService
.getId());
cachedAppInfo.put(serviceName, new AppInfo(currentAppId, persistedService
.getKerberosPrincipal().getPrincipalName()));
return currentAppId;

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.registry.client.impl.zk.CuratorService;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
@ -74,7 +76,8 @@ public class ServiceTestUtils {
private MiniYARNCluster yarnCluster = null;
private MiniDFSCluster hdfsCluster = null;
TestingCluster zkCluster;
private TestingCluster zkCluster;
private CuratorService curatorService;
private FileSystem fs = null;
private Configuration conf = null;
public static final int NUM_NMS = 1;
@ -186,6 +189,10 @@ public class ServiceTestUtils {
conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
LOG.info("ZK cluster: " + zkCluster.getConnectString());
curatorService = new CuratorService("testCuratorService");
curatorService.init(conf);
curatorService.start();
fs = FileSystem.get(conf);
basedir = new File("target", "apps");
if (basedir.exists()) {
@ -253,6 +260,9 @@ public class ServiceTestUtils {
hdfsCluster = null;
}
}
if (curatorService != null) {
ServiceOperations.stop(curatorService);
}
if (zkCluster != null) {
zkCluster.stop();
}
@ -295,6 +305,9 @@ public class ServiceTestUtils {
return client;
}
protected CuratorService getCuratorService() throws IOException {
return curatorService;
}
/**
* Watcher to initialize yarn service base path under target and deletes the

View File

@ -22,6 +22,8 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
@ -40,6 +42,7 @@ import org.apache.hadoop.yarn.service.api.records.PlacementType;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.hamcrest.CoreMatchers;
@ -59,6 +62,8 @@ import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.yarn.api.records.YarnApplicationState.FINISHED;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_NOT_FOUND;
/**
* End to end tests to test deploying services with MiniYarnCluster and a in-JVM
@ -131,6 +136,10 @@ public class TestYarnNativeServices extends ServiceTestUtils {
Assert.assertEquals(FINISHED, report.getYarnApplicationState());
Assert.assertEquals(FinalApplicationStatus.ENDED,
report.getFinalApplicationStatus());
String serviceZKPath = RegistryUtils.servicePath(RegistryUtils
.currentUser(), YarnServiceConstants.APP_TYPE, exampleApp.getName());
Assert.assertFalse("Registry ZK service path still exists after stop",
getCuratorService().zkPathExists(serviceZKPath));
LOG.info("Destroy the service");
// destroy the service and check the app dir is deleted from fs.
@ -139,7 +148,20 @@ public class TestYarnNativeServices extends ServiceTestUtils {
Assert.assertFalse(getFS().exists(appDir));
// check that destroying again does not succeed
Assert.assertEquals(-1, client.actionDestroy(exampleApp.getName()));
Assert.assertEquals(EXIT_NOT_FOUND, client.actionDestroy(exampleApp.getName()));
}
// Save a service without starting it and ensure that stop does not NPE and
// that service can be successfully destroyed
@Test (timeout = 200000)
public void testStopDestroySavedService() throws Exception {
setupInternal(NUM_NMS);
ServiceClient client = createClient(getConf());
Service exampleApp = createExampleApplication();
client.actionBuild(exampleApp);
Assert.assertEquals(EXIT_COMMAND_ARGUMENT_ERROR, client.actionStop(
exampleApp.getName()));
Assert.assertEquals(0, client.actionDestroy(exampleApp.getName()));
}
// Create compa with 2 containers
@ -507,19 +529,31 @@ public class TestYarnNativeServices extends ServiceTestUtils {
// When flex up to 4 instances, it should be compA-1 , compA-2, compA-3, compA-4
// When flex down to 3 instances, it should be compA-1 , compA-2, compA-3.
private void checkCompInstancesInOrder(ServiceClient client,
Service exampleApp) throws IOException, YarnException {
Service exampleApp) throws IOException, YarnException,
TimeoutException, InterruptedException {
Service service = client.getStatus(exampleApp.getName());
for (Component comp : service.getComponents()) {
checkEachCompInstancesInOrder(comp);
checkEachCompInstancesInOrder(comp, exampleApp.getName());
}
}
private void checkEachCompInstancesInOrder(Component component) {
private void checkEachCompInstancesInOrder(Component component, String
serviceName) throws TimeoutException, InterruptedException {
long expectedNumInstances = component.getNumberOfContainers();
Assert.assertEquals(expectedNumInstances, component.getContainers().size());
TreeSet<String> instances = new TreeSet<>();
for (Container container : component.getContainers()) {
instances.add(container.getComponentInstanceName());
String componentZKPath = RegistryUtils.componentPath(RegistryUtils
.currentUser(), YarnServiceConstants.APP_TYPE, serviceName,
RegistryPathUtils.encodeYarnID(container.getId()));
GenericTestUtils.waitFor(() -> {
try {
return getCuratorService().zkPathExists(componentZKPath);
} catch (IOException e) {
return false;
}
}, 1000, 60000);
}
int i = 0;