HADOOP-8140. dfs -getmerge should process its argments better (Daryn Sharp via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1297771 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4ce5f6553f
commit
bb86825d49
|
@ -345,6 +345,9 @@ Release 0.23.2 - UNRELEASED
|
||||||
HADOOP-8064. Remove unnecessary dependency on w3c.org in document processing
|
HADOOP-8064. Remove unnecessary dependency on w3c.org in document processing
|
||||||
(Khiwal Lee via bobby)
|
(Khiwal Lee via bobby)
|
||||||
|
|
||||||
|
HADOOP-8140. dfs -getmerge should process its argments better (Daryn Sharp
|
||||||
|
via bobby)
|
||||||
|
|
||||||
Release 0.23.1 - 2012-02-17
|
Release 0.23.1 - 2012-02-17
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -25,7 +25,10 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.shell.PathExceptions.PathIsDirectoryException;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
/** Various commands for copy files */
|
/** Various commands for copy files */
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -53,21 +56,69 @@ class CopyCommands {
|
||||||
|
|
||||||
protected PathData dst = null;
|
protected PathData dst = null;
|
||||||
protected String delimiter = null;
|
protected String delimiter = null;
|
||||||
|
protected List<PathData> srcs = null;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void processOptions(LinkedList<String> args) throws IOException {
|
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||||
CommandFormat cf = new CommandFormat(2, 3, "nl");
|
CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "nl");
|
||||||
cf.parse(args);
|
cf.parse(args);
|
||||||
|
|
||||||
delimiter = cf.getOpt("nl") ? "\n" : null;
|
delimiter = cf.getOpt("nl") ? "\n" : null;
|
||||||
|
|
||||||
dst = new PathData(new File(args.removeLast()), getConf());
|
dst = new PathData(new File(args.removeLast()), getConf());
|
||||||
|
if (dst.exists && dst.stat.isDirectory()) {
|
||||||
|
throw new PathIsDirectoryException(dst.toString());
|
||||||
|
}
|
||||||
|
srcs = new LinkedList<PathData>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processArguments(LinkedList<PathData> items)
|
||||||
|
throws IOException {
|
||||||
|
super.processArguments(items);
|
||||||
|
if (exitCode != 0) { // check for error collecting paths
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
FSDataOutputStream out = dst.fs.create(dst.path);
|
||||||
|
try {
|
||||||
|
FSDataInputStream in = null;
|
||||||
|
for (PathData src : srcs) {
|
||||||
|
try {
|
||||||
|
in = src.fs.open(src.path);
|
||||||
|
IOUtils.copyBytes(in, out, getConf(), false);
|
||||||
|
if (delimiter != null) {
|
||||||
|
out.write(delimiter.getBytes("UTF-8"));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processNonexistentPath(PathData item) throws IOException {
|
||||||
|
exitCode = 1; // flag that a path is bad
|
||||||
|
super.processNonexistentPath(item);
|
||||||
|
}
|
||||||
|
|
||||||
|
// this command is handled a bit differently than others. the paths
|
||||||
|
// are batched up instead of actually being processed. this avoids
|
||||||
|
// unnecessarily streaming into the merge file and then encountering
|
||||||
|
// a path error that should abort the merge
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void processPath(PathData src) throws IOException {
|
protected void processPath(PathData src) throws IOException {
|
||||||
FileUtil.copyMerge(src.fs, src.path,
|
// for directories, recurse one level to get its files, else skip it
|
||||||
dst.fs, dst.path, false, getConf(), delimiter);
|
if (src.stat.isDirectory()) {
|
||||||
|
if (getDepth() == 0) {
|
||||||
|
recursePath(src);
|
||||||
|
} // skip subdirs
|
||||||
|
} else {
|
||||||
|
srcs.add(src);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
|
||||||
import org.apache.hadoop.fs.shell.PathExceptions.PathIsDirectoryException;
|
import org.apache.hadoop.fs.shell.PathExceptions.PathIsDirectoryException;
|
||||||
import org.apache.hadoop.fs.shell.PathExceptions.PathIsNotDirectoryException;
|
import org.apache.hadoop.fs.shell.PathExceptions.PathIsNotDirectoryException;
|
||||||
import org.apache.hadoop.fs.shell.PathExceptions.PathNotFoundException;
|
import org.apache.hadoop.fs.shell.PathExceptions.PathNotFoundException;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encapsulates a Path (path), its FileStatus (stat), and its FileSystem (fs).
|
* Encapsulates a Path (path), its FileStatus (stat), and its FileSystem (fs).
|
||||||
|
@ -43,7 +45,7 @@ import org.apache.hadoop.fs.shell.PathExceptions.PathNotFoundException;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
|
|
||||||
public class PathData {
|
public class PathData implements Comparable<PathData> {
|
||||||
protected final URI uri;
|
protected final URI uri;
|
||||||
public final FileSystem fs;
|
public final FileSystem fs;
|
||||||
public final Path path;
|
public final Path path;
|
||||||
|
@ -210,6 +212,7 @@ public class PathData {
|
||||||
String child = getStringForChildPath(stats[i].getPath());
|
String child = getStringForChildPath(stats[i].getPath());
|
||||||
items[i] = new PathData(fs, child, stats[i]);
|
items[i] = new PathData(fs, child, stats[i]);
|
||||||
}
|
}
|
||||||
|
Arrays.sort(items);
|
||||||
return items;
|
return items;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,6 +306,7 @@ public class PathData {
|
||||||
items[i++] = new PathData(fs, globMatch, stat);
|
items[i++] = new PathData(fs, globMatch, stat);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Arrays.sort(items);
|
||||||
return items;
|
return items;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -446,4 +450,20 @@ public class PathData {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(PathData o) {
|
||||||
|
return path.compareTo(((PathData)o).path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
return (o != null) &&
|
||||||
|
(o instanceof PathData) &&
|
||||||
|
path.equals(((PathData)o).path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return path.hashCode();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -225,6 +225,110 @@ public class TestFsShellCopy {
|
||||||
assertEquals(exitCode, gotExit);
|
assertEquals(exitCode, gotExit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCopyMerge() throws Exception {
|
||||||
|
Path root = new Path(testRootDir, "TestMerge");
|
||||||
|
Path f1 = new Path(root, "f1");
|
||||||
|
Path f2 = new Path(root, "f2");
|
||||||
|
Path f3 = new Path(root, "f3");
|
||||||
|
Path fnf = new Path(root, "fnf");
|
||||||
|
Path d = new Path(root, "dir");
|
||||||
|
Path df1 = new Path(d, "df1");
|
||||||
|
Path df2 = new Path(d, "df2");
|
||||||
|
Path df3 = new Path(d, "df3");
|
||||||
|
|
||||||
|
createFile(f1, f2, f3, df1, df2, df3);
|
||||||
|
|
||||||
|
int exit;
|
||||||
|
// one file, kind of silly
|
||||||
|
exit = shell.run(new String[]{
|
||||||
|
"-getmerge",
|
||||||
|
f1.toString(),
|
||||||
|
"out" });
|
||||||
|
assertEquals(0, exit);
|
||||||
|
assertEquals("f1", readFile("out"));
|
||||||
|
|
||||||
|
exit = shell.run(new String[]{
|
||||||
|
"-getmerge",
|
||||||
|
fnf.toString(),
|
||||||
|
"out" });
|
||||||
|
assertEquals(1, exit);
|
||||||
|
assertFalse(lfs.exists(new Path("out")));
|
||||||
|
|
||||||
|
// two files
|
||||||
|
exit = shell.run(new String[]{
|
||||||
|
"-getmerge",
|
||||||
|
f1.toString(), f2.toString(),
|
||||||
|
"out" });
|
||||||
|
assertEquals(0, exit);
|
||||||
|
assertEquals("f1f2", readFile("out"));
|
||||||
|
|
||||||
|
// two files, preserves order
|
||||||
|
exit = shell.run(new String[]{
|
||||||
|
"-getmerge",
|
||||||
|
f2.toString(), f1.toString(),
|
||||||
|
"out" });
|
||||||
|
assertEquals(0, exit);
|
||||||
|
assertEquals("f2f1", readFile("out"));
|
||||||
|
|
||||||
|
// two files
|
||||||
|
exit = shell.run(new String[]{
|
||||||
|
"-getmerge", "-nl",
|
||||||
|
f1.toString(), f2.toString(),
|
||||||
|
"out" });
|
||||||
|
assertEquals(0, exit);
|
||||||
|
assertEquals("f1\nf2\n", readFile("out"));
|
||||||
|
|
||||||
|
// glob three files
|
||||||
|
shell.run(new String[]{
|
||||||
|
"-getmerge", "-nl",
|
||||||
|
new Path(root, "f*").toString(),
|
||||||
|
"out" });
|
||||||
|
assertEquals(0, exit);
|
||||||
|
assertEquals("f1\nf2\nf3\n", readFile("out"));
|
||||||
|
|
||||||
|
// directory with 3 files, should skip subdir
|
||||||
|
shell.run(new String[]{
|
||||||
|
"-getmerge", "-nl",
|
||||||
|
root.toString(),
|
||||||
|
"out" });
|
||||||
|
assertEquals(0, exit);
|
||||||
|
assertEquals("f1\nf2\nf3\n", readFile("out"));
|
||||||
|
|
||||||
|
// subdir
|
||||||
|
shell.run(new String[]{
|
||||||
|
"-getmerge", "-nl",
|
||||||
|
d.toString(), "out"});
|
||||||
|
assertEquals(0, exit);
|
||||||
|
assertEquals("df1\ndf2\ndf3\n", readFile("out"));
|
||||||
|
|
||||||
|
// file, dir, file
|
||||||
|
shell.run(new String[]{
|
||||||
|
"-getmerge", "-nl",
|
||||||
|
f1.toString(), d.toString(), f2.toString(), "out" });
|
||||||
|
assertEquals(0, exit);
|
||||||
|
assertEquals("f1\ndf1\ndf2\ndf3\nf2\n", readFile("out"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createFile(Path ... paths) throws IOException {
|
||||||
|
for (Path path : paths) {
|
||||||
|
FSDataOutputStream out = lfs.create(path);
|
||||||
|
out.write(path.getName().getBytes());
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String readFile(String out) throws IOException {
|
||||||
|
Path path = new Path(out);
|
||||||
|
FileStatus stat = lfs.getFileStatus(path);
|
||||||
|
FSDataInputStream in = lfs.open(path);
|
||||||
|
byte[] buffer = new byte[(int)stat.getLen()];
|
||||||
|
in.readFully(buffer);
|
||||||
|
in.close();
|
||||||
|
lfs.delete(path, false);
|
||||||
|
return new String(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
// path handles "." rather oddly
|
// path handles "." rather oddly
|
||||||
private String pathAsString(Path p) {
|
private String pathAsString(Path p) {
|
||||||
String s = (p == null) ? Path.CUR_DIR : p.toString();
|
String s = (p == null) ? Path.CUR_DIR : p.toString();
|
||||||
|
|
Loading…
Reference in New Issue