HADOOP-7238. Refactor the cat and text commands to conform to new FsCommand class. Contributed by Daryn Sharp

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1101199 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-05-09 20:08:51 +00:00
parent a0391cddd3
commit 3337cdb312
5 changed files with 211 additions and 160 deletions

View File

@ -127,6 +127,9 @@ Trunk (unreleased changes)
HADOOP-7265. Keep track of relative paths in PathData. (Daryn Sharp
via szetszwo)
HADOOP-7238. Refactor the cat and text commands to conform to new FsCommand
class. (Daryn Sharp via szetszwo)
OPTIMIZATIONS
BUG FIXES

View File

@ -28,7 +28,6 @@ import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.util.zip.GZIPInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -39,17 +38,9 @@ import org.apache.hadoop.fs.shell.Command;
import org.apache.hadoop.fs.shell.CommandFactory;
import org.apache.hadoop.fs.shell.CommandFormat;
import org.apache.hadoop.fs.shell.FsCommand;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@ -173,12 +164,12 @@ public class FsShell extends Configured implements Tool {
* and copy them to the local name. srcf is kept.
* When copying multiple files, the destination must be a directory.
* Otherwise, IOException is thrown.
* @param argv: arguments
* @param pos: Ignore everything before argv[pos]
* @exception: IOException
* @param argv : arguments
* @param pos : Ignore everything before argv[pos]
* @throws Exception
* @see org.apache.hadoop.fs.FileSystem.globStatus
*/
void copyToLocal(String[]argv, int pos) throws IOException {
void copyToLocal(String[]argv, int pos) throws Exception {
CommandFormat cf = new CommandFormat("copyToLocal", 2,2,"crc","ignoreCrc");
String srcstr = null;
@ -199,7 +190,12 @@ public class FsShell extends Configured implements Tool {
if (copyCrc) {
System.err.println("-crc option is not valid when destination is stdout.");
}
cat(srcstr, verifyChecksum);
List<String> catArgv = new ArrayList<String>();
catArgv.add("-cat");
if (cf.getOpt("ignoreCrc")) catArgv.add("-ignoreCrc");
catArgv.add(srcstr);
run(catArgv.toArray(new String[0]));
} else {
File dst = new File(dststr);
Path srcpath = new Path(srcstr);
@ -315,115 +311,6 @@ public class FsShell extends Configured implements Tool {
void moveToLocal(String srcf, Path dst) throws IOException {
System.err.println("Option '-moveToLocal' is not implemented yet.");
}
/**
* Fetch all files that match the file pattern <i>srcf</i> and display
* their content on stdout.
* @param srcf: a file pattern specifying source files
* @exception: IOException
* @see org.apache.hadoop.fs.FileSystem.globStatus
*/
void cat(String src, boolean verifyChecksum) throws IOException {
//cat behavior in Linux
// [~/1207]$ ls ?.txt
// x.txt z.txt
// [~/1207]$ cat x.txt y.txt z.txt
// xxx
// cat: y.txt: No such file or directory
// zzz
Path srcPattern = new Path(src);
new DelayedExceptionThrowing() {
@Override
void process(Path p, FileSystem srcFs) throws IOException {
printToStdout(srcFs.open(p));
}
}.globAndProcess(srcPattern, getSrcFileSystem(srcPattern, verifyChecksum));
}
private class TextRecordInputStream extends InputStream {
SequenceFile.Reader r;
WritableComparable<?> key;
Writable val;
DataInputBuffer inbuf;
DataOutputBuffer outbuf;
public TextRecordInputStream(FileStatus f) throws IOException {
final Path fpath = f.getPath();
final Configuration lconf = getConf();
r = new SequenceFile.Reader(lconf,
SequenceFile.Reader.file(fpath));
key = ReflectionUtils.newInstance(
r.getKeyClass().asSubclass(WritableComparable.class), lconf);
val = ReflectionUtils.newInstance(
r.getValueClass().asSubclass(Writable.class), lconf);
inbuf = new DataInputBuffer();
outbuf = new DataOutputBuffer();
}
public int read() throws IOException {
int ret;
if (null == inbuf || -1 == (ret = inbuf.read())) {
if (!r.next(key, val)) {
return -1;
}
byte[] tmp = key.toString().getBytes();
outbuf.write(tmp, 0, tmp.length);
outbuf.write('\t');
tmp = val.toString().getBytes();
outbuf.write(tmp, 0, tmp.length);
outbuf.write('\n');
inbuf.reset(outbuf.getData(), outbuf.getLength());
outbuf.reset();
ret = inbuf.read();
}
return ret;
}
public void close() throws IOException {
r.close();
super.close();
}
}
private InputStream forMagic(Path p, FileSystem srcFs) throws IOException {
FSDataInputStream i = srcFs.open(p);
// check codecs
CompressionCodecFactory cf = new CompressionCodecFactory(getConf());
CompressionCodec codec = cf.getCodec(p);
if (codec != null) {
return codec.createInputStream(i);
}
switch(i.readShort()) {
case 0x1f8b: // RFC 1952
i.seek(0);
return new GZIPInputStream(i);
case 0x5345: // 'S' 'E'
if (i.readByte() == 'Q') {
i.close();
return new TextRecordInputStream(srcFs.getFileStatus(p));
}
break;
}
i.seek(0);
return i;
}
void text(String srcf) throws IOException {
Path srcPattern = new Path(srcf);
new DelayedExceptionThrowing() {
@Override
void process(Path p, FileSystem srcFs) throws IOException {
if (srcFs.isDirectory(p)) {
throw new IOException("Source must be a file.");
}
printToStdout(forMagic(p, srcFs));
}
}.globAndProcess(srcPattern, srcPattern.getFileSystem(getConf()));
}
/**
* Show the size of a partition in the filesystem that contains
@ -953,11 +840,9 @@ public class FsShell extends Configured implements Tool {
"[-rmr [-skipTrash] <src>] [-put <localsrc> ... <dst>] [-copyFromLocal <localsrc> ... <dst>]\n\t" +
"[-moveFromLocal <localsrc> ... <dst>] [" +
GET_SHORT_USAGE + "\n\t" +
"[-cat <src>]\n\t" +
"[" + COPYTOLOCAL_SHORT_USAGE + "] [-moveToLocal <src> <localdst>]\n\t" +
"[-report]\n\t" +
"[-touchz <path>] [-test -[ezd] <path>] [-stat [format] <path>]\n\t" +
"[-text <path>]";
"[-touchz <path>] [-test -[ezd] <path>] [-stat [format] <path>]";
String conf ="-conf <configuration file>: Specify an application configuration file.";
@ -1025,14 +910,6 @@ public class FsShell extends Configured implements Tool {
"\t\tto the local name. <src> is kept. When copying mutiple, \n" +
"\t\tfiles, the destination must be a directory. \n";
String cat = "-cat <src>: \tFetch all files that match the file pattern <src> \n" +
"\t\tand display their content on stdout.\n";
String text = "-text <src>: \tTakes a source file and outputs the file in text format.\n" +
"\t\tThe allowed formats are zip and TextRecordInputStream.\n";
String copyToLocal = COPYTOLOCAL_SHORT_USAGE
+ ": Identical to the -get command.\n";
@ -1091,16 +968,12 @@ public class FsShell extends Configured implements Tool {
System.out.println(copyToLocal);
} else if ("moveToLocal".equals(cmd)) {
System.out.println(moveToLocal);
} else if ("cat".equals(cmd)) {
System.out.println(cat);
} else if ("get".equals(cmd)) {
System.out.println(get);
} else if ("touchz".equals(cmd)) {
System.out.println(touchz);
} else if ("test".equals(cmd)) {
System.out.println(test);
} else if ("text".equals(cmd)) {
System.out.println(text);
} else if ("stat".equals(cmd)) {
System.out.println(stat);
} else if ("help".equals(cmd)) {
@ -1125,12 +998,10 @@ public class FsShell extends Configured implements Tool {
System.out.println(copyFromLocal);
System.out.println(moveFromLocal);
System.out.println(get);
System.out.println(cat);
System.out.println(copyToLocal);
System.out.println(moveToLocal);
System.out.println(touchz);
System.out.println(test);
System.out.println(text);
System.out.println(stat);
for (String thisCmdName : commandFactory.getNames()) {
@ -1181,9 +1052,7 @@ public class FsShell extends Configured implements Tool {
//
// issue the command to the fs
//
if ("-cat".equals(cmd)) {
cat(argv[i], true);
} else if ("-rm".equals(cmd)) {
if ("-rm".equals(cmd)) {
delete(argv[i], false, rmSkipTrash);
} else if ("-rmr".equals(cmd)) {
delete(argv[i], true, rmSkipTrash);
@ -1191,8 +1060,6 @@ public class FsShell extends Configured implements Tool {
df(argv[i]);
} else if ("-touchz".equals(cmd)) {
touchz(argv[i]);
} else if ("-text".equals(cmd)) {
text(argv[i]);
}
} catch (RemoteException e) {
LOG.debug("Error", e);
@ -1247,8 +1114,7 @@ public class FsShell extends Configured implements Tool {
System.err.println("Usage: java FsShell" +
" [-D <[property=value>]");
} else if ("-du".equals(cmd) || "-dus".equals(cmd) ||
"-touchz".equals(cmd) ||
"-text".equals(cmd)) {
"-touchz".equals(cmd)) {
System.err.println("Usage: java FsShell" +
" [" + cmd + " <path>]");
} else if ("-df".equals(cmd) ) {
@ -1271,9 +1137,6 @@ public class FsShell extends Configured implements Tool {
} else if ("-moveToLocal".equals(cmd)) {
System.err.println("Usage: java FsShell" +
" [" + cmd + " [-crc] <src> <localdst>]");
} else if ("-cat".equals(cmd)) {
System.err.println("Usage: java FsShell" +
" [" + cmd + " <src>]");
} else if ("-test".equals(cmd)) {
System.err.println("Usage: java FsShell" +
" [-test -[ezd] <path>]");
@ -1294,8 +1157,6 @@ public class FsShell extends Configured implements Tool {
System.err.println(" [-copyFromLocal <localsrc> ... <dst>]");
System.err.println(" [-moveFromLocal <localsrc> ... <dst>]");
System.err.println(" [" + GET_SHORT_USAGE + "]");
System.err.println(" [-cat <src>]");
System.err.println(" [-text <src>]");
System.err.println(" [" + COPYTOLOCAL_SHORT_USAGE + "]");
System.err.println(" [-moveToLocal [-crc] <src> <localdst>]");
System.err.println(" [-touchz <path>]");
@ -1350,9 +1211,7 @@ public class FsShell extends Configured implements Tool {
return exitCode;
}
} else if ("-rm".equals(cmd) || "-rmr".equals(cmd) ||
"-cat".equals(cmd) ||
"-touchz".equals(cmd) || "-stat".equals(cmd) ||
"-text".equals(cmd)) {
"-touchz".equals(cmd) || "-stat".equals(cmd)) {
if (argv.length < 2) {
printUsage(cmd);
return exitCode;
@ -1399,10 +1258,6 @@ public class FsShell extends Configured implements Tool {
moveFromLocal(srcs, argv[i++]);
} else if ("-get".equals(cmd) || "-copyToLocal".equals(cmd)) {
copyToLocal(argv, i);
} else if ("-cat".equals(cmd)) {
exitCode = doall(cmd, argv, i);
} else if ("-text".equals(cmd)) {
exitCode = doall(cmd, argv, i);
} else if ("-moveToLocal".equals(cmd)) {
moveToLocal(argv[i++], new Path(argv[i++]));
} else if ("-mv".equals(cmd)) {

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.shell;
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Display contents of files
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
class Display extends FsCommand {
public static void registerCommands(CommandFactory factory) {
factory.addClass(Cat.class, "-cat");
factory.addClass(Text.class, "-text");
}
@Override
protected String getFnfText(Path path) {
// TODO: this is a pretty inconsistent way to output the path...!!
// but, it's backwards compatible
try {
FileSystem fs = path.getFileSystem(getConf());
path = fs.makeQualified(path);
} catch (IOException e) {
// shouldn't happen, so just use path as-is
displayWarning("can't fully qualify "+path);
}
return "File does not exist: " + path.toUri().getPath();
}
/**
* Displays file content to stdout
*/
public static class Cat extends Display {
public static final String NAME = "cat";
public static final String USAGE = "[-ignoreCrc] <src> ...";
public static final String DESCRIPTION =
"Fetch all files that match the file pattern <src> \n" +
"and display their content on stdout.\n";
private boolean verifyChecksum = true;
@Override
protected void processOptions(LinkedList<String> args)
throws IOException {
CommandFormat cf = new CommandFormat(null, 1, Integer.MAX_VALUE, "ignoreCrc");
cf.parse(args);
verifyChecksum = !cf.getOpt("ignoreCrc");
}
@Override
protected void processPath(PathData item) throws IOException {
item.fs.setVerifyChecksum(verifyChecksum);
printToStdout(getInputStream(item));
}
private void printToStdout(InputStream in) throws IOException {
try {
IOUtils.copyBytes(in, out, getConf(), false);
} finally {
in.close();
}
}
protected InputStream getInputStream(PathData item) throws IOException {
return item.fs.open(item.path);
}
}
/**
* Same behavior as "-cat", but handles zip and TextRecordInputStream
* encodings.
*/
public static class Text extends Cat {
public static final String NAME = "text";
public static final String SHORT_USAGE = Cat.USAGE;
public static final String DESCRIPTION =
"Takes a source file and outputs the file in text format.\n" +
"The allowed formats are zip and TextRecordInputStream.";
@Override
protected InputStream getInputStream(PathData item) throws IOException {
FSDataInputStream i = (FSDataInputStream)super.getInputStream(item);
// check codecs
CompressionCodecFactory cf = new CompressionCodecFactory(getConf());
CompressionCodec codec = cf.getCodec(item.path);
if (codec != null) {
return codec.createInputStream(i);
}
switch(i.readShort()) {
case 0x1f8b: { // RFC 1952
i.seek(0);
return new GZIPInputStream(i);
}
case 0x5345: { // 'S' 'E'
if (i.readByte() == 'Q') {
i.close();
return new TextRecordInputStream(item.stat);
}
break;
}
}
i.seek(0);
return i;
}
}
protected class TextRecordInputStream extends InputStream {
SequenceFile.Reader r;
WritableComparable<?> key;
Writable val;
DataInputBuffer inbuf;
DataOutputBuffer outbuf;
public TextRecordInputStream(FileStatus f) throws IOException {
final Path fpath = f.getPath();
final Configuration lconf = getConf();
r = new SequenceFile.Reader(lconf,
SequenceFile.Reader.file(fpath));
key = ReflectionUtils.newInstance(
r.getKeyClass().asSubclass(WritableComparable.class), lconf);
val = ReflectionUtils.newInstance(
r.getValueClass().asSubclass(Writable.class), lconf);
inbuf = new DataInputBuffer();
outbuf = new DataOutputBuffer();
}
public int read() throws IOException {
int ret;
if (null == inbuf || -1 == (ret = inbuf.read())) {
if (!r.next(key, val)) {
return -1;
}
byte[] tmp = key.toString().getBytes();
outbuf.write(tmp, 0, tmp.length);
outbuf.write('\t');
tmp = val.toString().getBytes();
outbuf.write(tmp, 0, tmp.length);
outbuf.write('\n');
inbuf.reset(outbuf.getData(), outbuf.getLength());
outbuf.reset();
ret = inbuf.read();
}
return ret;
}
public void close() throws IOException {
r.close();
super.close();
}
}
}

View File

@ -45,6 +45,7 @@ abstract public class FsCommand extends Command {
public static void registerCommands(CommandFactory factory) {
factory.registerCommands(Copy.class);
factory.registerCommands(Count.class);
factory.registerCommands(Display.class);
factory.registerCommands(FsShellPermissions.class);
factory.registerCommands(Ls.class);
factory.registerCommands(Mkdir.class);

View File

@ -454,7 +454,7 @@
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^-cat &lt;src&gt;:( |\t)*Fetch all files that match the file pattern &lt;src&gt;( )*</expected-output>
<expected-output>^-cat \[-ignoreCrc\] &lt;src&gt; \.\.\.:( |\t)*Fetch all files that match the file pattern &lt;src&gt;( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>