HADOOP-15281. Distcp to add no-rename copy option.

Contributed by Andrew Olson.

(cherry picked from commit de804e53b9)
This commit is contained in:
Andrew Olson 2019-02-07 10:09:55 +00:00 committed by Steve Loughran
parent c25b641cec
commit d2765ffc2e
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
11 changed files with 191 additions and 23 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.contract.s3a;
import java.io.FileNotFoundException;
import java.io.IOException;
import static org.apache.hadoop.fs.s3a.Constants.*;
@ -26,6 +27,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.s3a.FailureInjectionPolicy;
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
@ -74,4 +76,35 @@ public class ITestS3AContractDistCp extends AbstractContractDistCpTest {
Path path = super.path(filepath);
return new Path(path, FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING);
}
@Override
public void testDirectWrite() throws Exception {
resetStorageStatistics();
super.testDirectWrite();
assertEquals("Expected no renames for a direct write distcp", 0L,
getRenameOperationCount());
}
@Override
public void testNonDirectWrite() throws Exception {
resetStorageStatistics();
try {
super.testNonDirectWrite();
} catch (FileNotFoundException e) {
// We may get this exception when data is written to a DELAY_LISTING_ME
// directory causing verification of the distcp success to fail if
// S3Guard is not enabled
}
assertEquals("Expected 2 renames for a non-direct write distcp", 2L,
getRenameOperationCount());
}
private void resetStorageStatistics() {
getFileSystem().getStorageStatistics().reset();
}
private long getRenameOperationCount() {
return getFileSystem().getStorageStatistics()
.getLong(StorageStatistics.CommonStatisticNames.OP_RENAME);
}
}

View File

@ -85,7 +85,8 @@ public final class DistCpConstants {
"distcp.dynamic.min.records_per_chunk";
public static final String CONF_LABEL_SPLIT_RATIO =
"distcp.dynamic.split.ratio";
public static final String CONF_LABEL_DIRECT_WRITE = "distcp.direct.write";
/* Total bytes to be copied. Updated by copylisting. Unfiltered count */
public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected";

View File

@ -179,6 +179,10 @@ public class DistCpContext {
return options.getCopyBufferSize();
}
public boolean shouldDirectWrite() {
return options.shouldDirectWrite();
}
public void setTargetPathExists(boolean targetPathExists) {
this.targetPathExists = targetPathExists;
}

View File

@ -223,7 +223,19 @@ public enum DistCpOptionSwitch {
*/
FILTERS(DistCpConstants.CONF_LABEL_FILTERS_FILE,
new Option("filters", true, "The path to a file containing a list of"
+ " strings for paths to be excluded from the copy."));
+ " strings for paths to be excluded from the copy.")),
/**
* Write directly to the final location, avoiding the creation and rename
* of temporary files.
* This is typically useful in cases where the target filesystem
* implementation does not support atomic rename operations, such as with
* the S3AFileSystem which translates file renames to potentially very
* expensive copy-then-delete operations.
*/
DIRECT_WRITE(DistCpConstants.CONF_LABEL_DIRECT_WRITE,
new Option("direct", false, "Write files directly to the"
+ " target location, avoiding temporary file rename."));
public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct";

View File

@ -155,6 +155,9 @@ public final class DistCpOptions {
private final int copyBufferSize;
/** Whether data should be written directly to the target paths. */
private final boolean directWrite;
/**
* File attributes for preserve.
*
@ -216,6 +219,8 @@ public final class DistCpOptions {
this.copyBufferSize = builder.copyBufferSize;
this.verboseLog = builder.verboseLog;
this.trackPath = builder.trackPath;
this.directWrite = builder.directWrite;
}
public Path getSourceFileListing() {
@ -343,6 +348,10 @@ public final class DistCpOptions {
return trackPath;
}
public boolean shouldDirectWrite() {
return directWrite;
}
/**
* Add options to configuration. These will be used in the Mapper/committer
*
@ -391,6 +400,8 @@ public final class DistCpOptions {
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.NUM_LISTSTATUS_THREADS,
Integer.toString(numListstatusThreads));
}
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIRECT_WRITE,
String.valueOf(directWrite));
}
/**
@ -427,6 +438,7 @@ public final class DistCpOptions {
", blocksPerChunk=" + blocksPerChunk +
", copyBufferSize=" + copyBufferSize +
", verboseLog=" + verboseLog +
", directWrite=" + directWrite +
'}';
}
@ -476,6 +488,8 @@ public final class DistCpOptions {
private int copyBufferSize =
DistCpConstants.COPY_BUFFER_SIZE_DEFAULT;
private boolean directWrite = false;
public Builder(List<Path> sourcePaths, Path targetPath) {
Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(),
"Source paths should not be null or empty!");
@ -728,6 +742,11 @@ public final class DistCpOptions {
this.verboseLog = newVerboseLog;
return this;
}
public Builder withDirectWrite(boolean newDirectWrite) {
this.directWrite = newDirectWrite;
return this;
}
}
}

View File

@ -113,7 +113,9 @@ public class OptionsParser {
.withBlocking(
!command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch()))
.withVerboseLog(
command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch()));
command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch()))
.withDirectWrite(
command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch()));
if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
String[] snapshots = getVals(command,

View File

@ -84,6 +84,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
private boolean overWrite = false;
private boolean append = false;
private boolean verboseLog = false;
private boolean directWrite = false;
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
private FileSystem targetFS = null;
@ -111,6 +112,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
DistCpOptionSwitch.VERBOSE_LOG.getConfigLabel(), false);
preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
PRESERVE_STATUS.getConfigLabel()));
directWrite = conf.getBoolean(
DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false);
targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path targetFinalPath = new Path(conf.get(
@ -253,7 +256,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
long bytesCopied;
try {
bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
action).execute(sourceFileStatus, target, context, fileAttributes);
action, directWrite).execute(sourceFileStatus, target, context,
fileAttributes);
} catch (Exception e) {
context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
throw new IOException("File copy failed: " + sourceFileStatus.getPath() +

View File

@ -55,6 +55,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
private boolean skipCrc = false;
private boolean directWrite = false;
private FileAction action;
/**
@ -79,6 +80,21 @@ public class RetriableFileCopyCommand extends RetriableCommand {
this.skipCrc = skipCrc;
}
/**
* Create a RetriableFileCopyCommand.
*
* @param skipCrc Whether to skip the crc check.
* @param description A verbose description of the copy operation.
* @param action We should overwrite the target file or append new data to it.
* @param directWrite Whether to write directly to the target path, avoiding a
* temporary file rename.
*/
public RetriableFileCopyCommand(boolean skipCrc, String description,
FileAction action, boolean directWrite) {
this(skipCrc, description, action);
this.directWrite = directWrite;
}
/**
* Implementation of RetriableCommand::doExecute().
* This is the actual copy-implementation.
@ -102,16 +118,19 @@ public class RetriableFileCopyCommand extends RetriableCommand {
private long doCopy(CopyListingFileStatus source, Path target,
Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
throws IOException {
LOG.info("Copying {} to {}", source.getPath(), target);
final boolean toAppend = action == FileAction.APPEND;
Path targetPath = toAppend ? target : getTmpFile(target, context);
final boolean useTempTarget = !toAppend && !directWrite;
Path targetPath = useTempTarget ? getTempFile(target, context) : target;
LOG.info("Writing to {} target file path {}", useTempTarget ? "temporary"
: "direct", targetPath);
final Configuration configuration = context.getConfiguration();
FileSystem targetFS = target.getFileSystem(configuration);
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Copying " + source.getPath() + " to " + target);
LOG.debug("Target file path: " + targetPath);
}
final Path sourcePath = source.getPath();
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
final FileChecksum sourceChecksum = fileAttributes
@ -134,17 +153,20 @@ public class RetriableFileCopyCommand extends RetriableCommand {
targetFS, targetPath);
}
}
// it's not append case, thus we first write to a temporary file, rename
// it to the target path.
if (!toAppend) {
// it's not append or direct write (preferred for s3a) case, thus we first
// write to a temporary file, then rename it to the target path.
if (useTempTarget) {
LOG.info("Renaming temporary target file path {} to {}", targetPath,
target);
promoteTmpToTarget(targetPath, target, targetFS);
}
LOG.info("Completed writing {} ({} bytes)", target, bytesRead);
return bytesRead;
} finally {
// note that for append case, it is possible that we append partial data
// and then fail. In that case, for the next retry, we either reuse the
// partial appended data if it is good or we overwrite the whole file
if (!toAppend) {
if (useTempTarget) {
targetFS.delete(targetPath, false);
}
}
@ -252,14 +274,16 @@ public class RetriableFileCopyCommand extends RetriableCommand {
}
}
private Path getTmpFile(Path target, Mapper.Context context) {
private Path getTempFile(Path target, Mapper.Context context) {
Path targetWorkPath = new Path(context.getConfiguration().
get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath;
LOG.info("Creating temp file: " +
new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()));
return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
Path root = target.equals(targetWorkPath) ? targetWorkPath.getParent()
: targetWorkPath;
Path tempFile = new Path(root, ".distcp.tmp." +
context.getTaskAttemptID().toString());
LOG.info("Creating temp file: {}", tempFile);
return tempFile;
}
@VisibleForTesting

View File

@ -241,6 +241,7 @@ Flag | Description | Notes
`-blocksperchunk <blocksperchunk>` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `<blocksperchunk>` blocks to be transferred in parallel, and reassembled on the destination. By default, `<blocksperchunk>` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. |
`-copybuffersize <copybuffersize>` | Size of the copy buffer to use. By default, `<copybuffersize>` is set to 8192B |
`-xtrack <path>` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option.
`-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store
Architecture of DistCp
----------------------
@ -455,7 +456,7 @@ configuration, or be otherwise available in all cluster hosts.
DistCp can be used to upload data
```bash
hadoop distcp hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1
hadoop distcp -direct hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1
```
To download data
@ -535,6 +536,9 @@ rely on disk buffering.
Copies each byte down to the Hadoop worker nodes and back to the
bucket. As well as being slow, it means that charges may be incurred.
* The `-direct` option can be used to write to object store target paths directly,
avoiding the potentially very expensive temporary file rename operations that would
otherwise occur.
Frequently Asked Questions
--------------------------

View File

@ -287,8 +287,9 @@ public class TestDistCpOptions {
"skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " +
"mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " +
"atomicWorkPath=null, logPath=null, sourceFileListing=abc, " +
"sourcePaths=null, targetPath=xyz, filtersFile='null'," +
" blocksPerChunk=0, copyBufferSize=8192, verboseLog=false}";
"sourcePaths=null, targetPath=xyz, filtersFile='null', " +
"blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, " +
"directWrite=false}";
String optionString = option.toString();
Assert.assertEquals(val, optionString);
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),

View File

@ -552,7 +552,7 @@ public abstract class AbstractContractDistCpTest
/**
* Run the distcp job.
* @param optons distcp options
* @param options distcp options
* @return the job. It will have already completed.
* @throws Exception failure
*/
@ -586,4 +586,68 @@ public abstract class AbstractContractDistCpTest
private static void mkdirs(FileSystem fs, Path dir) throws Exception {
assertTrue("Failed to mkdir " + dir, fs.mkdirs(dir));
}
}
@Test
public void testDirectWrite() throws Exception {
describe("copy file from local to remote using direct write option");
directWrite(localFS, localDir, remoteFS, remoteDir, true);
}
@Test
public void testNonDirectWrite() throws Exception {
describe("copy file from local to remote without using direct write " +
"option");
directWrite(localFS, localDir, remoteFS, remoteDir, false);
}
/**
* Executes a test with support for using direct write option.
*
* @param srcFS source FileSystem
* @param srcDir source directory
* @param dstFS destination FileSystem
* @param dstDir destination directory
* @param directWrite whether to use -directwrite option
* @throws Exception if there is a failure
*/
private void directWrite(FileSystem srcFS, Path srcDir, FileSystem dstFS,
Path dstDir, boolean directWrite) throws Exception {
initPathFields(srcDir, dstDir);
// Create 2 test files
mkdirs(srcFS, inputSubDir1);
byte[] data1 = dataset(64, 33, 43);
createFile(srcFS, inputFile1, true, data1);
byte[] data2 = dataset(200, 43, 53);
createFile(srcFS, inputFile2, true, data2);
Path target = new Path(dstDir, "outputDir");
if (directWrite) {
runDistCpDirectWrite(inputDir, target);
} else {
runDistCp(inputDir, target);
}
ContractTestUtils.assertIsDirectory(dstFS, target);
lsR("Destination tree after distcp", dstFS, target);
// Verify copied file contents
verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
verifyFileContents(dstFS, new Path(target, "inputDir/subDir1/file2"),
data2);
}
/**
* Run distcp -direct srcDir destDir.
* @param srcDir local source directory
* @param destDir remote destination directory
* @return the completed job
* @throws Exception any failure.
*/
private Job runDistCpDirectWrite(final Path srcDir, final Path destDir)
throws Exception {
describe("\nDistcp -direct from " + srcDir + " to " + destDir);
return runDistCp(buildWithStandardOptions(
new DistCpOptions.Builder(
Collections.singletonList(srcDir), destDir)
.withDirectWrite(true)));
}
}