HDFS-16173.Improve CopyCommands#Put#executor queue configurability. (#3302)

Co-authored-by: zhujianghua <zhujianghua@zhujianghuadeMacBook-Pro.local>
Reviewed-by: Hui Fei <ferhui@apache.org>
Reviewed-by: Viraj Jasani <vjasani@apache.org>
(cherry picked from commit 4c94831364)
(cherry picked from commit 7c663043b2)
This commit is contained in:
jianghuazhu 2021-08-27 11:41:44 +08:00 committed by Wei-Chiu Chuang
parent 3ea74033bf
commit dac74b0e11
No known key found for this signature in database
GPG Key ID: B362E1C021854B9D
4 changed files with 62 additions and 10 deletions

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsDirectoryException; import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Various commands for copy files */ /** Various commands for copy files */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -238,7 +240,11 @@ class CopyCommands {
* Copy local files to a remote filesystem * Copy local files to a remote filesystem
*/ */
public static class Put extends CommandWithDestination { public static class Put extends CommandWithDestination {
public static final Logger LOG = LoggerFactory.getLogger(Put.class);
private ThreadPoolExecutor executor = null; private ThreadPoolExecutor executor = null;
private int threadPoolQueueSize = 1024;
private int numThreads = 1; private int numThreads = 1;
private static final int MAX_THREADS = private static final int MAX_THREADS =
@ -246,7 +252,8 @@ class CopyCommands {
public static final String NAME = "put"; public static final String NAME = "put";
public static final String USAGE = public static final String USAGE =
"[-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>"; "[-f] [-p] [-l] [-d] [-t <thread count>] [-q <threadPool queue size>] " +
"<localsrc> ... <dst>";
public static final String DESCRIPTION = public static final String DESCRIPTION =
"Copy files from the local file system " + "Copy files from the local file system " +
"into fs. Copying fails if the file already " + "into fs. Copying fails if the file already " +
@ -255,6 +262,8 @@ class CopyCommands {
" -p : Preserves timestamps, ownership and the mode.\n" + " -p : Preserves timestamps, ownership and the mode.\n" +
" -f : Overwrites the destination if it already exists.\n" + " -f : Overwrites the destination if it already exists.\n" +
" -t <thread count> : Number of threads to be used, default is 1.\n" + " -t <thread count> : Number of threads to be used, default is 1.\n" +
" -q <threadPool size> : ThreadPool queue size to be used, " +
"default is 1024.\n" +
" -l : Allow DataNode to lazily persist the file to disk. Forces" + " -l : Allow DataNode to lazily persist the file to disk. Forces" +
" replication factor of 1. This flag will result in reduced" + " replication factor of 1. This flag will result in reduced" +
" durability. Use with care.\n" + " durability. Use with care.\n" +
@ -265,8 +274,10 @@ class CopyCommands {
CommandFormat cf = CommandFormat cf =
new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d"); new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
cf.addOptionWithValue("t"); cf.addOptionWithValue("t");
cf.addOptionWithValue("q");
cf.parse(args); cf.parse(args);
setNumberThreads(cf.getOptValue("t")); setNumberThreads(cf.getOptValue("t"));
setThreadPoolQueueSize(cf.getOptValue("q"));
setOverwrite(cf.getOpt("f")); setOverwrite(cf.getOpt("f"));
setPreserve(cf.getOpt("p")); setPreserve(cf.getOpt("p"));
setLazyPersist(cf.getOpt("l")); setLazyPersist(cf.getOpt("l"));
@ -298,7 +309,7 @@ class CopyCommands {
} }
executor = new ThreadPoolExecutor(numThreads, numThreads, 1, executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), TimeUnit.SECONDS, new ArrayBlockingQueue<>(threadPoolQueueSize),
new ThreadPoolExecutor.CallerRunsPolicy()); new ThreadPoolExecutor.CallerRunsPolicy());
super.processArguments(args); super.processArguments(args);
@ -328,6 +339,25 @@ class CopyCommands {
} }
} }
private void setThreadPoolQueueSize(String numThreadPoolQueueSize) {
if (numThreadPoolQueueSize != null) {
int parsedValue = Integer.parseInt(numThreadPoolQueueSize);
if (parsedValue < 1) {
LOG.warn("The value of the thread pool queue size cannot be " +
"less than 1, and the default value is used here. " +
"The default size is 1024.");
threadPoolQueueSize = 1024;
} else {
threadPoolQueueSize = parsedValue;
}
}
}
@VisibleForTesting
protected int getThreadPoolQueueSize() {
return threadPoolQueueSize;
}
private void copyFile(PathData src, PathData target) throws IOException { private void copyFile(PathData src, PathData target) throws IOException {
if (isPathRecursable(src)) { if (isPathRecursable(src)) {
throw new PathIsDirectoryException(src.toString()); throw new PathIsDirectoryException(src.toString());

View File

@ -509,7 +509,7 @@ Returns 0 on success and -1 on error.
put put
--- ---
Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t <thread count>] [ - | <localsrc1> .. ]. <dst>` Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t <thread count>] [-q <threadPool queue size>] [ - | <localsrc1> .. ]. <dst>`
Copy single src, or multiple srcs from local file system to the destination file system. Copy single src, or multiple srcs from local file system to the destination file system.
Also reads input from stdin and writes to destination file system if the source is set to "-" Also reads input from stdin and writes to destination file system if the source is set to "-"
@ -526,6 +526,7 @@ Options:
* `-l` : Allow DataNode to lazily persist the file to disk, Forces a replication * `-l` : Allow DataNode to lazily persist the file to disk, Forces a replication
factor of 1. This flag will result in reduced durability. Use with care. factor of 1. This flag will result in reduced durability. Use with care.
* `-d` : Skip creation of temporary file with the suffix `._COPYING_`. * `-d` : Skip creation of temporary file with the suffix `._COPYING_`.
* `-q <threadPool queue size>` : ThreadPool queue size to be used, default is 1024.
Examples: Examples:
@ -534,6 +535,7 @@ Examples:
* `hadoop fs -put -f localfile1 localfile2 /user/hadoop/hadoopdir` * `hadoop fs -put -f localfile1 localfile2 /user/hadoop/hadoopdir`
* `hadoop fs -put -d localfile hdfs://nn.example.com/hadoop/hadoopfile` * `hadoop fs -put -d localfile hdfs://nn.example.com/hadoop/hadoopfile`
* `hadoop fs -put - hdfs://nn.example.com/hadoop/hadoopfile` Reads the input from stdin. * `hadoop fs -put - hdfs://nn.example.com/hadoop/hadoopfile` Reads the input from stdin.
* `hadoop fs -put -q 500 localfile3 hdfs://nn.example.com/hadoop/hadoopfile3`
Exit Code: Exit Code:

View File

@ -122,6 +122,22 @@ public class TestCopyPreserveFlag {
assertAttributesChanged(TO); assertAttributesChanged(TO);
} }
@Test(timeout = 10000)
public void testPutWithPQ() throws Exception {
Put put = new Put();
run(put, "-p", "-q", "100", FROM.toString(), TO.toString());
assertEquals(put.getThreadPoolQueueSize(), 100);
assertAttributesPreserved(TO);
}
@Test(timeout = 10000)
public void testPutWithQ() throws Exception {
Put put = new Put();
run(put, "-q", "100", FROM.toString(), TO.toString());
assertEquals(put.getThreadPoolQueueSize(), 100);
assertAttributesChanged(TO);
}
@Test(timeout = 10000) @Test(timeout = 10000)
public void testPutWithSplCharacter() throws Exception { public void testPutWithSplCharacter() throws Exception {
fs.mkdirs(DIR_FROM_SPL); fs.mkdirs(DIR_FROM_SPL);

View File

@ -498,7 +498,7 @@
<type>RegexpComparator</type> <type>RegexpComparator</type>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
<expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output> <expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] \[-q &lt;threadPool queue size&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
</comparator> </comparator>
</comparator> </comparator>
<comparator> <comparator>
@ -525,6 +525,10 @@
<type>RegexpComparator</type> <type>RegexpComparator</type>
<expected-output>^\s*-t &lt;thread count&gt; Number of threads to be used, default is 1.( )*</expected-output> <expected-output>^\s*-t &lt;thread count&gt; Number of threads to be used, default is 1.( )*</expected-output>
</comparator> </comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*-q &lt;threadPool size&gt; ThreadPool queue size to be used, default is 1024.( )*</expected-output>
</comparator>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
<expected-output>^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output> <expected-output>^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output>
@ -554,7 +558,7 @@
<comparators> <comparators>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output> <expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] \[-q &lt;threadPool queue size&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
</comparator> </comparator>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>