HADOOP-14698. Make copyFromLocals -t option available for put as well. Contributed by Andras Bokor.
This commit is contained in:
parent
e2713d5788
commit
5b6c2b366e
|
@ -238,26 +238,35 @@ class CopyCommands {
|
|||
* Copy local files to a remote filesystem
|
||||
*/
|
||||
public static class Put extends CommandWithDestination {
|
||||
private ThreadPoolExecutor executor = null;
|
||||
private int numThreads = 1;
|
||||
|
||||
private static final int MAX_THREADS =
|
||||
Runtime.getRuntime().availableProcessors() * 2;
|
||||
|
||||
public static final String NAME = "put";
|
||||
public static final String USAGE =
|
||||
"[-f] [-p] [-l] [-d] <localsrc> ... <dst>";
|
||||
"[-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>";
|
||||
public static final String DESCRIPTION =
|
||||
"Copy files from the local file system " +
|
||||
"into fs. Copying fails if the file already " +
|
||||
"exists, unless the -f flag is given.\n" +
|
||||
"Flags:\n" +
|
||||
" -p : Preserves access and modification times, ownership and the mode.\n" +
|
||||
" -f : Overwrites the destination if it already exists.\n" +
|
||||
" -l : Allow DataNode to lazily persist the file to disk. Forces\n" +
|
||||
" replication factor of 1. This flag will result in reduced\n" +
|
||||
" durability. Use with care.\n" +
|
||||
"Copy files from the local file system " +
|
||||
"into fs. Copying fails if the file already " +
|
||||
"exists, unless the -f flag is given.\n" +
|
||||
"Flags:\n" +
|
||||
" -p : Preserves timestamps, ownership and the mode.\n" +
|
||||
" -f : Overwrites the destination if it already exists.\n" +
|
||||
" -t <thread count> : Number of threads to be used, default is 1.\n" +
|
||||
" -l : Allow DataNode to lazily persist the file to disk. Forces" +
|
||||
" replication factor of 1. This flag will result in reduced" +
|
||||
" durability. Use with care.\n" +
|
||||
" -d : Skip creation of temporary file(<dst>._COPYING_).\n";
|
||||
|
||||
@Override
|
||||
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||
CommandFormat cf =
|
||||
new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
|
||||
cf.addOptionWithValue("t");
|
||||
cf.parse(args);
|
||||
setNumberThreads(cf.getOptValue("t"));
|
||||
setOverwrite(cf.getOpt("f"));
|
||||
setPreserve(cf.getOpt("p"));
|
||||
setLazyPersist(cf.getOpt("l"));
|
||||
|
@ -287,32 +296,22 @@ class CopyCommands {
|
|||
copyStreamToTarget(System.in, getTargetPath(args.get(0)));
|
||||
return;
|
||||
}
|
||||
|
||||
executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
|
||||
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
|
||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
super.processArguments(args);
|
||||
|
||||
// issue the command and then wait for it to finish
|
||||
executor.shutdown();
|
||||
try {
|
||||
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
|
||||
} catch (InterruptedException e) {
|
||||
executor.shutdownNow();
|
||||
displayError(e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class CopyFromLocal extends Put {
|
||||
private ThreadPoolExecutor executor = null;
|
||||
private int numThreads = 1;
|
||||
|
||||
private static final int MAX_THREADS =
|
||||
Runtime.getRuntime().availableProcessors() * 2;
|
||||
public static final String NAME = "copyFromLocal";
|
||||
public static final String USAGE =
|
||||
"[-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>";
|
||||
public static final String DESCRIPTION =
|
||||
"Copy files from the local file system " +
|
||||
"into fs. Copying fails if the file already " +
|
||||
"exists, unless the -f flag is given.\n" +
|
||||
"Flags:\n" +
|
||||
" -p : Preserves access and modification times, ownership and the" +
|
||||
" mode.\n" +
|
||||
" -f : Overwrites the destination if it already exists.\n" +
|
||||
" -t <thread count> : Number of threads to be used, default is 1.\n" +
|
||||
" -l : Allow DataNode to lazily persist the file to disk. Forces" +
|
||||
" replication factor of 1. This flag will result in reduced" +
|
||||
" durability. Use with care.\n" +
|
||||
" -d : Skip creation of temporary file(<dst>._COPYING_).\n";
|
||||
|
||||
private void setNumberThreads(String numberThreadsString) {
|
||||
if (numberThreadsString == null) {
|
||||
|
@ -329,22 +328,6 @@ class CopyCommands {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||
CommandFormat cf =
|
||||
new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
|
||||
cf.addOptionWithValue("t");
|
||||
cf.parse(args);
|
||||
setNumberThreads(cf.getOptValue("t"));
|
||||
setOverwrite(cf.getOpt("f"));
|
||||
setPreserve(cf.getOpt("p"));
|
||||
setLazyPersist(cf.getOpt("l"));
|
||||
setDirectWrite(cf.getOpt("d"));
|
||||
getRemoteDestination(args);
|
||||
// should have a -r option
|
||||
setRecursive(true);
|
||||
}
|
||||
|
||||
private void copyFile(PathData src, PathData target) throws IOException {
|
||||
if (isPathRecursable(src)) {
|
||||
throw new PathIsDirectoryException(src.toString());
|
||||
|
@ -371,25 +354,6 @@ class CopyCommands {
|
|||
executor.submit(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processArguments(LinkedList<PathData> args)
|
||||
throws IOException {
|
||||
executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
|
||||
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
|
||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
super.processArguments(args);
|
||||
|
||||
// issue the command and then wait for it to finish
|
||||
executor.shutdown();
|
||||
try {
|
||||
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
|
||||
} catch (InterruptedException e) {
|
||||
executor.shutdownNow();
|
||||
displayError(e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getNumThreads() {
|
||||
return numThreads;
|
||||
|
@ -400,6 +364,12 @@ class CopyCommands {
|
|||
return executor;
|
||||
}
|
||||
}
|
||||
|
||||
public static class CopyFromLocal extends Put {
|
||||
public static final String NAME = "copyFromLocal";
|
||||
public static final String USAGE = Put.USAGE;
|
||||
public static final String DESCRIPTION = "Identical to the -put command.";
|
||||
}
|
||||
|
||||
public static class CopyToLocal extends Get {
|
||||
public static final String NAME = "copyToLocal";
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.PathIOException;
|
||||
import org.apache.hadoop.fs.PathExistsException;
|
||||
import org.apache.hadoop.fs.shell.CopyCommands.Put;
|
||||
import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
|
||||
|
||||
/** Various commands for moving files */
|
||||
@InterfaceAudience.Private
|
||||
|
@ -41,12 +41,22 @@ class MoveCommands {
|
|||
/**
|
||||
* Move local files to a remote filesystem
|
||||
*/
|
||||
public static class MoveFromLocal extends Put {
|
||||
public static class MoveFromLocal extends CopyFromLocal {
|
||||
public static final String NAME = "moveFromLocal";
|
||||
public static final String USAGE = "<localsrc> ... <dst>";
|
||||
public static final String USAGE =
|
||||
"[-f] [-p] [-l] [-d] <localsrc> ... <dst>";
|
||||
public static final String DESCRIPTION =
|
||||
"Same as -put, except that the source is " +
|
||||
"deleted after it's copied.";
|
||||
"Same as -put, except that the source is " +
|
||||
"deleted after it's copied\n" +
|
||||
"and -t option has not yet implemented.";
|
||||
|
||||
@Override
|
||||
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||
if(args.contains("-t")) {
|
||||
throw new CommandFormat.UnknownOptionException("-t");
|
||||
}
|
||||
super.processOptions(args);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processPath(PathData src, PathData target) throws IOException {
|
||||
|
|
|
@ -522,7 +522,7 @@ Returns 0 on success and -1 on error.
|
|||
put
|
||||
---
|
||||
|
||||
Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [ - | <localsrc1> .. ]. <dst>`
|
||||
Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t <thread count>] [ - | <localsrc1> .. ]. <dst>`
|
||||
|
||||
Copy single src, or multiple srcs from local file system to the destination file system.
|
||||
Also reads input from stdin and writes to destination file system if the source is set to "-"
|
||||
|
@ -534,6 +534,8 @@ Options:
|
|||
* `-p` : Preserves access and modification times, ownership and the permissions.
|
||||
(assuming the permissions can be propagated across filesystems)
|
||||
* `-f` : Overwrites the destination if it already exists.
|
||||
* `-t <thread count>` : Number of threads to be used, default is 1. Useful
|
||||
when uploading a directory containing more than 1 file.
|
||||
* `-l` : Allow DataNode to lazily persist the file to disk, Forces a replication
|
||||
factor of 1. This flag will result in reduced durability. Use with care.
|
||||
* `-d` : Skip creation of temporary file with the suffix `._COPYING_`.
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FilterFileSystem;
|
||||
import org.apache.hadoop.fs.PathExistsException;
|
||||
import org.apache.hadoop.fs.shell.CommandFormat.UnknownOptionException;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -93,6 +94,12 @@ public class TestMove {
|
|||
assertTrue("Rename should have failed with path exists exception",
|
||||
cmd.error instanceof PathExistsException);
|
||||
}
|
||||
|
||||
@Test(expected = UnknownOptionException.class)
|
||||
public void testMoveFromLocalDoesNotAllowTOption() {
|
||||
new MoveCommands.MoveFromLocal().run("-t", "2",
|
||||
null, null);
|
||||
}
|
||||
|
||||
static class MockFileSystem extends FilterFileSystem {
|
||||
Configuration conf;
|
||||
|
|
|
@ -496,7 +496,10 @@
|
|||
<comparators>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] <localsrc> \.\.\. <dst> :( )*</expected-output>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] <localsrc> \.\.\. <dst> :\s*</expected-output>
|
||||
</comparator>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
|
@ -512,66 +515,11 @@
|
|||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s*-p Preserves access and modification times, ownership and the mode.( )*</expected-output>
|
||||
<expected-output>^\s*-p Preserves timestamps, ownership and the mode.( )*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s*replication factor of 1. This flag will result in reduced( )*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s*durability. Use with care.( )*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s*-d Skip creation of temporary file\(<dst>\._COPYING_\).( )*</expected-output>
|
||||
</comparator>
|
||||
</comparators>
|
||||
</test>
|
||||
|
||||
<test> <!-- TESTED -->
|
||||
<description>help: help for copyFromLocal</description>
|
||||
<test-commands>
|
||||
<command>-help copyFromLocal</command>
|
||||
</test-commands>
|
||||
<cleanup-commands>
|
||||
</cleanup-commands>
|
||||
<comparators>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] <localsrc> \.\.\. <dst> :\s*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s*Copy files from the local file system into fs.( )*Copying fails if the file already( )*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s*exists, unless the -f flag is given.( )*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s*Flags:( )*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s*-p Preserves access and modification times, ownership and the( )*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s*mode.( )*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output>
|
||||
<expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
|
@ -596,6 +544,25 @@
|
|||
</comparators>
|
||||
</test>
|
||||
|
||||
<test> <!-- TESTED -->
|
||||
<description>help: help for copyFromLocal</description>
|
||||
<test-commands>
|
||||
<command>-help copyFromLocal</command>
|
||||
</test-commands>
|
||||
<cleanup-commands>
|
||||
</cleanup-commands>
|
||||
<comparators>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] <localsrc> \.\.\. <dst> :\s*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s*Identical to the -put command\.\s*</expected-output>
|
||||
</comparator>
|
||||
</comparators>
|
||||
</test>
|
||||
|
||||
<test> <!-- TESTED -->
|
||||
<description>help: help for moveFromLocal</description>
|
||||
<test-commands>
|
||||
|
@ -606,11 +573,14 @@
|
|||
<comparators>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^-moveFromLocal <localsrc> \.\.\. <dst> :\s*</expected-output>
|
||||
<expected-output>^-moveFromLocal \[-f\] \[-p\] \[-l\] \[-d\] <localsrc> \.\.\. <dst> :\s*</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^( |\t)*Same as -put, except that the source is deleted after it's copied.</expected-output>
|
||||
<expected-output>^( |\t)*Same as -put, except that the source is deleted after it's copied</expected-output>
|
||||
</comparator><comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^\s* and -t option has not yet implemented.</expected-output>
|
||||
</comparator>
|
||||
</comparators>
|
||||
</test>
|
||||
|
|
Loading…
Reference in New Issue