From 2316f526902e827b6c1b92a5bddef72d211bc742 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Mon, 11 Dec 2017 14:00:42 -0800 Subject: [PATCH] MAPREDUCE-7018. Apply erasure coding properly to framework tarball and support plain tar (miklos.szegedi@cloudera.com via rkanter) --- .../mapred/uploader/FrameworkUploader.java | 59 +++++++------ .../uploader/TestFrameworkUploader.java | 83 ++++++++++--------- 2 files changed, 81 insertions(+), 61 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java index d1cd7401a50..a374262e82a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java @@ -81,7 +81,6 @@ public class FrameworkUploader implements Runnable { @VisibleForTesting OutputStream targetStream = null; - private Path targetPath = null; private String alias = null; private void printHelp(Options options) { @@ -140,11 +139,12 @@ public class FrameworkUploader implements Runnable { } } - private void beginUpload() throws IOException, UploaderException { + @VisibleForTesting + void beginUpload() throws IOException, UploaderException { if (targetStream == null) { validateTargetPath(); int lastIndex = target.indexOf('#'); - targetPath = + Path targetPath = new Path( target.substring( 0, lastIndex == -1 ? target.length() : lastIndex)); @@ -153,7 +153,37 @@ public class FrameworkUploader implements Runnable { targetPath.getName(); LOG.info("Target " + targetPath); FileSystem fileSystem = targetPath.getFileSystem(new Configuration()); - targetStream = fileSystem.create(targetPath, true); + + targetStream = null; + if (fileSystem instanceof DistributedFileSystem) { + LOG.info("Set replication to " + + replication + " for path: " + targetPath); + LOG.info("Disabling Erasure Coding for path: " + targetPath); + DistributedFileSystem dfs = (DistributedFileSystem)fileSystem; + DistributedFileSystem.HdfsDataOutputStreamBuilder builder = + dfs.createFile(targetPath) + .overwrite(true) + .ecPolicyName( + SystemErasureCodingPolicies.getReplicationPolicy().getName()); + if (replication > 0) { + builder.replication(replication); + } + targetStream = builder.build(); + } else { + LOG.warn("Cannot set replication to " + + replication + " for path: " + targetPath + + " on a non-distributed fileystem " + + fileSystem.getClass().getName()); + } + if (targetStream == null) { + targetStream = fileSystem.create(targetPath, true); + } + + if (targetPath.getName().endsWith("gz") || + targetPath.getName().endsWith("tgz")) { + LOG.info("Creating GZip"); + targetStream = new GZIPOutputStream(targetStream); + } } } @@ -162,7 +192,7 @@ public class FrameworkUploader implements Runnable { beginUpload(); LOG.info("Compressing tarball"); try (TarArchiveOutputStream out = new TarArchiveOutputStream( - new GZIPOutputStream(targetStream))) { + targetStream)) { for (String fullPath : filteredInputFiles) { LOG.info("Adding " + fullPath); File file = new File(fullPath); @@ -178,25 +208,6 @@ public class FrameworkUploader implements Runnable { targetStream.close(); } } - - if (targetPath == null) { - return; - } - - // Set file attributes - FileSystem fileSystem = targetPath.getFileSystem(new Configuration()); - if (fileSystem instanceof DistributedFileSystem) { - LOG.info("Disabling Erasure Coding for path: " + targetPath); - DistributedFileSystem dfs = (DistributedFileSystem) fileSystem; - dfs.setErasureCodingPolicy(targetPath, - SystemErasureCodingPolicies.getReplicationPolicy().getName()); - } - - if (replication > 0) { - LOG.info("Set replication to " + - replication + " for path: " + targetPath); - fileSystem.setReplication(targetPath, replication); - } } private void parseLists() throws UploaderException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java index 9d03165a4e8..f3e4fc586d5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java @@ -30,6 +30,7 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; import java.util.HashMap; import java.util.HashSet; @@ -171,46 +172,54 @@ public class TestFrameworkUploader { */ @Test public void testBuildTarBall() throws IOException, UploaderException { - File parent = new File(testDir); - try { - parent.deleteOnExit(); - FrameworkUploader uploader = prepareTree(parent); - - File gzipFile = new File("upload.tar.gz"); - gzipFile.deleteOnExit(); - Assert.assertTrue("Creating output", gzipFile.createNewFile()); - uploader.targetStream = new FileOutputStream(gzipFile); - - uploader.buildPackage(); - - TarArchiveInputStream result = null; + String[] testFiles = {"upload.tar", "upload.tar.gz"}; + for (String testFile: testFiles) { + File parent = new File(testDir); try { - result = - new TarArchiveInputStream( - new GZIPInputStream(new FileInputStream(gzipFile))); - Set fileNames = new HashSet<>(); - Set sizes = new HashSet<>(); - TarArchiveEntry entry1 = result.getNextTarEntry(); - fileNames.add(entry1.getName()); - sizes.add(entry1.getSize()); - TarArchiveEntry entry2 = result.getNextTarEntry(); - fileNames.add(entry2.getName()); - sizes.add(entry2.getSize()); - Assert.assertTrue( - "File name error", fileNames.contains("a.jar")); - Assert.assertTrue( - "File size error", sizes.contains((long) 13)); - Assert.assertTrue( - "File name error", fileNames.contains("b.jar")); - Assert.assertTrue( - "File size error", sizes.contains((long) 14)); - } finally { - if (result != null) { - result.close(); + parent.deleteOnExit(); + FrameworkUploader uploader = prepareTree(parent); + + File gzipFile = + new File(parent.getAbsolutePath() + "/" + testFile); + gzipFile.deleteOnExit(); + + uploader.target = + "file:///" + gzipFile.getAbsolutePath(); + uploader.beginUpload(); + uploader.buildPackage(); + InputStream stream = new FileInputStream(gzipFile); + if (gzipFile.getName().endsWith(".gz")) { + stream = new GZIPInputStream(stream); } + + TarArchiveInputStream result = null; + try { + result = + new TarArchiveInputStream(stream); + Set fileNames = new HashSet<>(); + Set sizes = new HashSet<>(); + TarArchiveEntry entry1 = result.getNextTarEntry(); + fileNames.add(entry1.getName()); + sizes.add(entry1.getSize()); + TarArchiveEntry entry2 = result.getNextTarEntry(); + fileNames.add(entry2.getName()); + sizes.add(entry2.getSize()); + Assert.assertTrue( + "File name error", fileNames.contains("a.jar")); + Assert.assertTrue( + "File size error", sizes.contains((long) 13)); + Assert.assertTrue( + "File name error", fileNames.contains("b.jar")); + Assert.assertTrue( + "File size error", sizes.contains((long) 14)); + } finally { + if (result != null) { + result.close(); + } + } + } finally { + FileUtils.deleteDirectory(parent); } - } finally { - FileUtils.deleteDirectory(parent); } }