HADOOP-17998. Allow get command to run with multi threads. (#3645)
(cherry picked from commit 63018dc73f
)
Conflicts:
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
This commit is contained in:
parent
90085eb4cb
commit
cbb3ba135c
|
@ -397,10 +397,10 @@ abstract class CommandWithDestination extends FsCommand {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If direct write is disabled ,copies the stream contents to a temporary
|
* If direct write is disabled ,copies the stream contents to a temporary
|
||||||
* file "<target>._COPYING_". If the copy is
|
* file "target._COPYING_". If the copy is successful, the temporary file
|
||||||
* successful, the temporary file will be renamed to the real path,
|
* will be renamed to the real path, else the temporary file will be deleted.
|
||||||
* else the temporary file will be deleted.
|
|
||||||
* if direct write is enabled , then creation temporary file is skipped.
|
* if direct write is enabled , then creation temporary file is skipped.
|
||||||
|
*
|
||||||
* @param in the input stream for the copy
|
* @param in the input stream for the copy
|
||||||
* @param target where to store the contents of the stream
|
* @param target where to store the contents of the stream
|
||||||
* @throws IOException if copy fails
|
* @throws IOException if copy fails
|
||||||
|
|
|
@ -0,0 +1,155 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.shell;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract command to enable sub copy commands run with multi-thread.
|
||||||
|
*/
|
||||||
|
public abstract class CopyCommandWithMultiThread
|
||||||
|
extends CommandWithDestination {
|
||||||
|
|
||||||
|
private int threadCount = 1;
|
||||||
|
private ThreadPoolExecutor executor = null;
|
||||||
|
private int threadPoolQueueSize = DEFAULT_QUEUE_SIZE;
|
||||||
|
|
||||||
|
public static final int DEFAULT_QUEUE_SIZE = 1024;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* set thread count by option value, if the value less than 1,
|
||||||
|
* use 1 instead.
|
||||||
|
*
|
||||||
|
* @param optValue option value
|
||||||
|
*/
|
||||||
|
protected void setThreadCount(String optValue) {
|
||||||
|
if (optValue != null) {
|
||||||
|
threadCount = Math.max(Integer.parseInt(optValue), 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* set thread pool queue size by option value, if the value less than 1,
|
||||||
|
* use DEFAULT_QUEUE_SIZE instead.
|
||||||
|
*
|
||||||
|
* @param optValue option value
|
||||||
|
*/
|
||||||
|
protected void setThreadPoolQueueSize(String optValue) {
|
||||||
|
if (optValue != null) {
|
||||||
|
int size = Integer.parseInt(optValue);
|
||||||
|
threadPoolQueueSize = size < 1 ? DEFAULT_QUEUE_SIZE : size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected int getThreadCount() {
|
||||||
|
return this.threadCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected int getThreadPoolQueueSize() {
|
||||||
|
return this.threadPoolQueueSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected ThreadPoolExecutor getExecutor() {
|
||||||
|
return this.executor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processArguments(LinkedList<PathData> args)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
if (isMultiThreadNecessary(args)) {
|
||||||
|
initThreadPoolExecutor();
|
||||||
|
}
|
||||||
|
|
||||||
|
super.processArguments(args);
|
||||||
|
|
||||||
|
if (executor != null) {
|
||||||
|
waitForCompletion();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if thread count is 1 or the source is only one single file,
|
||||||
|
// don't init executor to avoid threading overhead.
|
||||||
|
@VisibleForTesting
|
||||||
|
protected boolean isMultiThreadNecessary(LinkedList<PathData> args)
|
||||||
|
throws IOException {
|
||||||
|
return this.threadCount > 1 && hasMoreThanOneSourcePaths(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if source is only one single file.
|
||||||
|
private boolean hasMoreThanOneSourcePaths(LinkedList<PathData> args)
|
||||||
|
throws IOException {
|
||||||
|
if (args.size() > 1) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (args.size() == 1) {
|
||||||
|
PathData src = args.get(0);
|
||||||
|
if (src.stat == null) {
|
||||||
|
src.refreshStatus();
|
||||||
|
}
|
||||||
|
return isPathRecursable(src);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initThreadPoolExecutor() {
|
||||||
|
executor =
|
||||||
|
new ThreadPoolExecutor(threadCount, threadCount, 1, TimeUnit.SECONDS,
|
||||||
|
new ArrayBlockingQueue<>(threadPoolQueueSize),
|
||||||
|
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForCompletion() {
|
||||||
|
if (executor != null) {
|
||||||
|
executor.shutdown();
|
||||||
|
try {
|
||||||
|
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
executor.shutdownNow();
|
||||||
|
displayError(e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void copyFileToTarget(PathData src, PathData target)
|
||||||
|
throws IOException {
|
||||||
|
if (executor == null) {
|
||||||
|
super.copyFileToTarget(src, target);
|
||||||
|
} else {
|
||||||
|
executor.submit(() -> {
|
||||||
|
try {
|
||||||
|
super.copyFileToTarget(src, target);
|
||||||
|
} catch (IOException e) {
|
||||||
|
displayError(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,11 +26,7 @@ import java.nio.file.Files;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
@ -38,8 +34,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.PathIsDirectoryException;
|
import org.apache.hadoop.fs.PathIsDirectoryException;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/** Various commands for copy files */
|
/** Various commands for copy files */
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -210,28 +204,37 @@ class CopyCommands {
|
||||||
/**
|
/**
|
||||||
* Copy local files to a remote filesystem
|
* Copy local files to a remote filesystem
|
||||||
*/
|
*/
|
||||||
public static class Get extends CommandWithDestination {
|
public static class Get extends CopyCommandWithMultiThread {
|
||||||
public static final String NAME = "get";
|
public static final String NAME = "get";
|
||||||
public static final String USAGE =
|
public static final String USAGE =
|
||||||
"[-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>";
|
"[-f] [-p] [-crc] [-ignoreCrc] [-t <thread count>]"
|
||||||
|
+ " [-q <thread pool queue size>] <src> ... <localdst>";
|
||||||
public static final String DESCRIPTION =
|
public static final String DESCRIPTION =
|
||||||
"Copy files that match the file pattern <src> " +
|
"Copy files that match the file pattern <src> to the local name. "
|
||||||
"to the local name. <src> is kept. When copying multiple " +
|
+ "<src> is kept.\nWhen copying multiple files, the destination"
|
||||||
"files, the destination must be a directory. Passing " +
|
+ " must be a directory.\nFlags:\n"
|
||||||
"-f overwrites the destination if it already exists and " +
|
+ " -p : Preserves timestamps, ownership and the mode.\n"
|
||||||
"-p preserves access and modification times, " +
|
+ " -f : Overwrites the destination if it already exists.\n"
|
||||||
"ownership and the mode.\n";
|
+ " -crc : write CRC checksums for the files downloaded.\n"
|
||||||
|
+ " -ignoreCrc : Skip CRC checks on the file(s) downloaded.\n"
|
||||||
|
+ " -t <thread count> : Number of threads to be used,"
|
||||||
|
+ " default is 1.\n"
|
||||||
|
+ " -q <thread pool queue size> : Thread pool queue size to be"
|
||||||
|
+ " used, default is 1024.\n";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void processOptions(LinkedList<String> args)
|
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||||
throws IOException {
|
CommandFormat cf =
|
||||||
CommandFormat cf = new CommandFormat(
|
new CommandFormat(1, Integer.MAX_VALUE, "crc", "ignoreCrc", "p", "f");
|
||||||
1, Integer.MAX_VALUE, "crc", "ignoreCrc", "p", "f");
|
cf.addOptionWithValue("t");
|
||||||
|
cf.addOptionWithValue("q");
|
||||||
cf.parse(args);
|
cf.parse(args);
|
||||||
setWriteChecksum(cf.getOpt("crc"));
|
setWriteChecksum(cf.getOpt("crc"));
|
||||||
setVerifyChecksum(!cf.getOpt("ignoreCrc"));
|
setVerifyChecksum(!cf.getOpt("ignoreCrc"));
|
||||||
setPreserve(cf.getOpt("p"));
|
setPreserve(cf.getOpt("p"));
|
||||||
setOverwrite(cf.getOpt("f"));
|
setOverwrite(cf.getOpt("f"));
|
||||||
|
setThreadCount(cf.getOptValue("t"));
|
||||||
|
setThreadPoolQueueSize(cf.getOptValue("q"));
|
||||||
setRecursive(true);
|
setRecursive(true);
|
||||||
getLocalDestination(args);
|
getLocalDestination(args);
|
||||||
}
|
}
|
||||||
|
@ -240,21 +243,12 @@ class CopyCommands {
|
||||||
/**
|
/**
|
||||||
* Copy local files to a remote filesystem
|
* Copy local files to a remote filesystem
|
||||||
*/
|
*/
|
||||||
public static class Put extends CommandWithDestination {
|
public static class Put extends CopyCommandWithMultiThread {
|
||||||
|
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(Put.class);
|
|
||||||
|
|
||||||
private ThreadPoolExecutor executor = null;
|
|
||||||
private int threadPoolQueueSize = 1024;
|
|
||||||
private int numThreads = 1;
|
|
||||||
|
|
||||||
private static final int MAX_THREADS =
|
|
||||||
Runtime.getRuntime().availableProcessors() * 2;
|
|
||||||
|
|
||||||
public static final String NAME = "put";
|
public static final String NAME = "put";
|
||||||
public static final String USAGE =
|
public static final String USAGE =
|
||||||
"[-f] [-p] [-l] [-d] [-t <thread count>] [-q <threadPool queue size>] " +
|
"[-f] [-p] [-l] [-d] [-t <thread count>] [-q <thread pool queue size>]"
|
||||||
"<localsrc> ... <dst>";
|
+ " <localsrc> ... <dst>";
|
||||||
public static final String DESCRIPTION =
|
public static final String DESCRIPTION =
|
||||||
"Copy files from the local file system " +
|
"Copy files from the local file system " +
|
||||||
"into fs. Copying fails if the file already " +
|
"into fs. Copying fails if the file already " +
|
||||||
|
@ -263,7 +257,7 @@ class CopyCommands {
|
||||||
" -p : Preserves timestamps, ownership and the mode.\n" +
|
" -p : Preserves timestamps, ownership and the mode.\n" +
|
||||||
" -f : Overwrites the destination if it already exists.\n" +
|
" -f : Overwrites the destination if it already exists.\n" +
|
||||||
" -t <thread count> : Number of threads to be used, default is 1.\n" +
|
" -t <thread count> : Number of threads to be used, default is 1.\n" +
|
||||||
" -q <threadPool size> : ThreadPool queue size to be used, " +
|
" -q <thread pool queue size> : Thread pool queue size to be used, " +
|
||||||
"default is 1024.\n" +
|
"default is 1024.\n" +
|
||||||
" -l : Allow DataNode to lazily persist the file to disk. Forces " +
|
" -l : Allow DataNode to lazily persist the file to disk. Forces " +
|
||||||
"replication factor of 1. This flag will result in reduced " +
|
"replication factor of 1. This flag will result in reduced " +
|
||||||
|
@ -277,7 +271,7 @@ class CopyCommands {
|
||||||
cf.addOptionWithValue("t");
|
cf.addOptionWithValue("t");
|
||||||
cf.addOptionWithValue("q");
|
cf.addOptionWithValue("q");
|
||||||
cf.parse(args);
|
cf.parse(args);
|
||||||
setNumberThreads(cf.getOptValue("t"));
|
setThreadCount(cf.getOptValue("t"));
|
||||||
setThreadPoolQueueSize(cf.getOptValue("q"));
|
setThreadPoolQueueSize(cf.getOptValue("q"));
|
||||||
setOverwrite(cf.getOpt("f"));
|
setOverwrite(cf.getOpt("f"));
|
||||||
setPreserve(cf.getOpt("p"));
|
setPreserve(cf.getOpt("p"));
|
||||||
|
@ -308,92 +302,9 @@ class CopyCommands {
|
||||||
copyStreamToTarget(System.in, getTargetPath(args.get(0)));
|
copyStreamToTarget(System.in, getTargetPath(args.get(0)));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
|
|
||||||
TimeUnit.SECONDS, new ArrayBlockingQueue<>(threadPoolQueueSize),
|
|
||||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
|
||||||
super.processArguments(args);
|
super.processArguments(args);
|
||||||
|
|
||||||
// issue the command and then wait for it to finish
|
|
||||||
executor.shutdown();
|
|
||||||
try {
|
|
||||||
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
executor.shutdownNow();
|
|
||||||
displayError(e);
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setNumberThreads(String numberThreadsString) {
|
|
||||||
if (numberThreadsString == null) {
|
|
||||||
numThreads = 1;
|
|
||||||
} else {
|
|
||||||
int parsedValue = Integer.parseInt(numberThreadsString);
|
|
||||||
if (parsedValue <= 1) {
|
|
||||||
numThreads = 1;
|
|
||||||
} else if (parsedValue > MAX_THREADS) {
|
|
||||||
numThreads = MAX_THREADS;
|
|
||||||
} else {
|
|
||||||
numThreads = parsedValue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setThreadPoolQueueSize(String numThreadPoolQueueSize) {
|
|
||||||
if (numThreadPoolQueueSize != null) {
|
|
||||||
int parsedValue = Integer.parseInt(numThreadPoolQueueSize);
|
|
||||||
if (parsedValue < 1) {
|
|
||||||
LOG.warn("The value of the thread pool queue size cannot be " +
|
|
||||||
"less than 1, and the default value is used here. " +
|
|
||||||
"The default size is 1024.");
|
|
||||||
threadPoolQueueSize = 1024;
|
|
||||||
} else {
|
|
||||||
threadPoolQueueSize = parsedValue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
protected int getThreadPoolQueueSize() {
|
|
||||||
return threadPoolQueueSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void copyFile(PathData src, PathData target) throws IOException {
|
|
||||||
if (isPathRecursable(src)) {
|
|
||||||
throw new PathIsDirectoryException(src.toString());
|
|
||||||
}
|
|
||||||
super.copyFileToTarget(src, target);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void copyFileToTarget(PathData src, PathData target)
|
|
||||||
throws IOException {
|
|
||||||
// if number of thread is 1, mimic put and avoid threading overhead
|
|
||||||
if (numThreads == 1) {
|
|
||||||
copyFile(src, target);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Runnable task = () -> {
|
|
||||||
try {
|
|
||||||
copyFile(src, target);
|
|
||||||
} catch (IOException e) {
|
|
||||||
displayError(e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
executor.submit(task);
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public int getNumThreads() {
|
|
||||||
return numThreads;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public ThreadPoolExecutor getExecutor() {
|
|
||||||
return executor;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class CopyFromLocal extends Put {
|
public static class CopyFromLocal extends Put {
|
||||||
|
|
|
@ -323,19 +323,10 @@ Returns 0 on success and -1 on error.
|
||||||
get
|
get
|
||||||
---
|
---
|
||||||
|
|
||||||
Usage: `hadoop fs -get [-ignorecrc] [-crc] [-p] [-f] <src> <localdst> `
|
Usage: `hadoop fs -get [-ignorecrc] [-crc] [-p] [-f] [-t <thread count>] [-q <thread pool queue size>] <src> ... <localdst> `
|
||||||
|
|
||||||
Copy files to the local file system. Files that fail the CRC check may be copied with the -ignorecrc option. Files and CRCs may be copied using the -crc option.
|
Copy files to the local file system. Files that fail the CRC check may be copied with the -ignorecrc option. Files and CRCs may be copied using the -crc option.
|
||||||
|
|
||||||
Example:
|
|
||||||
|
|
||||||
* `hadoop fs -get /user/hadoop/file localfile`
|
|
||||||
* `hadoop fs -get hdfs://nn.example.com/user/hadoop/file localfile`
|
|
||||||
|
|
||||||
Exit Code:
|
|
||||||
|
|
||||||
Returns 0 on success and -1 on error.
|
|
||||||
|
|
||||||
Options:
|
Options:
|
||||||
|
|
||||||
* `-p` : Preserves access and modification times, ownership and the permissions.
|
* `-p` : Preserves access and modification times, ownership and the permissions.
|
||||||
|
@ -343,6 +334,21 @@ Options:
|
||||||
* `-f` : Overwrites the destination if it already exists.
|
* `-f` : Overwrites the destination if it already exists.
|
||||||
* `-ignorecrc` : Skip CRC checks on the file(s) downloaded.
|
* `-ignorecrc` : Skip CRC checks on the file(s) downloaded.
|
||||||
* `-crc`: write CRC checksums for the files downloaded.
|
* `-crc`: write CRC checksums for the files downloaded.
|
||||||
|
* `-t <thread count>` : Number of threads to be used, default is 1.
|
||||||
|
Useful when downloading directories containing more than 1 file.
|
||||||
|
* `-q <thread pool queue size>` : Thread pool queue size to be used, default is 1024.
|
||||||
|
It takes effect only when thread count greater than 1.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
* `hadoop fs -get /user/hadoop/file localfile`
|
||||||
|
* `hadoop fs -get hdfs://nn.example.com/user/hadoop/file localfile`
|
||||||
|
* `hadoop fs -get -t 10 hdfs://nn.example.com/user/hadoop/dir1 localdir`
|
||||||
|
* `hadoop fs -get -t 10 -q 2048 hdfs://nn.example.com/user/hadoop/dir* localdir`
|
||||||
|
|
||||||
|
Exit Code:
|
||||||
|
|
||||||
|
Returns 0 on success and -1 on error.
|
||||||
|
|
||||||
getfacl
|
getfacl
|
||||||
-------
|
-------
|
||||||
|
@ -525,7 +531,7 @@ Returns 0 on success and -1 on error.
|
||||||
put
|
put
|
||||||
---
|
---
|
||||||
|
|
||||||
Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t <thread count>] [-q <threadPool queue size>] [ - | <localsrc1> .. ]. <dst>`
|
Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t <thread count>] [-q <thread pool queue size>] [ - | <localsrc> ...] <dst>`
|
||||||
|
|
||||||
Copy single src, or multiple srcs from local file system to the destination file system.
|
Copy single src, or multiple srcs from local file system to the destination file system.
|
||||||
Also reads input from stdin and writes to destination file system if the source is set to "-"
|
Also reads input from stdin and writes to destination file system if the source is set to "-"
|
||||||
|
@ -537,12 +543,13 @@ Options:
|
||||||
* `-p` : Preserves access and modification times, ownership and the permissions.
|
* `-p` : Preserves access and modification times, ownership and the permissions.
|
||||||
(assuming the permissions can be propagated across filesystems)
|
(assuming the permissions can be propagated across filesystems)
|
||||||
* `-f` : Overwrites the destination if it already exists.
|
* `-f` : Overwrites the destination if it already exists.
|
||||||
* `-t <thread count>` : Number of threads to be used, default is 1. Useful
|
|
||||||
when uploading a directory containing more than 1 file.
|
|
||||||
* `-l` : Allow DataNode to lazily persist the file to disk, Forces a replication
|
* `-l` : Allow DataNode to lazily persist the file to disk, Forces a replication
|
||||||
factor of 1. This flag will result in reduced durability. Use with care.
|
factor of 1. This flag will result in reduced durability. Use with care.
|
||||||
* `-d` : Skip creation of temporary file with the suffix `._COPYING_`.
|
* `-d` : Skip creation of temporary file with the suffix `._COPYING_`.
|
||||||
* `-q <threadPool queue size>` : ThreadPool queue size to be used, default is 1024.
|
* `-t <thread count>` : Number of threads to be used, default is 1.
|
||||||
|
Useful when uploading directories containing more than 1 file.
|
||||||
|
* `-q <thread pool queue size>` : Thread pool queue size to be used, default is 1024.
|
||||||
|
It takes effect only when thread count greater than 1.
|
||||||
|
|
||||||
|
|
||||||
Examples:
|
Examples:
|
||||||
|
@ -551,7 +558,8 @@ Examples:
|
||||||
* `hadoop fs -put -f localfile1 localfile2 /user/hadoop/hadoopdir`
|
* `hadoop fs -put -f localfile1 localfile2 /user/hadoop/hadoopdir`
|
||||||
* `hadoop fs -put -d localfile hdfs://nn.example.com/hadoop/hadoopfile`
|
* `hadoop fs -put -d localfile hdfs://nn.example.com/hadoop/hadoopfile`
|
||||||
* `hadoop fs -put - hdfs://nn.example.com/hadoop/hadoopfile` Reads the input from stdin.
|
* `hadoop fs -put - hdfs://nn.example.com/hadoop/hadoopfile` Reads the input from stdin.
|
||||||
* `hadoop fs -put -q 500 localfile3 hdfs://nn.example.com/hadoop/hadoopfile3`
|
* `hadoop fs -put -t 5 localdir hdfs://nn.example.com/hadoop/hadoopdir`
|
||||||
|
* `hadoop fs -put -t 10 -q 2048 localdir1 localdir2 hdfs://nn.example.com/hadoop/hadoopdir`
|
||||||
|
|
||||||
Exit Code:
|
Exit Code:
|
||||||
|
|
||||||
|
|
|
@ -17,24 +17,26 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.fs.shell;
|
package org.apache.hadoop.fs.shell;
|
||||||
|
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
|
||||||
import org.apache.commons.lang3.RandomUtils;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.LocalFileSystem;
|
|
||||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.Assert;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
|
import org.apache.commons.lang3.RandomUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -48,6 +50,9 @@ public class TestCopyFromLocal {
|
||||||
private static Path testDir;
|
private static Path testDir;
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
|
|
||||||
|
private Path dir = null;
|
||||||
|
private int numFiles = 0;
|
||||||
|
|
||||||
public static int initialize(Path dir) throws Exception {
|
public static int initialize(Path dir) throws Exception {
|
||||||
fs.mkdirs(dir);
|
fs.mkdirs(dir);
|
||||||
Path fromDirPath = new Path(dir, FROM_DIR_NAME);
|
Path fromDirPath = new Path(dir, FROM_DIR_NAME);
|
||||||
|
@ -96,48 +101,36 @@ public class TestCopyFromLocal {
|
||||||
fs.close();
|
fs.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void initDirectory() throws Exception {
|
||||||
|
dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
|
||||||
|
numFiles = initialize(dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void run(CommandWithDestination cmd, String... args) {
|
private void run(CommandWithDestination cmd, String... args) {
|
||||||
cmd.setConf(conf);
|
cmd.setConf(conf);
|
||||||
assertEquals(0, cmd.run(args));
|
assertEquals(0, cmd.run(args));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testCopyFromLocal() throws Exception {
|
public void testCopyFromLocal() {
|
||||||
Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
|
|
||||||
TestCopyFromLocal.initialize(dir);
|
|
||||||
run(new TestMultiThreadedCopy(1, 0),
|
run(new TestMultiThreadedCopy(1, 0),
|
||||||
new Path(dir, FROM_DIR_NAME).toString(),
|
new Path(dir, FROM_DIR_NAME).toString(),
|
||||||
new Path(dir, TO_DIR_NAME).toString());
|
new Path(dir, TO_DIR_NAME).toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testCopyFromLocalWithThreads() throws Exception {
|
public void testCopyFromLocalWithThreads(){
|
||||||
Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
|
int threads = Runtime.getRuntime().availableProcessors() * 2 + 1;
|
||||||
int numFiles = TestCopyFromLocal.initialize(dir);
|
run(new TestMultiThreadedCopy(threads, numFiles),
|
||||||
int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
|
"-t", Integer.toString(threads),
|
||||||
int randThreads = RandomUtils.nextInt(0, maxThreads - 1) + 1;
|
|
||||||
String numThreads = Integer.toString(randThreads);
|
|
||||||
run(new TestMultiThreadedCopy(randThreads,
|
|
||||||
randThreads == 1 ? 0 : numFiles), "-t", numThreads,
|
|
||||||
new Path(dir, FROM_DIR_NAME).toString(),
|
new Path(dir, FROM_DIR_NAME).toString(),
|
||||||
new Path(dir, TO_DIR_NAME).toString());
|
new Path(dir, TO_DIR_NAME).toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testCopyFromLocalWithThreadWrong() throws Exception {
|
public void testCopyFromLocalWithThreadWrong(){
|
||||||
Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
|
|
||||||
int numFiles = TestCopyFromLocal.initialize(dir);
|
|
||||||
int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
|
|
||||||
String numThreads = Integer.toString(maxThreads * 2);
|
|
||||||
run(new TestMultiThreadedCopy(maxThreads, numFiles), "-t", numThreads,
|
|
||||||
new Path(dir, FROM_DIR_NAME).toString(),
|
|
||||||
new Path(dir, TO_DIR_NAME).toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
|
||||||
public void testCopyFromLocalWithZeroThreads() throws Exception {
|
|
||||||
Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
|
|
||||||
TestCopyFromLocal.initialize(dir);
|
|
||||||
run(new TestMultiThreadedCopy(1, 0), "-t", "0",
|
run(new TestMultiThreadedCopy(1, 0), "-t", "0",
|
||||||
new Path(dir, FROM_DIR_NAME).toString(),
|
new Path(dir, FROM_DIR_NAME).toString(),
|
||||||
new Path(dir, TO_DIR_NAME).toString());
|
new Path(dir, TO_DIR_NAME).toString());
|
||||||
|
@ -148,8 +141,7 @@ public class TestCopyFromLocal {
|
||||||
private int expectedThreads;
|
private int expectedThreads;
|
||||||
private int expectedCompletedTaskCount;
|
private int expectedCompletedTaskCount;
|
||||||
|
|
||||||
TestMultiThreadedCopy(int expectedThreads,
|
TestMultiThreadedCopy(int expectedThreads, int expectedCompletedTaskCount) {
|
||||||
int expectedCompletedTaskCount) {
|
|
||||||
this.expectedThreads = expectedThreads;
|
this.expectedThreads = expectedThreads;
|
||||||
this.expectedCompletedTaskCount = expectedCompletedTaskCount;
|
this.expectedCompletedTaskCount = expectedCompletedTaskCount;
|
||||||
}
|
}
|
||||||
|
@ -158,8 +150,10 @@ public class TestCopyFromLocal {
|
||||||
protected void processArguments(LinkedList<PathData> args)
|
protected void processArguments(LinkedList<PathData> args)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Check if the correct number of threads are spawned
|
// Check if the correct number of threads are spawned
|
||||||
Assert.assertEquals(expectedThreads, getNumThreads());
|
Assert.assertEquals(expectedThreads, getThreadCount());
|
||||||
super.processArguments(args);
|
super.processArguments(args);
|
||||||
|
|
||||||
|
if (isMultiThreadNecessary(args)) {
|
||||||
// Once the copy is complete, check following
|
// Once the copy is complete, check following
|
||||||
// 1) number of completed tasks are same as expected
|
// 1) number of completed tasks are same as expected
|
||||||
// 2) There are no active tasks in the executor
|
// 2) There are no active tasks in the executor
|
||||||
|
@ -169,6 +163,9 @@ public class TestCopyFromLocal {
|
||||||
executor.getCompletedTaskCount());
|
executor.getCompletedTaskCount());
|
||||||
Assert.assertEquals(0, executor.getActiveCount());
|
Assert.assertEquals(0, executor.getActiveCount());
|
||||||
Assert.assertTrue(executor.isTerminated());
|
Assert.assertTrue(executor.isTerminated());
|
||||||
|
} else {
|
||||||
|
assert getExecutor() == null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,11 +17,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.fs.shell;
|
package org.apache.hadoop.fs.shell;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNotEquals;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -31,13 +32,13 @@ import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
|
||||||
import org.apache.hadoop.fs.shell.CopyCommands.Cp;
|
import org.apache.hadoop.fs.shell.CopyCommands.Cp;
|
||||||
import org.apache.hadoop.fs.shell.CopyCommands.Get;
|
import org.apache.hadoop.fs.shell.CopyCommands.Get;
|
||||||
import org.apache.hadoop.fs.shell.CopyCommands.Put;
|
import org.apache.hadoop.fs.shell.CopyCommands.Put;
|
||||||
import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
|
|
||||||
import org.junit.After;
|
import static org.junit.Assert.assertEquals;
|
||||||
import org.junit.Before;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class TestCopyPreserveFlag {
|
public class TestCopyPreserveFlag {
|
||||||
private static final int MODIFICATION_TIME = 12345000;
|
private static final int MODIFICATION_TIME = 12345000;
|
||||||
|
@ -176,6 +177,34 @@ public class TestCopyPreserveFlag {
|
||||||
assertAttributesChanged(TO);
|
assertAttributesChanged(TO);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testGetWithPQ() throws Exception {
|
||||||
|
Get get = new Get();
|
||||||
|
run(get, "-p", "-q", "100", FROM.toString(), TO.toString());
|
||||||
|
assertEquals(get.getThreadPoolQueueSize(), 100);
|
||||||
|
assertAttributesPreserved(TO);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testGetWithQ() throws Exception {
|
||||||
|
Get get = new Get();
|
||||||
|
run(get, "-q", "100", FROM.toString(), TO.toString());
|
||||||
|
assertEquals(get.getThreadPoolQueueSize(), 100);
|
||||||
|
assertAttributesChanged(TO);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testGetWithThreads() throws Exception {
|
||||||
|
run(new Get(), "-t", "10", FROM.toString(), TO.toString());
|
||||||
|
assertAttributesChanged(TO);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testGetWithThreadsPreserve() throws Exception {
|
||||||
|
run(new Get(), "-p", "-t", "10", FROM.toString(), TO.toString());
|
||||||
|
assertAttributesPreserved(TO);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testCpWithP() throws Exception {
|
public void testCpWithP() throws Exception {
|
||||||
run(new Cp(), "-p", FROM.toString(), TO.toString());
|
run(new Cp(), "-p", FROM.toString(), TO.toString());
|
||||||
|
|
|
@ -0,0 +1,210 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs.shell;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
|
import org.apache.commons.lang3.RandomUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.shell.CopyCommands.CopyToLocal;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.shell.CopyCommandWithMultiThread.DEFAULT_QUEUE_SIZE;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
public class TestCopyToLocal {
|
||||||
|
|
||||||
|
private static final String FROM_DIR_NAME = "fromDir";
|
||||||
|
private static final String TO_DIR_NAME = "toDir";
|
||||||
|
|
||||||
|
private static FileSystem fs;
|
||||||
|
private static Path testDir;
|
||||||
|
private static Configuration conf;
|
||||||
|
|
||||||
|
private Path dir = null;
|
||||||
|
private int numFiles = 0;
|
||||||
|
|
||||||
|
private static int initialize(Path dir) throws Exception {
|
||||||
|
fs.mkdirs(dir);
|
||||||
|
Path fromDirPath = new Path(dir, FROM_DIR_NAME);
|
||||||
|
fs.mkdirs(fromDirPath);
|
||||||
|
Path toDirPath = new Path(dir, TO_DIR_NAME);
|
||||||
|
fs.mkdirs(toDirPath);
|
||||||
|
|
||||||
|
int numTotalFiles = 0;
|
||||||
|
int numDirs = RandomUtils.nextInt(0, 5);
|
||||||
|
for (int dirCount = 0; dirCount < numDirs; ++dirCount) {
|
||||||
|
Path subDirPath = new Path(fromDirPath, "subdir" + dirCount);
|
||||||
|
fs.mkdirs(subDirPath);
|
||||||
|
int numFiles = RandomUtils.nextInt(0, 10);
|
||||||
|
for (int fileCount = 0; fileCount < numFiles; ++fileCount) {
|
||||||
|
numTotalFiles++;
|
||||||
|
Path subFile = new Path(subDirPath, "file" + fileCount);
|
||||||
|
fs.createNewFile(subFile);
|
||||||
|
FSDataOutputStream output = fs.create(subFile, true);
|
||||||
|
for (int i = 0; i < 100; ++i) {
|
||||||
|
output.writeInt(i);
|
||||||
|
output.writeChar('\n');
|
||||||
|
}
|
||||||
|
output.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return numTotalFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void init() throws Exception {
|
||||||
|
conf = new Configuration(false);
|
||||||
|
conf.set("fs.file.impl", LocalFileSystem.class.getName());
|
||||||
|
fs = FileSystem.getLocal(conf);
|
||||||
|
testDir = new FileSystemTestHelper().getTestRootPath(fs);
|
||||||
|
// don't want scheme on the path, just an absolute path
|
||||||
|
testDir = new Path(fs.makeQualified(testDir).toUri().getPath());
|
||||||
|
|
||||||
|
FileSystem.setDefaultUri(conf, fs.getUri());
|
||||||
|
fs.setWorkingDirectory(testDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void cleanup() throws Exception {
|
||||||
|
fs.delete(testDir, true);
|
||||||
|
fs.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void run(CopyCommandWithMultiThread cmd, String... args) {
|
||||||
|
cmd.setConf(conf);
|
||||||
|
assertEquals(0, cmd.run(args));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void initDirectory() throws Exception {
|
||||||
|
dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
|
||||||
|
numFiles = initialize(dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testCopy() throws Exception {
|
||||||
|
MultiThreadedCopy copy = new MultiThreadedCopy(1, DEFAULT_QUEUE_SIZE, 0);
|
||||||
|
run(copy, new Path(dir, FROM_DIR_NAME).toString(),
|
||||||
|
new Path(dir, TO_DIR_NAME).toString());
|
||||||
|
assert copy.getExecutor() == null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testCopyWithThreads() {
|
||||||
|
run(new MultiThreadedCopy(5, DEFAULT_QUEUE_SIZE, numFiles), "-t", "5",
|
||||||
|
new Path(dir, FROM_DIR_NAME).toString(),
|
||||||
|
new Path(dir, TO_DIR_NAME).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testCopyWithThreadWrong() {
|
||||||
|
run(new MultiThreadedCopy(1, DEFAULT_QUEUE_SIZE, 0), "-t", "0",
|
||||||
|
new Path(dir, FROM_DIR_NAME).toString(),
|
||||||
|
new Path(dir, TO_DIR_NAME).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testCopyWithThreadsAndQueueSize() {
|
||||||
|
int queueSize = 256;
|
||||||
|
run(new MultiThreadedCopy(5, queueSize, numFiles), "-t", "5", "-q",
|
||||||
|
Integer.toString(queueSize),
|
||||||
|
new Path(dir, FROM_DIR_NAME).toString(),
|
||||||
|
new Path(dir, TO_DIR_NAME).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testCopyWithThreadsAndQueueSizeWrong() {
|
||||||
|
int queueSize = 0;
|
||||||
|
run(new MultiThreadedCopy(5, DEFAULT_QUEUE_SIZE, numFiles), "-t", "5", "-q",
|
||||||
|
Integer.toString(queueSize),
|
||||||
|
new Path(dir, FROM_DIR_NAME).toString(),
|
||||||
|
new Path(dir, TO_DIR_NAME).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testCopySingleFile() throws Exception {
|
||||||
|
Path fromDirPath = new Path(dir, FROM_DIR_NAME);
|
||||||
|
Path subFile = new Path(fromDirPath, "file0");
|
||||||
|
fs.createNewFile(subFile);
|
||||||
|
FSDataOutputStream output = fs.create(subFile, true);
|
||||||
|
for (int i = 0; i < 100; ++i) {
|
||||||
|
output.writeInt(i);
|
||||||
|
output.writeChar('\n');
|
||||||
|
}
|
||||||
|
output.close();
|
||||||
|
|
||||||
|
MultiThreadedCopy copy = new MultiThreadedCopy(5, DEFAULT_QUEUE_SIZE, 0);
|
||||||
|
run(copy, "-t", "5", subFile.toString(),
|
||||||
|
new Path(dir, TO_DIR_NAME).toString());
|
||||||
|
assert copy.getExecutor() == null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class MultiThreadedCopy extends CopyToLocal {
|
||||||
|
public static final String NAME = "multiThreadCopy";
|
||||||
|
private final int expectedThreads;
|
||||||
|
private final int expectedQueuePoolSize;
|
||||||
|
private final int expectedCompletedTaskCount;
|
||||||
|
|
||||||
|
MultiThreadedCopy(int expectedThreads, int expectedQueuePoolSize,
|
||||||
|
int expectedCompletedTaskCount) {
|
||||||
|
this.expectedThreads = expectedThreads;
|
||||||
|
this.expectedQueuePoolSize = expectedQueuePoolSize;
|
||||||
|
this.expectedCompletedTaskCount = expectedCompletedTaskCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processArguments(LinkedList<PathData> args)
|
||||||
|
throws IOException {
|
||||||
|
// Check if the number of threads are same as expected
|
||||||
|
Assert.assertEquals(expectedThreads, getThreadCount());
|
||||||
|
// Check if the queue pool size of executor is same as expected
|
||||||
|
Assert.assertEquals(expectedQueuePoolSize, getThreadPoolQueueSize());
|
||||||
|
|
||||||
|
super.processArguments(args);
|
||||||
|
|
||||||
|
if (isMultiThreadNecessary(args)) {
|
||||||
|
// Once the copy is complete, check following
|
||||||
|
// 1) number of completed tasks are same as expected
|
||||||
|
// 2) There are no active tasks in the executor
|
||||||
|
// 3) Executor has shutdown correctly
|
||||||
|
ThreadPoolExecutor executor = getExecutor();
|
||||||
|
Assert.assertEquals(expectedCompletedTaskCount,
|
||||||
|
executor.getCompletedTaskCount());
|
||||||
|
Assert.assertEquals(0, executor.getActiveCount());
|
||||||
|
Assert.assertTrue(executor.isTerminated());
|
||||||
|
} else {
|
||||||
|
assert getExecutor() == null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -162,38 +162,6 @@
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
|
||||||
<test> <!-- TESTED -->
|
|
||||||
<description>help: help for get</description>
|
|
||||||
<test-commands>
|
|
||||||
<command>-help get</command>
|
|
||||||
</test-commands>
|
|
||||||
<cleanup-commands>
|
|
||||||
<!-- No cleanup -->
|
|
||||||
</cleanup-commands>
|
|
||||||
<comparators>
|
|
||||||
<comparator>
|
|
||||||
<type>RegexpComparator</type>
|
|
||||||
<expected-output>^-get( )*\[-f\]( )*\[-p\]( )*\[-ignoreCrc\]( )*\[-crc\]( )*<src> \.\.\. <localdst> :\s*</expected-output>
|
|
||||||
</comparator>
|
|
||||||
<comparator>
|
|
||||||
<type>RegexpComparator</type>
|
|
||||||
<expected-output>\s*Copy files that match the file pattern <src> to the local name. <src> is kept.\s*</expected-output>
|
|
||||||
</comparator>
|
|
||||||
<comparator>
|
|
||||||
<type>RegexpComparator</type>
|
|
||||||
<expected-output>^( |\t)*When copying multiple files, the destination must be a directory. Passing -f( )*</expected-output>
|
|
||||||
</comparator>
|
|
||||||
<comparator>
|
|
||||||
<type>RegexpComparator</type>
|
|
||||||
<expected-output>^( |\t)*overwrites the destination if it already exists and -p preserves access and( )*</expected-output>
|
|
||||||
</comparator>
|
|
||||||
<comparator>
|
|
||||||
<type>RegexpComparator</type>
|
|
||||||
<expected-output>^( |\t)*modification times, ownership and the mode.*</expected-output>
|
|
||||||
</comparator>
|
|
||||||
</comparators>
|
|
||||||
</test>
|
|
||||||
|
|
||||||
<test> <!-- TESTED -->
|
<test> <!-- TESTED -->
|
||||||
<description>help: help for du</description>
|
<description>help: help for du</description>
|
||||||
<test-commands>
|
<test-commands>
|
||||||
|
@ -498,7 +466,7 @@
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] \[-q <threadPool queue size>\] <localsrc> \.\.\. <dst> :\s*</expected-output>
|
<expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] \[-q <thread pool queue size>\] <localsrc> \.\.\. <dst> :\s*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
|
@ -515,35 +483,39 @@
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^\s*-p Preserves timestamps, ownership and the mode.( )*</expected-output>
|
<expected-output>^\s*-p\s+Preserves timestamps, ownership and the mode.( )*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output>
|
<expected-output>^\s*-f\s+Overwrites the destination if it already exists.( )*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^\s*-t <thread count> Number of threads to be used, default is 1.( )*</expected-output>
|
<expected-output>^\s*-t <thread count>\s+Number of threads to be used, default is 1.( )*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^\s*-q <threadPool size> ThreadPool queue size to be used, default is 1024.( )*</expected-output>
|
<expected-output>^\s*-q <thread pool queue size>\s+Thread pool queue size to be used, default is( )*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output>
|
<expected-output>^( |\t)*1024.\s*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^\s*replication factor of 1. This flag will result in reduced( )*</expected-output>
|
<expected-output>^\s*-l\s+Allow DataNode to lazily persist the file to disk.( )*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^\s*durability. Use with care.( )*</expected-output>
|
<expected-output>^\s*Forces replication factor of 1. This flag will( )*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^\s*-d Skip creation of temporary file\(<dst>\._COPYING_\).( )*</expected-output>
|
<expected-output>^\s*result in reduced durability. Use with care.( )*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^\s*-d\s+Skip creation of temporary file\(<dst>\._COPYING_\).( )*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
@ -558,7 +530,7 @@
|
||||||
<comparators>
|
<comparators>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] \[-q <threadPool queue size>\] <localsrc> \.\.\. <dst> :\s*</expected-output>
|
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] \[-q <thread pool queue size>\] <localsrc> \.\.\. <dst> :\s*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
|
@ -600,7 +572,7 @@
|
||||||
<comparators>
|
<comparators>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^-get( )*\[-f\]( )*\[-p\]( )*\[-ignoreCrc\]( )*\[-crc\]( )*<src> \.\.\. <localdst> :\s*</expected-output>
|
<expected-output>^-get \[-f\] \[-p\] \[-crc\] \[-ignoreCrc\] \[-t <thread count>\] \[-q <thread pool queue size>\] <src> \.\.\. <localdst> :\s*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
|
@ -608,15 +580,39 @@
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^( |\t)*When copying multiple files, the destination must be a directory. Passing -f( )*</expected-output>
|
<expected-output>^( |\t)*When copying multiple files, the destination must be a directory.( )*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^( |\t)*overwrites the destination if it already exists and -p preserves access and( )*</expected-output>
|
<expected-output>^( |\t)*Flags:\s*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^( |\t)*modification times, ownership and the mode.*</expected-output>
|
<expected-output>^( |\t)*-p\s+Preserves timestamps, ownership and the mode.\s*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^( |\t)*-f\s+Overwrites the destination if it already exists.\s*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^( |\t)*-crc\s+ write CRC checksums for the files downloaded.\s*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^( |\t)*-ignoreCrc\s+ Skip CRC checks on the file\(s\) downloaded.\s*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^( |\t)*-t <thread count>\s+Number of threads to be used, default is 1.\s*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^( |\t)*-q <thread pool queue size>\s+Thread pool queue size to be used, default is\s*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^( |\t)*1024.\s*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
@ -723,7 +719,7 @@
|
||||||
<comparators>
|
<comparators>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^-copyToLocal \[-f\] \[-p\] \[-ignoreCrc\] \[-crc\] <src> \.\.\. <localdst> :\s*</expected-output>
|
<expected-output>^-copyToLocal \[-f\] \[-p\] \[-crc\] \[-ignoreCrc\] \[-t <thread count>\] \[-q <thread pool queue size>\] <src> \.\.\. <localdst> :\s*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
|
|
Loading…
Reference in New Issue