From bb86825d49897cf95551ef67371f13760f2900bf Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Tue, 6 Mar 2012 22:44:26 +0000 Subject: [PATCH] HADOOP-8140. dfs -getmerge should process its argments better (Daryn Sharp via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1297771 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../apache/hadoop/fs/shell/CopyCommands.java | 59 +++++++++- .../org/apache/hadoop/fs/shell/PathData.java | 22 +++- .../org/apache/hadoop/fs/TestFsShellCopy.java | 104 ++++++++++++++++++ 4 files changed, 183 insertions(+), 5 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 24a75059939..48052775fcf 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -345,6 +345,9 @@ Release 0.23.2 - UNRELEASED HADOOP-8064. Remove unnecessary dependency on w3c.org in document processing (Khiwal Lee via bobby) + HADOOP-8140. dfs -getmerge should process its argments better (Daryn Sharp + via bobby) + Release 0.23.1 - 2012-02-17 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java index a42d6a83fd2..3d999ed54dc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java @@ -25,7 +25,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.shell.PathExceptions.PathIsDirectoryException; +import org.apache.hadoop.io.IOUtils; /** Various commands for copy files */ @InterfaceAudience.Private @@ -53,21 +56,69 @@ public static class Merge extends FsCommand { protected PathData dst = null; protected String delimiter = null; + protected List srcs = null; @Override protected void processOptions(LinkedList args) throws IOException { - CommandFormat cf = new CommandFormat(2, 3, "nl"); + CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "nl"); cf.parse(args); delimiter = cf.getOpt("nl") ? "\n" : null; dst = new PathData(new File(args.removeLast()), getConf()); + if (dst.exists && dst.stat.isDirectory()) { + throw new PathIsDirectoryException(dst.toString()); + } + srcs = new LinkedList(); } + @Override + protected void processArguments(LinkedList items) + throws IOException { + super.processArguments(items); + if (exitCode != 0) { // check for error collecting paths + return; + } + FSDataOutputStream out = dst.fs.create(dst.path); + try { + FSDataInputStream in = null; + for (PathData src : srcs) { + try { + in = src.fs.open(src.path); + IOUtils.copyBytes(in, out, getConf(), false); + if (delimiter != null) { + out.write(delimiter.getBytes("UTF-8")); + } + } finally { + in.close(); + } + } + } finally { + out.close(); + } + } + + @Override + protected void processNonexistentPath(PathData item) throws IOException { + exitCode = 1; // flag that a path is bad + super.processNonexistentPath(item); + } + + // this command is handled a bit differently than others. the paths + // are batched up instead of actually being processed. this avoids + // unnecessarily streaming into the merge file and then encountering + // a path error that should abort the merge + @Override protected void processPath(PathData src) throws IOException { - FileUtil.copyMerge(src.fs, src.path, - dst.fs, dst.path, false, getConf(), delimiter); + // for directories, recurse one level to get its files, else skip it + if (src.stat.isDirectory()) { + if (getDepth() == 0) { + recursePath(src); + } // skip subdirs + } else { + srcs.add(src); + } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java index 850193c2883..afcba24de57 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -35,6 +36,7 @@ import org.apache.hadoop.fs.shell.PathExceptions.PathIsDirectoryException; import org.apache.hadoop.fs.shell.PathExceptions.PathIsNotDirectoryException; import org.apache.hadoop.fs.shell.PathExceptions.PathNotFoundException; +import org.apache.hadoop.io.Writable; /** * Encapsulates a Path (path), its FileStatus (stat), and its FileSystem (fs). @@ -43,7 +45,7 @@ @InterfaceAudience.Private @InterfaceStability.Unstable -public class PathData { +public class PathData implements Comparable { protected final URI uri; public final FileSystem fs; public final Path path; @@ -210,6 +212,7 @@ public PathData[] getDirectoryContents() throws IOException { String child = getStringForChildPath(stats[i].getPath()); items[i] = new PathData(fs, child, stats[i]); } + Arrays.sort(items); return items; } @@ -303,6 +306,7 @@ public static PathData[] expandAsGlob(String pattern, Configuration conf) items[i++] = new PathData(fs, globMatch, stat); } } + Arrays.sort(items); return items; } @@ -446,4 +450,20 @@ private static URI stringToUri(String pathString) { } } + @Override + public int compareTo(PathData o) { + return path.compareTo(((PathData)o).path); + } + + @Override + public boolean equals(Object o) { + return (o != null) && + (o instanceof PathData) && + path.equals(((PathData)o).path); + } + + @Override + public int hashCode() { + return path.hashCode(); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsShellCopy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsShellCopy.java index 796cec3d828..1ba1f915552 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsShellCopy.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsShellCopy.java @@ -225,6 +225,110 @@ private void checkPut(int exitCode, Path src, Path dest) throws Exception { assertEquals(exitCode, gotExit); } + @Test + public void testCopyMerge() throws Exception { + Path root = new Path(testRootDir, "TestMerge"); + Path f1 = new Path(root, "f1"); + Path f2 = new Path(root, "f2"); + Path f3 = new Path(root, "f3"); + Path fnf = new Path(root, "fnf"); + Path d = new Path(root, "dir"); + Path df1 = new Path(d, "df1"); + Path df2 = new Path(d, "df2"); + Path df3 = new Path(d, "df3"); + + createFile(f1, f2, f3, df1, df2, df3); + + int exit; + // one file, kind of silly + exit = shell.run(new String[]{ + "-getmerge", + f1.toString(), + "out" }); + assertEquals(0, exit); + assertEquals("f1", readFile("out")); + + exit = shell.run(new String[]{ + "-getmerge", + fnf.toString(), + "out" }); + assertEquals(1, exit); + assertFalse(lfs.exists(new Path("out"))); + + // two files + exit = shell.run(new String[]{ + "-getmerge", + f1.toString(), f2.toString(), + "out" }); + assertEquals(0, exit); + assertEquals("f1f2", readFile("out")); + + // two files, preserves order + exit = shell.run(new String[]{ + "-getmerge", + f2.toString(), f1.toString(), + "out" }); + assertEquals(0, exit); + assertEquals("f2f1", readFile("out")); + + // two files + exit = shell.run(new String[]{ + "-getmerge", "-nl", + f1.toString(), f2.toString(), + "out" }); + assertEquals(0, exit); + assertEquals("f1\nf2\n", readFile("out")); + + // glob three files + shell.run(new String[]{ + "-getmerge", "-nl", + new Path(root, "f*").toString(), + "out" }); + assertEquals(0, exit); + assertEquals("f1\nf2\nf3\n", readFile("out")); + + // directory with 3 files, should skip subdir + shell.run(new String[]{ + "-getmerge", "-nl", + root.toString(), + "out" }); + assertEquals(0, exit); + assertEquals("f1\nf2\nf3\n", readFile("out")); + + // subdir + shell.run(new String[]{ + "-getmerge", "-nl", + d.toString(), "out"}); + assertEquals(0, exit); + assertEquals("df1\ndf2\ndf3\n", readFile("out")); + + // file, dir, file + shell.run(new String[]{ + "-getmerge", "-nl", + f1.toString(), d.toString(), f2.toString(), "out" }); + assertEquals(0, exit); + assertEquals("f1\ndf1\ndf2\ndf3\nf2\n", readFile("out")); + } + + private void createFile(Path ... paths) throws IOException { + for (Path path : paths) { + FSDataOutputStream out = lfs.create(path); + out.write(path.getName().getBytes()); + out.close(); + } + } + + private String readFile(String out) throws IOException { + Path path = new Path(out); + FileStatus stat = lfs.getFileStatus(path); + FSDataInputStream in = lfs.open(path); + byte[] buffer = new byte[(int)stat.getLen()]; + in.readFully(buffer); + in.close(); + lfs.delete(path, false); + return new String(buffer); + } + // path handles "." rather oddly private String pathAsString(Path p) { String s = (p == null) ? Path.CUR_DIR : p.toString();