YARN-8349. Remove YARN registry entries when a service is killed by the RM. (Billie Rinaldi via wangda)

Change-Id: Ia58db3637789a8921482f564aa9bdf99c45cc36c
This commit is contained in:
Wangda Tan 2018-06-01 14:07:23 -07:00
parent 8956e5b8db
commit ff583d3fa3
11 changed files with 358 additions and 129 deletions

View File

@ -450,6 +450,13 @@
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-services-core</artifactId>
<version>${hadoop.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>

View File

@ -139,6 +139,22 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-services-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -588,6 +588,17 @@ public int actionUpgradeComponents(String appName, List<String> components)
return result;
}
@Override
public int actionCleanUp(String appName, String userName) throws
IOException, YarnException {
ServiceClient sc = new ServiceClient();
sc.init(getConfig());
sc.start();
int result = sc.actionCleanUp(appName, userName);
sc.close();
return result;
}
private static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE =
new JsonSerDeser<>(Container[].class,
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
/**
* Minicluster test that verifies registry cleanup when app lifetime is
* exceeded.
*/
public class TestCleanupAfterKill extends ServiceTestUtils {
private static final Logger LOG =
LoggerFactory.getLogger(TestCleanupAfterKill.class);
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
@Before
public void setup() throws Exception {
File tmpYarnDir = new File("target", "tmp");
FileUtils.deleteQuietly(tmpYarnDir);
}
@After
public void tearDown() throws IOException {
shutdown();
}
@Test(timeout = 200000)
public void testRegistryCleanedOnLifetimeExceeded() throws Exception {
setupInternal(NUM_NMS);
ServiceClient client = createClient(getConf());
Service exampleApp = createExampleApplication();
exampleApp.setLifetime(30L);
client.actionCreate(exampleApp);
waitForServiceToBeStable(client, exampleApp);
String serviceZKPath = RegistryUtils.servicePath(RegistryUtils
.currentUser(), YarnServiceConstants.APP_TYPE, exampleApp.getName());
Assert.assertTrue("Registry ZK service path doesn't exist",
getCuratorService().zkPathExists(serviceZKPath));
// wait for app to be killed by RM
ApplicationId exampleAppId = ApplicationId.fromString(exampleApp.getId());
GenericTestUtils.waitFor(() -> {
try {
ApplicationReport ar = client.getYarnClient()
.getApplicationReport(exampleAppId);
return ar.getYarnApplicationState() == YarnApplicationState.KILLED;
} catch (YarnException | IOException e) {
throw new RuntimeException("while waiting", e);
}
}, 2000, 200000);
Assert.assertFalse("Registry ZK service path still exists after killed",
getCuratorService().zkPathExists(serviceZKPath));
LOG.info("Destroy the service");
Assert.assertEquals(0, client.actionDestroy(exampleApp.getName()));
}
}

View File

@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<configuration>
<!-- Dummy (invalid) config file to be overwritten by ServiceTestUtils with MiniCluster configuration. -->
</configuration>

View File

@ -308,6 +308,16 @@ public int actionUpgradeComponents(String appName,
return actionUpgrade(persistedService, containersToUpgrade);
}
@Override
public int actionCleanUp(String appName, String userName) throws
IOException, YarnException {
if (cleanUpRegistry(appName, userName)) {
return EXIT_SUCCESS;
} else {
return EXIT_FALSE;
}
}
public int actionUpgrade(Service service, List<Container> compInstances)
throws IOException, YarnException {
ApplicationReport appReport =
@ -639,9 +649,23 @@ public int actionDestroy(String serviceName) throws YarnException,
}
}
private boolean cleanUpRegistry(String serviceName, String user) throws
SliderException {
String encodedName = RegistryUtils.registryUser(user);
String registryPath = RegistryUtils.servicePath(encodedName,
YarnServiceConstants.APP_TYPE, serviceName);
return cleanUpRegistryPath(registryPath, serviceName);
}
private boolean cleanUpRegistry(String serviceName) throws SliderException {
String registryPath =
ServiceRegistryUtils.registryPathForInstance(serviceName);
return cleanUpRegistryPath(registryPath, serviceName);
}
private boolean cleanUpRegistryPath(String registryPath, String
serviceName) throws SliderException {
try {
if (getRegistryClient().exists(registryPath)) {
getRegistryClient().delete(registryPath, true);

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.service;
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.conf.Configuration;
@ -29,13 +31,17 @@
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.registry.client.impl.zk.CuratorService;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
@ -60,6 +66,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC;
@ -418,4 +425,132 @@ public java.nio.file.Path getServiceBasePath() {
return serviceBasePath;
}
}
/**
* Wait until all the containers for all components become ready state.
*
* @param client
* @param exampleApp
* @return all ready containers of a service.
* @throws TimeoutException
* @throws InterruptedException
*/
protected Multimap<String, String> waitForAllCompToBeReady(ServiceClient
client, Service exampleApp) throws TimeoutException,
InterruptedException {
int expectedTotalContainers = countTotalContainers(exampleApp);
Multimap<String, String> allContainers = HashMultimap.create();
GenericTestUtils.waitFor(() -> {
try {
Service retrievedApp = client.getStatus(exampleApp.getName());
int totalReadyContainers = 0;
allContainers.clear();
LOG.info("Num Components " + retrievedApp.getComponents().size());
for (Component component : retrievedApp.getComponents()) {
LOG.info("looking for " + component.getName());
LOG.info(component.toString());
if (component.getContainers() != null) {
if (component.getContainers().size() == exampleApp
.getComponent(component.getName()).getNumberOfContainers()) {
for (Container container : component.getContainers()) {
LOG.info(
"Container state " + container.getState() + ", component "
+ component.getName());
if (container.getState() == ContainerState.READY) {
totalReadyContainers++;
allContainers.put(component.getName(), container.getId());
LOG.info("Found 1 ready container " + container.getId());
}
}
} else {
LOG.info(component.getName() + " Expected number of containers "
+ exampleApp.getComponent(component.getName())
.getNumberOfContainers() + ", current = " + component
.getContainers());
}
}
}
LOG.info("Exit loop, totalReadyContainers= " + totalReadyContainers
+ " expected = " + expectedTotalContainers);
return totalReadyContainers == expectedTotalContainers;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}, 2000, 200000);
return allContainers;
}
/**
* Wait until service state becomes stable. A service is stable when all
* requested containers of all components are running and in ready state.
*
* @param client
* @param exampleApp
* @throws TimeoutException
* @throws InterruptedException
*/
protected void waitForServiceToBeStable(ServiceClient client,
Service exampleApp) throws TimeoutException, InterruptedException {
waitForServiceToBeStable(client, exampleApp, 200000);
}
protected void waitForServiceToBeStable(ServiceClient client,
Service exampleApp, int waitForMillis)
throws TimeoutException, InterruptedException {
waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE,
waitForMillis);
}
/**
* Wait until service is started. It does not have to reach a stable state.
*
* @param client
* @param exampleApp
* @throws TimeoutException
* @throws InterruptedException
*/
protected void waitForServiceToBeStarted(ServiceClient client,
Service exampleApp) throws TimeoutException, InterruptedException {
waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED);
}
protected void waitForServiceToBeInState(ServiceClient client,
Service exampleApp, ServiceState desiredState) throws TimeoutException,
InterruptedException {
waitForServiceToBeInState(client, exampleApp, desiredState, 200000);
}
/**
* Wait until service is started. It does not have to reach a stable state.
*
* @param client
* @param exampleApp
* @throws TimeoutException
* @throws InterruptedException
*/
protected void waitForServiceToBeInState(ServiceClient client,
Service exampleApp, ServiceState desiredState, int waitForMillis) throws
TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
try {
Service retrievedApp = client.getStatus(exampleApp.getName());
System.out.println(retrievedApp);
return retrievedApp.getState() == desiredState;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}, 2000, waitForMillis);
}
private int countTotalContainers(Service service) {
int totalContainers = 0;
for (Component component : service.getComponents()) {
totalContainers += component.getNumberOfContainers();
}
return totalContainers;
}
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.service;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
@ -36,7 +35,6 @@
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
import org.apache.hadoop.yarn.service.api.records.PlacementScope;
@ -806,131 +804,4 @@ private void checkEachCompInstancesInOrder(Component component, String
i++;
}
}
/**
* Wait until all the containers for all components become ready state.
*
* @param client
* @param exampleApp
* @return all ready containers of a service.
* @throws TimeoutException
* @throws InterruptedException
*/
private Multimap<String, String> waitForAllCompToBeReady(ServiceClient client,
Service exampleApp) throws TimeoutException, InterruptedException {
int expectedTotalContainers = countTotalContainers(exampleApp);
Multimap<String, String> allContainers = HashMultimap.create();
GenericTestUtils.waitFor(() -> {
try {
Service retrievedApp = client.getStatus(exampleApp.getName());
int totalReadyContainers = 0;
allContainers.clear();
LOG.info("Num Components " + retrievedApp.getComponents().size());
for (Component component : retrievedApp.getComponents()) {
LOG.info("looking for " + component.getName());
LOG.info(component.toString());
if (component.getContainers() != null) {
if (component.getContainers().size() == exampleApp
.getComponent(component.getName()).getNumberOfContainers()) {
for (Container container : component.getContainers()) {
LOG.info(
"Container state " + container.getState() + ", component "
+ component.getName());
if (container.getState() == ContainerState.READY) {
totalReadyContainers++;
allContainers.put(component.getName(), container.getId());
LOG.info("Found 1 ready container " + container.getId());
}
}
} else {
LOG.info(component.getName() + " Expected number of containers "
+ exampleApp.getComponent(component.getName())
.getNumberOfContainers() + ", current = " + component
.getContainers());
}
}
}
LOG.info("Exit loop, totalReadyContainers= " + totalReadyContainers
+ " expected = " + expectedTotalContainers);
return totalReadyContainers == expectedTotalContainers;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}, 2000, 200000);
return allContainers;
}
/**
* Wait until service state becomes stable. A service is stable when all
* requested containers of all components are running and in ready state.
*
* @param client
* @param exampleApp
* @throws TimeoutException
* @throws InterruptedException
*/
private void waitForServiceToBeStable(ServiceClient client,
Service exampleApp) throws TimeoutException, InterruptedException {
waitForServiceToBeStable(client, exampleApp, 200000);
}
private void waitForServiceToBeStable(ServiceClient client,
Service exampleApp, int waitForMillis)
throws TimeoutException, InterruptedException {
waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE,
waitForMillis);
}
/**
* Wait until service is started. It does not have to reach a stable state.
*
* @param client
* @param exampleApp
* @throws TimeoutException
* @throws InterruptedException
*/
private void waitForServiceToBeStarted(ServiceClient client,
Service exampleApp) throws TimeoutException, InterruptedException {
waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED);
}
private void waitForServiceToBeInState(ServiceClient client,
Service exampleApp, ServiceState desiredState) throws TimeoutException,
InterruptedException {
waitForServiceToBeInState(client, exampleApp, desiredState, 200000);
}
/**
* Wait until service is started. It does not have to reach a stable state.
*
* @param client
* @param exampleApp
* @throws TimeoutException
* @throws InterruptedException
*/
private void waitForServiceToBeInState(ServiceClient client,
Service exampleApp, ServiceState desiredState, int waitForMillis) throws
TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
try {
Service retrievedApp = client.getStatus(exampleApp.getName());
System.out.println(retrievedApp);
return retrievedApp.getState() == desiredState;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}, 2000, waitForMillis);
}
private int countTotalContainers(Service service) {
int totalContainers = 0;
for (Component component : service.getComponents()) {
totalContainers += component.getNumberOfContainers();
}
return totalContainers;
}
}

View File

@ -270,4 +270,16 @@ public abstract int actionUpgradeInstances(String appName,
public abstract int actionUpgradeComponents(String appName,
List<String> components) throws IOException, YarnException;
/**
* Operation to be performed by the RM after an application has completed.
*
* @param appName the name of the application.
* @param userName the name of the user.
* @return exit code
*/
@Public
@Unstable
public abstract int actionCleanUp(String appName, String userName) throws
IOException, YarnException;
}

View File

@ -296,6 +296,16 @@ public static String getCurrentUsernameUnencoded(String env_hadoop_username) {
*/
public static String currentUser() {
String shortUserName = currentUsernameUnencoded();
return registryUser(shortUserName);
}
/**
* Convert the given user name formatted for the registry.
*
* @param shortUserName
* @return converted user name
*/
public static String registryUser(String shortUserName) {
String encodedName = encodeForRegistry(shortUserName);
// DNS name doesn't allow "_", replace it with "-"
encodedName = RegistryUtils.convertUsername(encodedName);

View File

@ -65,6 +65,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -1470,6 +1471,33 @@ public void transition(RMAppImpl app, RMAppEvent event) {
};
}
/**
* Attempt to perform a type-specific cleanup after application has completed.
*
* @param app application to clean up
*/
static void appAdminClientCleanUp(RMAppImpl app) {
try {
AppAdminClient client = AppAdminClient.createAppAdminClient(app
.applicationType, app.conf);
int result = client.actionCleanUp(app.name, app.user);
if (result == 0) {
LOG.info("Type-specific cleanup of application " + app.applicationId
+ " of type " + app.applicationType + " succeeded");
} else {
LOG.warn("Type-specific cleanup of application " + app.applicationId
+ " of type " + app.applicationType + " did not succeed with exit"
+ " code " + result);
}
} catch (IllegalArgumentException e) {
// no AppAdminClient class has been specified for the application type,
// so this does not need to be logged
} catch (Exception e) {
LOG.warn("Could not run type-specific cleanup on application " +
app.applicationId + " of type " + app.applicationType, e);
}
}
private static class FinalTransition extends RMAppTransition {
private final RMAppState finalState;
@ -1504,6 +1532,8 @@ public void transition(RMAppImpl app, RMAppEvent event) {
.appFinished(app, finalState, app.finishTime);
// set the memory free
app.clearUnusedFields();
appAdminClientCleanUp(app);
};
}