From a375fe82631920c98fb1d45b13e3e82108365baf Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Tue, 10 Apr 2018 17:40:49 -0400 Subject: [PATCH] YARN-7984. Improved YARN service stop/destroy and clean up. Contributed by Billie Rinaldi (cherry picked from commit d553799030a5a64df328319aceb35734d0b2de20) --- .../hadoop/yarn/service/webapp/ApiServer.java | 47 +++++++--- .../yarn/service/ServiceClientTest.java | 6 ++ .../hadoop/yarn/service/TestApiServer.java | 26 ++++++ .../yarn/service/client/ServiceClient.java | 93 ++++++++++++++----- .../hadoop/yarn/service/ServiceTestUtils.java | 15 ++- .../yarn/service/TestYarnNativeServices.java | 42 ++++++++- 6 files changed, 191 insertions(+), 38 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index 59ee05d7ab2..14c77f6a2c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -231,30 +231,40 @@ public class ApiServer { e.getCause().getMessage()); } catch (YarnException | FileNotFoundException e) { return formatResponse(Status.NOT_FOUND, e.getMessage()); - } catch (IOException | InterruptedException e) { + } catch (Exception e) { LOG.error("Fail to stop service: {}", e); return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage()); } } private Response stopService(String appName, boolean destroy, - final UserGroupInformation ugi) throws IOException, - InterruptedException, YarnException, FileNotFoundException { + final UserGroupInformation ugi) throws Exception { int result = ugi.doAs(new PrivilegedExceptionAction() { @Override - public Integer run() throws IOException, YarnException, - FileNotFoundException { + public Integer run() throws Exception { int result = 0; ServiceClient sc = getServiceClient(); sc.init(YARN_CONFIG); sc.start(); - result = sc.actionStop(appName, destroy); - if (result == EXIT_SUCCESS) { - LOG.info("Successfully stopped service {}", appName); + Exception stopException = null; + try { + result = sc.actionStop(appName, destroy); + if (result == EXIT_SUCCESS) { + LOG.info("Successfully stopped service {}", appName); + } + } catch (Exception e) { + LOG.info("Got exception stopping service", e); + stopException = e; } if (destroy) { result = sc.actionDestroy(appName); - LOG.info("Successfully deleted service {}", appName); + if (result == EXIT_SUCCESS) { + LOG.info("Successfully deleted service {}", appName); + } + } else { + if (stopException != null) { + throw stopException; + } } sc.close(); return result; @@ -262,8 +272,21 @@ public class ApiServer { }); ServiceStatus serviceStatus = new ServiceStatus(); if (destroy) { - serviceStatus.setDiagnostics("Successfully destroyed service " + - appName); + if (result == EXIT_SUCCESS) { + serviceStatus.setDiagnostics("Successfully destroyed service " + + appName); + } else { + if (result == EXIT_NOT_FOUND) { + serviceStatus + .setDiagnostics("Service " + appName + " doesn't exist"); + return formatResponse(Status.BAD_REQUEST, serviceStatus); + } else { + serviceStatus + .setDiagnostics("Service " + appName + " error cleaning up " + + "registry"); + return formatResponse(Status.INTERNAL_SERVER_ERROR, serviceStatus); + } + } } else { if (result == EXIT_COMMAND_ARGUMENT_ERROR) { serviceStatus @@ -394,7 +417,7 @@ public class ApiServer { String message = "Service is not found in hdfs: " + appName; LOG.error(message, e); return formatResponse(Status.NOT_FOUND, e.getMessage()); - } catch (IOException | InterruptedException e) { + } catch (Exception e) { String message = "Error while performing operation for app: " + appName; LOG.error(message, e); return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java index 5d959da3779..543c5833b45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java @@ -103,6 +103,12 @@ public class ServiceClientTest extends ServiceClient { } if (serviceName.equals("jenkins")) { return EXIT_SUCCESS; + } else if (serviceName.equals("jenkins-already-stopped")) { + return EXIT_SUCCESS; + } else if (serviceName.equals("jenkins-doesn't-exist")) { + return EXIT_NOT_FOUND; + } else if (serviceName.equals("jenkins-error-cleaning-registry")) { + return EXIT_OTHER_FAILURE; } else { throw new IllegalArgumentException(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java index 7db6be24cb2..9e28c9632af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java @@ -154,6 +154,24 @@ public class TestApiServer { actual.getStatus()); } + @Test + public void testBadDeleteService3() { + final Response actual = apiServer.deleteService(request, + "jenkins-doesn't-exist"); + assertEquals("Delete service is ", + Response.status(Status.BAD_REQUEST).build().getStatus(), + actual.getStatus()); + } + + @Test + public void testBadDeleteService4() { + final Response actual = apiServer.deleteService(request, + "jenkins-error-cleaning-registry"); + assertEquals("Delete service is ", + Response.status(Status.INTERNAL_SERVER_ERROR).build().getStatus(), + actual.getStatus()); + } + @Test public void testGoodDeleteService() { final Response actual = apiServer.deleteService(request, "jenkins"); @@ -161,6 +179,14 @@ public class TestApiServer { Response.status(Status.OK).build().getStatus(), actual.getStatus()); } + @Test + public void testDeleteStoppedService() { + final Response actual = apiServer.deleteService(request, + "jenkins-already-stopped"); + assertEquals("Delete service is ", + Response.status(Status.OK).build().getStatus(), actual.getStatus()); + } + @Test public void testDecreaseContainerAndStop() { Service service = new Service(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 1ea20afbb0b..21fb075551a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -375,8 +375,14 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, .save(fs.getFileSystem(), ServiceApiUtil.getServiceJsonPath(fs, serviceName), persistedService, true); + ApplicationId appId = getAppId(serviceName); + if (appId == null) { + String message = "Application ID doesn't exist for " + serviceName; + LOG.error(message); + throw new YarnException(message); + } ApplicationReport appReport = - yarnClient.getApplicationReport(getAppId(serviceName)); + yarnClient.getApplicationReport(appId); if (appReport.getYarnApplicationState() != RUNNING) { String message = serviceName + " is at " + appReport.getYarnApplicationState() @@ -408,10 +414,16 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, throws YarnException, IOException { ServiceApiUtil.validateNameFormat(serviceName, getConfig()); ApplicationId currentAppId = getAppId(serviceName); + if (currentAppId == null) { + LOG.info("Application ID doesn't exist for service {}", serviceName); + cleanUpRegistry(serviceName); + return EXIT_COMMAND_ARGUMENT_ERROR; + } ApplicationReport report = yarnClient.getApplicationReport(currentAppId); if (terminatedStates.contains(report.getYarnApplicationState())) { LOG.info("Service {} is already in a terminated state {}", serviceName, report.getYarnApplicationState()); + cleanUpRegistry(serviceName); return EXIT_COMMAND_ARGUMENT_ERROR; } if (preRunningStates.contains(report.getYarnApplicationState())) { @@ -419,6 +431,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, + ", forcefully killed by user!"; yarnClient.killApplication(currentAppId, msg); LOG.info(msg); + cleanUpRegistry(serviceName); return EXIT_SUCCESS; } if (StringUtils.isEmpty(report.getHost())) { @@ -438,10 +451,12 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, yarnClient.killApplication(currentAppId, serviceName + " is forcefully killed by user!"); LOG.info("Forcefully kill the service: " + serviceName); + cleanUpRegistry(serviceName); return EXIT_SUCCESS; } if (!waitForAppStopped) { + cleanUpRegistry(serviceName); return EXIT_SUCCESS; } // Wait until the app is killed. @@ -471,6 +486,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, + e.getMessage() + ", forcefully kill the app."); yarnClient.killApplication(currentAppId, "Forcefully kill the app"); } + cleanUpRegistry(serviceName); return EXIT_SUCCESS; } @@ -484,7 +500,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, FileSystem fileSystem = fs.getFileSystem(); // remove from the appId cache cachedAppInfo.remove(serviceName); - boolean destroySucceed = true; + int ret = EXIT_SUCCESS; if (fileSystem.exists(appDir)) { if (fileSystem.delete(appDir, true)) { LOG.info("Successfully deleted service dir for " + serviceName + ": " @@ -498,13 +514,34 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, } else { LOG.info("Service '" + serviceName + "' doesn't exist at hdfs path: " + appDir); - destroySucceed = false; + ret = EXIT_NOT_FOUND; } try { deleteZKNode(serviceName); + // don't set destroySucceed to false if no ZK node exists because not + // all services use a ZK node } catch (Exception e) { throw new IOException("Could not delete zk node for " + serviceName, e); } + if (!cleanUpRegistry(serviceName)) { + if (ret == EXIT_SUCCESS) { + ret = EXIT_OTHER_FAILURE; + } + } + if (ret == EXIT_SUCCESS) { + LOG.info("Successfully destroyed service {}", serviceName); + return ret; + } else if (ret == EXIT_NOT_FOUND) { + LOG.error("Error on destroy '" + serviceName + "': not found."); + return ret; + } else { + LOG.error("Error on destroy '" + serviceName + "': error cleaning up " + + "registry."); + return ret; + } + } + + private boolean cleanUpRegistry(String serviceName) throws SliderException { String registryPath = ServiceRegistryUtils.registryPathForInstance(serviceName); try { @@ -514,18 +551,13 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, LOG.info( "Service '" + serviceName + "' doesn't exist at ZK registry path: " + registryPath); - destroySucceed = false; + // not counted as a failure if the registry entries don't exist } } catch (IOException e) { LOG.warn("Error deleting registry entry {}", registryPath, e); + return false; } - if (destroySucceed) { - LOG.info("Successfully destroyed service {}", serviceName); - return EXIT_SUCCESS; - } else { - LOG.error("Error on destroy '" + serviceName + "': not found."); - return -1; - } + return true; } private synchronized RegistryOperations getRegistryClient() @@ -540,17 +572,25 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return registryClient; } - private boolean deleteZKNode(String clusterName) throws Exception { + /** + * Delete service's ZK node. This is a different node from the service's + * registry entry and is set aside for the service to use for its own ZK data. + * + * @param serviceName service name + * @return true if the node was deleted, false if the node doesn't exist + * @throws Exception if the node couldn't be deleted + */ + private boolean deleteZKNode(String serviceName) throws Exception { CuratorFramework curatorFramework = getCuratorClient(); String user = RegistryUtils.currentUser(); - String zkPath = ServiceRegistryUtils.mkServiceHomePath(user, clusterName); + String zkPath = ServiceRegistryUtils.mkServiceHomePath(user, serviceName); if (curatorFramework.checkExists().forPath(zkPath) != null) { curatorFramework.delete().deletingChildrenIfNeeded().forPath(zkPath); LOG.info("Deleted zookeeper path: " + zkPath); return true; } else { LOG.info( - "Service '" + clusterName + "' doesn't exist at ZK path: " + zkPath); + "Service '" + serviceName + "' doesn't exist at ZK path: " + zkPath); return false; } } @@ -968,6 +1008,9 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, public String updateLifetime(String serviceName, long lifetime) throws YarnException, IOException { ApplicationId currentAppId = getAppId(serviceName); + if (currentAppId == null) { + throw new YarnException("Application ID not found for " + serviceName); + } ApplicationReport report = yarnClient.getApplicationReport(currentAppId); if (report == null) { throw new YarnException("Service not found for " + serviceName); @@ -1013,8 +1056,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, throws IOException, YarnException { try { // try parsing appIdOrName, if it succeeds, it means it's appId - ApplicationId.fromString(appIdOrName); - return getStatusByAppId(appIdOrName); + ApplicationId appId = ApplicationId.fromString(appIdOrName); + return getStatusByAppId(appId); } catch (IllegalArgumentException e) { // not appId format, it could be appName. Service status = getStatus(appIdOrName); @@ -1022,10 +1065,10 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, } } - private String getStatusByAppId(String appId) + private String getStatusByAppId(ApplicationId appId) throws IOException, YarnException { ApplicationReport appReport = - yarnClient.getApplicationReport(ApplicationId.fromString(appId)); + yarnClient.getApplicationReport(appId); if (appReport.getYarnApplicationState() != RUNNING) { return ""; @@ -1042,10 +1085,14 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, public Service getStatus(String serviceName) throws IOException, YarnException { ServiceApiUtil.validateNameFormat(serviceName, getConfig()); - ApplicationId currentAppId = getAppId(serviceName); - ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId); Service appSpec = new Service(); appSpec.setName(serviceName); + ApplicationId currentAppId = getAppId(serviceName); + if (currentAppId == null) { + LOG.info("Service {} does not have an application ID", serviceName); + return appSpec; + } + ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId); appSpec.setState(convertState(appReport.getYarnApplicationState())); ApplicationTimeout lifetime = appReport.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME); @@ -1172,7 +1219,11 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, throw new YarnException("Service " + serviceName + " doesn't exist on hdfs. Please check if the app exists in RM"); } - ApplicationId currentAppId = ApplicationId.fromString(persistedService.getId()); + if (persistedService.getId() == null) { + return null; + } + ApplicationId currentAppId = ApplicationId.fromString(persistedService + .getId()); cachedAppInfo.put(serviceName, new AppInfo(currentAppId, persistedService .getKerberosPrincipal().getPrincipalName())); return currentAppId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 8347eb35d91..d84f05fc6bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -27,6 +27,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.registry.client.impl.zk.CuratorService; +import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -74,7 +76,8 @@ public class ServiceTestUtils { private MiniYARNCluster yarnCluster = null; private MiniDFSCluster hdfsCluster = null; - TestingCluster zkCluster; + private TestingCluster zkCluster; + private CuratorService curatorService; private FileSystem fs = null; private Configuration conf = null; public static final int NUM_NMS = 1; @@ -186,6 +189,10 @@ public class ServiceTestUtils { conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString()); LOG.info("ZK cluster: " + zkCluster.getConnectString()); + curatorService = new CuratorService("testCuratorService"); + curatorService.init(conf); + curatorService.start(); + fs = FileSystem.get(conf); basedir = new File("target", "apps"); if (basedir.exists()) { @@ -253,6 +260,9 @@ public class ServiceTestUtils { hdfsCluster = null; } } + if (curatorService != null) { + ServiceOperations.stop(curatorService); + } if (zkCluster != null) { zkCluster.stop(); } @@ -295,6 +305,9 @@ public class ServiceTestUtils { return client; } + protected CuratorService getCuratorService() throws IOException { + return curatorService; + } /** * Watcher to initialize yarn service base path under target and deletes the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 2b40e494596..fac282bc3c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -22,6 +22,8 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.*; @@ -33,6 +35,7 @@ import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.hamcrest.CoreMatchers; @@ -52,6 +55,8 @@ import java.util.concurrent.TimeoutException; import static org.apache.hadoop.yarn.api.records.YarnApplicationState.FINISHED; import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH; +import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR; +import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_NOT_FOUND; /** * End to end tests to test deploying services with MiniYarnCluster and a in-JVM @@ -124,6 +129,10 @@ public class TestYarnNativeServices extends ServiceTestUtils { Assert.assertEquals(FINISHED, report.getYarnApplicationState()); Assert.assertEquals(FinalApplicationStatus.ENDED, report.getFinalApplicationStatus()); + String serviceZKPath = RegistryUtils.servicePath(RegistryUtils + .currentUser(), YarnServiceConstants.APP_TYPE, exampleApp.getName()); + Assert.assertFalse("Registry ZK service path still exists after stop", + getCuratorService().zkPathExists(serviceZKPath)); LOG.info("Destroy the service"); // destroy the service and check the app dir is deleted from fs. @@ -132,7 +141,20 @@ public class TestYarnNativeServices extends ServiceTestUtils { Assert.assertFalse(getFS().exists(appDir)); // check that destroying again does not succeed - Assert.assertEquals(-1, client.actionDestroy(exampleApp.getName())); + Assert.assertEquals(EXIT_NOT_FOUND, client.actionDestroy(exampleApp.getName())); + } + + // Save a service without starting it and ensure that stop does not NPE and + // that service can be successfully destroyed + @Test (timeout = 200000) + public void testStopDestroySavedService() throws Exception { + setupInternal(NUM_NMS); + ServiceClient client = createClient(getConf()); + Service exampleApp = createExampleApplication(); + client.actionBuild(exampleApp); + Assert.assertEquals(EXIT_COMMAND_ARGUMENT_ERROR, client.actionStop( + exampleApp.getName())); + Assert.assertEquals(0, client.actionDestroy(exampleApp.getName())); } // Create compa with 2 containers @@ -411,19 +433,31 @@ public class TestYarnNativeServices extends ServiceTestUtils { // When flex up to 4 instances, it should be compA-1 , compA-2, compA-3, compA-4 // When flex down to 3 instances, it should be compA-1 , compA-2, compA-3. private void checkCompInstancesInOrder(ServiceClient client, - Service exampleApp) throws IOException, YarnException { + Service exampleApp) throws IOException, YarnException, + TimeoutException, InterruptedException { Service service = client.getStatus(exampleApp.getName()); for (Component comp : service.getComponents()) { - checkEachCompInstancesInOrder(comp); + checkEachCompInstancesInOrder(comp, exampleApp.getName()); } } - private void checkEachCompInstancesInOrder(Component component) { + private void checkEachCompInstancesInOrder(Component component, String + serviceName) throws TimeoutException, InterruptedException { long expectedNumInstances = component.getNumberOfContainers(); Assert.assertEquals(expectedNumInstances, component.getContainers().size()); TreeSet instances = new TreeSet<>(); for (Container container : component.getContainers()) { instances.add(container.getComponentInstanceName()); + String componentZKPath = RegistryUtils.componentPath(RegistryUtils + .currentUser(), YarnServiceConstants.APP_TYPE, serviceName, + RegistryPathUtils.encodeYarnID(container.getId())); + GenericTestUtils.waitFor(() -> { + try { + return getCuratorService().zkPathExists(componentZKPath); + } catch (IOException e) { + return false; + } + }, 1000, 60000); } int i = 0;