YARN-9227. DistributedShell RelativePath is not removed at end. Contributed by Prabhu Joseph.
This commit is contained in:
parent
856cbf62d3
commit
b0d24ef39c
|
@ -389,8 +389,9 @@ public class ApplicationMaster {
|
||||||
*/
|
*/
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
boolean result = false;
|
boolean result = false;
|
||||||
|
ApplicationMaster appMaster = null;
|
||||||
try {
|
try {
|
||||||
ApplicationMaster appMaster = new ApplicationMaster();
|
appMaster = new ApplicationMaster();
|
||||||
LOG.info("Initializing ApplicationMaster");
|
LOG.info("Initializing ApplicationMaster");
|
||||||
boolean doRun = appMaster.init(args);
|
boolean doRun = appMaster.init(args);
|
||||||
if (!doRun) {
|
if (!doRun) {
|
||||||
|
@ -402,6 +403,10 @@ public class ApplicationMaster {
|
||||||
LOG.error("Error running ApplicationMaster", t);
|
LOG.error("Error running ApplicationMaster", t);
|
||||||
LogManager.shutdown();
|
LogManager.shutdown();
|
||||||
ExitUtil.terminate(1, t);
|
ExitUtil.terminate(1, t);
|
||||||
|
} finally {
|
||||||
|
if (appMaster != null) {
|
||||||
|
appMaster.cleanup();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (result) {
|
if (result) {
|
||||||
LOG.info("Application Master completed successfully. exiting");
|
LOG.info("Application Master completed successfully. exiting");
|
||||||
|
@ -768,6 +773,18 @@ public class ApplicationMaster {
|
||||||
new HelpFormatter().printHelp("ApplicationMaster", opts);
|
new HelpFormatter().printHelp("ApplicationMaster", opts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void cleanup() {
|
||||||
|
Path dst = null;
|
||||||
|
try {
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
dst = new Path(fs.getHomeDirectory(), getRelativePath(appName,
|
||||||
|
appId.toString(), ""));
|
||||||
|
fs.delete(dst, true);
|
||||||
|
} catch(IOException e) {
|
||||||
|
LOG.warn("Failed to remove application staging directory {}", dst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Main run function for the application master
|
* Main run function for the application master
|
||||||
*
|
*
|
||||||
|
|
|
@ -96,6 +96,7 @@ import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -147,6 +148,7 @@ public class Client {
|
||||||
private YarnClient yarnClient;
|
private YarnClient yarnClient;
|
||||||
// Application master specific info to register a new Application with RM/ASM
|
// Application master specific info to register a new Application with RM/ASM
|
||||||
private String appName = "";
|
private String appName = "";
|
||||||
|
private ApplicationId applicationId;
|
||||||
// App master priority
|
// App master priority
|
||||||
private int amPriority = 0;
|
private int amPriority = 0;
|
||||||
// Queue for App master
|
// Queue for App master
|
||||||
|
@ -759,7 +761,7 @@ public class Client {
|
||||||
|
|
||||||
// set the application name
|
// set the application name
|
||||||
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
|
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
|
||||||
ApplicationId appId = appContext.getApplicationId();
|
applicationId = appContext.getApplicationId();
|
||||||
|
|
||||||
// Set up resource type requirements
|
// Set up resource type requirements
|
||||||
// For now, both memory and vcores are supported, so we set memory and
|
// For now, both memory and vcores are supported, so we set memory and
|
||||||
|
@ -800,13 +802,13 @@ public class Client {
|
||||||
// Copy the application master jar to the filesystem
|
// Copy the application master jar to the filesystem
|
||||||
// Create a local resource to point to the destination jar path
|
// Create a local resource to point to the destination jar path
|
||||||
FileSystem fs = FileSystem.get(conf);
|
FileSystem fs = FileSystem.get(conf);
|
||||||
addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
|
addToLocalResources(fs, appMasterJar, appMasterJarPath,
|
||||||
localResources, null);
|
applicationId.toString(), localResources, null);
|
||||||
|
|
||||||
// Set the log4j properties if needed
|
// Set the log4j properties if needed
|
||||||
if (!log4jPropFile.isEmpty()) {
|
if (!log4jPropFile.isEmpty()) {
|
||||||
addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
|
addToLocalResources(fs, log4jPropFile, log4jPath,
|
||||||
localResources, null);
|
applicationId.toString(), localResources, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process local files for localization
|
// Process local files for localization
|
||||||
|
@ -833,7 +835,7 @@ public class Client {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
String fileName = f.getName();
|
String fileName = f.getName();
|
||||||
uploadFile(fs, path, fileName, appId.toString());
|
uploadFile(fs, path, fileName, applicationId.toString());
|
||||||
if (localizableFiles.length() == 0) {
|
if (localizableFiles.length() == 0) {
|
||||||
localizableFiles.append(fileName);
|
localizableFiles.append(fileName);
|
||||||
} else {
|
} else {
|
||||||
|
@ -857,7 +859,7 @@ public class Client {
|
||||||
Path shellSrc = new Path(shellScriptPath);
|
Path shellSrc = new Path(shellScriptPath);
|
||||||
String shellPathSuffix =
|
String shellPathSuffix =
|
||||||
ApplicationMaster.getRelativePath(appName,
|
ApplicationMaster.getRelativePath(appName,
|
||||||
appId.toString(),
|
applicationId.toString(),
|
||||||
SCRIPT_PATH);
|
SCRIPT_PATH);
|
||||||
Path shellDst =
|
Path shellDst =
|
||||||
new Path(fs.getHomeDirectory(), shellPathSuffix);
|
new Path(fs.getHomeDirectory(), shellPathSuffix);
|
||||||
|
@ -869,12 +871,12 @@ public class Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!shellCommand.isEmpty()) {
|
if (!shellCommand.isEmpty()) {
|
||||||
addToLocalResources(fs, null, shellCommandPath, appId.toString(),
|
addToLocalResources(fs, null, shellCommandPath, applicationId.toString(),
|
||||||
localResources, shellCommand);
|
localResources, shellCommand);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shellArgs.length > 0) {
|
if (shellArgs.length > 0) {
|
||||||
addToLocalResources(fs, null, shellArgsPath, appId.toString(),
|
addToLocalResources(fs, null, shellArgsPath, applicationId.toString(),
|
||||||
localResources, StringUtils.join(shellArgs, " "));
|
localResources, StringUtils.join(shellArgs, " "));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1033,7 +1035,7 @@ public class Client {
|
||||||
if (dockerClientConfig != null) {
|
if (dockerClientConfig != null) {
|
||||||
dockerCredentials =
|
dockerCredentials =
|
||||||
DockerClientConfigHandler.readCredentialsFromConfigFile(
|
DockerClientConfigHandler.readCredentialsFromConfigFile(
|
||||||
new Path(dockerClientConfig), conf, appId.toString());
|
new Path(dockerClientConfig), conf, applicationId.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rmCredentials != null || dockerCredentials != null) {
|
if (rmCredentials != null || dockerCredentials != null) {
|
||||||
|
@ -1071,7 +1073,7 @@ public class Client {
|
||||||
// app submission failure?
|
// app submission failure?
|
||||||
|
|
||||||
// Monitor the application
|
// Monitor the application
|
||||||
return monitorApplication(appId);
|
return monitorApplication(applicationId);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1200,6 +1202,11 @@ public class Client {
|
||||||
fs.copyFromLocalFile(new Path(fileSrcPath), dst);
|
fs.copyFromLocalFile(new Path(fileSrcPath), dst);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
ApplicationId getAppId() {
|
||||||
|
return applicationId;
|
||||||
|
}
|
||||||
|
|
||||||
private void prepareTimelineDomain() {
|
private void prepareTimelineDomain() {
|
||||||
TimelineClient timelineClient = null;
|
TimelineClient timelineClient = null;
|
||||||
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
|
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.applications.distributedshell;
|
package org.apache.hadoop.yarn.applications.distributedshell;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -1809,4 +1810,40 @@ public class TestDistributedShell {
|
||||||
client.init(args);
|
client.init(args);
|
||||||
client.run();
|
client.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDistributedShellCleanup()
|
||||||
|
throws Exception {
|
||||||
|
String appName = "DistributedShellCleanup";
|
||||||
|
String[] args = {
|
||||||
|
"--jar",
|
||||||
|
APPMASTER_JAR,
|
||||||
|
"--num_containers",
|
||||||
|
"1",
|
||||||
|
"--shell_command",
|
||||||
|
Shell.WINDOWS ? "dir" : "ls",
|
||||||
|
"--appname",
|
||||||
|
appName
|
||||||
|
};
|
||||||
|
Configuration config = new Configuration(yarnCluster.getConfig());
|
||||||
|
Client client = new Client(config);
|
||||||
|
client.init(args);
|
||||||
|
client.run();
|
||||||
|
ApplicationId appId = client.getAppId();
|
||||||
|
String relativePath =
|
||||||
|
ApplicationMaster.getRelativePath(appName, appId.toString(), "");
|
||||||
|
FileSystem fs1 = FileSystem.get(config);
|
||||||
|
Path path = new Path(fs1.getHomeDirectory(), relativePath);
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
return !fs1.exists(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 10, 60000);
|
||||||
|
|
||||||
|
assertFalse("Distributed Shell Cleanup failed", fs1.exists(path));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue