From 49dc5472529ecf58cadfd041b378b9f99b4b979f Mon Sep 17 00:00:00 2001 From: Yongjun Zhang Date: Fri, 14 Apr 2017 10:14:02 -0700 Subject: [PATCH] HADOOP-11794. Enable distcp to copy blocks in parallel. Contributed by Yongjun Zhang, Wei-Chiu Chuang, Xiao Chen, Rosie Li. --- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 22 +- .../org/apache/hadoop/tools/CopyListing.java | 37 +- .../hadoop/tools/CopyListingFileStatus.java | 87 ++++- .../java/org/apache/hadoop/tools/DistCp.java | 52 +++ .../hadoop/tools/DistCpOptionSwitch.java | 10 + .../apache/hadoop/tools/DistCpOptions.java | 22 +- .../apache/hadoop/tools/OptionsParser.java | 36 +- .../hadoop/tools/SimpleCopyListing.java | 83 ++-- .../hadoop/tools/mapred/CopyCommitter.java | 174 ++++++++- .../hadoop/tools/mapred/CopyMapper.java | 40 +- .../mapred/RetriableFileCopyCommand.java | 26 +- .../tools/mapred/UniformSizeInputFormat.java | 5 +- .../apache/hadoop/tools/util/DistCpUtils.java | 111 +++++- .../src/site/markdown/DistCp.md.vm | 1 + .../hadoop/tools/TestDistCpOptions.java | 2 +- .../apache/hadoop/tools/TestDistCpSystem.java | 368 ++++++++++++++++-- .../hadoop/tools/TestOptionsParser.java | 2 +- .../tools/mapred/TestCopyCommitter.java | 5 +- 18 files changed, 972 insertions(+), 111 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 033b81db736..087d8f4ab07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -841,7 +841,27 @@ public class DFSTestUtil { out.write(toAppend); } } - + + /** + * Append specified length of bytes to a given file, starting with new block. + * @param fs The file system + * @param p Path of the file to append + * @param length Length of bytes to append to the file + * @throws IOException + */ + public static void appendFileNewBlock(DistributedFileSystem fs, + Path p, int length) throws IOException { + assert fs.exists(p); + assert length >= 0; + byte[] toAppend = new byte[length]; + Random random = new Random(); + random.nextBytes(toAppend); + try (FSDataOutputStream out = fs.append(p, + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)) { + out.write(toAppend); + } + } + /** * @return url content as string (UTF-8 encoding assumed) */ diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java index 481aa61b0f1..9ebf9d29b3b 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java @@ -145,12 +145,22 @@ public abstract class CopyListing extends Configured { Configuration config = getConf(); FileSystem fs = pathToListFile.getFileSystem(config); - Path sortedList = DistCpUtils.sortListing(fs, config, pathToListFile); + final boolean splitLargeFile = options.splitLargeFile(); + + // When splitLargeFile is enabled, we don't randomize the copylist + // earlier, so we don't do the sorting here. For a file that has + // multiple entries due to split, we check here that their + // is continuous. + // + Path checkPath = splitLargeFile? + pathToListFile : DistCpUtils.sortListing(fs, config, pathToListFile); SequenceFile.Reader reader = new SequenceFile.Reader( - config, SequenceFile.Reader.file(sortedList)); + config, SequenceFile.Reader.file(checkPath)); try { Text lastKey = new Text("*"); //source relative path can never hold * + long lastChunkOffset = -1; + long lastChunkLength = -1; CopyListingFileStatus lastFileStatus = new CopyListingFileStatus(); Text currentKey = new Text(); @@ -161,8 +171,21 @@ public abstract class CopyListing extends Configured { if (currentKey.equals(lastKey)) { CopyListingFileStatus currentFileStatus = new CopyListingFileStatus(); reader.getCurrentValue(currentFileStatus); - throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " + - currentFileStatus.getPath() + " would cause duplicates. Aborting"); + if (!splitLargeFile) { + throw new DuplicateFileException("File " + lastFileStatus.getPath() + + " and " + currentFileStatus.getPath() + + " would cause duplicates. Aborting"); + } else { + if (lastChunkOffset + lastChunkLength != + currentFileStatus.getChunkOffset()) { + throw new InvalidInputException("File " + lastFileStatus.getPath() + + " " + lastChunkOffset + "," + lastChunkLength + + " and " + currentFileStatus.getPath() + + " " + currentFileStatus.getChunkOffset() + "," + + currentFileStatus.getChunkLength() + + " are not continuous. Aborting"); + } + } } reader.getCurrentValue(lastFileStatus); if (options.shouldPreserve(DistCpOptions.FileAttribute.ACL)) { @@ -181,8 +204,12 @@ public abstract class CopyListing extends Configured { xAttrSupportCheckFsSet.add(lastFsUri); } } - lastKey.set(currentKey); + lastKey.set(currentKey); + if (splitLargeFile) { + lastChunkOffset = lastFileStatus.getChunkOffset(); + lastChunkLength = lastFileStatus.getChunkLength(); + } if (options.shouldUseDiff() && LOG.isDebugEnabled()) { LOG.debug("Copy list entry " + idx + ": " + lastFileStatus.getPath().toUri().getPath()); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java index 2b1e7e4ce47..5395fa9d36f 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java @@ -74,6 +74,14 @@ public final class CopyListingFileStatus implements Writable { private List aclEntries; private Map xAttrs; + // represents the offset and length of a file + // chunk in number of bytes. + // used when splitting a large file to chunks to copy in parallel. + // If a file is not large enough to split, chunkOffset would be 0 and + // chunkLength would be the length of the file. + private long chunkOffset = 0; + private long chunkLength = Long.MAX_VALUE; + /** * Default constructor. */ @@ -96,11 +104,32 @@ public final class CopyListingFileStatus implements Writable { fileStatus.getPath()); } + public CopyListingFileStatus(FileStatus fileStatus, + long chunkOffset, long chunkLength) { + this(fileStatus.getLen(), fileStatus.isDirectory(), + fileStatus.getReplication(), fileStatus.getBlockSize(), + fileStatus.getModificationTime(), fileStatus.getAccessTime(), + fileStatus.getPermission(), fileStatus.getOwner(), + fileStatus.getGroup(), + fileStatus.getPath()); + this.chunkOffset = chunkOffset; + this.chunkLength = chunkLength; + } + @SuppressWarnings("checkstyle:parameternumber") public CopyListingFileStatus(long length, boolean isdir, int blockReplication, long blocksize, long modificationTime, long accessTime, FsPermission permission, String owner, String group, Path path) { + this(length, isdir, blockReplication, blocksize, modificationTime, + accessTime, permission, owner, group, path, 0, Long.MAX_VALUE); + } + + @SuppressWarnings("checkstyle:parameternumber") + public CopyListingFileStatus(long length, boolean isdir, + int blockReplication, long blocksize, long modificationTime, + long accessTime, FsPermission permission, String owner, String group, + Path path, long chunkOffset, long chunkLength) { this.length = length; this.isdir = isdir; this.blockReplication = (short)blockReplication; @@ -117,6 +146,23 @@ public final class CopyListingFileStatus implements Writable { this.owner = (owner == null) ? "" : owner; this.group = (group == null) ? "" : group; this.path = path; + this.chunkOffset = chunkOffset; + this.chunkLength = chunkLength; + } + + public CopyListingFileStatus(CopyListingFileStatus other) { + this.length = other.length; + this.isdir = other.isdir; + this.blockReplication = other.blockReplication; + this.blocksize = other.blocksize; + this.modificationTime = other.modificationTime; + this.accessTime = other.accessTime; + this.permission = other.permission; + this.owner = other.owner; + this.group = other.group; + this.path = new Path(other.path.toUri()); + this.chunkOffset = other.chunkOffset; + this.chunkLength = other.chunkLength; } public Path getPath() { @@ -196,6 +242,31 @@ public final class CopyListingFileStatus implements Writable { this.xAttrs = xAttrs; } + public long getChunkOffset() { + return chunkOffset; + } + + public void setChunkOffset(long offset) { + this.chunkOffset = offset; + } + + public long getChunkLength() { + return chunkLength; + } + + public void setChunkLength(long chunkLength) { + this.chunkLength = chunkLength; + } + + public boolean isSplit() { + return getChunkLength() != Long.MAX_VALUE && + getChunkLength() != getLen(); + } + + public long getSizeToCopy() { + return isSplit()? getChunkLength() : getLen(); + } + @Override public void write(DataOutput out) throws IOException { Text.writeString(out, getPath().toString(), Text.DEFAULT_MAX_LEN); @@ -240,6 +311,9 @@ public final class CopyListingFileStatus implements Writable { } else { out.writeInt(NO_XATTRS); } + + out.writeLong(chunkOffset); + out.writeLong(chunkLength); } @Override @@ -288,6 +362,9 @@ public final class CopyListingFileStatus implements Writable { } else { xAttrs = null; } + + chunkOffset = in.readLong(); + chunkLength = in.readLong(); } @Override @@ -313,8 +390,14 @@ public final class CopyListingFileStatus implements Writable { public String toString() { StringBuilder sb = new StringBuilder(super.toString()); sb.append('{'); - sb.append("aclEntries = " + aclEntries); - sb.append(", xAttrs = " + xAttrs); + sb.append(this.getPath().toString()); + sb.append(" length = ").append(this.getLen()); + sb.append(" aclEntries = ").append(aclEntries); + sb.append(", xAttrs = ").append(xAttrs); + if (isSplit()) { + sb.append(", chunkOffset = ").append(this.getChunkOffset()); + sb.append(", chunkLength = ").append(this.getChunkLength()); + } sb.append('}'); return sb.toString(); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index 7b0d9f2452d..ddf67ff41ae 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobSubmissionFiles; +import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.CopyListing.*; import org.apache.hadoop.tools.mapred.CopyMapper; import org.apache.hadoop.tools.mapred.CopyOutputFormat; @@ -133,6 +134,7 @@ public class DistCp extends Configured implements Tool { try { inputOptions = (OptionsParser.parse(argv)); + setOptionsForSplitLargeFile(); setTargetPathExists(); LOG.info("Input Options: " + inputOptions); } catch (Throwable e) { @@ -234,6 +236,56 @@ public class DistCp extends Configured implements Tool { getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, targetExists); } + + /** + * Check if concat is supported by fs. + * Throws UnsupportedOperationException if not. + */ + private void checkConcatSupport(FileSystem fs) { + try { + Path[] src = null; + Path tgt = null; + fs.concat(tgt, src); + } catch (UnsupportedOperationException use) { + throw new UnsupportedOperationException( + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + + " is not supported since the target file system doesn't" + + " support concat.", use); + } catch (Exception e) { + // Ignore other exception + } + } + + /** + * Set up needed options for splitting large files. + */ + private void setOptionsForSplitLargeFile() throws IOException { + if (!inputOptions.splitLargeFile()) { + return; + } + Path target = inputOptions.getTargetPath(); + FileSystem targetFS = target.getFileSystem(getConf()); + checkConcatSupport(targetFS); + + LOG.info("Enabling preserving blocksize since " + + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " is passed."); + inputOptions.preserve(FileAttribute.BLOCKSIZE); + + LOG.info("Set " + + DistCpOptionSwitch.APPEND.getSwitch() + + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + + " is passed."); + inputOptions.setAppend(false); + + LOG.info("Set " + + DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES + + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + + " is passed."); + getConf().setBoolean( + DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false); + } + + /** * Create Job object for submitting it, with all the configuration * diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index b0007917e85..e76a48e805f 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -178,6 +178,16 @@ public enum DistCpOptionSwitch { new Option("sizelimit", true, "(Deprecated!) Limit number of files " + "copied to <= n bytes")), + BLOCKS_PER_CHUNK("", + new Option("blocksperchunk", true, "If set to a positive value, files" + + "with more blocks than this value will be split into chunks of " + + " blocks to be transferred in parallel, and " + + "reassembled on the destination. By default, is " + + "0 and the files will be transmitted in their entirety without " + + "splitting. This switch is only applicable when the source file " + + "system implements getBlockLocations method and the target file " + + "system implements concat method")), + /** * Specify bandwidth per map in MB */ diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index c61816aa0e9..2efb96b2d95 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -99,7 +99,11 @@ public class DistCpOptions { // targetPathExist is a derived field, it's initialized in the // beginning of distcp. private boolean targetPathExists = true; - + + // Size of chunk in number of blocks when splitting large file into chunks + // to copy in parallel. Default is 0 and file are not splitted. + private int blocksPerChunk = 0; + public static enum FileAttribute{ REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR, TIMES; @@ -169,6 +173,7 @@ public class DistCpOptions { this.targetPath = that.getTargetPath(); this.targetPathExists = that.getTargetPathExists(); this.filtersFile = that.getFiltersFile(); + this.blocksPerChunk = that.blocksPerChunk; } } @@ -623,6 +628,18 @@ public class DistCpOptions { this.filtersFile = filtersFilename; } + public final void setBlocksPerChunk(int csize) { + this.blocksPerChunk = csize; + } + + public final int getBlocksPerChunk() { + return blocksPerChunk; + } + + public final boolean splitLargeFile() { + return blocksPerChunk > 0; + } + public void validate(DistCpOptionSwitch option, boolean value) { boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ? @@ -717,6 +734,8 @@ public class DistCpOptions { DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.FILTERS, filtersFile); } + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK, + String.valueOf(blocksPerChunk)); } /** @@ -753,6 +772,7 @@ public class DistCpOptions { ", targetPath=" + targetPath + ", targetPathExists=" + targetPathExists + ", filtersFile='" + filtersFile + '\'' + + ", blocksPerChunk=" + blocksPerChunk + '}'; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index af3cb92cc92..c68102df329 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -184,9 +184,42 @@ public class OptionsParser { DistCpOptionSwitch.FILTERS.getSwitch())); } + parseBlocksPerChunk(command, option); + return option; } + + /** + * A helper method to parse chunk size in number of blocks. + * Used when breaking large file into chunks to copy in parallel. + * + * @param command command line arguments + */ + private static void parseBlocksPerChunk(CommandLine command, + DistCpOptions option) { + boolean hasOption = + command.hasOption(DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()); + LOG.info("parseChunkSize: " + + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " " + hasOption); + if (hasOption) { + String chunkSizeString = getVal(command, + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch().trim()); + try { + int csize = Integer.parseInt(chunkSizeString); + if (csize < 0) { + csize = 0; + } + LOG.info("Set distcp blocksPerChunk to " + csize); + option.setBlocksPerChunk(csize); + } + catch (NumberFormatException e) { + throw new IllegalArgumentException("blocksPerChunk is invalid: " + + chunkSizeString, e); + } + } + } + /** * parseSizeLimit is a helper method for parsing the deprecated * argument SIZE_LIMIT. @@ -221,8 +254,7 @@ public class OptionsParser { DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim()); try { Integer.parseInt(fileLimitString); - } - catch (NumberFormatException e) { + } catch (NumberFormatException e) { throw new IllegalArgumentException("File-limit is invalid: " + fileLimitString, e); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index 105e4f2fe17..af913474550 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -19,6 +19,7 @@ package org.apache.hadoop.tools; import com.google.common.collect.Lists; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -47,6 +48,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.LinkedList; import static org.apache.hadoop.tools.DistCpConstants .HDFS_RESERVED_RAW_DIRECTORY_NAME; @@ -240,10 +242,10 @@ public class SimpleCopyListing extends CopyListing { final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL); final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR); final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs(); - CopyListingFileStatus fileCopyListingStatus = + LinkedList fileCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, fileStatus, - preserveAcls, preserveXAttrs, preserveRawXAttrs); - + preserveAcls, preserveXAttrs, preserveRawXAttrs, + options.getBlocksPerChunk()); writeToFileListingRoot(fileListWriter, fileCopyListingStatus, sourceRoot, options); } @@ -348,9 +350,10 @@ public class SimpleCopyListing extends CopyListing { FileStatus[] sourceFiles = sourceFS.listStatus(path); boolean explore = (sourceFiles != null && sourceFiles.length > 0); if (!explore || rootStatus.isDirectory()) { - CopyListingFileStatus rootCopyListingStatus = - DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus, - preserveAcls, preserveXAttrs, preserveRawXAttrs); + LinkedList rootCopyListingStatus = + DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus, + preserveAcls, preserveXAttrs, preserveRawXAttrs, + options.getBlocksPerChunk()); writeToFileListingRoot(fileListWriter, rootCopyListingStatus, sourcePathRoot, options); } @@ -360,20 +363,20 @@ public class SimpleCopyListing extends CopyListing { if (LOG.isDebugEnabled()) { LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy."); } - CopyListingFileStatus sourceCopyListingStatus = - DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus, - preserveAcls && sourceStatus.isDirectory(), - preserveXAttrs && sourceStatus.isDirectory(), - preserveRawXAttrs && sourceStatus.isDirectory()); - if (randomizeFileListing) { - addToFileListing(statusList, - new FileStatusInfo(sourceCopyListingStatus, sourcePathRoot), - fileListWriter); - } else { - writeToFileListing(fileListWriter, sourceCopyListingStatus, - sourcePathRoot); + LinkedList sourceCopyListingStatus = + DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus, + preserveAcls && sourceStatus.isDirectory(), + preserveXAttrs && sourceStatus.isDirectory(), + preserveRawXAttrs && sourceStatus.isDirectory(), + options.getBlocksPerChunk()); + for (CopyListingFileStatus fs : sourceCopyListingStatus) { + if (randomizeFileListing) { + addToFileListing(statusList, + new FileStatusInfo(fs, sourcePathRoot), fileListWriter); + } else { + writeToFileListing(fileListWriter, fs, sourcePathRoot); + } } - if (sourceStatus.isDirectory()) { if (LOG.isDebugEnabled()) { LOG.debug("Adding source dir for traverse: " + sourceStatus.getPath()); @@ -641,18 +644,20 @@ public class SimpleCopyListing extends CopyListing { LOG.debug("Recording source-path: " + child.getPath() + " for copy."); } if (workResult.getSuccess()) { - CopyListingFileStatus childCopyListingStatus = + LinkedList childCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, child, preserveAcls && child.isDirectory(), preserveXAttrs && child.isDirectory(), - preserveRawXattrs && child.isDirectory()); - if (randomizeFileListing) { - addToFileListing(fileStatuses, - new FileStatusInfo(childCopyListingStatus, sourcePathRoot), - fileListWriter); - } else { - writeToFileListing(fileListWriter, childCopyListingStatus, - sourcePathRoot); + preserveRawXattrs && child.isDirectory(), + options.getBlocksPerChunk()); + + for (CopyListingFileStatus fs : childCopyListingStatus) { + if (randomizeFileListing) { + addToFileListing(fileStatuses, + new FileStatusInfo(fs, sourcePathRoot), fileListWriter); + } else { + writeToFileListing(fileListWriter, fs, sourcePathRoot); + } } } if (retry < maxRetries) { @@ -675,19 +680,21 @@ public class SimpleCopyListing extends CopyListing { } private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, - CopyListingFileStatus fileStatus, Path sourcePathRoot, + LinkedList fileStatus, Path sourcePathRoot, DistCpOptions options) throws IOException { boolean syncOrOverwrite = options.shouldSyncFolder() || options.shouldOverwrite(); - if (fileStatus.getPath().equals(sourcePathRoot) && - fileStatus.isDirectory() && syncOrOverwrite) { - // Skip the root-paths when syncOrOverwrite - if (LOG.isDebugEnabled()) { - LOG.debug("Skip " + fileStatus.getPath()); - } - return; + for (CopyListingFileStatus fs : fileStatus) { + if (fs.getPath().equals(sourcePathRoot) && + fs.isDirectory() && syncOrOverwrite) { + // Skip the root-paths when syncOrOverwrite + if (LOG.isDebugEnabled()) { + LOG.debug("Skip " + fs.getPath()); + } + return; + } + writeToFileListing(fileListWriter, fs, sourcePathRoot); } - writeToFileListing(fileListWriter, fileStatus, sourcePathRoot); } private void writeToFileListing(SequenceFile.Writer fileListWriter, @@ -707,7 +714,7 @@ public class SimpleCopyListing extends CopyListing { fileListWriter.sync(); if (!fileStatus.isDirectory()) { - totalBytesToCopy += fileStatus.getLen(); + totalBytesToCopy += fileStatus.getSizeToCopy(); } else { totalDirs++; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 75cefb488ae..6ddaab99c3d 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -34,14 +35,17 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.tools.CopyListing; import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.DistCpConstants; +import org.apache.hadoop.tools.DistCpOptionSwitch; import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.GlobbedCopyListing; import org.apache.hadoop.tools.util.DistCpUtils; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; +import java.util.LinkedList; import java.util.List; /** @@ -63,7 +67,8 @@ public class CopyCommitter extends FileOutputCommitter { private boolean syncFolder = false; private boolean overwrite = false; private boolean targetPathExists = true; - + private boolean ignoreFailures = false; + /** * Create a output committer * @@ -82,8 +87,13 @@ public class CopyCommitter extends FileOutputCommitter { Configuration conf = jobContext.getConfiguration(); syncFolder = conf.getBoolean(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, false); overwrite = conf.getBoolean(DistCpConstants.CONF_LABEL_OVERWRITE, false); - targetPathExists = conf.getBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true); - + targetPathExists = conf.getBoolean( + DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true); + ignoreFailures = conf.getBoolean( + DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false); + + concatFileChunks(conf); + super.commitJob(jobContext); cleanupTempFiles(jobContext); @@ -169,9 +179,112 @@ public class CopyCommitter extends FileOutputCommitter { } } + private boolean isFileNotFoundException(IOException e) { + if (e instanceof FileNotFoundException) { + return true; + } + + if (e instanceof RemoteException) { + return ((RemoteException)e).unwrapRemoteException() + instanceof FileNotFoundException; + } + + return false; + } + + /** + * Concat chunk files for the same file into one. + * Iterate through copy listing, identify chunk files for the same file, + * concat them into one. + */ + private void concatFileChunks(Configuration conf) throws IOException { + + LOG.info("concat file chunks ..."); + + String spath = conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH); + if (spath == null || spath.isEmpty()) { + return; + } + Path sourceListing = new Path(spath); + SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(sourceListing)); + Path targetRoot = + new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); + + try { + CopyListingFileStatus srcFileStatus = new CopyListingFileStatus(); + Text srcRelPath = new Text(); + CopyListingFileStatus lastFileStatus = null; + LinkedList allChunkPaths = new LinkedList(); + + // Iterate over every source path that was copied. + while (sourceReader.next(srcRelPath, srcFileStatus)) { + if (srcFileStatus.isDirectory()) { + continue; + } + Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath); + Path targetFileChunkPath = + DistCpUtils.getSplitChunkPath(targetFile, srcFileStatus); + if (LOG.isDebugEnabled()) { + LOG.debug(" add " + targetFileChunkPath + " to concat."); + } + allChunkPaths.add(targetFileChunkPath); + if (srcFileStatus.getChunkOffset() + srcFileStatus.getChunkLength() + == srcFileStatus.getLen()) { + // This is the last chunk of the splits, consolidate allChunkPaths + try { + concatFileChunks(conf, targetFile, allChunkPaths); + } catch (IOException e) { + // If the concat failed because a chunk file doesn't exist, + // then we assume that the CopyMapper has skipped copying this + // file, and we ignore the exception here. + // If a chunk file should have been created but it was not, then + // the CopyMapper would have failed. + if (!isFileNotFoundException(e)) { + String emsg = "Failed to concat chunk files for " + targetFile; + if (!ignoreFailures) { + throw new IOException(emsg, e); + } else { + LOG.warn(emsg, e); + } + } + } + allChunkPaths.clear(); + lastFileStatus = null; + } else { + if (lastFileStatus == null) { + lastFileStatus = new CopyListingFileStatus(srcFileStatus); + } else { + // Two neighboring chunks have to be consecutive ones for the same + // file, for them to be merged + if (!srcFileStatus.getPath().equals(lastFileStatus.getPath()) || + srcFileStatus.getChunkOffset() != + (lastFileStatus.getChunkOffset() + + lastFileStatus.getChunkLength())) { + String emsg = "Inconsistent sequence file: current " + + "chunk file " + srcFileStatus + " doesnt match prior " + + "entry " + lastFileStatus; + if (!ignoreFailures) { + throw new IOException(emsg); + } else { + LOG.warn(emsg + ", skipping concat this set."); + } + } else { + lastFileStatus.setChunkOffset(srcFileStatus.getChunkOffset()); + lastFileStatus.setChunkLength(srcFileStatus.getChunkLength()); + } + } + } + } + } finally { + IOUtils.closeStream(sourceReader); + } + } + // This method changes the target-directories' file-attributes (owner, // user/group permissions, etc.) based on the corresponding source directories. - private void preserveFileAttributesForDirectories(Configuration conf) throws IOException { + private void preserveFileAttributesForDirectories(Configuration conf) + throws IOException { String attrSymbols = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); final boolean syncOrOverwrite = syncFolder || overwrite; @@ -325,4 +438,57 @@ public class CopyCommitter extends FileOutputCommitter { ", Unable to move to " + finalDir); } } + + /** + * Concat the passed chunk files into one and rename it the targetFile. + */ + private void concatFileChunks(Configuration conf, Path targetFile, + LinkedList allChunkPaths) throws IOException { + if (allChunkPaths.size() == 1) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("concat " + targetFile + " allChunkSize+ " + + allChunkPaths.size()); + } + FileSystem dstfs = targetFile.getFileSystem(conf); + + Path firstChunkFile = allChunkPaths.removeFirst(); + Path[] restChunkFiles = new Path[allChunkPaths.size()]; + allChunkPaths.toArray(restChunkFiles); + if (LOG.isDebugEnabled()) { + LOG.debug("concat: firstchunk: " + dstfs.getFileStatus(firstChunkFile)); + int i = 0; + for (Path f : restChunkFiles) { + LOG.debug("concat: other chunk: " + i + ": " + dstfs.getFileStatus(f)); + ++i; + } + } + dstfs.concat(firstChunkFile, restChunkFiles); + if (LOG.isDebugEnabled()) { + LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile)); + } + rename(dstfs, firstChunkFile, targetFile); + } + + /** + * Rename tmp to dst on destFileSys. + * @param destFileSys the file ssystem + * @param tmp the source path + * @param dst the destination path + * @throws IOException if renaming failed + */ + private static void rename(FileSystem destFileSys, Path tmp, Path dst) + throws IOException { + try { + if (destFileSys.exists(dst)) { + destFileSys.delete(dst, true); + } + destFileSys.rename(tmp, dst); + } catch (IOException ioe) { + throw new IOException("Fail to rename tmp file (=" + tmp + + ") to destination file (=" + dst + ")", ioe); + } + } + } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index 41c5d78cb50..53a95eeae4e 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -219,10 +219,12 @@ public class CopyMapper extends Mapper sourceFS = sourcePath.getFileSystem(conf); final boolean preserveXAttrs = fileAttributes.contains(FileAttribute.XATTR); - sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, - sourceFS.getFileStatus(sourcePath), - fileAttributes.contains(FileAttribute.ACL), - preserveXAttrs, preserveRawXattrs); + sourceCurrStatus = DistCpUtils.toCopyListingFileStatusHelper(sourceFS, + sourceFS.getFileStatus(sourcePath), + fileAttributes.contains(FileAttribute.ACL), + preserveXAttrs, preserveRawXattrs, + sourceFileStatus.getChunkOffset(), + sourceFileStatus.getChunkLength()); } catch (FileNotFoundException e) { throw new IOException(new RetriableFileCopyCommand.CopyReadException(e)); } @@ -236,7 +238,8 @@ public class CopyMapper extends Mapper LOG.debug("Path could not be found: " + target, ignore); } - if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) { + if (targetStatus != null && + (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) { throw new IOException("Can't replace " + target + ". Target is " + getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus)); } @@ -246,19 +249,28 @@ public class CopyMapper extends Mapper return; } - FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target, targetStatus); + FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target, + targetStatus); + + Path tmpTarget = target; if (action == FileAction.SKIP) { LOG.info("Skipping copy of " + sourceCurrStatus.getPath() + " to " + target); updateSkipCounters(context, sourceCurrStatus); context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath())); + } else { - copyFileWithRetry(description, sourceCurrStatus, target, context, + if (sourceCurrStatus.isSplit()) { + tmpTarget = DistCpUtils.getSplitChunkPath(target, sourceCurrStatus); + } + if (LOG.isDebugEnabled()) { + LOG.debug("copying " + sourceCurrStatus + " " + tmpTarget); + } + copyFileWithRetry(description, sourceCurrStatus, tmpTarget, context, action, fileAttributes); } - - DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus, - fileAttributes, preserveRawXattrs); + DistCpUtils.preserve(target.getFileSystem(conf), tmpTarget, + sourceCurrStatus, fileAttributes, preserveRawXattrs); } catch (IOException exception) { handleFailures(exception, sourceFileStatus, target, context); } @@ -323,8 +335,12 @@ public class CopyMapper extends Mapper private void handleFailures(IOException exception, CopyListingFileStatus sourceFileStatus, Path target, Context context) throws IOException, InterruptedException { - LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " + - target, exception); + LOG.error("Failure in copying " + sourceFileStatus.getPath() + + (sourceFileStatus.isSplit()? "," + + " offset=" + sourceFileStatus.getChunkOffset() + + " chunkLength=" + sourceFileStatus.getChunkLength() + : "") + + " to " + target, exception); if (ignoreFailures && ExceptionUtils.indexOfType(exception, CopyReadException.class) != -1) { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 17773642b67..58a51af15c5 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -118,17 +118,21 @@ public class RetriableFileCopyCommand extends RetriableCommand { .contains(FileAttribute.CHECKSUMTYPE) ? sourceFS .getFileChecksum(sourcePath) : null; - final long offset = action == FileAction.APPEND ? targetFS.getFileStatus( - target).getLen() : 0; + long offset = (action == FileAction.APPEND) ? + targetFS.getFileStatus(target).getLen() : source.getChunkOffset(); long bytesRead = copyToFile(targetPath, targetFS, source, offset, context, fileAttributes, sourceChecksum); - compareFileLengths(source, targetPath, configuration, bytesRead - + offset); + if (!source.isSplit()) { + compareFileLengths(source, targetPath, configuration, bytesRead + + offset); + } //At this point, src&dest lengths are same. if length==0, we skip checksum if ((bytesRead != 0) && (!skipCrc)) { - compareCheckSums(sourceFS, source.getPath(), sourceChecksum, - targetFS, targetPath); + if (!source.isSplit()) { + compareCheckSums(sourceFS, source.getPath(), sourceChecksum, + targetFS, targetPath); + } } // it's not append case, thus we first write to a temporary file, rename // it to the target path. @@ -246,16 +250,26 @@ public class RetriableFileCopyCommand extends RetriableCommand { ThrottledInputStream inStream = null; long totalBytesRead = 0; + long chunkLength = source2.getChunkLength(); + boolean finished = false; try { inStream = getInputStream(source, context.getConfiguration()); int bytesRead = readBytes(inStream, buf, sourceOffset); while (bytesRead >= 0) { + if (chunkLength > 0 && + (totalBytesRead + bytesRead) >= chunkLength) { + bytesRead = (int)(chunkLength - totalBytesRead); + finished = true; + } totalBytesRead += bytesRead; if (action == FileAction.APPEND) { sourceOffset += bytesRead; } outStream.write(buf, 0, bytesRead); updateContextStatus(totalBytesRead, context, source2); + if (finished) { + break; + } bytesRead = readBytes(inStream, buf, sourceOffset); } outStream.close(); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java index 3e86d0931bc..d1c18ea8d16 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java @@ -99,7 +99,8 @@ public class UniformSizeInputFormat while (reader.next(srcRelPath, srcFileStatus)) { // If adding the current file would cause the bytes per map to exceed // limit. Add the current file to new split - if (currentSplitSize + srcFileStatus.getLen() > nBytesPerSplit && lastPosition != 0) { + if (currentSplitSize + srcFileStatus.getChunkLength() > nBytesPerSplit + && lastPosition != 0) { FileSplit split = new FileSplit(listingFilePath, lastSplitStart, lastPosition - lastSplitStart, null); if (LOG.isDebugEnabled()) { @@ -109,7 +110,7 @@ public class UniformSizeInputFormat lastSplitStart = lastPosition; currentSplitSize = 0; } - currentSplitSize += srcFileStatus.getLen(); + currentSplitSize += srcFileStatus.getChunkLength(); lastPosition = reader.getPosition(); } if (lastPosition > lastSplitStart) { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java index c308e6f1f90..29715bb326a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java @@ -19,9 +19,11 @@ package org.apache.hadoop.tools.util; import com.google.common.collect.Maps; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -30,6 +32,7 @@ import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclUtil; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; @@ -44,6 +47,7 @@ import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.text.DecimalFormat; import java.util.EnumSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -116,7 +120,7 @@ public class DistCpUtils { * @return Class implementing the strategy specified in options. */ public static Class getStrategy(Configuration conf, - DistCpOptions options) { + DistCpOptions options) { String confLabel = "distcp." + StringUtils.toLowerCase(options.getCopyStrategy()) + ".strategy" + ".impl"; @@ -292,6 +296,86 @@ public class DistCpUtils { return fileSystem.getXAttrs(path); } + /** + * Converts FileStatus to a list of CopyListingFileStatus. + * The resulted list contains either one CopyListingFileStatus per chunk of + * file-blocks (if file-size exceeds blockSize * blocksPerChunk, and there + * are more blocks in the file than blocksperChunk), or a single + * CopyListingFileStatus for the entire file (if file-size is too small to + * split). + * If preserving ACLs, populates the CopyListingFileStatus with the ACLs. + * If preserving XAttrs, populates the CopyListingFileStatus with the XAttrs. + * + * @param fileSystem FileSystem containing the file + * @param fileStatus FileStatus of file + * @param preserveAcls boolean true if preserving ACLs + * @param preserveXAttrs boolean true if preserving XAttrs + * @param preserveRawXAttrs boolean true if preserving raw.* XAttrs + * @param blocksPerChunk size of chunks when copying chunks in parallel + * @return list of CopyListingFileStatus + * @throws IOException if there is an I/O error + */ + public static LinkedList toCopyListingFileStatus( + FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls, + boolean preserveXAttrs, boolean preserveRawXAttrs, int blocksPerChunk) + throws IOException { + LinkedList copyListingFileStatus = + new LinkedList(); + + final CopyListingFileStatus clfs = toCopyListingFileStatusHelper( + fileSystem, fileStatus, preserveAcls, + preserveXAttrs, preserveRawXAttrs, + 0, fileStatus.getLen()); + final long blockSize = fileStatus.getBlockSize(); + if (LOG.isDebugEnabled()) { + LOG.debug("toCopyListing: " + fileStatus + " chunkSize: " + + blocksPerChunk + " isDFS: " + + (fileSystem instanceof DistributedFileSystem)); + } + if ((blocksPerChunk > 0) && + !fileStatus.isDirectory() && + (fileStatus.getLen() > blockSize * blocksPerChunk)) { + // split only when the file size is larger than the intended chunk size + final BlockLocation[] blockLocations; + blockLocations = fileSystem.getFileBlockLocations(fileStatus, 0, + fileStatus.getLen()); + + int numBlocks = blockLocations.length; + long curPos = 0; + if (numBlocks <= blocksPerChunk) { + if (LOG.isDebugEnabled()) { + LOG.debug(" add file " + clfs); + } + copyListingFileStatus.add(clfs); + } else { + int i = 0; + while (i < numBlocks) { + long curLength = 0; + for (int j = 0; j < blocksPerChunk && i < numBlocks; ++j, ++i) { + curLength += blockLocations[i].getLength(); + } + if (curLength > 0) { + CopyListingFileStatus clfs1 = new CopyListingFileStatus(clfs); + clfs1.setChunkOffset(curPos); + clfs1.setChunkLength(curLength); + if (LOG.isDebugEnabled()) { + LOG.debug(" add file chunk " + clfs1); + } + copyListingFileStatus.add(clfs1); + curPos += curLength; + } + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(" add file/dir " + clfs); + } + copyListingFileStatus.add(clfs); + } + + return copyListingFileStatus; + } + /** * Converts a FileStatus to a CopyListingFileStatus. If preserving ACLs, * populates the CopyListingFileStatus with the ACLs. If preserving XAttrs, @@ -302,13 +386,17 @@ public class DistCpUtils { * @param preserveAcls boolean true if preserving ACLs * @param preserveXAttrs boolean true if preserving XAttrs * @param preserveRawXAttrs boolean true if preserving raw.* XAttrs + * @param chunkOffset chunk offset in bytes + * @param chunkLength chunk length in bytes + * @return CopyListingFileStatus * @throws IOException if there is an I/O error */ - public static CopyListingFileStatus toCopyListingFileStatus( + public static CopyListingFileStatus toCopyListingFileStatusHelper( FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls, - boolean preserveXAttrs, boolean preserveRawXAttrs) throws IOException { + boolean preserveXAttrs, boolean preserveRawXAttrs, + long chunkOffset, long chunkLength) throws IOException { CopyListingFileStatus copyListingFileStatus = - new CopyListingFileStatus(fileStatus); + new CopyListingFileStatus(fileStatus, chunkOffset, chunkLength); if (preserveAcls) { FsPermission perm = fileStatus.getPermission(); if (perm.getAclBit()) { @@ -465,4 +553,19 @@ public class DistCpUtils { return (sourceChecksum == null || targetChecksum == null || sourceChecksum.equals(targetChecksum)); } + + /* + * Return the Path for a given chunk. + * Used when splitting large file into chunks to copy in parallel. + * @param targetFile path to target file + * @param srcFileStatus source file status in copy listing + * @return path to the chunk specified by the parameters to store + * in target cluster temporarily + */ + public static Path getSplitChunkPath(Path targetFile, + CopyListingFileStatus srcFileStatus) { + return new Path(targetFile.toString() + + ".____distcpSplit____" + srcFileStatus.getChunkOffset() + + "." + srcFileStatus.getChunkLength()); + } } diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm index d153485b3df..e6cff10b39d 100644 --- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm +++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm @@ -239,6 +239,7 @@ Flag | Description | Notes `-rdiff ` | Use snapshot diff report between given two snapshots to identify what has been changed on the target since the snapshot `` was created on the target, and apply the diff reversely to the target, and copy modified files from the source's ``, to make the target the same as ``. | This option is valid only with `-update` option and the following conditions should be satisfied.
  1. Both the source and the target FileSystem must be DistributedFileSystem. The source and the target can be two different clusters/paths, or they can be exactly the same cluster/path. In the latter case, modified files are copied from target's `` to target's current state).
  2. Two snapshots `` and `` have been created on the target FS, and `` is older than ``. No change has been made on target since `` was created on the target.
  3. The source has the same snapshot ``, which has the same content as the `` on the target. All the files/directories in the target's `` are the same with source's ``.
| `-numListstatusThreads` | Number of threads to use for building file listing | At most 40 threads. `-skipcrccheck` | Whether to skip CRC checks between source and target paths. | +`-blocksperchunk ` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `` blocks to be transferred in parallel, and reassembled on the destination. By default, `` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. | Architecture of DistCp ---------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java index 74a100c7afa..df36c262703 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java @@ -312,7 +312,7 @@ public class TestDistCpOptions { + "copyStrategy='uniformsize', preserveStatus=[], " + "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, " + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, " - + "targetPathExists=true, filtersFile='null'}"; + + "targetPathExists=true, filtersFile='null', blocksPerChunk=0}"; String optionString = option.toString(); Assert.assertEquals(val, optionString); Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java index e3018a07730..b2266b3344d 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java @@ -23,17 +23,27 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStream; +import java.io.PrintStream; import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; import org.junit.Assert; @@ -47,11 +57,15 @@ import org.junit.rules.Timeout; */ public class TestDistCpSystem { + private static final Log LOG = + LogFactory.getLog(TestDistCpSystem.class); + @Rule public Timeout globalTimeout = new Timeout(30000); private static final String SRCDAT = "srcdat"; private static final String DSTDAT = "dstdat"; + private static final long BLOCK_SIZE = 1024; private static MiniDFSCluster cluster; private static Configuration conf; @@ -63,27 +77,76 @@ public class TestDistCpSystem { this.path = path; this.isDir = isDir; } - String getPath() { return path; } - boolean isDirectory() { return isDir; } + + String getPath() { + return path; + } + + boolean isDirectory() { + return isDir; + } + } + + @BeforeClass + public static void beforeClass() throws IOException { + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + } + + @AfterClass + public static void afterClass() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + static String execCmd(FsShell shell, String... args) throws Exception { + ByteArrayOutputStream baout = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(baout, true); + PrintStream old = System.out; + System.setOut(out); + shell.run(args); + out.close(); + System.setOut(old); + return baout.toString(); } - private void createFiles(FileSystem fs, String topdir, - FileEntry[] entries) throws IOException { + private void createFiles(DistributedFileSystem fs, String topdir, + FileEntry[] entries, long chunkSize) throws IOException { + long seed = System.currentTimeMillis(); + Random rand = new Random(seed); + short replicationFactor = 2; for (FileEntry entry : entries) { - Path newpath = new Path(topdir + "/" + entry.getPath()); + Path newPath = new Path(topdir + "/" + entry.getPath()); if (entry.isDirectory()) { - fs.mkdirs(newpath); + fs.mkdirs(newPath); } else { - OutputStream out = fs.create(newpath); - try { - out.write((topdir + "/" + entry).getBytes()); - out.write("\n".getBytes()); - } finally { - out.close(); + long fileSize = BLOCK_SIZE *100; + int bufSize = 128; + if (chunkSize == -1) { + DFSTestUtil.createFile(fs, newPath, bufSize, + fileSize, BLOCK_SIZE, replicationFactor, seed); + } else { + // Create a variable length block file, by creating + // one block of half block size at the chunk boundary + long seg1 = chunkSize * BLOCK_SIZE - BLOCK_SIZE / 2; + long seg2 = fileSize - seg1; + DFSTestUtil.createFile(fs, newPath, bufSize, + seg1, BLOCK_SIZE, replicationFactor, seed); + DFSTestUtil.appendFileNewBlock(fs, newPath, (int)seg2); } } + seed = System.currentTimeMillis() + rand.nextLong(); } } + + private void createFiles(DistributedFileSystem fs, String topdir, + FileEntry[] entries) throws IOException { + createFiles(fs, topdir, entries, -1); + } private static FileStatus[] getFileStatus(FileSystem fs, String topdir, FileEntry[] files) throws IOException { @@ -104,18 +167,19 @@ public class TestDistCpSystem { } private void testPreserveUserHelper(String testRoot, - FileEntry[] srcEntries, - FileEntry[] dstEntries, - boolean createSrcDir, - boolean createTgtDir, - boolean update) throws Exception { + FileEntry[] srcEntries, + FileEntry[] dstEntries, + boolean createSrcDir, + boolean createTgtDir, + boolean update) throws Exception { final String testSrcRel = SRCDAT; final String testSrc = testRoot + "/" + testSrcRel; final String testDstRel = DSTDAT; final String testDst = testRoot + "/" + testDstRel; String nnUri = FileSystem.getDefaultUri(conf).toString(); - FileSystem fs = FileSystem.get(URI.create(nnUri), conf); + DistributedFileSystem fs = (DistributedFileSystem) + FileSystem.get(URI.create(nnUri), conf); fs.mkdirs(new Path(testRoot)); if (createSrcDir) { fs.mkdirs(new Path(testSrc)); @@ -129,8 +193,8 @@ public class TestDistCpSystem { for(int i = 0; i < srcEntries.length; i++) { fs.setOwner(srcstats[i].getPath(), "u" + i, null); } - String[] args = update? new String[]{"-pu", "-update", nnUri+testSrc, - nnUri+testDst} : new String[]{"-pu", nnUri+testSrc, nnUri+testDst}; + String[] args = update? new String[]{"-pub", "-update", nnUri+testSrc, + nnUri+testDst} : new String[]{"-pub", nnUri+testSrc, nnUri+testDst}; ToolRunner.run(conf, new DistCp(), args); @@ -145,20 +209,263 @@ public class TestDistCpSystem { deldir(fs, testRoot); } - @BeforeClass - public static void beforeClass() throws IOException { - conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - cluster.waitActive(); + private void compareFiles(FileSystem fs, FileStatus srcStat, + FileStatus dstStat) throws Exception { + LOG.info("Comparing " + srcStat + " and " + dstStat); + assertEquals(srcStat.isDirectory(), dstStat.isDirectory()); + assertEquals(srcStat.getReplication(), dstStat.getReplication()); + assertEquals("File POSIX permission should match", + srcStat.getPermission(), dstStat.getPermission()); + assertEquals("File user ownership should match", + srcStat.getOwner(), dstStat.getOwner()); + assertEquals("File group ownership should match", + srcStat.getGroup(), dstStat.getGroup()); + // TODO; check ACL attributes + + if (srcStat.isDirectory()) { + return; + } + + assertEquals("File length should match (" + srcStat.getPath() + ")", + srcStat.getLen(), dstStat.getLen()); + + FSDataInputStream srcIn = fs.open(srcStat.getPath()); + FSDataInputStream dstIn = fs.open(dstStat.getPath()); + try { + byte[] readSrc = new byte[(int) + HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT]; + byte[] readDst = new byte[(int) + HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT]; + + int srcBytesRead = 0, tgtBytesRead = 0; + int srcIdx = 0, tgtIdx = 0; + long totalComparedBytes = 0; + while (true) { + if (srcBytesRead == 0) { + srcBytesRead = srcIn.read(readSrc); + srcIdx = 0; + } + if (tgtBytesRead == 0) { + tgtBytesRead = dstIn.read(readDst); + tgtIdx = 0; + } + if (srcBytesRead == 0 || tgtBytesRead == 0) { + LOG.info("______ compared src and dst files for " + + totalComparedBytes + " bytes, content match."); + if (srcBytesRead != tgtBytesRead) { + Assert.fail("Read mismatching size, compared " + + totalComparedBytes + " bytes between src and dst file " + + srcStat + " and " + dstStat); + } + if (totalComparedBytes != srcStat.getLen()) { + Assert.fail("Only read/compared " + totalComparedBytes + + " bytes between src and dst file " + srcStat + + " and " + dstStat); + } else { + // success + break; + } + } + for (; srcIdx < srcBytesRead && tgtIdx < tgtBytesRead; + ++srcIdx, ++tgtIdx) { + if (readSrc[srcIdx] != readDst[tgtIdx]) { + Assert.fail("src and dst file does not match at " + + totalComparedBytes + " between " + + srcStat + " and " + dstStat); + } + ++totalComparedBytes; + } + LOG.info("______ compared src and dst files for " + + totalComparedBytes + " bytes, content match. FileLength: " + + srcStat.getLen()); + if (totalComparedBytes == srcStat.getLen()) { + LOG.info("______ Final:" + srcIdx + " " + + srcBytesRead + " " + tgtIdx + " " + tgtBytesRead); + break; + } + if (srcIdx == srcBytesRead) { + srcBytesRead = 0; + } + if (tgtIdx == tgtBytesRead) { + tgtBytesRead = 0; + } + } + } finally { + if (srcIn != null) { + srcIn.close(); + } + if (dstIn != null) { + dstIn.close(); + } + } } - @AfterClass - public static void afterClass() throws IOException { - if (cluster != null) { - cluster.shutdown(); + // WC: needed because the current distcp does not create target dirs + private void createDestDir(FileSystem fs, String testDst, + FileStatus[] srcStats, FileEntry[] srcFiles) throws IOException { + fs.mkdirs(new Path(testDst)); + + for (int i=0; i=0; --i) { + if (!srcFiles[i].isDirectory()) { + LOG.info("Modifying " + srcStats[i].getPath()); + DFSTestUtil.appendFileNewBlock(fs, srcStats[i].getPath(), + (int)BLOCK_SIZE * 3); + break; + } + } + // get file status after modifying file + srcStats = getFileStatus(fs, testRoot, srcFiles); + + args = new String[] {"-pugp", "-update", "-blocksperchunk", + String.valueOf(chunkSize), + nnUri + testSrc, nnUri + testDst + "/" + testSrcRel}; + + copyAndVerify(fs, srcFiles, srcStats, testDst, args); + + deldir(fs, testRoot); + } + + @Test + public void testRecursiveChunkCopy() throws Exception { + FileEntry[] srcFiles = { + new FileEntry(SRCDAT, true), + new FileEntry(SRCDAT + "/file0", false), + new FileEntry(SRCDAT + "/dir1", true), + new FileEntry(SRCDAT + "/dir2", true), + new FileEntry(SRCDAT + "/dir1/file1", false) + }; + chunkCopy(srcFiles); + } + + @Test + public void testChunkCopyOneFile() throws Exception { + FileEntry[] srcFiles = { + new FileEntry(SRCDAT, true), + new FileEntry(SRCDAT + "/file0", false) + }; + chunkCopy(srcFiles); + } + + @Test + public void testDistcpLargeFile() throws Exception { + FileEntry[] srcfiles = { + new FileEntry(SRCDAT, true), + new FileEntry(SRCDAT + "/file", false) + }; + + final String testRoot = "/testdir"; + final String testSrcRel = SRCDAT; + final String testSrc = testRoot + "/" + testSrcRel; + final String testDstRel = DSTDAT; + final String testDst = testRoot + "/" + testDstRel; + + String nnUri = FileSystem.getDefaultUri(conf).toString(); + DistributedFileSystem fs = + (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf); + fs.mkdirs(new Path(testRoot)); + fs.mkdirs(new Path(testSrc)); + fs.mkdirs(new Path(testDst)); + long chunkSize = 6; + createFiles(fs, testRoot, srcfiles, chunkSize); + + String srcFileName = testRoot + Path.SEPARATOR + srcfiles[1].getPath(); + Path srcfile = new Path(srcFileName); + + if(!cluster.getFileSystem().exists(srcfile)){ + throw new Exception("src not exist"); + } + + final long srcLen = fs.getFileStatus(srcfile).getLen(); + + FileStatus[] srcstats = getFileStatus(fs, testRoot, srcfiles); + for (int i = 0; i < srcfiles.length; i++) { + fs.setOwner(srcstats[i].getPath(), "u" + i, null); + } + String[] args = new String[] { + "-blocksperchunk", + String.valueOf(chunkSize), + nnUri + testSrc, + nnUri + testDst + }; + + LOG.info("_____ running distcp: " + args[0] + " " + args[1]); + ToolRunner.run(conf, new DistCp(), args); + + String realTgtPath = testDst; + FileStatus[] dststat = getFileStatus(fs, realTgtPath, srcfiles); + assertEquals("File length should match", srcLen, + dststat[dststat.length - 1].getLen()); + + this.compareFiles(fs, srcstats[srcstats.length-1], + dststat[dststat.length-1]); + deldir(fs, testRoot); + } + @Test public void testPreserveUseNonEmptyDir() throws Exception { String testRoot = "/testdir." + getMethodName(); @@ -180,7 +487,6 @@ public class TestDistCpSystem { testPreserveUserHelper(testRoot, srcfiles, dstfiles, false, false, false); } - @Test public void testPreserveUserEmptyDir() throws Exception { String testRoot = "/testdir." + getMethodName(); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index 35778d29f57..acffb76a46e 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -407,7 +407,7 @@ public class TestOptionsParser { + "copyStrategy='uniformsize', preserveStatus=[], " + "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, " + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, " - + "targetPathExists=true, filtersFile='null'}"; + + "targetPathExists=true, filtersFile='null', blocksPerChunk=0}"; String optionString = option.toString(); Assert.assertEquals(val, optionString); Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 2e9a350b1ca..2452d6fee3d 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -81,6 +81,10 @@ public class TestCopyCommitter { @Before public void createMetaFolder() { config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta"); + // Unset listing file path since the config is shared by + // multiple tests, and some test doesn't set it, such as + // testNoCommitAction, but the distcp code will check it. + config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, ""); Path meta = new Path("/meta"); try { cluster.getFileSystem().mkdirs(meta); @@ -326,7 +330,6 @@ public class TestCopyCommitter { committer.commitJob(jobContext); Assert.assertFalse(fs.exists(new Path(workPath))); Assert.assertTrue(fs.exists(new Path(finalPath))); - } catch (IOException e) { LOG.error("Exception encountered while testing for preserve status", e); Assert.fail("Atomic commit failure");