YARN-7766. Introduce a new config property for YARN Service dependency tarball location. Contributed by Gour Saha

This commit is contained in:
Jian He 2018-01-23 10:53:27 -08:00
parent f63d13f10d
commit a72cdcc47a
7 changed files with 68 additions and 39 deletions

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.client.cli.ApplicationCLI;
import org.apache.hadoop.yarn.client.util.YarnClientUtils; import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -718,14 +719,20 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
libPath, "lib", false); libPath, "lib", false);
Path dependencyLibTarGzip = fs.getDependencyTarGzip(); Path dependencyLibTarGzip = fs.getDependencyTarGzip();
if (fs.isFile(dependencyLibTarGzip)) { if (fs.isFile(dependencyLibTarGzip)) {
LOG.debug("Loading lib tar from " + fs.getFileSystem().getScheme() + ":/" LOG.info("Loading lib tar from " + dependencyLibTarGzip);
+ dependencyLibTarGzip);
fs.submitTarGzipAndUpdate(localResources); fs.submitTarGzipAndUpdate(localResources);
} else { } else {
if (dependencyLibTarGzip != null) {
LOG.warn("Property {} has a value {}, but is not a valid file",
YarnServiceConf.DEPENDENCY_TARBALL_PATH, dependencyLibTarGzip);
}
String[] libs = ServiceUtils.getLibDirs(); String[] libs = ServiceUtils.getLibDirs();
LOG.info("Uploading all dependency jars to HDFS. For faster submission of" + LOG.info("Uploading all dependency jars to HDFS. For faster submission of"
" apps, pre-upload dependency jars to HDFS " + " apps, set config property {} to the dependency tarball location."
+ "using command: yarn app -enableFastLaunch"); + " Dependency tarball can be uploaded to any HDFS path directly"
+ " or by using command: yarn app -{} [<Destination Folder>]",
YarnServiceConf.DEPENDENCY_TARBALL_PATH,
ApplicationCLI.ENABLE_FAST_LAUNCH);
for (String libDirProp : libs) { for (String libDirProp : libs) {
ProviderUtils.addAllDependencyJars(localResources, fs, libPath, "lib", ProviderUtils.addAllDependencyJars(localResources, fs, libPath, "lib",
libDirProp); libDirProp);
@ -988,16 +995,23 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
return this.yarnClient; return this.yarnClient;
} }
public int enableFastLaunch() throws IOException, YarnException { public int enableFastLaunch(String destinationFolder)
return actionDependency(true); throws IOException, YarnException {
return actionDependency(destinationFolder, true);
} }
public int actionDependency(boolean overwrite) public int actionDependency(String destinationFolder, boolean overwrite)
throws IOException, YarnException { throws IOException, YarnException {
String currentUser = RegistryUtils.currentUser(); String currentUser = RegistryUtils.currentUser();
LOG.info("Running command as user {}", currentUser); LOG.info("Running command as user {}", currentUser);
Path dependencyLibTarGzip = fs.getDependencyTarGzip(); if (destinationFolder == null) {
destinationFolder = String.format(YarnServiceConstants.DEPENDENCY_DIR,
VersionInfo.getVersion());
}
Path dependencyLibTarGzip = new Path(destinationFolder,
YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_NAME
+ YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_EXT);
// Check if dependency has already been uploaded, in which case log // Check if dependency has already been uploaded, in which case log
// appropriately and exit success (unless overwrite has been requested) // appropriately and exit success (unless overwrite has been requested)
@ -1019,6 +1033,9 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
LOG.info("Version Info: " + VersionInfo.getBuildVersion()); LOG.info("Version Info: " + VersionInfo.getBuildVersion());
fs.copyLocalFileToHdfs(tempLibTarGzipFile, dependencyLibTarGzip, fs.copyLocalFileToHdfs(tempLibTarGzipFile, dependencyLibTarGzip,
new FsPermission(YarnServiceConstants.DEPENDENCY_DIR_PERMISSIONS)); new FsPermission(YarnServiceConstants.DEPENDENCY_DIR_PERMISSIONS));
LOG.info("To let apps use this tarball, in yarn-site set config property "
+ "{} to {}", YarnServiceConf.DEPENDENCY_TARBALL_PATH,
dependencyLibTarGzip);
return EXIT_SUCCESS; return EXIT_SUCCESS;
} else { } else {
return EXIT_FALSE; return EXIT_FALSE;

View File

@ -93,6 +93,12 @@ public class YarnServiceConf {
public static final int DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS = 120000; public static final int DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS = 120000;
/**
* The dependency tarball file location.
*/
public static final String DEPENDENCY_TARBALL_PATH = YARN_SERVICE_PREFIX
+ "framework.path";
/** /**
* Get long value for the property. First get from the userConf, if not * Get long value for the property. First get from the userConf, if not
* present, get from systemConf. * present, get from systemConf.

View File

@ -38,9 +38,9 @@ public class TarballProviderService extends AbstractProviderService {
Path artifact = new Path(instance.getCompSpec().getArtifact().getId()); Path artifact = new Path(instance.getCompSpec().getArtifact().getId());
if (!fileSystem.isFile(artifact)) { if (!fileSystem.isFile(artifact)) {
throw new IOException( throw new IOException(
"Package doesn't exist as a resource: " + artifact.toString()); "Package doesn't exist as a resource: " + artifact);
} }
log.info("Adding resource {}", artifact.toString()); log.info("Adding resource {}", artifact);
LocalResourceType type = LocalResourceType.ARCHIVE; LocalResourceType type = LocalResourceType.ARCHIVE;
LocalResource packageResource = fileSystem.createAmResource(artifact, type); LocalResource packageResource = fileSystem.createAmResource(artifact, type);
launcher.addLocalResource(APP_LIB_DIR, packageResource); launcher.addLocalResource(APP_LIB_DIR, packageResource);

View File

@ -295,6 +295,9 @@ public class CoreFileSystem {
* reasons including if file check throws IOException * reasons including if file check throws IOException
*/ */
public boolean isFile(Path path) { public boolean isFile(Path path) {
if (path == null) {
return false;
}
boolean isFile = false; boolean isFile = false;
try { try {
FileStatus status = fileSystem.getFileStatus(path); FileStatus status = fileSystem.getFileStatus(path);
@ -321,26 +324,18 @@ public class CoreFileSystem {
} }
/** /**
* Get slider dependency parent dir in HDFS * Get service dependency absolute filepath in HDFS used for application
* submission.
* *
* @return the parent dir path of slider.tar.gz in HDFS * @return the absolute path to service dependency tarball in HDFS
*/
public Path getDependencyPath() {
String parentDir = YarnServiceConstants.DEPENDENCY_DIR;
return new Path(String.format(parentDir, VersionInfo.getVersion()));
}
/**
* Get slider.tar.gz absolute filepath in HDFS
*
* @return the absolute path to slider.tar.gz in HDFS
*/ */
public Path getDependencyTarGzip() { public Path getDependencyTarGzip() {
Path dependencyLibAmPath = getDependencyPath(); Path dependencyLibTarGzip = null;
Path dependencyLibTarGzip = new Path( String configuredDependencyTarballPath = configuration
dependencyLibAmPath.toUri().toString(), .get(YarnServiceConf.DEPENDENCY_TARBALL_PATH);
YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_NAME if (configuredDependencyTarballPath != null) {
+ YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_EXT); dependencyLibTarGzip = new Path(configuredDependencyTarballPath);
}
return dependencyLibTarGzip; return dependencyLibTarGzip;
} }
@ -467,8 +462,7 @@ public class CoreFileSystem {
fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,
"000"); "000");
fileSystem.mkdirs(destPath.getParent(), fp); fileSystem.mkdirs(destPath.getParent(), fp);
log.info("Copying file {} to {}", localPath.toURI(), log.info("Copying file {} to {}", localPath.toURI(), destPath);
fileSystem.getScheme() + ":/" + destPath.toUri());
fileSystem.copyFromLocalFile(false, true, new Path(localPath.getPath()), fileSystem.copyFromLocalFile(false, true, new Path(localPath.getPath()),
destPath); destPath);

View File

@ -196,14 +196,18 @@ public abstract class AppAdminClient extends CompositeService {
* faster since the dependencies do not have to be uploaded on each launch. * faster since the dependencies do not have to be uploaded on each launch.
* </p> * </p>
* *
* @param destinationFolder
* an optional HDFS folder where dependency tarball will be uploaded
* @return exit code * @return exit code
* @throws IOException IOException * @throws IOException
* @throws YarnException exception in client or server * IOException
* @throws YarnException
* exception in client or server
*/ */
@Public @Public
@Unstable @Unstable
public abstract int enableFastLaunch() throws IOException, public abstract int enableFastLaunch(String destinationFolder)
YarnException; throws IOException, YarnException;
/** /**
* <p> * <p>

View File

@ -232,9 +232,10 @@ public class ApplicationCLI extends YarnCLI {
"the number of components/containers running for an application / " + "the number of components/containers running for an application / " +
"long-running service. Supports absolute or relative changes, such " + "long-running service. Supports absolute or relative changes, such " +
"as +1, 2, or -3."); "as +1, 2, or -3.");
opts.addOption(ENABLE_FAST_LAUNCH, false, "Uploads AM dependencies " + opts.addOption(ENABLE_FAST_LAUNCH, true, "Uploads AM dependencies " +
"to HDFS to make future launches faster. Supports -appTypes option " + "to HDFS to make future launches faster. Supports -appTypes option " +
" to specify which client implementation to use."); "to specify which client implementation to use. Optionally a " +
"destination folder for the tarball can be specified.");
opts.getOption(LAUNCH_CMD).setArgName("Application Name> <File Name"); opts.getOption(LAUNCH_CMD).setArgName("Application Name> <File Name");
opts.getOption(LAUNCH_CMD).setArgs(2); opts.getOption(LAUNCH_CMD).setArgs(2);
opts.getOption(START_CMD).setArgName("Application Name"); opts.getOption(START_CMD).setArgName("Application Name");
@ -245,6 +246,8 @@ public class ApplicationCLI extends YarnCLI {
opts.getOption(FLEX_CMD).setArgName("Application Name or ID"); opts.getOption(FLEX_CMD).setArgName("Application Name or ID");
opts.getOption(COMPONENT).setArgName("Component Name> <Count"); opts.getOption(COMPONENT).setArgName("Component Name> <Count");
opts.getOption(COMPONENT).setArgs(2); opts.getOption(COMPONENT).setArgs(2);
opts.getOption(ENABLE_FAST_LAUNCH).setOptionalArg(true);
opts.getOption(ENABLE_FAST_LAUNCH).setArgName("Destination Folder");
} else if (title != null && title.equalsIgnoreCase(APPLICATION_ATTEMPT)) { } else if (title != null && title.equalsIgnoreCase(APPLICATION_ATTEMPT)) {
opts.addOption(STATUS_CMD, true, opts.addOption(STATUS_CMD, true,
"Prints the status of the application attempt."); "Prints the status of the application attempt.");
@ -513,13 +516,15 @@ public class ApplicationCLI extends YarnCLI {
.actionFlex(appNameAndType[0], counts); .actionFlex(appNameAndType[0], counts);
} else if (cliParser.hasOption(ENABLE_FAST_LAUNCH)) { } else if (cliParser.hasOption(ENABLE_FAST_LAUNCH)) {
String appType = getSingleAppTypeFromCLI(cliParser); String appType = getSingleAppTypeFromCLI(cliParser);
String uploadDestinationFolder = cliParser
.getOptionValue(ENABLE_FAST_LAUNCH);
if (hasAnyOtherCLIOptions(cliParser, opts, ENABLE_FAST_LAUNCH, if (hasAnyOtherCLIOptions(cliParser, opts, ENABLE_FAST_LAUNCH,
APP_TYPE_CMD)) { APP_TYPE_CMD)) {
printUsage(title, opts); printUsage(title, opts);
return exitCode; return exitCode;
} }
return AppAdminClient.createAppAdminClient(appType, getConf()) return AppAdminClient.createAppAdminClient(appType, getConf())
.enableFastLaunch(); .enableFastLaunch(uploadDestinationFolder);
} else if (cliParser.hasOption(UPDATE_LIFETIME)) { } else if (cliParser.hasOption(UPDATE_LIFETIME)) {
if (!cliParser.hasOption(APP_ID)) { if (!cliParser.hasOption(APP_ID)) {
printUsage(title, opts); printUsage(title, opts);

View File

@ -2064,11 +2064,14 @@ public class TestYarnCLI {
pw.println(" Supports -appTypes option to"); pw.println(" Supports -appTypes option to");
pw.println(" specify which client"); pw.println(" specify which client");
pw.println(" implementation to use."); pw.println(" implementation to use.");
pw.println(" -enableFastLaunch Uploads AM dependencies to HDFS"); pw.println(" -enableFastLaunch <Destination Folder> Uploads AM dependencies to HDFS");
pw.println(" to make future launches faster."); pw.println(" to make future launches faster.");
pw.println(" Supports -appTypes option to"); pw.println(" Supports -appTypes option to");
pw.println(" specify which client"); pw.println(" specify which client");
pw.println(" implementation to use."); pw.println(" implementation to use.");
pw.println(" Optionally a destination folder");
pw.println(" for the tarball can be");
pw.println(" specified.");
pw.println(" -flex <Application Name or ID> Changes number of running"); pw.println(" -flex <Application Name or ID> Changes number of running");
pw.println(" containers for a component of an"); pw.println(" containers for a component of an");
pw.println(" application / long-running"); pw.println(" application / long-running");