From b803efbdced9d02d9caa24f967b6bf4ef85e9470 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Fri, 29 May 2020 10:32:37 +0100 Subject: [PATCH] HADOOP-14698. Make copyFromLocals -t option available for put as well. Contributed by Andras Bokor. --- .../apache/hadoop/fs/shell/CopyCommands.java | 108 +++++++----------- .../apache/hadoop/fs/shell/MoveCommands.java | 20 +++- .../src/site/markdown/FileSystemShell.md | 4 +- .../org/apache/hadoop/fs/shell/TestMove.java | 7 ++ .../src/test/resources/testConf.xml | 90 +++++---------- 5 files changed, 94 insertions(+), 135 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java index 4622c75fbd4..39958a9cb1c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java @@ -239,26 +239,35 @@ class CopyCommands { * Copy local files to a remote filesystem */ public static class Put extends CommandWithDestination { + private ThreadPoolExecutor executor = null; + private int numThreads = 1; + + private static final int MAX_THREADS = + Runtime.getRuntime().availableProcessors() * 2; + public static final String NAME = "put"; public static final String USAGE = - "[-f] [-p] [-l] [-d] ... "; + "[-f] [-p] [-l] [-d] [-t ] ... "; public static final String DESCRIPTION = - "Copy files from the local file system " + - "into fs. Copying fails if the file already " + - "exists, unless the -f flag is given.\n" + - "Flags:\n" + - " -p : Preserves access and modification times, ownership and the mode.\n" + - " -f : Overwrites the destination if it already exists.\n" + - " -l : Allow DataNode to lazily persist the file to disk. Forces\n" + - " replication factor of 1. This flag will result in reduced\n" + - " durability. Use with care.\n" + + "Copy files from the local file system " + + "into fs. Copying fails if the file already " + + "exists, unless the -f flag is given.\n" + + "Flags:\n" + + " -p : Preserves timestamps, ownership and the mode.\n" + + " -f : Overwrites the destination if it already exists.\n" + + " -t : Number of threads to be used, default is 1.\n" + + " -l : Allow DataNode to lazily persist the file to disk. Forces" + + " replication factor of 1. This flag will result in reduced" + + " durability. Use with care.\n" + " -d : Skip creation of temporary file(._COPYING_).\n"; @Override protected void processOptions(LinkedList args) throws IOException { CommandFormat cf = new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d"); + cf.addOptionWithValue("t"); cf.parse(args); + setNumberThreads(cf.getOptValue("t")); setOverwrite(cf.getOpt("f")); setPreserve(cf.getOpt("p")); setLazyPersist(cf.getOpt("l")); @@ -288,32 +297,22 @@ class CopyCommands { copyStreamToTarget(System.in, getTargetPath(args.get(0))); return; } + + executor = new ThreadPoolExecutor(numThreads, numThreads, 1, + TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), + new ThreadPoolExecutor.CallerRunsPolicy()); super.processArguments(args); + + // issue the command and then wait for it to finish + executor.shutdown(); + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); + } catch (InterruptedException e) { + executor.shutdownNow(); + displayError(e); + Thread.currentThread().interrupt(); + } } - } - - public static class CopyFromLocal extends Put { - private ThreadPoolExecutor executor = null; - private int numThreads = 1; - - private static final int MAX_THREADS = - Runtime.getRuntime().availableProcessors() * 2; - public static final String NAME = "copyFromLocal"; - public static final String USAGE = - "[-f] [-p] [-l] [-d] [-t ] ... "; - public static final String DESCRIPTION = - "Copy files from the local file system " + - "into fs. Copying fails if the file already " + - "exists, unless the -f flag is given.\n" + - "Flags:\n" + - " -p : Preserves access and modification times, ownership and the" + - " mode.\n" + - " -f : Overwrites the destination if it already exists.\n" + - " -t : Number of threads to be used, default is 1.\n" + - " -l : Allow DataNode to lazily persist the file to disk. Forces" + - " replication factor of 1. This flag will result in reduced" + - " durability. Use with care.\n" + - " -d : Skip creation of temporary file(._COPYING_).\n"; private void setNumberThreads(String numberThreadsString) { if (numberThreadsString == null) { @@ -330,22 +329,6 @@ class CopyCommands { } } - @Override - protected void processOptions(LinkedList args) throws IOException { - CommandFormat cf = - new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d"); - cf.addOptionWithValue("t"); - cf.parse(args); - setNumberThreads(cf.getOptValue("t")); - setOverwrite(cf.getOpt("f")); - setPreserve(cf.getOpt("p")); - setLazyPersist(cf.getOpt("l")); - setDirectWrite(cf.getOpt("d")); - getRemoteDestination(args); - // should have a -r option - setRecursive(true); - } - private void copyFile(PathData src, PathData target) throws IOException { if (isPathRecursable(src)) { throw new PathIsDirectoryException(src.toString()); @@ -372,25 +355,6 @@ class CopyCommands { executor.submit(task); } - @Override - protected void processArguments(LinkedList args) - throws IOException { - executor = new ThreadPoolExecutor(numThreads, numThreads, 1, - TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), - new ThreadPoolExecutor.CallerRunsPolicy()); - super.processArguments(args); - - // issue the command and then wait for it to finish - executor.shutdown(); - try { - executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); - } catch (InterruptedException e) { - executor.shutdownNow(); - displayError(e); - Thread.currentThread().interrupt(); - } - } - @VisibleForTesting public int getNumThreads() { return numThreads; @@ -401,6 +365,12 @@ class CopyCommands { return executor; } } + + public static class CopyFromLocal extends Put { + public static final String NAME = "copyFromLocal"; + public static final String USAGE = Put.USAGE; + public static final String DESCRIPTION = "Identical to the -put command."; + } public static class CopyToLocal extends Get { public static final String NAME = "copyToLocal"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java index 5ef42775ea5..c20293e1a5a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java @@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.PathExistsException; -import org.apache.hadoop.fs.shell.CopyCommands.Put; +import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal; /** Various commands for moving files */ @InterfaceAudience.Private @@ -41,12 +41,22 @@ class MoveCommands { /** * Move local files to a remote filesystem */ - public static class MoveFromLocal extends Put { + public static class MoveFromLocal extends CopyFromLocal { public static final String NAME = "moveFromLocal"; - public static final String USAGE = " ... "; + public static final String USAGE = + "[-f] [-p] [-l] [-d] ... "; public static final String DESCRIPTION = - "Same as -put, except that the source is " + - "deleted after it's copied."; + "Same as -put, except that the source is " + + "deleted after it's copied\n" + + "and -t option has not yet implemented."; + + @Override + protected void processOptions(LinkedList args) throws IOException { + if(args.contains("-t")) { + throw new CommandFormat.UnknownOptionException("-t"); + } + super.processOptions(args); + } @Override protected void processPath(PathData src, PathData target) throws IOException { diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md index 7df2cce574b..463aaa37240 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md @@ -535,7 +535,7 @@ Returns 0 on success and -1 on error. put --- -Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [ - | .. ]. ` +Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t ] [ - | .. ]. ` Copy single src, or multiple srcs from local file system to the destination file system. Also reads input from stdin and writes to destination file system if the source is set to "-" @@ -547,6 +547,8 @@ Options: * `-p` : Preserves access and modification times, ownership and the permissions. (assuming the permissions can be propagated across filesystems) * `-f` : Overwrites the destination if it already exists. +* `-t ` : Number of threads to be used, default is 1. Useful + when uploading a directory containing more than 1 file. * `-l` : Allow DataNode to lazily persist the file to disk, Forces a replication factor of 1. This flag will result in reduced durability. Use with care. * `-d` : Skip creation of temporary file with the suffix `._COPYING_`. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestMove.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestMove.java index 1f379448ee8..b9e87d3dace 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestMove.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestMove.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.shell.CommandFormat.UnknownOptionException; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -93,6 +94,12 @@ public class TestMove { assertTrue("Rename should have failed with path exists exception", cmd.error instanceof PathExistsException); } + + @Test(expected = UnknownOptionException.class) + public void testMoveFromLocalDoesNotAllowTOption() { + new MoveCommands.MoveFromLocal().run("-t", "2", + null, null); + } static class MockFileSystem extends FilterFileSystem { Configuration conf; diff --git a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml index 392d39170d5..1874621e1b5 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml @@ -496,7 +496,10 @@ RegexpComparator - ^-put \[-f\] \[-p\] \[-l\] \[-d\] <localsrc> \.\.\. <dst> :( )* + + RegexpComparator + ^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] <localsrc> \.\.\. <dst> :\s* + RegexpComparator @@ -512,66 +515,11 @@ RegexpComparator - ^\s*-p Preserves access and modification times, ownership and the mode.( )* + ^\s*-p Preserves timestamps, ownership and the mode.( )* RegexpComparator - ^\s*-f Overwrites the destination if it already exists.( )* - - - RegexpComparator - ^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )* - - - RegexpComparator - ^\s*replication factor of 1. This flag will result in reduced( )* - - - RegexpComparator - ^\s*durability. Use with care.( )* - - - RegexpComparator - ^\s*-d Skip creation of temporary file\(<dst>\._COPYING_\).( )* - - - - - - help: help for copyFromLocal - - -help copyFromLocal - - - - - - RegexpComparator - ^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] <localsrc> \.\.\. <dst> :\s* - - - RegexpComparator - ^\s*Copy files from the local file system into fs.( )*Copying fails if the file already( )* - - - RegexpComparator - ^\s*exists, unless the -f flag is given.( )* - - - RegexpComparator - ^\s*Flags:( )* - - - RegexpComparator - ^\s*-p Preserves access and modification times, ownership and the( )* - - - RegexpComparator - ^\s*mode.( )* - - - RegexpComparator - ^\s*-f Overwrites the destination if it already exists.( )* + ^\s*-f Overwrites the destination if it already exists.( )* RegexpComparator @@ -596,6 +544,25 @@ + + help: help for copyFromLocal + + -help copyFromLocal + + + + + + RegexpComparator + ^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] <localsrc> \.\.\. <dst> :\s* + + + RegexpComparator + ^\s*Identical to the -put command\.\s* + + + + help: help for moveFromLocal @@ -606,11 +573,14 @@ RegexpComparator - ^-moveFromLocal <localsrc> \.\.\. <dst> :\s* + ^-moveFromLocal \[-f\] \[-p\] \[-l\] \[-d\] <localsrc> \.\.\. <dst> :\s* RegexpComparator - ^( |\t)*Same as -put, except that the source is deleted after it's copied. + ^( |\t)*Same as -put, except that the source is deleted after it's copied + + RegexpComparator + ^\s* and -t option has not yet implemented.