From 94c6a4aa85e7d98e9b532b330f30783315f4334b Mon Sep 17 00:00:00 2001 From: Vinayakumar B Date: Wed, 22 Jul 2015 10:25:49 +0530 Subject: [PATCH] HADOOP-12017. Hadoop archives command should use configurable replication factor when closing (Contributed by Bibin A Chundatt) --- .../hadoop-common/CHANGES.txt | 3 +++ .../apache/hadoop/tools/HadoopArchives.java | 21 +++++++++------ .../src/site/markdown/HadoopArchives.md.vm | 2 +- .../hadoop/tools/TestHadoopArchives.java | 26 ++++++++++++------- 4 files changed, 33 insertions(+), 19 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 5b51bce6876..3d101d4f5ae 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -992,6 +992,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString() over getMessage() in logging/span events. (Varun Saxena via stevel) + HADOOP-12017. Hadoop archives command should use configurable replication + factor when closing (Bibin A Chundatt via vinayakumarb) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java b/hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java index 330830b41f5..ee148503f10 100644 --- a/hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java +++ b/hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java @@ -100,15 +100,17 @@ public class HadoopArchives implements Tool { static final String SRC_PARENT_LABEL = NAME + ".parent.path"; /** the size of the blocks that will be created when archiving **/ static final String HAR_BLOCKSIZE_LABEL = NAME + ".block.size"; - /**the size of the part files that will be created when archiving **/ + /** the replication factor for the file in archiving. **/ + static final String HAR_REPLICATION_LABEL = NAME + ".replication.factor"; + /** the size of the part files that will be created when archiving **/ static final String HAR_PARTSIZE_LABEL = NAME + ".partfile.size"; /** size of each part file size **/ long partSize = 2 * 1024 * 1024 * 1024l; /** size of blocks in hadoop archives **/ long blockSize = 512 * 1024 * 1024l; - /** the desired replication degree; default is 10 **/ - short repl = 10; + /** the desired replication degree; default is 3 **/ + short repl = 3; private static final String usage = "archive" + " <-archiveName .har> <-p > [-r ]" + @@ -475,6 +477,7 @@ public class HadoopArchives implements Tool { conf.setLong(HAR_PARTSIZE_LABEL, partSize); conf.set(DST_HAR_LABEL, archiveName); conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString()); + conf.setInt(HAR_REPLICATION_LABEL, repl); Path outputPath = new Path(dest, archiveName); FileOutputFormat.setOutputPath(conf, outputPath); FileSystem outFs = outputPath.getFileSystem(conf); @@ -549,8 +552,6 @@ public class HadoopArchives implements Tool { } finally { srcWriter.close(); } - //increase the replication of src files - jobfs.setReplication(srcFiles, repl); conf.setInt(SRC_COUNT_LABEL, numFiles); conf.setLong(TOTAL_SIZE_LABEL, totalSize); int numMaps = (int)(totalSize/partSize); @@ -587,6 +588,7 @@ public class HadoopArchives implements Tool { FileSystem destFs = null; byte[] buffer; int buf_size = 128 * 1024; + private int replication = 3; long blockSize = 512 * 1024 * 1024l; // configure the mapper and create @@ -595,7 +597,7 @@ public class HadoopArchives implements Tool { // tmp files. public void configure(JobConf conf) { this.conf = conf; - + replication = conf.getInt(HAR_REPLICATION_LABEL, 3); // this is tightly tied to map reduce // since it does not expose an api // to get the partition @@ -712,6 +714,7 @@ public class HadoopArchives implements Tool { public void close() throws IOException { // close the part files. partStream.close(); + destFs.setReplication(tmpOutput, (short) replication); } } @@ -732,6 +735,7 @@ public class HadoopArchives implements Tool { private int numIndexes = 1000; private Path tmpOutputDir = null; private int written = 0; + private int replication = 3; private int keyVal = 0; // configure @@ -740,6 +744,7 @@ public class HadoopArchives implements Tool { tmpOutputDir = FileOutputFormat.getWorkOutputPath(this.conf); masterIndex = new Path(tmpOutputDir, "_masterindex"); index = new Path(tmpOutputDir, "_index"); + replication = conf.getInt(HAR_REPLICATION_LABEL, 3); try { fs = masterIndex.getFileSystem(conf); if (fs.exists(masterIndex)) { @@ -798,8 +803,8 @@ public class HadoopArchives implements Tool { outStream.close(); indexStream.close(); // try increasing the replication - fs.setReplication(index, (short) 5); - fs.setReplication(masterIndex, (short) 5); + fs.setReplication(index, (short) replication); + fs.setReplication(masterIndex, (short) replication); } } diff --git a/hadoop-tools/hadoop-archives/src/site/markdown/HadoopArchives.md.vm b/hadoop-tools/hadoop-archives/src/site/markdown/HadoopArchives.md.vm index be557a73293..8bbb1ea596d 100644 --- a/hadoop-tools/hadoop-archives/src/site/markdown/HadoopArchives.md.vm +++ b/hadoop-tools/hadoop-archives/src/site/markdown/HadoopArchives.md.vm @@ -53,7 +53,7 @@ How to Create an Archive sections. -r indicates the desired replication factor; if this optional argument is - not specified, a replication factor of 10 will be used. + not specified, a replication factor of 3 will be used. If you just want to archive a single directory /foo/bar then you can just use diff --git a/hadoop-tools/hadoop-archives/src/test/java/org/apache/hadoop/tools/TestHadoopArchives.java b/hadoop-tools/hadoop-archives/src/test/java/org/apache/hadoop/tools/TestHadoopArchives.java index d8222dc20f2..165c51559ed 100644 --- a/hadoop-tools/hadoop-archives/src/test/java/org/apache/hadoop/tools/TestHadoopArchives.java +++ b/hadoop-tools/hadoop-archives/src/test/java/org/apache/hadoop/tools/TestHadoopArchives.java @@ -21,7 +21,6 @@ package org.apache.hadoop.tools; import java.io.ByteArrayOutputStream; import java.io.FilterInputStream; import java.io.IOException; -import java.io.OutputStream; import java.io.PrintStream; import java.net.URI; import java.util.ArrayList; @@ -39,7 +38,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.HarFileSystem; import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.JarFinder; @@ -110,13 +111,9 @@ public class TestHadoopArchives { conf.set(CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT + ".default." + CapacitySchedulerConfiguration.CAPACITY, "100"); - dfscluster = new MiniDFSCluster - .Builder(conf) - .checkExitOnShutdown(true) - .numDataNodes(2) - .format(true) - .racks(null) - .build(); + dfscluster = + new MiniDFSCluster.Builder(conf).checkExitOnShutdown(true) + .numDataNodes(3).format(true).racks(null).build(); fs = dfscluster.getFileSystem(); @@ -753,12 +750,21 @@ public class TestHadoopArchives { final String harName = "foo.har"; final String fullHarPathStr = prefix + harName; - final String[] args = { "-archiveName", harName, "-p", inputPathStr, "-r", - "3", "*", archivePath.toString() }; + final String[] args = + { "-archiveName", harName, "-p", inputPathStr, "-r", "2", "*", + archivePath.toString() }; System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH, HADOOP_ARCHIVES_JAR); final HadoopArchives har = new HadoopArchives(conf); assertEquals(0, ToolRunner.run(har, args)); + RemoteIterator listFiles = + fs.listFiles(new Path(archivePath.toString() + "/" + harName), false); + while (listFiles.hasNext()) { + LocatedFileStatus next = listFiles.next(); + if (!next.getPath().toString().endsWith("_SUCCESS")) { + assertEquals(next.getPath().toString(), 2, next.getReplication()); + } + } return fullHarPathStr; }