CONTAINER_JSON_SERDE =
new JsonSerDeser<>(Container[].class,
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestCleanupAfterKill.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestCleanupAfterKill.java
new file mode 100644
index 00000000000..51e834a34d9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestCleanupAfterKill.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Minicluster test that verifies registry cleanup when app lifetime is
+ * exceeded.
+ */
+public class TestCleanupAfterKill extends ServiceTestUtils {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestCleanupAfterKill.class);
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Before
+ public void setup() throws Exception {
+ File tmpYarnDir = new File("target", "tmp");
+ FileUtils.deleteQuietly(tmpYarnDir);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ shutdown();
+ }
+
+ @Test(timeout = 200000)
+ public void testRegistryCleanedOnLifetimeExceeded() throws Exception {
+ setupInternal(NUM_NMS);
+ ServiceClient client = createClient(getConf());
+ Service exampleApp = createExampleApplication();
+ exampleApp.setLifetime(30L);
+ client.actionCreate(exampleApp);
+ waitForServiceToBeStable(client, exampleApp);
+ String serviceZKPath = RegistryUtils.servicePath(RegistryUtils
+ .currentUser(), YarnServiceConstants.APP_TYPE, exampleApp.getName());
+ Assert.assertTrue("Registry ZK service path doesn't exist",
+ getCuratorService().zkPathExists(serviceZKPath));
+
+ // wait for app to be killed by RM
+ ApplicationId exampleAppId = ApplicationId.fromString(exampleApp.getId());
+ GenericTestUtils.waitFor(() -> {
+ try {
+ ApplicationReport ar = client.getYarnClient()
+ .getApplicationReport(exampleAppId);
+ return ar.getYarnApplicationState() == YarnApplicationState.KILLED;
+ } catch (YarnException | IOException e) {
+ throw new RuntimeException("while waiting", e);
+ }
+ }, 2000, 200000);
+ Assert.assertFalse("Registry ZK service path still exists after killed",
+ getCuratorService().zkPathExists(serviceZKPath));
+
+ LOG.info("Destroy the service");
+ Assert.assertEquals(0, client.actionDestroy(exampleApp.getName()));
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/resources/yarn-site.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/resources/yarn-site.xml
new file mode 100644
index 00000000000..daac23adcd4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/resources/yarn-site.xml
@@ -0,0 +1,19 @@
+
+
+
+
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
index c86f5de5947..3f6e8966e4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
@@ -308,6 +308,16 @@ public int actionUpgradeComponents(String appName,
return actionUpgrade(persistedService, containersToUpgrade);
}
+ @Override
+ public int actionCleanUp(String appName, String userName) throws
+ IOException, YarnException {
+ if (cleanUpRegistry(appName, userName)) {
+ return EXIT_SUCCESS;
+ } else {
+ return EXIT_FALSE;
+ }
+ }
+
public int actionUpgrade(Service service, List compInstances)
throws IOException, YarnException {
ApplicationReport appReport =
@@ -639,9 +649,23 @@ public int actionDestroy(String serviceName) throws YarnException,
}
}
+ private boolean cleanUpRegistry(String serviceName, String user) throws
+ SliderException {
+ String encodedName = RegistryUtils.registryUser(user);
+
+ String registryPath = RegistryUtils.servicePath(encodedName,
+ YarnServiceConstants.APP_TYPE, serviceName);
+ return cleanUpRegistryPath(registryPath, serviceName);
+ }
+
private boolean cleanUpRegistry(String serviceName) throws SliderException {
String registryPath =
ServiceRegistryUtils.registryPathForInstance(serviceName);
+ return cleanUpRegistryPath(registryPath, serviceName);
+ }
+
+ private boolean cleanUpRegistryPath(String registryPath, String
+ serviceName) throws SliderException {
try {
if (getRegistryClient().exists(registryPath)) {
getRegistryClient().delete(registryPath, true);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
index 86b4cea649c..3d1412dfe71 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.service;
import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.conf.Configuration;
@@ -29,13 +31,17 @@
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.registry.client.impl.zk.CuratorService;
import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
@@ -60,6 +66,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC;
@@ -418,4 +425,132 @@ public java.nio.file.Path getServiceBasePath() {
return serviceBasePath;
}
}
+
+ /**
+ * Wait until all the containers for all components become ready state.
+ *
+ * @param client
+ * @param exampleApp
+ * @return all ready containers of a service.
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ protected Multimap waitForAllCompToBeReady(ServiceClient
+ client, Service exampleApp) throws TimeoutException,
+ InterruptedException {
+ int expectedTotalContainers = countTotalContainers(exampleApp);
+
+ Multimap allContainers = HashMultimap.create();
+
+ GenericTestUtils.waitFor(() -> {
+ try {
+ Service retrievedApp = client.getStatus(exampleApp.getName());
+ int totalReadyContainers = 0;
+ allContainers.clear();
+ LOG.info("Num Components " + retrievedApp.getComponents().size());
+ for (Component component : retrievedApp.getComponents()) {
+ LOG.info("looking for " + component.getName());
+ LOG.info(component.toString());
+ if (component.getContainers() != null) {
+ if (component.getContainers().size() == exampleApp
+ .getComponent(component.getName()).getNumberOfContainers()) {
+ for (Container container : component.getContainers()) {
+ LOG.info(
+ "Container state " + container.getState() + ", component "
+ + component.getName());
+ if (container.getState() == ContainerState.READY) {
+ totalReadyContainers++;
+ allContainers.put(component.getName(), container.getId());
+ LOG.info("Found 1 ready container " + container.getId());
+ }
+ }
+ } else {
+ LOG.info(component.getName() + " Expected number of containers "
+ + exampleApp.getComponent(component.getName())
+ .getNumberOfContainers() + ", current = " + component
+ .getContainers());
+ }
+ }
+ }
+ LOG.info("Exit loop, totalReadyContainers= " + totalReadyContainers
+ + " expected = " + expectedTotalContainers);
+ return totalReadyContainers == expectedTotalContainers;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 2000, 200000);
+ return allContainers;
+ }
+
+ /**
+ * Wait until service state becomes stable. A service is stable when all
+ * requested containers of all components are running and in ready state.
+ *
+ * @param client
+ * @param exampleApp
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ protected void waitForServiceToBeStable(ServiceClient client,
+ Service exampleApp) throws TimeoutException, InterruptedException {
+ waitForServiceToBeStable(client, exampleApp, 200000);
+ }
+
+ protected void waitForServiceToBeStable(ServiceClient client,
+ Service exampleApp, int waitForMillis)
+ throws TimeoutException, InterruptedException {
+ waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE,
+ waitForMillis);
+ }
+
+ /**
+ * Wait until service is started. It does not have to reach a stable state.
+ *
+ * @param client
+ * @param exampleApp
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ protected void waitForServiceToBeStarted(ServiceClient client,
+ Service exampleApp) throws TimeoutException, InterruptedException {
+ waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED);
+ }
+
+ protected void waitForServiceToBeInState(ServiceClient client,
+ Service exampleApp, ServiceState desiredState) throws TimeoutException,
+ InterruptedException {
+ waitForServiceToBeInState(client, exampleApp, desiredState, 200000);
+ }
+
+ /**
+ * Wait until service is started. It does not have to reach a stable state.
+ *
+ * @param client
+ * @param exampleApp
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ protected void waitForServiceToBeInState(ServiceClient client,
+ Service exampleApp, ServiceState desiredState, int waitForMillis) throws
+ TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(() -> {
+ try {
+ Service retrievedApp = client.getStatus(exampleApp.getName());
+ System.out.println(retrievedApp);
+ return retrievedApp.getState() == desiredState;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 2000, waitForMillis);
+ }
+
+ private int countTotalContainers(Service service) {
+ int totalContainers = 0;
+ for (Component component : service.getComponents()) {
+ totalContainers += component.getNumberOfContainers();
+ }
+ return totalContainers;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
index ae209b929ed..8b13b2495b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.service;
-import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
@@ -36,7 +35,6 @@
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.api.records.Container;
-import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
import org.apache.hadoop.yarn.service.api.records.PlacementScope;
@@ -806,131 +804,4 @@ private void checkEachCompInstancesInOrder(Component component, String
i++;
}
}
-
- /**
- * Wait until all the containers for all components become ready state.
- *
- * @param client
- * @param exampleApp
- * @return all ready containers of a service.
- * @throws TimeoutException
- * @throws InterruptedException
- */
- private Multimap waitForAllCompToBeReady(ServiceClient client,
- Service exampleApp) throws TimeoutException, InterruptedException {
- int expectedTotalContainers = countTotalContainers(exampleApp);
-
- Multimap allContainers = HashMultimap.create();
-
- GenericTestUtils.waitFor(() -> {
- try {
- Service retrievedApp = client.getStatus(exampleApp.getName());
- int totalReadyContainers = 0;
- allContainers.clear();
- LOG.info("Num Components " + retrievedApp.getComponents().size());
- for (Component component : retrievedApp.getComponents()) {
- LOG.info("looking for " + component.getName());
- LOG.info(component.toString());
- if (component.getContainers() != null) {
- if (component.getContainers().size() == exampleApp
- .getComponent(component.getName()).getNumberOfContainers()) {
- for (Container container : component.getContainers()) {
- LOG.info(
- "Container state " + container.getState() + ", component "
- + component.getName());
- if (container.getState() == ContainerState.READY) {
- totalReadyContainers++;
- allContainers.put(component.getName(), container.getId());
- LOG.info("Found 1 ready container " + container.getId());
- }
- }
- } else {
- LOG.info(component.getName() + " Expected number of containers "
- + exampleApp.getComponent(component.getName())
- .getNumberOfContainers() + ", current = " + component
- .getContainers());
- }
- }
- }
- LOG.info("Exit loop, totalReadyContainers= " + totalReadyContainers
- + " expected = " + expectedTotalContainers);
- return totalReadyContainers == expectedTotalContainers;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }, 2000, 200000);
- return allContainers;
- }
-
- /**
- * Wait until service state becomes stable. A service is stable when all
- * requested containers of all components are running and in ready state.
- *
- * @param client
- * @param exampleApp
- * @throws TimeoutException
- * @throws InterruptedException
- */
- private void waitForServiceToBeStable(ServiceClient client,
- Service exampleApp) throws TimeoutException, InterruptedException {
- waitForServiceToBeStable(client, exampleApp, 200000);
- }
-
- private void waitForServiceToBeStable(ServiceClient client,
- Service exampleApp, int waitForMillis)
- throws TimeoutException, InterruptedException {
- waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE,
- waitForMillis);
- }
-
- /**
- * Wait until service is started. It does not have to reach a stable state.
- *
- * @param client
- * @param exampleApp
- * @throws TimeoutException
- * @throws InterruptedException
- */
- private void waitForServiceToBeStarted(ServiceClient client,
- Service exampleApp) throws TimeoutException, InterruptedException {
- waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED);
- }
-
- private void waitForServiceToBeInState(ServiceClient client,
- Service exampleApp, ServiceState desiredState) throws TimeoutException,
- InterruptedException {
- waitForServiceToBeInState(client, exampleApp, desiredState, 200000);
- }
-
- /**
- * Wait until service is started. It does not have to reach a stable state.
- *
- * @param client
- * @param exampleApp
- * @throws TimeoutException
- * @throws InterruptedException
- */
- private void waitForServiceToBeInState(ServiceClient client,
- Service exampleApp, ServiceState desiredState, int waitForMillis) throws
- TimeoutException, InterruptedException {
- GenericTestUtils.waitFor(() -> {
- try {
- Service retrievedApp = client.getStatus(exampleApp.getName());
- System.out.println(retrievedApp);
- return retrievedApp.getState() == desiredState;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }, 2000, waitForMillis);
- }
-
- private int countTotalContainers(Service service) {
- int totalContainers = 0;
- for (Component component : service.getComponents()) {
- totalContainers += component.getNumberOfContainers();
- }
- return totalContainers;
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
similarity index 96%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
index 91f899c82af..3cd1a787103 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
@@ -270,4 +270,16 @@ public abstract int actionUpgradeInstances(String appName,
public abstract int actionUpgradeComponents(String appName,
List components) throws IOException, YarnException;
+ /**
+ * Operation to be performed by the RM after an application has completed.
+ *
+ * @param appName the name of the application.
+ * @param userName the name of the user.
+ * @return exit code
+ */
+ @Public
+ @Unstable
+ public abstract int actionCleanUp(String appName, String userName) throws
+ IOException, YarnException;
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
index 4ef7b8d404b..fcfc5bf570f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
@@ -296,6 +296,16 @@ public static String getCurrentUsernameUnencoded(String env_hadoop_username) {
*/
public static String currentUser() {
String shortUserName = currentUsernameUnencoded();
+ return registryUser(shortUserName);
+ }
+
+ /**
+ * Convert the given user name formatted for the registry.
+ *
+ * @param shortUserName
+ * @return converted user name
+ */
+ public static String registryUser(String shortUserName) {
String encodedName = encodeForRegistry(shortUserName);
// DNS name doesn't allow "_", replace it with "-"
encodedName = RegistryUtils.convertUsername(encodedName);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 6aee8132962..73191562c97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -65,6 +65,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -1470,6 +1471,33 @@ public void transition(RMAppImpl app, RMAppEvent event) {
};
}
+ /**
+ * Attempt to perform a type-specific cleanup after application has completed.
+ *
+ * @param app application to clean up
+ */
+ static void appAdminClientCleanUp(RMAppImpl app) {
+ try {
+ AppAdminClient client = AppAdminClient.createAppAdminClient(app
+ .applicationType, app.conf);
+ int result = client.actionCleanUp(app.name, app.user);
+ if (result == 0) {
+ LOG.info("Type-specific cleanup of application " + app.applicationId
+ + " of type " + app.applicationType + " succeeded");
+ } else {
+ LOG.warn("Type-specific cleanup of application " + app.applicationId
+ + " of type " + app.applicationType + " did not succeed with exit"
+ + " code " + result);
+ }
+ } catch (IllegalArgumentException e) {
+ // no AppAdminClient class has been specified for the application type,
+ // so this does not need to be logged
+ } catch (Exception e) {
+ LOG.warn("Could not run type-specific cleanup on application " +
+ app.applicationId + " of type " + app.applicationType, e);
+ }
+ }
+
private static class FinalTransition extends RMAppTransition {
private final RMAppState finalState;
@@ -1504,6 +1532,8 @@ public void transition(RMAppImpl app, RMAppEvent event) {
.appFinished(app, finalState, app.finishTime);
// set the memory free
app.clearUnusedFields();
+
+ appAdminClientCleanUp(app);
};
}