From c553e0ed7ca159ab3921e70eea6bc0e4d2584d00 Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Mon, 25 Mar 2013 15:48:46 +0000 Subject: [PATCH] svn merge -c 1460723 FIXES: YARN-109. .tmp file is not deleted for localized archives (Mayank Bansal via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1460728 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../apache/hadoop/yarn/util/FSDownload.java | 24 +- .../hadoop/yarn/util/TestFSDownload.java | 298 +++++++++++++++++- 3 files changed, 313 insertions(+), 12 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c12f71d62c2..67eb74a7f3f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -404,6 +404,9 @@ Release 0.23.7 - UNRELEASED YARN-345. Many InvalidStateTransitonException errors for ApplicationImpl in Node Manager (Robert Parker via jlowe) + YARN-109. .tmp file is not deleted for localized archives (Mayank Bansal + via bobby) + Release 0.23.6 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index 627d565d4d0..5253f49e87d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -251,6 +251,12 @@ public class FSDownload implements Callable { } break; } + if(localrsrc.isFile()){ + try { + files.delete(new Path(localrsrc.toString()), false); + } catch (IOException ignore) { + } + } return 0; // TODO Should calculate here before returning //return FileUtil.getDU(destDir); @@ -264,41 +270,41 @@ public class FSDownload implements Callable { } catch (URISyntaxException e) { throw new IOException("Invalid resource", e); } - Path tmp; do { tmp = new Path(destDirPath, String.valueOf(rand.nextLong())); } while (files.util().exists(tmp)); destDirPath = tmp; - createDir(destDirPath, cachePerms); final Path dst_work = new Path(destDirPath + "_tmp"); createDir(dst_work, cachePerms); - Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName())); try { - Path dTmp = null == userUgi - ? files.makeQualified(copy(sCopy, dst_work)) - : userUgi.doAs(new PrivilegedExceptionAction() { + Path dTmp = null == userUgi ? files.makeQualified(copy(sCopy, dst_work)) + : userUgi.doAs(new PrivilegedExceptionAction() { public Path run() throws Exception { return files.makeQualified(copy(sCopy, dst_work)); }; }); Pattern pattern = null; String p = resource.getPattern(); - if(p != null) { + if (p != null) { pattern = Pattern.compile(p); } unpack(new File(dTmp.toUri()), new File(dFinal.toUri()), pattern); changePermissions(dFinal.getFileSystem(conf), dFinal); files.rename(dst_work, destDirPath, Rename.OVERWRITE); } catch (Exception e) { - try { files.delete(destDirPath, true); } catch (IOException ignore) { } + try { + files.delete(destDirPath, true); + } catch (IOException ignore) { + } throw e; } finally { try { files.delete(dst_work, true); - } catch (FileNotFoundException ignore) { } + } catch (FileNotFoundException ignore) { + } // clear ref to internal var rand = null; conf = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java index c29b69f1148..912c7248ca5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertEquals; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.util.EnumSet; import java.util.HashMap; @@ -47,10 +48,12 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -113,7 +116,127 @@ public class TestFSDownload { return ret; } - @Test + static LocalResource createTarFile(FileContext files, Path p, int len, + Random r, LocalResourceVisibility vis) throws IOException, + URISyntaxException { + + FSDataOutputStream outFile = null; + try { + byte[] bytes = new byte[len]; + Path tarPath = new Path(p.toString()); + outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE)); + r.nextBytes(bytes); + outFile.write(bytes); + } finally { + if (outFile != null) + outFile.close(); + } + StringBuffer tarCommand = new StringBuffer(); + URI u = new URI(p.getParent().toString()); + tarCommand.append("cd '"); + tarCommand.append(FileUtil.makeShellPath(u.getPath().toString())); + tarCommand.append("' ; "); + tarCommand.append("tar -czf " + p.getName() + ".tar " + p.getName()); + String[] shellCmd = { "bash", "-c", tarCommand.toString() }; + ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); + shexec.execute(); + int exitcode = shexec.getExitCode(); + if (exitcode != 0) { + throw new IOException("Error untarring file " + p + + ". Tar process exited with exit code " + exitcode); + } + LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); + ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString() + + ".tar"))); + ret.setSize(len); + ret.setType(LocalResourceType.ARCHIVE); + ret.setVisibility(vis); + ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".tar")) + .getModificationTime()); + return ret; + } + + static LocalResource createJarFile(FileContext files, Path p, int len, + Random r, LocalResourceVisibility vis) throws IOException, + URISyntaxException { + + FSDataOutputStream outFile = null; + try { + byte[] bytes = new byte[len]; + Path tarPath = new Path(p.toString()); + outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE)); + r.nextBytes(bytes); + outFile.write(bytes); + } finally { + if (outFile != null) + outFile.close(); + } + StringBuffer tarCommand = new StringBuffer(); + URI u = new URI(p.getParent().toString()); + tarCommand.append("cd '"); + tarCommand.append(FileUtil.makeShellPath(u.getPath().toString())); + tarCommand.append("' ; "); + tarCommand.append("jar cf " + p.getName() + ".jar " + p.getName()); + String[] shellCmd = { "bash", "-c", tarCommand.toString() }; + ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); + shexec.execute(); + int exitcode = shexec.getExitCode(); + if (exitcode != 0) { + throw new IOException("Error untarring file " + p + + ". Tar process exited with exit code " + exitcode); + } + LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); + ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString() + + ".jar"))); + ret.setSize(len); + ret.setType(LocalResourceType.ARCHIVE); + ret.setVisibility(vis); + ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".jar")) + .getModificationTime()); + return ret; + } + + static LocalResource createZipFile(FileContext files, Path p, int len, + Random r, LocalResourceVisibility vis) throws IOException, + URISyntaxException { + + FSDataOutputStream outFile = null; + try { + byte[] bytes = new byte[len]; + Path tarPath = new Path(p.toString()); + outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE)); + r.nextBytes(bytes); + outFile.write(bytes); + } finally { + if (outFile != null) + outFile.close(); + } + StringBuffer zipCommand = new StringBuffer(); + URI u = new URI(p.getParent().toString()); + zipCommand.append("cd '"); + zipCommand.append(FileUtil.makeShellPath(u.getPath().toString())); + zipCommand.append("' ; "); + zipCommand.append("gzip " + p.getName()); + String[] shellCmd = { "bash", "-c", zipCommand.toString() }; + ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); + shexec.execute(); + int exitcode = shexec.getExitCode(); + if (exitcode != 0) { + throw new IOException("Error untarring file " + p + + ". Tar process exited with exit code " + exitcode); + } + LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); + ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString() + + ".zip"))); + ret.setSize(len); + ret.setType(LocalResourceType.ARCHIVE); + ret.setVisibility(vis); + ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".gz")) + .getModificationTime()); + return ret; + } + + @Test (timeout=10000) public void testDownloadBadPublic() throws IOException, URISyntaxException, InterruptedException { Configuration conf = new Configuration(); @@ -161,7 +284,7 @@ public class TestFSDownload { } } - @Test + @Test (timeout=10000) public void testDownload() throws IOException, URISyntaxException, InterruptedException { Configuration conf = new Configuration(); @@ -229,6 +352,175 @@ public class TestFSDownload { } } + @SuppressWarnings("deprecation") + @Test (timeout=10000) + public void testDownloadArchive() throws IOException, URISyntaxException, + InterruptedException { + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + FileContext files = FileContext.getLocalFSFileContext(conf); + final Path basedir = files.makeQualified(new Path("target", + TestFSDownload.class.getSimpleName())); + files.mkdir(basedir, null, true); + conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); + + Random rand = new Random(); + long sharedSeed = rand.nextLong(); + rand.setSeed(sharedSeed); + System.out.println("SEED: " + sharedSeed); + + Map> pending = new HashMap>(); + ExecutorService exec = Executors.newSingleThreadExecutor(); + LocalDirAllocator dirs = new LocalDirAllocator( + TestFSDownload.class.getName()); + + int size = rand.nextInt(512) + 512; + LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE; + + Path p = new Path(basedir, "" + 1); + LocalResource rsrc = createTarFile(files, p, size, rand, vis); + Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf); + FSDownload fsd = new FSDownload(files, + UserGroupInformation.getCurrentUser(), conf, destPath, rsrc, + new Random(sharedSeed)); + pending.put(rsrc, exec.submit(fsd)); + + try { + FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus( + basedir); + for (FileStatus filestatus : filesstatus) { + if (filestatus.isDir()) { + FileStatus[] childFiles = files.getDefaultFileSystem().listStatus( + filestatus.getPath()); + for (FileStatus childfile : childFiles) { + if (childfile.getPath().getName().equalsIgnoreCase("1.tar.tmp")) { + Assert.fail("Tmp File should not have been there " + + childfile.getPath()); + } + } + } + } + }catch (Exception e) { + throw new IOException("Failed exec", e); + } + finally { + exec.shutdown(); + } + } + + @SuppressWarnings("deprecation") + @Test (timeout=10000) + public void testDownloadPatternJar() throws IOException, URISyntaxException, + InterruptedException { + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + FileContext files = FileContext.getLocalFSFileContext(conf); + final Path basedir = files.makeQualified(new Path("target", + TestFSDownload.class.getSimpleName())); + files.mkdir(basedir, null, true); + conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); + + Random rand = new Random(); + long sharedSeed = rand.nextLong(); + rand.setSeed(sharedSeed); + System.out.println("SEED: " + sharedSeed); + + Map> pending = new HashMap>(); + ExecutorService exec = Executors.newSingleThreadExecutor(); + LocalDirAllocator dirs = new LocalDirAllocator( + TestFSDownload.class.getName()); + + int size = rand.nextInt(512) + 512; + LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE; + + Path p = new Path(basedir, "" + 1); + LocalResource rsrcjar = createJarFile(files, p, size, rand, vis); + rsrcjar.setType(LocalResourceType.PATTERN); + Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf); + FSDownload fsdjar = new FSDownload(files, + UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar, + new Random(sharedSeed)); + pending.put(rsrcjar, exec.submit(fsdjar)); + + try { + FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus( + basedir); + for (FileStatus filestatus : filesstatus) { + if (filestatus.isDir()) { + FileStatus[] childFiles = files.getDefaultFileSystem().listStatus( + filestatus.getPath()); + for (FileStatus childfile : childFiles) { + if (childfile.getPath().getName().equalsIgnoreCase("1.jar.tmp")) { + Assert.fail("Tmp File should not have been there " + + childfile.getPath()); + } + } + } + } + }catch (Exception e) { + throw new IOException("Failed exec", e); + } + finally { + exec.shutdown(); + } + } + + @SuppressWarnings("deprecation") + @Test (timeout=10000) + public void testDownloadArchiveZip() throws IOException, URISyntaxException, + InterruptedException { + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + FileContext files = FileContext.getLocalFSFileContext(conf); + final Path basedir = files.makeQualified(new Path("target", + TestFSDownload.class.getSimpleName())); + files.mkdir(basedir, null, true); + conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); + + Random rand = new Random(); + long sharedSeed = rand.nextLong(); + rand.setSeed(sharedSeed); + System.out.println("SEED: " + sharedSeed); + + Map> pending = new HashMap>(); + ExecutorService exec = Executors.newSingleThreadExecutor(); + LocalDirAllocator dirs = new LocalDirAllocator( + TestFSDownload.class.getName()); + + int size = rand.nextInt(512) + 512; + LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE; + + Path p = new Path(basedir, "" + 1); + LocalResource rsrczip = createZipFile(files, p, size, rand, vis); + Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf); + FSDownload fsdzip = new FSDownload(files, + UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip, + new Random(sharedSeed)); + pending.put(rsrczip, exec.submit(fsdzip)); + + try { + FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus( + basedir); + for (FileStatus filestatus : filesstatus) { + if (filestatus.isDir()) { + FileStatus[] childFiles = files.getDefaultFileSystem().listStatus( + filestatus.getPath()); + for (FileStatus childfile : childFiles) { + if (childfile.getPath().getName().equalsIgnoreCase("1.gz.tmp")) { + Assert.fail("Tmp File should not have been there " + + childfile.getPath()); + } + } + } + } + }catch (Exception e) { + throw new IOException("Failed exec", e); + } + finally { + exec.shutdown(); + } + } + private void verifyPermsRecursively(FileSystem fs, FileContext files, Path p, LocalResourceVisibility vis) throws IOException { @@ -261,7 +553,7 @@ public class TestFSDownload { } } - @Test + @Test (timeout=10000) public void testDirDownload() throws IOException, InterruptedException { Configuration conf = new Configuration(); FileContext files = FileContext.getLocalFSFileContext(conf);