From 7288b08330001102f944124d18e02bf0585d3e96 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 31 Jan 2018 09:39:43 -0600 Subject: [PATCH] YARN-2185. Use pipes when localizing archives. Contributed by Miklos Szegedi --- .../java/org/apache/hadoop/fs/FileUtil.java | 251 +++++++++++++++++- .../java/org/apache/hadoop/util/RunJar.java | 65 +++++ .../apache/hadoop/yarn/util/FSDownload.java | 220 +++++++++------ .../hadoop/yarn/util/TestFSDownload.java | 30 ++- 4 files changed, 466 insertions(+), 100 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java index 4d971aa8156..bf9b1463782 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java @@ -20,27 +20,35 @@ package org.apache.hadoop.fs; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; +import java.nio.charset.Charset; import java.nio.file.AccessDeniedException; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.jar.Attributes; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; import java.util.zip.GZIPInputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; +import java.util.zip.ZipInputStream; import org.apache.commons.collections.map.CaseInsensitiveMap; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -74,6 +82,11 @@ public class FileUtil { * */ public static final int SYMLINK_NO_PRIVILEGE = 2; + /** + * Buffer size for copy the content of compressed file to new file. + */ + private static final int BUFFER_SIZE = 8_192; + /** * convert an array of FileStatus to an array of Path * @@ -525,6 +538,22 @@ public class FileUtil { return makeShellPath(file, false); } + /** + * Convert a os-native filename to a path that works for the shell + * and avoids script injection attacks. + * @param file The filename to convert + * @return The unix pathname + * @throws IOException on windows, there can be problems with the subprocess + */ + public static String makeSecureShellPath(File file) throws IOException { + if (Shell.WINDOWS) { + // Currently it is never called, but it might be helpful in the future. + throw new UnsupportedOperationException("Not implemented for Windows"); + } else { + return makeShellPath(file, false).replace("'", "'\\''"); + } + } + /** * Convert a os-native filename to a path that works for the shell. * @param file The filename to convert @@ -576,11 +605,48 @@ public class FileUtil { } /** - * Given a File input it will unzip the file in a the unzip directory + * Given a stream input it will unzip the it in the unzip directory. + * passed as the second parameter + * @param inputStream The zip file as input + * @param toDir The unzip directory where to unzip the zip file. + * @throws IOException an exception occurred + */ + public static void unZip(InputStream inputStream, File toDir) + throws IOException { + try (ZipInputStream zip = new ZipInputStream(inputStream)) { + int numOfFailedLastModifiedSet = 0; + for(ZipEntry entry = zip.getNextEntry(); + entry != null; + entry = zip.getNextEntry()) { + if (!entry.isDirectory()) { + File file = new File(toDir, entry.getName()); + File parent = file.getParentFile(); + if (!parent.mkdirs() && + !parent.isDirectory()) { + throw new IOException("Mkdirs failed to create " + + parent.getAbsolutePath()); + } + try (OutputStream out = new FileOutputStream(file)) { + IOUtils.copyBytes(zip, out, BUFFER_SIZE); + } + if (!file.setLastModified(entry.getTime())) { + numOfFailedLastModifiedSet++; + } + } + } + if (numOfFailedLastModifiedSet > 0) { + LOG.warn("Could not set last modfied time for {} file(s)", + numOfFailedLastModifiedSet); + } + } + } + + /** + * Given a File input it will unzip it in the unzip directory. * passed as the second parameter * @param inFile The zip file as input * @param unzipDir The unzip directory where to unzip the zip file. - * @throws IOException + * @throws IOException An I/O exception has occurred */ public static void unZip(File inFile, File unzipDir) throws IOException { Enumeration entries; @@ -620,6 +686,138 @@ public class FileUtil { } } + /** + * Run a command and send the contents of an input stream to it. + * @param inputStream Input stream to forward to the shell command + * @param command shell command to run + * @throws IOException read or write failed + * @throws InterruptedException command interrupted + * @throws ExecutionException task submit failed + */ + private static void runCommandOnStream( + InputStream inputStream, String command) + throws IOException, InterruptedException, ExecutionException { + ExecutorService executor = null; + ProcessBuilder builder = new ProcessBuilder(); + builder.command( + Shell.WINDOWS ? "cmd" : "bash", + Shell.WINDOWS ? "/c" : "-c", + command); + Process process = builder.start(); + int exitCode; + try { + // Consume stdout and stderr, to avoid blocking the command + executor = Executors.newFixedThreadPool(2); + Future output = executor.submit(() -> { + try { + // Read until the output stream receives an EOF and closed. + if (LOG.isDebugEnabled()) { + // Log directly to avoid out of memory errors + try (BufferedReader reader = + new BufferedReader( + new InputStreamReader(process.getInputStream(), + Charset.forName("UTF-8")))) { + String line; + while((line = reader.readLine()) != null) { + LOG.debug(line); + } + } + } else { + org.apache.commons.io.IOUtils.copy( + process.getInputStream(), + new IOUtils.NullOutputStream()); + } + } catch (IOException e) { + LOG.debug(e.getMessage()); + } + }); + Future error = executor.submit(() -> { + try { + // Read until the error stream receives an EOF and closed. + if (LOG.isDebugEnabled()) { + // Log directly to avoid out of memory errors + try (BufferedReader reader = + new BufferedReader( + new InputStreamReader(process.getErrorStream(), + Charset.forName("UTF-8")))) { + String line; + while((line = reader.readLine()) != null) { + LOG.debug(line); + } + } + } else { + org.apache.commons.io.IOUtils.copy( + process.getErrorStream(), + new IOUtils.NullOutputStream()); + } + } catch (IOException e) { + LOG.debug(e.getMessage()); + } + }); + + // Pass the input stream to the command to process + try { + org.apache.commons.io.IOUtils.copy( + inputStream, process.getOutputStream()); + } finally { + process.getOutputStream().close(); + } + + // Wait for both stdout and stderr futures to finish + error.get(); + output.get(); + } finally { + // Clean up the threads + if (executor != null) { + executor.shutdown(); + } + // Wait to avoid leaking the child process + exitCode = process.waitFor(); + } + + if (exitCode != 0) { + throw new IOException( + String.format( + "Error executing command. %s " + + "Process exited with exit code %d.", + command, exitCode)); + } + } + + /** + * Given a Tar File as input it will untar the file in a the untar directory + * passed as the second parameter + * + * This utility will untar ".tar" files and ".tar.gz","tgz" files. + * + * @param inputStream The tar file as input. + * @param untarDir The untar directory where to untar the tar file. + * @param gzipped The input stream is gzipped + * TODO Use magic number and PusbackInputStream to identify + * @throws IOException an exception occurred + * @throws InterruptedException command interrupted + * @throws ExecutionException task submit failed + */ + public static void unTar(InputStream inputStream, File untarDir, + boolean gzipped) + throws IOException, InterruptedException, ExecutionException { + if (!untarDir.mkdirs()) { + if (!untarDir.isDirectory()) { + throw new IOException("Mkdirs failed to create " + untarDir); + } + } + + if(Shell.WINDOWS) { + // Tar is not native to Windows. Use simple Java based implementation for + // tests and simple tar archives + unTarUsingJava(inputStream, untarDir, gzipped); + } else { + // spawn tar utility to untar archive for full fledged unix behavior such + // as resolving symlinks in tar archives + unTarUsingTar(inputStream, untarDir, gzipped); + } + } + /** * Given a Tar File as input it will untar the file in a the untar directory * passed as the second parameter @@ -650,23 +848,41 @@ public class FileUtil { } } + private static void unTarUsingTar(InputStream inputStream, File untarDir, + boolean gzipped) + throws IOException, InterruptedException, ExecutionException { + StringBuilder untarCommand = new StringBuilder(); + if (gzipped) { + untarCommand.append("gzip -dc | ("); + } + untarCommand.append("cd '"); + untarCommand.append(FileUtil.makeSecureShellPath(untarDir)); + untarCommand.append("' && "); + untarCommand.append("tar -x "); + + if (gzipped) { + untarCommand.append(")"); + } + runCommandOnStream(inputStream, untarCommand.toString()); + } + private static void unTarUsingTar(File inFile, File untarDir, boolean gzipped) throws IOException { StringBuffer untarCommand = new StringBuffer(); if (gzipped) { untarCommand.append(" gzip -dc '"); - untarCommand.append(FileUtil.makeShellPath(inFile)); + untarCommand.append(FileUtil.makeSecureShellPath(inFile)); untarCommand.append("' | ("); } untarCommand.append("cd '"); - untarCommand.append(FileUtil.makeShellPath(untarDir)); - untarCommand.append("' ; "); + untarCommand.append(FileUtil.makeSecureShellPath(untarDir)); + untarCommand.append("' && "); untarCommand.append("tar -xf "); if (gzipped) { untarCommand.append(" -)"); } else { - untarCommand.append(FileUtil.makeShellPath(inFile)); + untarCommand.append(FileUtil.makeSecureShellPath(inFile)); } String[] shellCmd = { "bash", "-c", untarCommand.toString() }; ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); @@ -701,6 +917,29 @@ public class FileUtil { } } + private static void unTarUsingJava(InputStream inputStream, File untarDir, + boolean gzipped) throws IOException { + TarArchiveInputStream tis = null; + try { + if (gzipped) { + inputStream = new BufferedInputStream(new GZIPInputStream( + inputStream)); + } else { + inputStream = + new BufferedInputStream(inputStream); + } + + tis = new TarArchiveInputStream(inputStream); + + for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null;) { + unpackEntries(tis, entry, untarDir); + entry = tis.getNextTarEntry(); + } + } finally { + IOUtils.cleanupWithLogger(LOG, tis, inputStream); + } + } + private static void unpackEntries(TarArchiveInputStream tis, TarArchiveEntry entry, File outputDir) throws IOException { if (entry.isDirectory()) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java index 19b51ad692a..89b7d763ec1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java @@ -34,9 +34,11 @@ import java.util.Enumeration; import java.util.List; import java.util.jar.JarEntry; import java.util.jar.JarFile; +import java.util.jar.JarInputStream; import java.util.jar.Manifest; import java.util.regex.Pattern; +import org.apache.commons.io.input.TeeInputStream; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileUtil; @@ -94,6 +96,69 @@ public class RunJar { unJar(jarFile, toDir, MATCH_ANY); } + /** + * Unpack matching files from a jar. Entries inside the jar that do + * not match the given pattern will be skipped. + * + * @param inputStream the jar stream to unpack + * @param toDir the destination directory into which to unpack the jar + * @param unpackRegex the pattern to match jar entries against + * + * @throws IOException if an I/O error has occurred or toDir + * cannot be created and does not already exist + */ + public static void unJar(InputStream inputStream, File toDir, + Pattern unpackRegex) + throws IOException { + try (JarInputStream jar = new JarInputStream(inputStream)) { + int numOfFailedLastModifiedSet = 0; + for (JarEntry entry = jar.getNextJarEntry(); + entry != null; + entry = jar.getNextJarEntry()) { + if (!entry.isDirectory() && + unpackRegex.matcher(entry.getName()).matches()) { + File file = new File(toDir, entry.getName()); + ensureDirectory(file.getParentFile()); + try (OutputStream out = new FileOutputStream(file)) { + IOUtils.copyBytes(jar, out, BUFFER_SIZE); + } + if (!file.setLastModified(entry.getTime())) { + numOfFailedLastModifiedSet++; + } + } + } + if (numOfFailedLastModifiedSet > 0) { + LOG.warn("Could not set last modfied time for {} file(s)", + numOfFailedLastModifiedSet); + } + } + } + + /** + * Unpack matching files from a jar. Entries inside the jar that do + * not match the given pattern will be skipped. Keep also a copy + * of the entire jar in the same directory for backward compatibility. + * TODO remove this feature in a new release and do only unJar + * + * @param inputStream the jar stream to unpack + * @param toDir the destination directory into which to unpack the jar + * @param unpackRegex the pattern to match jar entries against + * + * @throws IOException if an I/O error has occurred or toDir + * cannot be created and does not already exist + */ + @Deprecated + public static void unJarAndSave(InputStream inputStream, File toDir, + String name, Pattern unpackRegex) + throws IOException{ + File file = new File(toDir, name); + ensureDirectory(toDir); + try (OutputStream jar = new FileOutputStream(file); + TeeInputStream teeInputStream = new TeeInputStream(inputStream, jar)) { + unJar(teeInputStream, toDir, unpackRegex); + } + } + /** * Unpack matching files from a jar. Entries inside the jar that do * not match the given pattern will be skipped. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index 6e5957449f2..508440a5f36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.util; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.concurrent.Callable; @@ -29,6 +31,7 @@ import java.util.concurrent.Future; import java.util.regex.Pattern; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -54,6 +57,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.util.concurrent.Futures; +import org.apache.hadoop.yarn.exceptions.YarnException; /** * Download a single URL to the local disk. @@ -247,9 +251,21 @@ public class FSDownload implements Callable { } } - private Path copy(Path sCopy, Path dstdir) throws IOException { + /** + * Localize files. + * @param destination destination directory + * @throws IOException cannot read or write file + * @throws YarnException subcommand returned an error + */ + private void verifyAndCopy(Path destination) + throws IOException, YarnException { + final Path sCopy; + try { + sCopy = resource.getResource().toPath(); + } catch (URISyntaxException e) { + throw new IOException("Invalid resource", e); + } FileSystem sourceFs = sCopy.getFileSystem(conf); - Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName()); FileStatus sStat = sourceFs.getFileStatus(sCopy); if (sStat.getModificationTime() != resource.getTimestamp()) { throw new IOException("Resource " + sCopy + @@ -264,82 +280,108 @@ public class FSDownload implements Callable { } } - FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false, - true, conf); - return dCopy; + downloadAndUnpack(sCopy, destination); } - private long unpack(File localrsrc, File dst) throws IOException { - switch (resource.getType()) { - case ARCHIVE: { - String lowerDst = StringUtils.toLowerCase(dst.getName()); - if (lowerDst.endsWith(".jar")) { - RunJar.unJar(localrsrc, dst); - } else if (lowerDst.endsWith(".zip")) { - FileUtil.unZip(localrsrc, dst); - } else if (lowerDst.endsWith(".tar.gz") || - lowerDst.endsWith(".tgz") || - lowerDst.endsWith(".tar")) { - FileUtil.unTar(localrsrc, dst); + /** + * Copy source path to destination with localization rules. + * @param source source path to copy. Typically HDFS + * @param destination destination path. Typically local filesystem + * @exception YarnException Any error has occurred + */ + private void downloadAndUnpack(Path source, Path destination) + throws YarnException { + try { + FileSystem sourceFileSystem = source.getFileSystem(conf); + FileSystem destinationFileSystem = destination.getFileSystem(conf); + if (sourceFileSystem.getFileStatus(source).isDirectory()) { + FileUtil.copy( + sourceFileSystem, source, + destinationFileSystem, destination, false, + true, conf); } else { - LOG.warn("Cannot unpack " + localrsrc); - if (!localrsrc.renameTo(dst)) { - throw new IOException("Unable to rename file: [" + localrsrc - + "] to [" + dst + "]"); - } + unpack(source, destination, sourceFileSystem, destinationFileSystem); } + } catch (Exception e) { + throw new YarnException("Download and unpack failed", e); } - break; - case PATTERN: { + } + + /** + * Do the localization action on the input stream. + * We use the deprecated method RunJar.unJarAndSave for compatibility reasons. + * We should use the more efficient RunJar.unJar in the future. + * @param source Source path + * @param destination Destination pth + * @param sourceFileSystem Source filesystem + * @param destinationFileSystem Destination filesystem + * @throws IOException Could not read or write stream + * @throws InterruptedException Operation interrupted by caller + * @throws ExecutionException Could not create thread pool execution + */ + @SuppressWarnings("deprecation") + private void unpack(Path source, Path destination, + FileSystem sourceFileSystem, + FileSystem destinationFileSystem) + throws IOException, InterruptedException, ExecutionException { + try (InputStream inputStream = sourceFileSystem.open(source)) { + File dst = new File(destination.toUri()); String lowerDst = StringUtils.toLowerCase(dst.getName()); - if (lowerDst.endsWith(".jar")) { - String p = resource.getPattern(); - RunJar.unJar(localrsrc, dst, - p == null ? RunJar.MATCH_ANY : Pattern.compile(p)); - File newDst = new File(dst, dst.getName()); - if (!dst.exists() && !dst.mkdir()) { - throw new IOException("Unable to create directory: [" + dst + "]"); + switch (resource.getType()) { + case ARCHIVE: + if (lowerDst.endsWith(".jar")) { + RunJar.unJar(inputStream, dst, RunJar.MATCH_ANY); + } else if (lowerDst.endsWith(".zip")) { + FileUtil.unZip(inputStream, dst); + } else if (lowerDst.endsWith(".tar.gz") || + lowerDst.endsWith(".tgz") || + lowerDst.endsWith(".tar")) { + FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz")); + } else { + LOG.warn("Cannot unpack " + source); + try (OutputStream outputStream = + destinationFileSystem.create(destination, true)) { + IOUtils.copy(inputStream, outputStream); + } } - if (!localrsrc.renameTo(newDst)) { - throw new IOException("Unable to rename file: [" + localrsrc - + "] to [" + newDst + "]"); + break; + case PATTERN: + if (lowerDst.endsWith(".jar")) { + String p = resource.getPattern(); + if (!dst.exists() && !dst.mkdir()) { + throw new IOException("Unable to create directory: [" + dst + "]"); + } + RunJar.unJarAndSave(inputStream, dst, source.getName(), + p == null ? RunJar.MATCH_ANY : Pattern.compile(p)); + } else if (lowerDst.endsWith(".zip")) { + LOG.warn("Treating [" + source + "] as an archive even though it " + + "was specified as PATTERN"); + FileUtil.unZip(inputStream, dst); + } else if (lowerDst.endsWith(".tar.gz") || + lowerDst.endsWith(".tgz") || + lowerDst.endsWith(".tar")) { + LOG.warn("Treating [" + source + "] as an archive even though it " + + "was specified as PATTERN"); + FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz")); + } else { + LOG.warn("Cannot unpack " + source); + try (OutputStream outputStream = + destinationFileSystem.create(destination, true)) { + IOUtils.copy(inputStream, outputStream); + } } - } else if (lowerDst.endsWith(".zip")) { - LOG.warn("Treating [" + localrsrc + "] as an archive even though it " + - "was specified as PATTERN"); - FileUtil.unZip(localrsrc, dst); - } else if (lowerDst.endsWith(".tar.gz") || - lowerDst.endsWith(".tgz") || - lowerDst.endsWith(".tar")) { - LOG.warn("Treating [" + localrsrc + "] as an archive even though it " + - "was specified as PATTERN"); - FileUtil.unTar(localrsrc, dst); - } else { - LOG.warn("Cannot unpack " + localrsrc); - if (!localrsrc.renameTo(dst)) { - throw new IOException("Unable to rename file: [" + localrsrc - + "] to [" + dst + "]"); + break; + case FILE: + default: + try (OutputStream outputStream = + destinationFileSystem.create(destination, true)) { + IOUtils.copy(inputStream, outputStream); } + break; } + // TODO Should calculate here before returning + //return FileUtil.getDU(destDir); } - break; - case FILE: - default: - if (!localrsrc.renameTo(dst)) { - throw new IOException("Unable to rename file: [" + localrsrc - + "] to [" + dst + "]"); - } - break; - } - if(localrsrc.isFile()){ - try { - files.delete(new Path(localrsrc.toString()), false); - } catch (IOException ignore) { - } - } - return 0; - // TODO Should calculate here before returning - //return FileUtil.getDU(destDir); } @Override @@ -352,27 +394,37 @@ public class FSDownload implements Callable { } if (LOG.isDebugEnabled()) { - LOG.debug("Starting to download " + sCopy); + LOG.debug(String.format("Starting to download %s %s %s", + sCopy, + resource.getType(), + resource.getPattern())); } - createDir(destDirPath, cachePerms); - final Path dst_work = new Path(destDirPath + "_tmp"); - createDir(dst_work, cachePerms); - Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName())); + final Path destinationTmp = new Path(destDirPath + "_tmp"); + createDir(destinationTmp, PRIVATE_DIR_PERMS); + Path dFinal = + files.makeQualified(new Path(destinationTmp, sCopy.getName())); try { - Path dTmp = null == userUgi ? files.makeQualified(copy(sCopy, dst_work)) - : userUgi.doAs(new PrivilegedExceptionAction() { - public Path run() throws Exception { - return files.makeQualified(copy(sCopy, dst_work)); - }; - }); - unpack(new File(dTmp.toUri()), new File(dFinal.toUri())); - changePermissions(dFinal.getFileSystem(conf), dFinal); - files.rename(dst_work, destDirPath, Rename.OVERWRITE); + if (userUgi == null) { + verifyAndCopy(dFinal); + } else { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + verifyAndCopy(dFinal); + return null; + } + }); + } + Path destinationTmpfilesQualified = files.makeQualified(destinationTmp); + changePermissions( + destinationTmpfilesQualified.getFileSystem(conf), + destinationTmpfilesQualified); + files.rename(destinationTmp, destDirPath, Rename.OVERWRITE); if (LOG.isDebugEnabled()) { - LOG.debug("File has been downloaded to " + - new Path(destDirPath, sCopy.getName())); + LOG.debug(String.format("File has been downloaded to %s from %s", + new Path(destDirPath, sCopy.getName()), sCopy)); } } catch (Exception e) { try { @@ -382,7 +434,7 @@ public class FSDownload implements Callable { throw e; } finally { try { - files.delete(dst_work, true); + files.delete(destinationTmp, true); } catch (FileNotFoundException ignore) { } conf = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java index 877dd080afb..fa8c0398cf3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java @@ -82,6 +82,9 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +/** + * Unit test for the FSDownload class. + */ public class TestFSDownload { private static final Log LOG = LogFactory.getLog(TestFSDownload.class); @@ -90,7 +93,8 @@ public class TestFSDownload { private enum TEST_FILE_TYPE { TAR, JAR, ZIP, TGZ }; - + private Configuration conf = new Configuration(); + @AfterClass public static void deleteTestDir() throws IOException { FileContext fs = FileContext.getLocalFSFileContext(); @@ -132,6 +136,18 @@ public class TestFSDownload { FileOutputStream stream = new FileOutputStream(jarFile); LOG.info("Create jar out stream "); JarOutputStream out = new JarOutputStream(stream, new Manifest()); + ZipEntry entry = new ZipEntry("classes/1.class"); + out.putNextEntry(entry); + out.write(1); + out.write(2); + out.write(3); + out.closeEntry(); + ZipEntry entry2 = new ZipEntry("classes/2.class"); + out.putNextEntry(entry2); + out.write(1); + out.write(2); + out.write(3); + out.closeEntry(); LOG.info("Done writing jar stream "); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); @@ -256,7 +272,6 @@ public class TestFSDownload { @Test (timeout=10000) public void testDownloadBadPublic() throws IOException, URISyntaxException, InterruptedException { - Configuration conf = new Configuration(); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", @@ -307,7 +322,6 @@ public class TestFSDownload { @Test (timeout=60000) public void testDownloadPublicWithStatCache() throws IOException, URISyntaxException, InterruptedException, ExecutionException { - final Configuration conf = new Configuration(); FileContext files = FileContext.getLocalFSFileContext(conf); Path basedir = files.makeQualified(new Path("target", TestFSDownload.class.getSimpleName())); @@ -382,7 +396,6 @@ public class TestFSDownload { @Test (timeout=10000) public void testDownload() throws IOException, URISyntaxException, InterruptedException { - Configuration conf = new Configuration(); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", @@ -438,7 +451,7 @@ public class TestFSDownload { FileStatus status = files.getFileStatus(localized.getParent()); FsPermission perm = status.getPermission(); assertEquals("Cache directory permissions are incorrect", - new FsPermission((short)0755), perm); + new FsPermission((short)0700), perm); status = files.getFileStatus(localized); perm = status.getPermission(); @@ -455,7 +468,6 @@ public class TestFSDownload { private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, URISyntaxException, InterruptedException{ - Configuration conf = new Configuration(); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", @@ -530,7 +542,7 @@ public class TestFSDownload { } } - @Test (timeout=10000) + @Test (timeout=10000) public void testDownloadArchive() throws IOException, URISyntaxException, InterruptedException { downloadWithFileType(TEST_FILE_TYPE.TAR); @@ -542,7 +554,7 @@ public class TestFSDownload { downloadWithFileType(TEST_FILE_TYPE.JAR); } - @Test (timeout=10000) + @Test (timeout=10000) public void testDownloadArchiveZip() throws IOException, URISyntaxException, InterruptedException { downloadWithFileType(TEST_FILE_TYPE.ZIP); @@ -603,7 +615,6 @@ public class TestFSDownload { @Test (timeout=10000) public void testDirDownload() throws IOException, InterruptedException { - Configuration conf = new Configuration(); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", TestFSDownload.class.getSimpleName())); @@ -668,7 +679,6 @@ public class TestFSDownload { @Test (timeout=10000) public void testUniqueDestinationPath() throws Exception { - Configuration conf = new Configuration(); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", TestFSDownload.class.getSimpleName()));