From 4e9b9adb964935b4737ee877ae737dd2c46d15ed Mon Sep 17 00:00:00 2001 From: smarthan <1139557635@qq.com> Date: Mon, 29 Nov 2021 20:45:08 +0800 Subject: [PATCH] HADOOP-18023. Allow cp command to run with multi threads. (#3721) (cherry picked from commit 932a78fe38b34a923f6852a1a19482075806ecba) (cherry picked from commit 2f3e18697802cce81eb3c976088583ef365daea0) --- .../apache/hadoop/fs/shell/CopyCommands.java | 39 ++-- .../src/site/markdown/FileSystemShell.md | 11 +- .../apache/hadoop/fs/shell/TestCpCommand.java | 210 ++++++++++++++++++ .../src/test/resources/testConf.xml | 47 ++-- 4 files changed, 275 insertions(+), 32 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCpCommand.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java index 73729927f7a..7b2e83543e9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java @@ -146,33 +146,40 @@ class CopyCommands { } } - static class Cp extends CommandWithDestination { + static class Cp extends CopyCommandWithMultiThread { public static final String NAME = "cp"; public static final String USAGE = - "[-f] [-p | -p[topax]] [-d] ... "; + "[-f] [-p | -p[topax]] [-d] [-t ]" + + " [-q ] ... "; public static final String DESCRIPTION = - "Copy files that match the file pattern to a " + - "destination. When copying multiple files, the destination " + - "must be a directory. Passing -p preserves status " + - "[topax] (timestamps, ownership, permission, ACLs, XAttr). " + - "If -p is specified with no , then preserves " + - "timestamps, ownership, permission. If -pa is specified, " + - "then preserves permission also because ACL is a super-set of " + - "permission. Passing -f overwrites the destination if it " + - "already exists. raw namespace extended attributes are preserved " + - "if (1) they are supported (HDFS only) and, (2) all of the source and " + - "target pathnames are in the /.reserved/raw hierarchy. raw namespace " + - "xattr preservation is determined solely by the presence (or absence) " + - "of the /.reserved/raw prefix and not by the -p option. Passing -d "+ - "will skip creation of temporary file(._COPYING_).\n"; + "Copy files that match the file pattern to a destination." + + " When copying multiple files, the destination must be a " + + "directory.\nFlags :\n" + + " -p[topax] : Preserve file attributes [topx] (timestamps, " + + "ownership, permission, ACL, XAttr). If -p is specified with " + + "no arg, then preserves timestamps, ownership, permission. " + + "If -pa is specified, then preserves permission also because " + + "ACL is a super-set of permission. Determination of whether raw " + + "namespace extended attributes are preserved is independent of " + + "the -p flag.\n" + + " -f : Overwrite the destination if it already exists.\n" + + " -d : Skip creation of temporary file(._COPYING_).\n" + + " -t : Number of threads to be used, " + + "default is 1.\n" + + " -q : Thread pool queue size to be " + + "used, default is 1024.\n"; @Override protected void processOptions(LinkedList args) throws IOException { popPreserveOption(args); CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "f", "d"); + cf.addOptionWithValue("t"); + cf.addOptionWithValue("q"); cf.parse(args); setDirectWrite(cf.getOpt("d")); setOverwrite(cf.getOpt("f")); + setThreadCount(cf.getOptValue("t")); + setThreadPoolQueueSize(cf.getOptValue("q")); // should have a -r option setRecursive(true); getRemoteDestination(args); diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md index bb4ce455bf9..946516a0f4f 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md @@ -170,7 +170,7 @@ Returns 0 on success and -1 on error. cp ---- -Usage: `hadoop fs -cp [-f] [-p | -p[topax]] URI [URI ...] ` +Usage: `hadoop fs -cp [-f] [-p | -p[topax]] [-t ] [-q ] URI [URI ...] ` Copy files from source to destination. This command allows multiple sources as well in which case the destination must be a directory. @@ -178,13 +178,18 @@ Copy files from source to destination. This command allows multiple sources as w Options: -* The -f option will overwrite the destination if it already exists. -* The -p option will preserve file attributes [topx] (timestamps, ownership, permission, ACL, XAttr). If -p is specified with no *arg*, then preserves timestamps, ownership, permission. If -pa is specified, then preserves permission also because ACL is a super-set of permission. Determination of whether raw namespace extended attributes are preserved is independent of the -p flag. +* `-f` : Overwrite the destination if it already exists. +* `-d` : Skip creation of temporary file with the suffix `._COPYING_`. +* `-p` : Preserve file attributes [topx] (timestamps, ownership, permission, ACL, XAttr). If -p is specified with no *arg*, then preserves timestamps, ownership, permission. If -pa is specified, then preserves permission also because ACL is a super-set of permission. Determination of whether raw namespace extended attributes are preserved is independent of the -p flag. +* `-t ` : Number of threads to be used, default is 1. Useful when copying directories containing more than 1 file. +* `-q ` : Thread pool queue size to be used, default is 1024. It takes effect only when thread count greater than 1. Example: * `hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2` * `hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir` +* `hadoop fs -cp -t 5 /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir` +* `hadoop fs -cp -t 10 -q 2048 /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir` Exit Code: diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCpCommand.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCpCommand.java new file mode 100644 index 00000000000..214f1a0686c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCpCommand.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.shell; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.concurrent.ThreadPoolExecutor; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.shell.CopyCommands.Cp; + +import static org.apache.hadoop.fs.shell.CopyCommandWithMultiThread.DEFAULT_QUEUE_SIZE; +import static org.junit.Assert.assertEquals; + +public class TestCpCommand { + + private static final String FROM_DIR_NAME = "fromDir"; + private static final String TO_DIR_NAME = "toDir"; + + private static FileSystem fs; + private static Path testDir; + private static Configuration conf; + + private Path dir = null; + private int numFiles = 0; + + private static int initialize(Path dir) throws Exception { + fs.mkdirs(dir); + Path fromDirPath = new Path(dir, FROM_DIR_NAME); + fs.mkdirs(fromDirPath); + Path toDirPath = new Path(dir, TO_DIR_NAME); + fs.mkdirs(toDirPath); + + int numTotalFiles = 0; + int numDirs = RandomUtils.nextInt(0, 5); + for (int dirCount = 0; dirCount < numDirs; ++dirCount) { + Path subDirPath = new Path(fromDirPath, "subdir" + dirCount); + fs.mkdirs(subDirPath); + int numFiles = RandomUtils.nextInt(0, 10); + for (int fileCount = 0; fileCount < numFiles; ++fileCount) { + numTotalFiles++; + Path subFile = new Path(subDirPath, "file" + fileCount); + fs.createNewFile(subFile); + FSDataOutputStream output = fs.create(subFile, true); + for (int i = 0; i < 100; ++i) { + output.writeInt(i); + output.writeChar('\n'); + } + output.close(); + } + } + + return numTotalFiles; + } + + @BeforeClass + public static void init() throws Exception { + conf = new Configuration(false); + conf.set("fs.file.impl", LocalFileSystem.class.getName()); + fs = FileSystem.getLocal(conf); + testDir = new FileSystemTestHelper().getTestRootPath(fs); + // don't want scheme on the path, just an absolute path + testDir = new Path(fs.makeQualified(testDir).toUri().getPath()); + + FileSystem.setDefaultUri(conf, fs.getUri()); + fs.setWorkingDirectory(testDir); + } + + @AfterClass + public static void cleanup() throws Exception { + fs.delete(testDir, true); + fs.close(); + } + + private void run(CopyCommandWithMultiThread cmd, String... args) { + cmd.setConf(conf); + assertEquals(0, cmd.run(args)); + } + + @Before + public void initDirectory() throws Exception { + dir = new Path("dir" + RandomStringUtils.randomNumeric(4)); + numFiles = initialize(dir); + } + + @Test(timeout = 10000) + public void testCp() throws Exception { + MultiThreadedCp copy = new MultiThreadedCp(1, DEFAULT_QUEUE_SIZE, 0); + run(copy, new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + assert copy.getExecutor() == null; + } + + @Test(timeout = 10000) + public void testCpWithThreads() { + run(new MultiThreadedCp(5, DEFAULT_QUEUE_SIZE, numFiles), "-t", "5", + new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + } + + @Test(timeout = 10000) + public void testCpWithThreadWrong() { + run(new MultiThreadedCp(1, DEFAULT_QUEUE_SIZE, 0), "-t", "0", + new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + } + + @Test(timeout = 10000) + public void testCpWithThreadsAndQueueSize() { + int queueSize = 256; + run(new MultiThreadedCp(5, queueSize, numFiles), "-t", "5", "-q", + Integer.toString(queueSize), + new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + } + + @Test(timeout = 10000) + public void testCpWithThreadsAndQueueSizeWrong() { + int queueSize = 0; + run(new MultiThreadedCp(5, DEFAULT_QUEUE_SIZE, numFiles), "-t", "5", "-q", + Integer.toString(queueSize), + new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + } + + @Test(timeout = 10000) + public void testCpSingleFile() throws Exception { + Path fromDirPath = new Path(dir, FROM_DIR_NAME); + Path subFile = new Path(fromDirPath, "file0"); + fs.createNewFile(subFile); + FSDataOutputStream output = fs.create(subFile, true); + for (int i = 0; i < 100; ++i) { + output.writeInt(i); + output.writeChar('\n'); + } + output.close(); + + MultiThreadedCp copy = new MultiThreadedCp(5, DEFAULT_QUEUE_SIZE, 0); + run(copy, "-t", "5", subFile.toString(), + new Path(dir, TO_DIR_NAME).toString()); + assert copy.getExecutor() == null; + } + + private static class MultiThreadedCp extends Cp { + public static final String NAME = "multiThreadCp"; + private final int expectedThreads; + private final int expectedQueuePoolSize; + private final int expectedCompletedTaskCount; + + MultiThreadedCp(int expectedThreads, int expectedQueuePoolSize, + int expectedCompletedTaskCount) { + this.expectedThreads = expectedThreads; + this.expectedQueuePoolSize = expectedQueuePoolSize; + this.expectedCompletedTaskCount = expectedCompletedTaskCount; + } + + @Override + protected void processArguments(LinkedList args) + throws IOException { + // Check if the number of threads are same as expected + Assert.assertEquals(expectedThreads, getThreadCount()); + // Check if the queue pool size of executor is same as expected + Assert.assertEquals(expectedQueuePoolSize, getThreadPoolQueueSize()); + + super.processArguments(args); + + if (isMultiThreadNecessary(args)) { + // Once the copy is complete, check following + // 1) number of completed tasks are same as expected + // 2) There are no active tasks in the executor + // 3) Executor has shutdown correctly + ThreadPoolExecutor executor = getExecutor(); + Assert.assertEquals(expectedCompletedTaskCount, + executor.getCompletedTaskCount()); + Assert.assertEquals(0, executor.getActiveCount()); + Assert.assertTrue(executor.isTerminated()); + } else { + assert getExecutor() == null; + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml index cc827cca391..34f7e14ba4d 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml @@ -324,51 +324,72 @@ RegexpComparator - ^-cp \[-f\] \[-p \| -p\[topax\]\] \[-d\] <src> \.\.\. <dst> :\s* + ^-cp \[-f\] \[-p \| -p\[topax\]\] \[-d\] \[-t <thread count>\] \[-q <thread pool queue size>\] <src> \.\.\. <dst> :\s* RegexpComparator - ^\s*Copy files that match the file pattern <src> to a destination. When copying( )* + ^\s*Copy files that match the file pattern <src> to a destination. When copying( )* RegexpComparator - ^( |\t)*multiple files, the destination must be a directory.( )*Passing -p preserves status( )* + ^( |\t)*multiple files, the destination must be a directory.( )* RegexpComparator - ^( |\t)*\[topax\] \(timestamps, ownership, permission, ACLs, XAttr\). If -p is specified( )* + ^( |\t)*Flags :( )* RegexpComparator - ^( |\t)*with no <arg>, then preserves timestamps, ownership, permission. If -pa is( )* + ^( |\t)*-p\[topax\]\s+Preserve file attributes \[topx\] \(timestamps,( )* RegexpComparator - ^( |\t)*specified, then preserves permission also because ACL is a super-set of( )* + ^( |\t)*ownership, permission, ACL, XAttr\). If -p is( )* RegexpComparator - ^( |\t)*permission. Passing -f overwrites the destination if it already exists. raw( )* + ^( |\t)*specified with no arg, then preserves timestamps,( )* RegexpComparator - ^( |\t)*namespace extended attributes are preserved if \(1\) they are supported \(HDFS( )* + ^( |\t)*ownership, permission. If -pa is specified, then( )* RegexpComparator - ^( |\t)*only\) and, \(2\) all of the source and target pathnames are in the \/\.reserved\/raw( )* + ^( |\t)*preserves permission also because ACL is a( )* RegexpComparator - ^( |\t)*hierarchy. raw namespace xattr preservation is determined solely by the presence( )* + ^( |\t)*super-set of permission. Determination of whether( )* + + + + RegexpComparator + ^( |\t)*raw namespace extended attributes are preserved is( )* + + + RegexpComparator + ^( |\t)*independent of the -p flag.( )* RegexpComparator - ^\s*\(or absence\) of the \/\.reserved\/raw prefix and not by the -p option\. Passing -d( )* + ^\s*-f\s+Overwrite the destination if it already exists.( )* - RegexpComparator - ^\s*will skip creation of temporary file\(<dst>\._COPYING_\)\.( )* + RegexpComparator + ^\s*-d\s+Skip creation of temporary file\(<dst>\._COPYING_\).( )* + + + RegexpComparator + ^\s*-t <thread count>\s+Number of threads to be used, default is 1.( )* + + + RegexpComparator + ^\s*-q <thread pool queue size>\s+Thread pool queue size to be used, default is( )* + + + RegexpComparator + ^( |\t)*1024.\s*