diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java index b3d511ed6af..740f256b62e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.contract.s3a; +import java.io.FileNotFoundException; import java.io.IOException; import static org.apache.hadoop.fs.s3a.Constants.*; @@ -26,6 +27,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.s3a.FailureInjectionPolicy; import org.apache.hadoop.tools.contract.AbstractContractDistCpTest; @@ -74,4 +76,35 @@ public class ITestS3AContractDistCp extends AbstractContractDistCpTest { Path path = super.path(filepath); return new Path(path, FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING); } + + @Override + public void testDirectWrite() throws Exception { + resetStorageStatistics(); + super.testDirectWrite(); + assertEquals("Expected no renames for a direct write distcp", 0L, + getRenameOperationCount()); + } + + @Override + public void testNonDirectWrite() throws Exception { + resetStorageStatistics(); + try { + super.testNonDirectWrite(); + } catch (FileNotFoundException e) { + // We may get this exception when data is written to a DELAY_LISTING_ME + // directory causing verification of the distcp success to fail if + // S3Guard is not enabled + } + assertEquals("Expected 2 renames for a non-direct write distcp", 2L, + getRenameOperationCount()); + } + + private void resetStorageStatistics() { + getFileSystem().getStorageStatistics().reset(); + } + + private long getRenameOperationCount() { + return getFileSystem().getStorageStatistics() + .getLong(StorageStatistics.CommonStatisticNames.OP_RENAME); + } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 494609144b2..e20f20626a6 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -85,7 +85,8 @@ public final class DistCpConstants { "distcp.dynamic.min.records_per_chunk"; public static final String CONF_LABEL_SPLIT_RATIO = "distcp.dynamic.split.ratio"; - + public static final String CONF_LABEL_DIRECT_WRITE = "distcp.direct.write"; + /* Total bytes to be copied. Updated by copylisting. Unfiltered count */ public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected"; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java index fc047cadadc..1e63d802e87 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java @@ -179,6 +179,10 @@ public class DistCpContext { return options.getCopyBufferSize(); } + public boolean shouldDirectWrite() { + return options.shouldDirectWrite(); + } + public void setTargetPathExists(boolean targetPathExists) { this.targetPathExists = targetPathExists; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index e57e413de33..49ffc59344e 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -223,7 +223,19 @@ public enum DistCpOptionSwitch { */ FILTERS(DistCpConstants.CONF_LABEL_FILTERS_FILE, new Option("filters", true, "The path to a file containing a list of" - + " strings for paths to be excluded from the copy.")); + + " strings for paths to be excluded from the copy.")), + + /** + * Write directly to the final location, avoiding the creation and rename + * of temporary files. + * This is typically useful in cases where the target filesystem + * implementation does not support atomic rename operations, such as with + * the S3AFileSystem which translates file renames to potentially very + * expensive copy-then-delete operations. + */ + DIRECT_WRITE(DistCpConstants.CONF_LABEL_DIRECT_WRITE, + new Option("direct", false, "Write files directly to the" + + " target location, avoiding temporary file rename.")); public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct"; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index aca5d0e414d..4a6552fed6b 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -155,6 +155,9 @@ public final class DistCpOptions { private final int copyBufferSize; + /** Whether data should be written directly to the target paths. */ + private final boolean directWrite; + /** * File attributes for preserve. * @@ -216,6 +219,8 @@ public final class DistCpOptions { this.copyBufferSize = builder.copyBufferSize; this.verboseLog = builder.verboseLog; this.trackPath = builder.trackPath; + + this.directWrite = builder.directWrite; } public Path getSourceFileListing() { @@ -343,6 +348,10 @@ public final class DistCpOptions { return trackPath; } + public boolean shouldDirectWrite() { + return directWrite; + } + /** * Add options to configuration. These will be used in the Mapper/committer * @@ -391,6 +400,8 @@ public final class DistCpOptions { DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.NUM_LISTSTATUS_THREADS, Integer.toString(numListstatusThreads)); } + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIRECT_WRITE, + String.valueOf(directWrite)); } /** @@ -427,6 +438,7 @@ public final class DistCpOptions { ", blocksPerChunk=" + blocksPerChunk + ", copyBufferSize=" + copyBufferSize + ", verboseLog=" + verboseLog + + ", directWrite=" + directWrite + '}'; } @@ -476,6 +488,8 @@ public final class DistCpOptions { private int copyBufferSize = DistCpConstants.COPY_BUFFER_SIZE_DEFAULT; + private boolean directWrite = false; + public Builder(List sourcePaths, Path targetPath) { Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(), "Source paths should not be null or empty!"); @@ -728,6 +742,11 @@ public final class DistCpOptions { this.verboseLog = newVerboseLog; return this; } + + public Builder withDirectWrite(boolean newDirectWrite) { + this.directWrite = newDirectWrite; + return this; + } } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 83c6ff3e40a..3b9d13b3b03 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -113,7 +113,9 @@ public class OptionsParser { .withBlocking( !command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch())) .withVerboseLog( - command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch())); + command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch())) + .withDirectWrite( + command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch())); if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) { String[] snapshots = getVals(command, diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index 63a61b861f9..336779eef23 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -84,6 +84,7 @@ public class CopyMapper extends Mapper private boolean overWrite = false; private boolean append = false; private boolean verboseLog = false; + private boolean directWrite = false; private EnumSet preserve = EnumSet.noneOf(FileAttribute.class); private FileSystem targetFS = null; @@ -111,6 +112,8 @@ public class CopyMapper extends Mapper DistCpOptionSwitch.VERBOSE_LOG.getConfigLabel(), false); preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch. PRESERVE_STATUS.getConfigLabel())); + directWrite = conf.getBoolean( + DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false); targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); Path targetFinalPath = new Path(conf.get( @@ -253,7 +256,8 @@ public class CopyMapper extends Mapper long bytesCopied; try { bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description, - action).execute(sourceFileStatus, target, context, fileAttributes); + action, directWrite).execute(sourceFileStatus, target, context, + fileAttributes); } catch (Exception e) { context.setStatus("Copy Failure: " + sourceFileStatus.getPath()); throw new IOException("File copy failed: " + sourceFileStatus.getPath() + diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 51579bc437c..db21f64d72a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -55,6 +55,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { private static Logger LOG = LoggerFactory.getLogger(RetriableFileCopyCommand.class); private boolean skipCrc = false; + private boolean directWrite = false; private FileAction action; /** @@ -79,6 +80,21 @@ public class RetriableFileCopyCommand extends RetriableCommand { this.skipCrc = skipCrc; } + /** + * Create a RetriableFileCopyCommand. + * + * @param skipCrc Whether to skip the crc check. + * @param description A verbose description of the copy operation. + * @param action We should overwrite the target file or append new data to it. + * @param directWrite Whether to write directly to the target path, avoiding a + * temporary file rename. + */ + public RetriableFileCopyCommand(boolean skipCrc, String description, + FileAction action, boolean directWrite) { + this(skipCrc, description, action); + this.directWrite = directWrite; + } + /** * Implementation of RetriableCommand::doExecute(). * This is the actual copy-implementation. @@ -102,16 +118,19 @@ public class RetriableFileCopyCommand extends RetriableCommand { private long doCopy(CopyListingFileStatus source, Path target, Mapper.Context context, EnumSet fileAttributes) throws IOException { + LOG.info("Copying {} to {}", source.getPath(), target); + final boolean toAppend = action == FileAction.APPEND; - Path targetPath = toAppend ? target : getTmpFile(target, context); + final boolean useTempTarget = !toAppend && !directWrite; + Path targetPath = useTempTarget ? getTempFile(target, context) : target; + + LOG.info("Writing to {} target file path {}", useTempTarget ? "temporary" + : "direct", targetPath); + final Configuration configuration = context.getConfiguration(); FileSystem targetFS = target.getFileSystem(configuration); try { - if (LOG.isDebugEnabled()) { - LOG.debug("Copying " + source.getPath() + " to " + target); - LOG.debug("Target file path: " + targetPath); - } final Path sourcePath = source.getPath(); final FileSystem sourceFS = sourcePath.getFileSystem(configuration); final FileChecksum sourceChecksum = fileAttributes @@ -134,17 +153,20 @@ public class RetriableFileCopyCommand extends RetriableCommand { targetFS, targetPath); } } - // it's not append case, thus we first write to a temporary file, rename - // it to the target path. - if (!toAppend) { + // it's not append or direct write (preferred for s3a) case, thus we first + // write to a temporary file, then rename it to the target path. + if (useTempTarget) { + LOG.info("Renaming temporary target file path {} to {}", targetPath, + target); promoteTmpToTarget(targetPath, target, targetFS); } + LOG.info("Completed writing {} ({} bytes)", target, bytesRead); return bytesRead; } finally { // note that for append case, it is possible that we append partial data // and then fail. In that case, for the next retry, we either reuse the // partial appended data if it is good or we overwrite the whole file - if (!toAppend) { + if (useTempTarget) { targetFS.delete(targetPath, false); } } @@ -252,14 +274,16 @@ public class RetriableFileCopyCommand extends RetriableCommand { } } - private Path getTmpFile(Path target, Mapper.Context context) { + private Path getTempFile(Path target, Mapper.Context context) { Path targetWorkPath = new Path(context.getConfiguration(). get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); - Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath; - LOG.info("Creating temp file: " + - new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString())); - return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()); + Path root = target.equals(targetWorkPath) ? targetWorkPath.getParent() + : targetWorkPath; + Path tempFile = new Path(root, ".distcp.tmp." + + context.getTaskAttemptID().toString()); + LOG.info("Creating temp file: {}", tempFile); + return tempFile; } @VisibleForTesting diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm index b855422d968..25ea7e28fe9 100644 --- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm +++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm @@ -241,6 +241,7 @@ Flag | Description | Notes `-blocksperchunk ` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `` blocks to be transferred in parallel, and reassembled on the destination. By default, `` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. | `-copybuffersize ` | Size of the copy buffer to use. By default, `` is set to 8192B | `-xtrack ` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option. +`-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store Architecture of DistCp ---------------------- @@ -455,7 +456,7 @@ configuration, or be otherwise available in all cluster hosts. DistCp can be used to upload data ```bash -hadoop distcp hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1 +hadoop distcp -direct hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1 ``` To download data @@ -535,6 +536,9 @@ rely on disk buffering. Copies each byte down to the Hadoop worker nodes and back to the bucket. As well as being slow, it means that charges may be incurred. +* The `-direct` option can be used to write to object store target paths directly, +avoiding the potentially very expensive temporary file rename operations that would +otherwise occur. Frequently Asked Questions -------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java index 62a2e6d7514..7382795dd90 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java @@ -287,8 +287,9 @@ public class TestDistCpOptions { "skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " + "mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " + "atomicWorkPath=null, logPath=null, sourceFileListing=abc, " + - "sourcePaths=null, targetPath=xyz, filtersFile='null'," + - " blocksPerChunk=0, copyBufferSize=8192, verboseLog=false}"; + "sourcePaths=null, targetPath=xyz, filtersFile='null', " + + "blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, " + + "directWrite=false}"; String optionString = option.toString(); Assert.assertEquals(val, optionString); Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java index 0757a66223e..eeaf30a9299 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java @@ -552,7 +552,7 @@ public abstract class AbstractContractDistCpTest /** * Run the distcp job. - * @param optons distcp options + * @param options distcp options * @return the job. It will have already completed. * @throws Exception failure */ @@ -586,4 +586,68 @@ public abstract class AbstractContractDistCpTest private static void mkdirs(FileSystem fs, Path dir) throws Exception { assertTrue("Failed to mkdir " + dir, fs.mkdirs(dir)); } -} + + @Test + public void testDirectWrite() throws Exception { + describe("copy file from local to remote using direct write option"); + directWrite(localFS, localDir, remoteFS, remoteDir, true); + } + + @Test + public void testNonDirectWrite() throws Exception { + describe("copy file from local to remote without using direct write " + + "option"); + directWrite(localFS, localDir, remoteFS, remoteDir, false); + } + + /** + * Executes a test with support for using direct write option. + * + * @param srcFS source FileSystem + * @param srcDir source directory + * @param dstFS destination FileSystem + * @param dstDir destination directory + * @param directWrite whether to use -directwrite option + * @throws Exception if there is a failure + */ + private void directWrite(FileSystem srcFS, Path srcDir, FileSystem dstFS, + Path dstDir, boolean directWrite) throws Exception { + initPathFields(srcDir, dstDir); + + // Create 2 test files + mkdirs(srcFS, inputSubDir1); + byte[] data1 = dataset(64, 33, 43); + createFile(srcFS, inputFile1, true, data1); + byte[] data2 = dataset(200, 43, 53); + createFile(srcFS, inputFile2, true, data2); + Path target = new Path(dstDir, "outputDir"); + if (directWrite) { + runDistCpDirectWrite(inputDir, target); + } else { + runDistCp(inputDir, target); + } + ContractTestUtils.assertIsDirectory(dstFS, target); + lsR("Destination tree after distcp", dstFS, target); + + // Verify copied file contents + verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1); + verifyFileContents(dstFS, new Path(target, "inputDir/subDir1/file2"), + data2); + } + + /** + * Run distcp -direct srcDir destDir. + * @param srcDir local source directory + * @param destDir remote destination directory + * @return the completed job + * @throws Exception any failure. + */ + private Job runDistCpDirectWrite(final Path srcDir, final Path destDir) + throws Exception { + describe("\nDistcp -direct from " + srcDir + " to " + destDir); + return runDistCp(buildWithStandardOptions( + new DistCpOptions.Builder( + Collections.singletonList(srcDir), destDir) + .withDirectWrite(true))); + } +} \ No newline at end of file