HADOOP-7250. Refactor the setrep command to conform to new FsCommand class. Contributed by Daryn Sharp
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1099633 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
369a203915
commit
b9599b26dc
|
@ -111,6 +111,9 @@ Trunk (unreleased changes)
|
|||
HADOOP-7236. Refactor the mkdir command to conform to new FsCommand class.
|
||||
(Daryn Sharp via szetszwo)
|
||||
|
||||
HADOOP-7250. Refactor the setrep command to conform to new FsCommand class.
|
||||
(Daryn Sharp via szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -72,7 +72,6 @@ public class FsShell extends Configured implements Tool {
|
|||
static {
|
||||
modifFmt.setTimeZone(TimeZone.getTimeZone("UTC"));
|
||||
}
|
||||
static final String SETREP_SHORT_USAGE="-setrep [-R] [-w] <rep> <path/file>";
|
||||
static final String GET_SHORT_USAGE = "-get [-ignoreCrc] [-crc] <src> <localdst>";
|
||||
static final String COPYTOLOCAL_SHORT_USAGE = GET_SHORT_USAGE.replace(
|
||||
"-get", "-copyToLocal");
|
||||
|
@ -467,150 +466,6 @@ public class FsShell extends Configured implements Tool {
|
|||
}
|
||||
}.globAndProcess(srcPattern, srcPattern.getFileSystem(getConf()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the incoming command string
|
||||
* @param cmd
|
||||
* @param pos ignore anything before this pos in cmd
|
||||
* @throws IOException
|
||||
*/
|
||||
private void setReplication(String[] cmd, int pos) throws IOException {
|
||||
CommandFormat c = new CommandFormat("setrep", 2, 2, "R", "w");
|
||||
String dst = null;
|
||||
short rep = 0;
|
||||
|
||||
try {
|
||||
List<String> parameters = c.parse(cmd, pos);
|
||||
rep = Short.parseShort(parameters.get(0));
|
||||
dst = parameters.get(1);
|
||||
} catch (NumberFormatException nfe) {
|
||||
System.err.println("Illegal replication, a positive integer expected");
|
||||
throw nfe;
|
||||
}
|
||||
catch(IllegalArgumentException iae) {
|
||||
System.err.println("Usage: java FsShell " + SETREP_SHORT_USAGE);
|
||||
throw iae;
|
||||
}
|
||||
|
||||
if (rep < 1) {
|
||||
System.err.println("Cannot set replication to: " + rep);
|
||||
throw new IllegalArgumentException("replication must be >= 1");
|
||||
}
|
||||
|
||||
List<Path> waitList = c.getOpt("w")? new ArrayList<Path>(): null;
|
||||
setReplication(rep, dst, c.getOpt("R"), waitList);
|
||||
|
||||
if (waitList != null) {
|
||||
waitForReplication(waitList, rep);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for all files in waitList to have replication number equal to rep.
|
||||
* @param waitList The files are waited for.
|
||||
* @param rep The new replication number.
|
||||
* @throws IOException IOException
|
||||
*/
|
||||
void waitForReplication(List<Path> waitList, int rep) throws IOException {
|
||||
for(Path f : waitList) {
|
||||
System.out.print("Waiting for " + f + " ...");
|
||||
System.out.flush();
|
||||
|
||||
boolean printWarning = false;
|
||||
FileSystem pFS = f.getFileSystem(getConf());
|
||||
FileStatus status = pFS.getFileStatus(f);
|
||||
long len = status.getLen();
|
||||
|
||||
for(boolean done = false; !done; ) {
|
||||
BlockLocation[] locations = pFS.getFileBlockLocations(status, 0, len);
|
||||
int i = 0;
|
||||
for(; i < locations.length &&
|
||||
locations[i].getHosts().length == rep; i++)
|
||||
if (!printWarning && locations[i].getHosts().length > rep) {
|
||||
System.out.println("\nWARNING: the waiting time may be long for "
|
||||
+ "DECREASING the number of replication.");
|
||||
printWarning = true;
|
||||
}
|
||||
done = i == locations.length;
|
||||
|
||||
if (!done) {
|
||||
System.out.print(".");
|
||||
System.out.flush();
|
||||
try {Thread.sleep(10000);} catch (InterruptedException e) {}
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println(" done");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the replication for files that match file pattern <i>srcf</i>
|
||||
* if it's a directory and recursive is true,
|
||||
* set replication for all the subdirs and those files too.
|
||||
* @param newRep new replication factor
|
||||
* @param srcf a file pattern specifying source files
|
||||
* @param recursive if need to set replication factor for files in subdirs
|
||||
* @throws IOException
|
||||
* @see org.apache.hadoop.fs.FileSystem#globStatus(Path)
|
||||
*/
|
||||
void setReplication(short newRep, String srcf, boolean recursive,
|
||||
List<Path> waitingList)
|
||||
throws IOException {
|
||||
Path srcPath = new Path(srcf);
|
||||
FileSystem srcFs = srcPath.getFileSystem(getConf());
|
||||
Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPath),
|
||||
srcPath);
|
||||
for(int i=0; i<srcs.length; i++) {
|
||||
setReplication(newRep, srcFs, srcs[i], recursive, waitingList);
|
||||
}
|
||||
}
|
||||
|
||||
private void setReplication(short newRep, FileSystem srcFs,
|
||||
Path src, boolean recursive,
|
||||
List<Path> waitingList)
|
||||
throws IOException {
|
||||
if (srcFs.getFileStatus(src).isFile()) {
|
||||
setFileReplication(src, srcFs, newRep, waitingList);
|
||||
return;
|
||||
}
|
||||
FileStatus items[];
|
||||
try {
|
||||
items = srcFs.listStatus(src);
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
throw new IOException("Could not get listing for " + src);
|
||||
}
|
||||
|
||||
for (int i = 0; i < items.length; i++) {
|
||||
if (items[i].isFile()) {
|
||||
setFileReplication(items[i].getPath(), srcFs, newRep, waitingList);
|
||||
} else if (items[i].isSymlink()) {
|
||||
throw new AssertionError("Symlinks unsupported");
|
||||
} else if (recursive) {
|
||||
setReplication(newRep, srcFs, items[i].getPath(), recursive,
|
||||
waitingList);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Actually set the replication for this file
|
||||
* If it fails either throw IOException or print an error msg
|
||||
* @param file: a file/directory
|
||||
* @param newRep: new replication factor
|
||||
* @throws IOException
|
||||
*/
|
||||
private void setFileReplication(Path file, FileSystem srcFs, short newRep, List<Path> waitList)
|
||||
throws IOException {
|
||||
if (srcFs.setReplication(file, newRep)) {
|
||||
if (waitList != null) {
|
||||
waitList.add(file);
|
||||
}
|
||||
System.out.println("Replication " + newRep + " set: " + file);
|
||||
} else {
|
||||
System.err.println("Could not set replication for: " + file);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Show the size of a partition in the filesystem that contains
|
||||
|
@ -1250,7 +1105,7 @@ public class FsShell extends Configured implements Tool {
|
|||
GET_SHORT_USAGE + "\n\t" +
|
||||
"[-getmerge <src> <localdst> [addnl]] [-cat <src>]\n\t" +
|
||||
"[" + COPYTOLOCAL_SHORT_USAGE + "] [-moveToLocal <src> <localdst>]\n\t" +
|
||||
"[-report] [" + SETREP_SHORT_USAGE + "]\n\t" +
|
||||
"[-report]\n\t" +
|
||||
"[-touchz <path>] [-test -[ezd] <path>] [-stat [format] <path>]\n\t" +
|
||||
"[-text <path>]\n\t" +
|
||||
"[" + FsShellPermissions.CHMOD_USAGE + "]\n\t" +
|
||||
|
@ -1340,11 +1195,6 @@ public class FsShell extends Configured implements Tool {
|
|||
|
||||
String moveToLocal = "-moveToLocal <src> <localdst>: Not implemented yet \n";
|
||||
|
||||
String setrep = SETREP_SHORT_USAGE
|
||||
+ ": Set the replication level of a file. \n"
|
||||
+ "\t\tThe -R flag requests a recursive change of replication level \n"
|
||||
+ "\t\tfor an entire tree.\n";
|
||||
|
||||
String touchz = "-touchz <path>: Creates a file of zero length\n"
|
||||
+ "\t\t at <path> with current time as the timestamp of that <path>.\n"
|
||||
+ "\t\t An error is returned if the file exists with non-zero length\n";
|
||||
|
@ -1436,8 +1286,6 @@ public class FsShell extends Configured implements Tool {
|
|||
System.out.println(cat);
|
||||
} else if ("get".equals(cmd)) {
|
||||
System.out.println(get);
|
||||
} else if ("setrep".equals(cmd)) {
|
||||
System.out.println(setrep);
|
||||
} else if ("touchz".equals(cmd)) {
|
||||
System.out.println(touchz);
|
||||
} else if ("test".equals(cmd)) {
|
||||
|
@ -1478,7 +1326,6 @@ public class FsShell extends Configured implements Tool {
|
|||
System.out.println(cat);
|
||||
System.out.println(copyToLocal);
|
||||
System.out.println(moveToLocal);
|
||||
System.out.println(setrep);
|
||||
System.out.println(touchz);
|
||||
System.out.println(test);
|
||||
System.out.println(text);
|
||||
|
@ -1628,8 +1475,6 @@ public class FsShell extends Configured implements Tool {
|
|||
} else if ("-cat".equals(cmd)) {
|
||||
System.err.println("Usage: java FsShell" +
|
||||
" [" + cmd + " <src>]");
|
||||
} else if ("-setrep".equals(cmd)) {
|
||||
System.err.println("Usage: java FsShell [" + SETREP_SHORT_USAGE + "]");
|
||||
} else if ("-test".equals(cmd)) {
|
||||
System.err.println("Usage: java FsShell" +
|
||||
" [-test -[ezd] <path>]");
|
||||
|
@ -1655,7 +1500,6 @@ public class FsShell extends Configured implements Tool {
|
|||
System.err.println(" [-text <src>]");
|
||||
System.err.println(" [" + COPYTOLOCAL_SHORT_USAGE + "]");
|
||||
System.err.println(" [-moveToLocal [-crc] <src> <localdst>]");
|
||||
System.err.println(" [" + SETREP_SHORT_USAGE + "]");
|
||||
System.err.println(" [-touchz <path>]");
|
||||
System.err.println(" [-test -[ezd] <path>]");
|
||||
System.err.println(" [-stat [format] <path>]");
|
||||
|
@ -1771,8 +1615,6 @@ public class FsShell extends Configured implements Tool {
|
|||
exitCode = doall(cmd, argv, i);
|
||||
} else if ("-moveToLocal".equals(cmd)) {
|
||||
moveToLocal(argv[i++], new Path(argv[i++]));
|
||||
} else if ("-setrep".equals(cmd)) {
|
||||
setReplication(argv, i);
|
||||
} else if ("-chmod".equals(cmd) ||
|
||||
"-chown".equals(cmd) ||
|
||||
"-chgrp".equals(cmd)) {
|
||||
|
|
|
@ -45,6 +45,7 @@ abstract public class FsCommand extends Command {
|
|||
factory.registerCommands(Count.class);
|
||||
factory.registerCommands(Ls.class);
|
||||
factory.registerCommands(Mkdir.class);
|
||||
factory.registerCommands(SetReplication.class);
|
||||
factory.registerCommands(Tail.class);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.shell;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
/**
|
||||
* Modifies the replication factor
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
|
||||
public class SetReplication extends FsCommand {
|
||||
public static void registerCommands(CommandFactory factory) {
|
||||
factory.addClass(SetReplication.class, "-setrep");
|
||||
}
|
||||
|
||||
public static final String NAME = "setrep";
|
||||
public static final String USAGE = "[-R] [-w] <rep> <path/file> ...";
|
||||
public static final String DESCRIPTION =
|
||||
"Set the replication level of a file.\n" +
|
||||
"The -R flag requests a recursive change of replication level\n" +
|
||||
"for an entire tree.";
|
||||
|
||||
protected short newRep = 0;
|
||||
protected List<PathData> waitList = new LinkedList<PathData>();
|
||||
protected boolean waitOpt = false;
|
||||
|
||||
@Override
|
||||
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||
CommandFormat cf = new CommandFormat(null, 2, Integer.MAX_VALUE, "R", "w");
|
||||
cf.parse(args);
|
||||
waitOpt = cf.getOpt("w");
|
||||
setRecursive(cf.getOpt("R"));
|
||||
|
||||
try {
|
||||
newRep = Short.parseShort(args.removeFirst());
|
||||
} catch (NumberFormatException nfe) {
|
||||
displayWarning("Illegal replication, a positive integer expected");
|
||||
throw nfe;
|
||||
}
|
||||
if (newRep < 1) {
|
||||
throw new IllegalArgumentException("replication must be >= 1");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processArguments(LinkedList<PathData> args)
|
||||
throws IOException {
|
||||
super.processArguments(args);
|
||||
if (waitOpt) waitForReplication();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processPath(PathData item) throws IOException {
|
||||
if (item.stat.isSymlink()) throw new IOException("Symlinks unsupported");
|
||||
|
||||
if (item.stat.isFile()) {
|
||||
if (!item.fs.setReplication(item.path, newRep)) {
|
||||
throw new IOException("Could not set replication for: " + item);
|
||||
}
|
||||
out.println("Replication " + newRep + " set: " + item);
|
||||
if (waitOpt) waitList.add(item);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for all files in waitList to have replication number equal to rep.
|
||||
*/
|
||||
private void waitForReplication() throws IOException {
|
||||
for (PathData item : waitList) {
|
||||
out.print("Waiting for " + item + " ...");
|
||||
out.flush();
|
||||
|
||||
boolean printedWarning = false;
|
||||
boolean done = false;
|
||||
while (!done) {
|
||||
item.refreshStatus();
|
||||
BlockLocation[] locations =
|
||||
item.fs.getFileBlockLocations(item.stat, 0, item.stat.getLen());
|
||||
|
||||
int i = 0;
|
||||
for(; i < locations.length; i++) {
|
||||
int currentRep = locations[i].getHosts().length;
|
||||
if (currentRep != newRep) {
|
||||
if (!printedWarning && currentRep > newRep) {
|
||||
out.println("\nWARNING: the waiting time may be long for "
|
||||
+ "DECREASING the number of replications.");
|
||||
printedWarning = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
done = i == locations.length;
|
||||
if (done) break;
|
||||
|
||||
out.print(".");
|
||||
out.flush();
|
||||
try {Thread.sleep(10000);} catch (InterruptedException e) {}
|
||||
}
|
||||
out.println(" done");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getFnfText(Path path) {
|
||||
return "File does not exist: " + path;
|
||||
}
|
||||
}
|
|
@ -518,7 +518,7 @@
|
|||
<comparators>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^-setrep \[-R\] \[-w\] <rep> <path/file>:( )*Set the replication level of a file.( )*</expected-output>
|
||||
<expected-output>^-setrep \[-R\] \[-w\] <rep> <path/file> \.\.\.:( |\t)*Set the replication level of a file.( )*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
|
|
Loading…
Reference in New Issue