YARN-7984. Improved YARN service stop/destroy and clean up.
Contributed by Billie Rinaldi
(cherry picked from commit d553799030
)
This commit is contained in:
parent
035e0f97ea
commit
a375fe8263
|
@ -231,30 +231,40 @@ public class ApiServer {
|
|||
e.getCause().getMessage());
|
||||
} catch (YarnException | FileNotFoundException e) {
|
||||
return formatResponse(Status.NOT_FOUND, e.getMessage());
|
||||
} catch (IOException | InterruptedException e) {
|
||||
} catch (Exception e) {
|
||||
LOG.error("Fail to stop service: {}", e);
|
||||
return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private Response stopService(String appName, boolean destroy,
|
||||
final UserGroupInformation ugi) throws IOException,
|
||||
InterruptedException, YarnException, FileNotFoundException {
|
||||
final UserGroupInformation ugi) throws Exception {
|
||||
int result = ugi.doAs(new PrivilegedExceptionAction<Integer>() {
|
||||
@Override
|
||||
public Integer run() throws IOException, YarnException,
|
||||
FileNotFoundException {
|
||||
public Integer run() throws Exception {
|
||||
int result = 0;
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
result = sc.actionStop(appName, destroy);
|
||||
if (result == EXIT_SUCCESS) {
|
||||
LOG.info("Successfully stopped service {}", appName);
|
||||
Exception stopException = null;
|
||||
try {
|
||||
result = sc.actionStop(appName, destroy);
|
||||
if (result == EXIT_SUCCESS) {
|
||||
LOG.info("Successfully stopped service {}", appName);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.info("Got exception stopping service", e);
|
||||
stopException = e;
|
||||
}
|
||||
if (destroy) {
|
||||
result = sc.actionDestroy(appName);
|
||||
LOG.info("Successfully deleted service {}", appName);
|
||||
if (result == EXIT_SUCCESS) {
|
||||
LOG.info("Successfully deleted service {}", appName);
|
||||
}
|
||||
} else {
|
||||
if (stopException != null) {
|
||||
throw stopException;
|
||||
}
|
||||
}
|
||||
sc.close();
|
||||
return result;
|
||||
|
@ -262,8 +272,21 @@ public class ApiServer {
|
|||
});
|
||||
ServiceStatus serviceStatus = new ServiceStatus();
|
||||
if (destroy) {
|
||||
serviceStatus.setDiagnostics("Successfully destroyed service " +
|
||||
appName);
|
||||
if (result == EXIT_SUCCESS) {
|
||||
serviceStatus.setDiagnostics("Successfully destroyed service " +
|
||||
appName);
|
||||
} else {
|
||||
if (result == EXIT_NOT_FOUND) {
|
||||
serviceStatus
|
||||
.setDiagnostics("Service " + appName + " doesn't exist");
|
||||
return formatResponse(Status.BAD_REQUEST, serviceStatus);
|
||||
} else {
|
||||
serviceStatus
|
||||
.setDiagnostics("Service " + appName + " error cleaning up " +
|
||||
"registry");
|
||||
return formatResponse(Status.INTERNAL_SERVER_ERROR, serviceStatus);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (result == EXIT_COMMAND_ARGUMENT_ERROR) {
|
||||
serviceStatus
|
||||
|
@ -394,7 +417,7 @@ public class ApiServer {
|
|||
String message = "Service is not found in hdfs: " + appName;
|
||||
LOG.error(message, e);
|
||||
return formatResponse(Status.NOT_FOUND, e.getMessage());
|
||||
} catch (IOException | InterruptedException e) {
|
||||
} catch (Exception e) {
|
||||
String message = "Error while performing operation for app: " + appName;
|
||||
LOG.error(message, e);
|
||||
return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());
|
||||
|
|
|
@ -103,6 +103,12 @@ public class ServiceClientTest extends ServiceClient {
|
|||
}
|
||||
if (serviceName.equals("jenkins")) {
|
||||
return EXIT_SUCCESS;
|
||||
} else if (serviceName.equals("jenkins-already-stopped")) {
|
||||
return EXIT_SUCCESS;
|
||||
} else if (serviceName.equals("jenkins-doesn't-exist")) {
|
||||
return EXIT_NOT_FOUND;
|
||||
} else if (serviceName.equals("jenkins-error-cleaning-registry")) {
|
||||
return EXIT_OTHER_FAILURE;
|
||||
} else {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
|
|
@ -154,6 +154,24 @@ public class TestApiServer {
|
|||
actual.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBadDeleteService3() {
|
||||
final Response actual = apiServer.deleteService(request,
|
||||
"jenkins-doesn't-exist");
|
||||
assertEquals("Delete service is ",
|
||||
Response.status(Status.BAD_REQUEST).build().getStatus(),
|
||||
actual.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBadDeleteService4() {
|
||||
final Response actual = apiServer.deleteService(request,
|
||||
"jenkins-error-cleaning-registry");
|
||||
assertEquals("Delete service is ",
|
||||
Response.status(Status.INTERNAL_SERVER_ERROR).build().getStatus(),
|
||||
actual.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGoodDeleteService() {
|
||||
final Response actual = apiServer.deleteService(request, "jenkins");
|
||||
|
@ -161,6 +179,14 @@ public class TestApiServer {
|
|||
Response.status(Status.OK).build().getStatus(), actual.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteStoppedService() {
|
||||
final Response actual = apiServer.deleteService(request,
|
||||
"jenkins-already-stopped");
|
||||
assertEquals("Delete service is ",
|
||||
Response.status(Status.OK).build().getStatus(), actual.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecreaseContainerAndStop() {
|
||||
Service service = new Service();
|
||||
|
|
|
@ -375,8 +375,14 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
.save(fs.getFileSystem(), ServiceApiUtil.getServiceJsonPath(fs, serviceName),
|
||||
persistedService, true);
|
||||
|
||||
ApplicationId appId = getAppId(serviceName);
|
||||
if (appId == null) {
|
||||
String message = "Application ID doesn't exist for " + serviceName;
|
||||
LOG.error(message);
|
||||
throw new YarnException(message);
|
||||
}
|
||||
ApplicationReport appReport =
|
||||
yarnClient.getApplicationReport(getAppId(serviceName));
|
||||
yarnClient.getApplicationReport(appId);
|
||||
if (appReport.getYarnApplicationState() != RUNNING) {
|
||||
String message =
|
||||
serviceName + " is at " + appReport.getYarnApplicationState()
|
||||
|
@ -408,10 +414,16 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
throws YarnException, IOException {
|
||||
ServiceApiUtil.validateNameFormat(serviceName, getConfig());
|
||||
ApplicationId currentAppId = getAppId(serviceName);
|
||||
if (currentAppId == null) {
|
||||
LOG.info("Application ID doesn't exist for service {}", serviceName);
|
||||
cleanUpRegistry(serviceName);
|
||||
return EXIT_COMMAND_ARGUMENT_ERROR;
|
||||
}
|
||||
ApplicationReport report = yarnClient.getApplicationReport(currentAppId);
|
||||
if (terminatedStates.contains(report.getYarnApplicationState())) {
|
||||
LOG.info("Service {} is already in a terminated state {}", serviceName,
|
||||
report.getYarnApplicationState());
|
||||
cleanUpRegistry(serviceName);
|
||||
return EXIT_COMMAND_ARGUMENT_ERROR;
|
||||
}
|
||||
if (preRunningStates.contains(report.getYarnApplicationState())) {
|
||||
|
@ -419,6 +431,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
+ ", forcefully killed by user!";
|
||||
yarnClient.killApplication(currentAppId, msg);
|
||||
LOG.info(msg);
|
||||
cleanUpRegistry(serviceName);
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
if (StringUtils.isEmpty(report.getHost())) {
|
||||
|
@ -438,10 +451,12 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
yarnClient.killApplication(currentAppId,
|
||||
serviceName + " is forcefully killed by user!");
|
||||
LOG.info("Forcefully kill the service: " + serviceName);
|
||||
cleanUpRegistry(serviceName);
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
if (!waitForAppStopped) {
|
||||
cleanUpRegistry(serviceName);
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
// Wait until the app is killed.
|
||||
|
@ -471,6 +486,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
+ e.getMessage() + ", forcefully kill the app.");
|
||||
yarnClient.killApplication(currentAppId, "Forcefully kill the app");
|
||||
}
|
||||
cleanUpRegistry(serviceName);
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -484,7 +500,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
FileSystem fileSystem = fs.getFileSystem();
|
||||
// remove from the appId cache
|
||||
cachedAppInfo.remove(serviceName);
|
||||
boolean destroySucceed = true;
|
||||
int ret = EXIT_SUCCESS;
|
||||
if (fileSystem.exists(appDir)) {
|
||||
if (fileSystem.delete(appDir, true)) {
|
||||
LOG.info("Successfully deleted service dir for " + serviceName + ": "
|
||||
|
@ -498,13 +514,34 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
} else {
|
||||
LOG.info("Service '" + serviceName + "' doesn't exist at hdfs path: "
|
||||
+ appDir);
|
||||
destroySucceed = false;
|
||||
ret = EXIT_NOT_FOUND;
|
||||
}
|
||||
try {
|
||||
deleteZKNode(serviceName);
|
||||
// don't set destroySucceed to false if no ZK node exists because not
|
||||
// all services use a ZK node
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Could not delete zk node for " + serviceName, e);
|
||||
}
|
||||
if (!cleanUpRegistry(serviceName)) {
|
||||
if (ret == EXIT_SUCCESS) {
|
||||
ret = EXIT_OTHER_FAILURE;
|
||||
}
|
||||
}
|
||||
if (ret == EXIT_SUCCESS) {
|
||||
LOG.info("Successfully destroyed service {}", serviceName);
|
||||
return ret;
|
||||
} else if (ret == EXIT_NOT_FOUND) {
|
||||
LOG.error("Error on destroy '" + serviceName + "': not found.");
|
||||
return ret;
|
||||
} else {
|
||||
LOG.error("Error on destroy '" + serviceName + "': error cleaning up " +
|
||||
"registry.");
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean cleanUpRegistry(String serviceName) throws SliderException {
|
||||
String registryPath =
|
||||
ServiceRegistryUtils.registryPathForInstance(serviceName);
|
||||
try {
|
||||
|
@ -514,18 +551,13 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
LOG.info(
|
||||
"Service '" + serviceName + "' doesn't exist at ZK registry path: "
|
||||
+ registryPath);
|
||||
destroySucceed = false;
|
||||
// not counted as a failure if the registry entries don't exist
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Error deleting registry entry {}", registryPath, e);
|
||||
return false;
|
||||
}
|
||||
if (destroySucceed) {
|
||||
LOG.info("Successfully destroyed service {}", serviceName);
|
||||
return EXIT_SUCCESS;
|
||||
} else {
|
||||
LOG.error("Error on destroy '" + serviceName + "': not found.");
|
||||
return -1;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private synchronized RegistryOperations getRegistryClient()
|
||||
|
@ -540,17 +572,25 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
return registryClient;
|
||||
}
|
||||
|
||||
private boolean deleteZKNode(String clusterName) throws Exception {
|
||||
/**
|
||||
* Delete service's ZK node. This is a different node from the service's
|
||||
* registry entry and is set aside for the service to use for its own ZK data.
|
||||
*
|
||||
* @param serviceName service name
|
||||
* @return true if the node was deleted, false if the node doesn't exist
|
||||
* @throws Exception if the node couldn't be deleted
|
||||
*/
|
||||
private boolean deleteZKNode(String serviceName) throws Exception {
|
||||
CuratorFramework curatorFramework = getCuratorClient();
|
||||
String user = RegistryUtils.currentUser();
|
||||
String zkPath = ServiceRegistryUtils.mkServiceHomePath(user, clusterName);
|
||||
String zkPath = ServiceRegistryUtils.mkServiceHomePath(user, serviceName);
|
||||
if (curatorFramework.checkExists().forPath(zkPath) != null) {
|
||||
curatorFramework.delete().deletingChildrenIfNeeded().forPath(zkPath);
|
||||
LOG.info("Deleted zookeeper path: " + zkPath);
|
||||
return true;
|
||||
} else {
|
||||
LOG.info(
|
||||
"Service '" + clusterName + "' doesn't exist at ZK path: " + zkPath);
|
||||
"Service '" + serviceName + "' doesn't exist at ZK path: " + zkPath);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -968,6 +1008,9 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
public String updateLifetime(String serviceName, long lifetime)
|
||||
throws YarnException, IOException {
|
||||
ApplicationId currentAppId = getAppId(serviceName);
|
||||
if (currentAppId == null) {
|
||||
throw new YarnException("Application ID not found for " + serviceName);
|
||||
}
|
||||
ApplicationReport report = yarnClient.getApplicationReport(currentAppId);
|
||||
if (report == null) {
|
||||
throw new YarnException("Service not found for " + serviceName);
|
||||
|
@ -1013,8 +1056,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
throws IOException, YarnException {
|
||||
try {
|
||||
// try parsing appIdOrName, if it succeeds, it means it's appId
|
||||
ApplicationId.fromString(appIdOrName);
|
||||
return getStatusByAppId(appIdOrName);
|
||||
ApplicationId appId = ApplicationId.fromString(appIdOrName);
|
||||
return getStatusByAppId(appId);
|
||||
} catch (IllegalArgumentException e) {
|
||||
// not appId format, it could be appName.
|
||||
Service status = getStatus(appIdOrName);
|
||||
|
@ -1022,10 +1065,10 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
}
|
||||
}
|
||||
|
||||
private String getStatusByAppId(String appId)
|
||||
private String getStatusByAppId(ApplicationId appId)
|
||||
throws IOException, YarnException {
|
||||
ApplicationReport appReport =
|
||||
yarnClient.getApplicationReport(ApplicationId.fromString(appId));
|
||||
yarnClient.getApplicationReport(appId);
|
||||
|
||||
if (appReport.getYarnApplicationState() != RUNNING) {
|
||||
return "";
|
||||
|
@ -1042,10 +1085,14 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
public Service getStatus(String serviceName)
|
||||
throws IOException, YarnException {
|
||||
ServiceApiUtil.validateNameFormat(serviceName, getConfig());
|
||||
ApplicationId currentAppId = getAppId(serviceName);
|
||||
ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId);
|
||||
Service appSpec = new Service();
|
||||
appSpec.setName(serviceName);
|
||||
ApplicationId currentAppId = getAppId(serviceName);
|
||||
if (currentAppId == null) {
|
||||
LOG.info("Service {} does not have an application ID", serviceName);
|
||||
return appSpec;
|
||||
}
|
||||
ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId);
|
||||
appSpec.setState(convertState(appReport.getYarnApplicationState()));
|
||||
ApplicationTimeout lifetime =
|
||||
appReport.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME);
|
||||
|
@ -1172,7 +1219,11 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
throw new YarnException("Service " + serviceName
|
||||
+ " doesn't exist on hdfs. Please check if the app exists in RM");
|
||||
}
|
||||
ApplicationId currentAppId = ApplicationId.fromString(persistedService.getId());
|
||||
if (persistedService.getId() == null) {
|
||||
return null;
|
||||
}
|
||||
ApplicationId currentAppId = ApplicationId.fromString(persistedService
|
||||
.getId());
|
||||
cachedAppInfo.put(serviceName, new AppInfo(currentAppId, persistedService
|
||||
.getKerberosPrincipal().getPrincipalName()));
|
||||
return currentAppId;
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.http.HttpServer2;
|
||||
import org.apache.hadoop.registry.client.impl.zk.CuratorService;
|
||||
import org.apache.hadoop.service.ServiceOperations;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
|
@ -74,7 +76,8 @@ public class ServiceTestUtils {
|
|||
|
||||
private MiniYARNCluster yarnCluster = null;
|
||||
private MiniDFSCluster hdfsCluster = null;
|
||||
TestingCluster zkCluster;
|
||||
private TestingCluster zkCluster;
|
||||
private CuratorService curatorService;
|
||||
private FileSystem fs = null;
|
||||
private Configuration conf = null;
|
||||
public static final int NUM_NMS = 1;
|
||||
|
@ -186,6 +189,10 @@ public class ServiceTestUtils {
|
|||
conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
|
||||
LOG.info("ZK cluster: " + zkCluster.getConnectString());
|
||||
|
||||
curatorService = new CuratorService("testCuratorService");
|
||||
curatorService.init(conf);
|
||||
curatorService.start();
|
||||
|
||||
fs = FileSystem.get(conf);
|
||||
basedir = new File("target", "apps");
|
||||
if (basedir.exists()) {
|
||||
|
@ -253,6 +260,9 @@ public class ServiceTestUtils {
|
|||
hdfsCluster = null;
|
||||
}
|
||||
}
|
||||
if (curatorService != null) {
|
||||
ServiceOperations.stop(curatorService);
|
||||
}
|
||||
if (zkCluster != null) {
|
||||
zkCluster.stop();
|
||||
}
|
||||
|
@ -295,6 +305,9 @@ public class ServiceTestUtils {
|
|||
return client;
|
||||
}
|
||||
|
||||
protected CuratorService getCuratorService() throws IOException {
|
||||
return curatorService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Watcher to initialize yarn service base path under target and deletes the
|
||||
|
|
|
@ -22,6 +22,8 @@ import com.google.common.collect.HashMultimap;
|
|||
import com.google.common.collect.Multimap;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
|
||||
import org.apache.hadoop.registry.client.binding.RegistryUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.records.*;
|
||||
|
@ -33,6 +35,7 @@ import org.apache.hadoop.yarn.service.api.records.Component;
|
|||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.service.client.ServiceClient;
|
||||
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
|
||||
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
||||
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
|
@ -52,6 +55,8 @@ import java.util.concurrent.TimeoutException;
|
|||
|
||||
import static org.apache.hadoop.yarn.api.records.YarnApplicationState.FINISHED;
|
||||
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH;
|
||||
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR;
|
||||
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_NOT_FOUND;
|
||||
|
||||
/**
|
||||
* End to end tests to test deploying services with MiniYarnCluster and a in-JVM
|
||||
|
@ -124,6 +129,10 @@ public class TestYarnNativeServices extends ServiceTestUtils {
|
|||
Assert.assertEquals(FINISHED, report.getYarnApplicationState());
|
||||
Assert.assertEquals(FinalApplicationStatus.ENDED,
|
||||
report.getFinalApplicationStatus());
|
||||
String serviceZKPath = RegistryUtils.servicePath(RegistryUtils
|
||||
.currentUser(), YarnServiceConstants.APP_TYPE, exampleApp.getName());
|
||||
Assert.assertFalse("Registry ZK service path still exists after stop",
|
||||
getCuratorService().zkPathExists(serviceZKPath));
|
||||
|
||||
LOG.info("Destroy the service");
|
||||
// destroy the service and check the app dir is deleted from fs.
|
||||
|
@ -132,7 +141,20 @@ public class TestYarnNativeServices extends ServiceTestUtils {
|
|||
Assert.assertFalse(getFS().exists(appDir));
|
||||
|
||||
// check that destroying again does not succeed
|
||||
Assert.assertEquals(-1, client.actionDestroy(exampleApp.getName()));
|
||||
Assert.assertEquals(EXIT_NOT_FOUND, client.actionDestroy(exampleApp.getName()));
|
||||
}
|
||||
|
||||
// Save a service without starting it and ensure that stop does not NPE and
|
||||
// that service can be successfully destroyed
|
||||
@Test (timeout = 200000)
|
||||
public void testStopDestroySavedService() throws Exception {
|
||||
setupInternal(NUM_NMS);
|
||||
ServiceClient client = createClient(getConf());
|
||||
Service exampleApp = createExampleApplication();
|
||||
client.actionBuild(exampleApp);
|
||||
Assert.assertEquals(EXIT_COMMAND_ARGUMENT_ERROR, client.actionStop(
|
||||
exampleApp.getName()));
|
||||
Assert.assertEquals(0, client.actionDestroy(exampleApp.getName()));
|
||||
}
|
||||
|
||||
// Create compa with 2 containers
|
||||
|
@ -411,19 +433,31 @@ public class TestYarnNativeServices extends ServiceTestUtils {
|
|||
// When flex up to 4 instances, it should be compA-1 , compA-2, compA-3, compA-4
|
||||
// When flex down to 3 instances, it should be compA-1 , compA-2, compA-3.
|
||||
private void checkCompInstancesInOrder(ServiceClient client,
|
||||
Service exampleApp) throws IOException, YarnException {
|
||||
Service exampleApp) throws IOException, YarnException,
|
||||
TimeoutException, InterruptedException {
|
||||
Service service = client.getStatus(exampleApp.getName());
|
||||
for (Component comp : service.getComponents()) {
|
||||
checkEachCompInstancesInOrder(comp);
|
||||
checkEachCompInstancesInOrder(comp, exampleApp.getName());
|
||||
}
|
||||
}
|
||||
|
||||
private void checkEachCompInstancesInOrder(Component component) {
|
||||
private void checkEachCompInstancesInOrder(Component component, String
|
||||
serviceName) throws TimeoutException, InterruptedException {
|
||||
long expectedNumInstances = component.getNumberOfContainers();
|
||||
Assert.assertEquals(expectedNumInstances, component.getContainers().size());
|
||||
TreeSet<String> instances = new TreeSet<>();
|
||||
for (Container container : component.getContainers()) {
|
||||
instances.add(container.getComponentInstanceName());
|
||||
String componentZKPath = RegistryUtils.componentPath(RegistryUtils
|
||||
.currentUser(), YarnServiceConstants.APP_TYPE, serviceName,
|
||||
RegistryPathUtils.encodeYarnID(container.getId()));
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
try {
|
||||
return getCuratorService().zkPathExists(componentZKPath);
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}, 1000, 60000);
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
|
|
Loading…
Reference in New Issue