HDFS-11786. Add support to make copyFromLocal multi threaded. Contributed by Mukul Kumar Singh.
This commit is contained in:
parent
b778887af5
commit
02b141ac60
|
@ -26,7 +26,11 @@ import java.net.URISyntaxException;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
@ -288,9 +292,113 @@ class CopyCommands {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class CopyFromLocal extends Put {
|
public static class CopyFromLocal extends Put {
|
||||||
|
private ThreadPoolExecutor executor = null;
|
||||||
|
private int numThreads = 1;
|
||||||
|
|
||||||
|
private static final int MAX_THREADS =
|
||||||
|
Runtime.getRuntime().availableProcessors() * 2;
|
||||||
public static final String NAME = "copyFromLocal";
|
public static final String NAME = "copyFromLocal";
|
||||||
public static final String USAGE = Put.USAGE;
|
public static final String USAGE =
|
||||||
public static final String DESCRIPTION = "Identical to the -put command.";
|
"[-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>";
|
||||||
|
public static final String DESCRIPTION =
|
||||||
|
"Copy files from the local file system " +
|
||||||
|
"into fs. Copying fails if the file already " +
|
||||||
|
"exists, unless the -f flag is given.\n" +
|
||||||
|
"Flags:\n" +
|
||||||
|
" -p : Preserves access and modification times, ownership and the" +
|
||||||
|
" mode.\n" +
|
||||||
|
" -f : Overwrites the destination if it already exists.\n" +
|
||||||
|
" -t <thread count> : Number of threads to be used, default is 1.\n" +
|
||||||
|
" -l : Allow DataNode to lazily persist the file to disk. Forces" +
|
||||||
|
" replication factor of 1. This flag will result in reduced" +
|
||||||
|
" durability. Use with care.\n" +
|
||||||
|
" -d : Skip creation of temporary file(<dst>._COPYING_).\n";
|
||||||
|
|
||||||
|
private void setNumberThreads(String numberThreadsString) {
|
||||||
|
if (numberThreadsString == null) {
|
||||||
|
numThreads = 1;
|
||||||
|
} else {
|
||||||
|
int parsedValue = Integer.parseInt(numberThreadsString);
|
||||||
|
if (parsedValue <= 1) {
|
||||||
|
numThreads = 1;
|
||||||
|
} else if (parsedValue > MAX_THREADS) {
|
||||||
|
numThreads = MAX_THREADS;
|
||||||
|
} else {
|
||||||
|
numThreads = parsedValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||||
|
CommandFormat cf =
|
||||||
|
new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
|
||||||
|
cf.addOptionWithValue("t");
|
||||||
|
cf.parse(args);
|
||||||
|
setNumberThreads(cf.getOptValue("t"));
|
||||||
|
setOverwrite(cf.getOpt("f"));
|
||||||
|
setPreserve(cf.getOpt("p"));
|
||||||
|
setLazyPersist(cf.getOpt("l"));
|
||||||
|
setDirectWrite(cf.getOpt("d"));
|
||||||
|
getRemoteDestination(args);
|
||||||
|
// should have a -r option
|
||||||
|
setRecursive(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void copyFile(PathData src, PathData target) throws IOException {
|
||||||
|
if (isPathRecursable(src)) {
|
||||||
|
throw new PathIsDirectoryException(src.toString());
|
||||||
|
}
|
||||||
|
super.copyFileToTarget(src, target);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void copyFileToTarget(PathData src, PathData target)
|
||||||
|
throws IOException {
|
||||||
|
// if number of thread is 1, mimic put and avoid threading overhead
|
||||||
|
if (numThreads == 1) {
|
||||||
|
copyFile(src, target);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Runnable task = () -> {
|
||||||
|
try {
|
||||||
|
copyFile(src, target);
|
||||||
|
} catch (IOException e) {
|
||||||
|
displayError(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
executor.submit(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processArguments(LinkedList<PathData> args)
|
||||||
|
throws IOException {
|
||||||
|
executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
|
||||||
|
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
|
||||||
|
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
|
super.processArguments(args);
|
||||||
|
|
||||||
|
// issue the command and then wait for it to finish
|
||||||
|
executor.shutdown();
|
||||||
|
try {
|
||||||
|
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
executor.shutdownNow();
|
||||||
|
displayError(e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getNumThreads() {
|
||||||
|
return numThreads;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public ThreadPoolExecutor getExecutor() {
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class CopyToLocal extends Get {
|
public static class CopyToLocal extends Get {
|
||||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.PathIOException;
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
import org.apache.hadoop.fs.PathExistsException;
|
import org.apache.hadoop.fs.PathExistsException;
|
||||||
import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
|
import org.apache.hadoop.fs.shell.CopyCommands.Put;
|
||||||
|
|
||||||
/** Various commands for moving files */
|
/** Various commands for moving files */
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -41,7 +41,7 @@ class MoveCommands {
|
||||||
/**
|
/**
|
||||||
* Move local files to a remote filesystem
|
* Move local files to a remote filesystem
|
||||||
*/
|
*/
|
||||||
public static class MoveFromLocal extends CopyFromLocal {
|
public static class MoveFromLocal extends Put {
|
||||||
public static final String NAME = "moveFromLocal";
|
public static final String NAME = "moveFromLocal";
|
||||||
public static final String USAGE = "<localsrc> ... <dst>";
|
public static final String USAGE = "<localsrc> ... <dst>";
|
||||||
public static final String DESCRIPTION =
|
public static final String DESCRIPTION =
|
||||||
|
|
|
@ -0,0 +1,173 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs.shell;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.RandomStringUtils;
|
||||||
|
import org.apache.commons.lang.math.RandomUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for copyFromLocal.
|
||||||
|
*/
|
||||||
|
public class TestCopyFromLocal {
|
||||||
|
private static final String FROM_DIR_NAME = "fromDir";
|
||||||
|
private static final String TO_DIR_NAME = "toDir";
|
||||||
|
|
||||||
|
private static FileSystem fs;
|
||||||
|
private static Path testDir;
|
||||||
|
private static Configuration conf;
|
||||||
|
|
||||||
|
public static int initialize(Path dir) throws Exception {
|
||||||
|
fs.mkdirs(dir);
|
||||||
|
Path fromDirPath = new Path(dir, FROM_DIR_NAME);
|
||||||
|
fs.mkdirs(fromDirPath);
|
||||||
|
Path toDirPath = new Path(dir, TO_DIR_NAME);
|
||||||
|
fs.mkdirs(toDirPath);
|
||||||
|
|
||||||
|
int numTotalFiles = 0;
|
||||||
|
int numDirs = RandomUtils.nextInt(5);
|
||||||
|
for (int dirCount = 0; dirCount < numDirs; ++dirCount) {
|
||||||
|
Path subDirPath = new Path(fromDirPath, "subdir" + dirCount);
|
||||||
|
fs.mkdirs(subDirPath);
|
||||||
|
int numFiles = RandomUtils.nextInt(10);
|
||||||
|
for (int fileCount = 0; fileCount < numFiles; ++fileCount) {
|
||||||
|
numTotalFiles++;
|
||||||
|
Path subFile = new Path(subDirPath, "file" + fileCount);
|
||||||
|
fs.createNewFile(subFile);
|
||||||
|
FSDataOutputStream output = fs.create(subFile, true);
|
||||||
|
for(int i = 0; i < 100; ++i) {
|
||||||
|
output.writeInt(i);
|
||||||
|
output.writeChar('\n');
|
||||||
|
}
|
||||||
|
output.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return numTotalFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void init() throws Exception {
|
||||||
|
conf = new Configuration(false);
|
||||||
|
conf.set("fs.file.impl", LocalFileSystem.class.getName());
|
||||||
|
fs = FileSystem.getLocal(conf);
|
||||||
|
testDir = new FileSystemTestHelper().getTestRootPath(fs);
|
||||||
|
// don't want scheme on the path, just an absolute path
|
||||||
|
testDir = new Path(fs.makeQualified(testDir).toUri().getPath());
|
||||||
|
|
||||||
|
FileSystem.setDefaultUri(conf, fs.getUri());
|
||||||
|
fs.setWorkingDirectory(testDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void cleanup() throws Exception {
|
||||||
|
fs.delete(testDir, true);
|
||||||
|
fs.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void run(CommandWithDestination cmd, String... args) {
|
||||||
|
cmd.setConf(conf);
|
||||||
|
assertEquals(0, cmd.run(args));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testCopyFromLocal() throws Exception {
|
||||||
|
Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
|
||||||
|
TestCopyFromLocal.initialize(dir);
|
||||||
|
run(new TestMultiThreadedCopy(1, 0),
|
||||||
|
new Path(dir, FROM_DIR_NAME).toString(),
|
||||||
|
new Path(dir, TO_DIR_NAME).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testCopyFromLocalWithThreads() throws Exception {
|
||||||
|
Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
|
||||||
|
int numFiles = TestCopyFromLocal.initialize(dir);
|
||||||
|
int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
|
||||||
|
int randThreads = RandomUtils.nextInt(maxThreads);
|
||||||
|
int numActualThreads = randThreads == 0 ? 1 : randThreads;
|
||||||
|
String numThreads = Integer.toString(numActualThreads);
|
||||||
|
run(new TestMultiThreadedCopy(numActualThreads, numFiles), "-t", numThreads,
|
||||||
|
new Path(dir, FROM_DIR_NAME).toString(),
|
||||||
|
new Path(dir, TO_DIR_NAME).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testCopyFromLocalWithThreadWrong() throws Exception {
|
||||||
|
Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
|
||||||
|
int numFiles = TestCopyFromLocal.initialize(dir);
|
||||||
|
int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
|
||||||
|
String numThreads = Integer.toString(maxThreads * 2);
|
||||||
|
run(new TestMultiThreadedCopy(maxThreads, numFiles), "-t", numThreads,
|
||||||
|
new Path(dir, FROM_DIR_NAME).toString(),
|
||||||
|
new Path(dir, TO_DIR_NAME).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testCopyFromLocalWithZeroThreads() throws Exception {
|
||||||
|
Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
|
||||||
|
TestCopyFromLocal.initialize(dir);
|
||||||
|
run(new TestMultiThreadedCopy(1, 0), "-t", "0",
|
||||||
|
new Path(dir, FROM_DIR_NAME).toString(),
|
||||||
|
new Path(dir, TO_DIR_NAME).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
private class TestMultiThreadedCopy extends CopyFromLocal {
|
||||||
|
private int expectedThreads;
|
||||||
|
private int expectedCompletedTaskCount;
|
||||||
|
|
||||||
|
TestMultiThreadedCopy(int expectedThreads,
|
||||||
|
int expectedCompletedTaskCount) {
|
||||||
|
this.expectedThreads = expectedThreads;
|
||||||
|
this.expectedCompletedTaskCount = expectedCompletedTaskCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processArguments(LinkedList<PathData> args)
|
||||||
|
throws IOException {
|
||||||
|
// Check if the correct number of threads are spawned
|
||||||
|
Assert.assertEquals(expectedThreads, getNumThreads());
|
||||||
|
super.processArguments(args);
|
||||||
|
// Once the copy is complete, check following
|
||||||
|
// 1) number of completed tasks are same as expected
|
||||||
|
// 2) There are no active tasks in the executor
|
||||||
|
// 3) Executor has shutdown correctly
|
||||||
|
ThreadPoolExecutor executor = getExecutor();
|
||||||
|
Assert.assertEquals(executor.getCompletedTaskCount(),
|
||||||
|
expectedCompletedTaskCount);
|
||||||
|
Assert.assertEquals(executor.getActiveCount(), 0);
|
||||||
|
Assert.assertTrue(executor.isTerminated());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.shell.CopyCommands.Cp;
|
import org.apache.hadoop.fs.shell.CopyCommands.Cp;
|
||||||
import org.apache.hadoop.fs.shell.CopyCommands.Get;
|
import org.apache.hadoop.fs.shell.CopyCommands.Get;
|
||||||
import org.apache.hadoop.fs.shell.CopyCommands.Put;
|
import org.apache.hadoop.fs.shell.CopyCommands.Put;
|
||||||
|
import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -119,6 +120,24 @@ public class TestCopyPreserveFlag {
|
||||||
assertAttributesChanged(TO);
|
assertAttributesChanged(TO);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testCopyFromLocal() throws Exception {
|
||||||
|
run(new CopyFromLocal(), FROM.toString(), TO.toString());
|
||||||
|
assertAttributesChanged(TO);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testCopyFromLocalWithThreads() throws Exception {
|
||||||
|
run(new CopyFromLocal(), "-t", "10", FROM.toString(), TO.toString());
|
||||||
|
assertAttributesChanged(TO);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testCopyFromLocalWithThreadsPreserve() throws Exception {
|
||||||
|
run(new CopyFromLocal(), "-p", "-t", "10", FROM.toString(), TO.toString());
|
||||||
|
assertAttributesPreserved(TO);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testGetWithP() throws Exception {
|
public void testGetWithP() throws Exception {
|
||||||
run(new Get(), "-p", FROM.toString(), TO.toString());
|
run(new Get(), "-p", FROM.toString(), TO.toString());
|
||||||
|
|
|
@ -547,11 +547,51 @@
|
||||||
<comparators>
|
<comparators>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] <localsrc> \.\.\. <dst> :\s*</expected-output>
|
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] <localsrc> \.\.\. <dst> :\s*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^\s*Identical to the -put command\.\s*</expected-output>
|
<expected-output>^\s*Copy files from the local file system into fs.( )*Copying fails if the file already( )*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^\s*exists, unless the -f flag is given.( )*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^\s*Flags:( )*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^\s*-p Preserves access and modification times, ownership and the( )*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^\s*mode.( )*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^\s*-t <thread count> Number of threads to be used, default is 1.( )*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^\s*replication factor of 1. This flag will result in reduced( )*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^\s*durability. Use with care.( )*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^\s*-d Skip creation of temporary file\(<dst>\._COPYING_\).( )*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
|
Loading…
Reference in New Issue