diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f0a70602bc8..c842dc900ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -477,6 +477,9 @@ Release 2.8.0 - UNRELEASED HDFS-8884. Fail-fast check in BlockPlacementPolicyDefault#chooseTarget. (yliu) + HDFS-8828. Utilize Snapshot diff report to build diff copy list in distcp. + (Yufei Gu via Yongjun Zhang) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java index e3c58e9a226..1efc56ca1ba 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java @@ -27,6 +27,8 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.security.Credentials; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import java.io.IOException; import java.lang.reflect.Constructor; @@ -46,7 +48,7 @@ public abstract class CopyListing extends Configured { private Credentials credentials; - + static final Log LOG = LogFactory.getLog(DistCp.class); /** * Build listing function creates the input listing that distcp uses to * perform the copy. @@ -89,6 +91,7 @@ public final void buildListing(Path pathToListFile, config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths()); validateFinalListing(pathToListFile, options); + LOG.info("Number of paths in the copy list: " + this.getNumberOfPaths()); } /** @@ -153,6 +156,7 @@ private void validateFinalListing(Path pathToListFile, DistCpOptions options) Text currentKey = new Text(); Set aclSupportCheckFsSet = Sets.newHashSet(); Set xAttrSupportCheckFsSet = Sets.newHashSet(); + long idx = 0; while (reader.next(currentKey)) { if (currentKey.equals(lastKey)) { CopyListingFileStatus currentFileStatus = new CopyListingFileStatus(); @@ -178,6 +182,12 @@ private void validateFinalListing(Path pathToListFile, DistCpOptions options) } } lastKey.set(currentKey); + + if (options.shouldUseDiff() && LOG.isDebugEnabled()) { + LOG.debug("Copy list entry " + idx + ": " + + lastFileStatus.getPath().toUri().getPath()); + idx++; + } } } finally { IOUtils.closeStream(reader); @@ -224,9 +234,6 @@ public static CopyListing getCopyListing(Configuration configuration, Credentials credentials, DistCpOptions options) throws IOException { - if (options.shouldUseDiff()) { - return new GlobbedCopyListing(configuration, credentials); - } String copyListingClassName = configuration.get(DistCpConstants. CONF_LABEL_COPY_LISTING_CLASS, ""); Class copyListingClass; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java index b617de78803..10a8b7e25c2 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java @@ -17,12 +17,9 @@ */ package org.apache.hadoop.tools; -import java.util.ArrayList; import java.util.Comparator; -import java.util.List; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; /** @@ -54,12 +51,19 @@ public int compare(DiffInfo d1, DiffInfo d2) { */ private Path tmp; /** The target file/dir of the rename op. Null means the op is deletion. */ - final Path target; + Path target; - DiffInfo(Path source, Path target) { + private final SnapshotDiffReport.DiffType type; + + public SnapshotDiffReport.DiffType getType(){ + return this.type; + } + + DiffInfo(Path source, Path target, SnapshotDiffReport.DiffType type) { assert source != null; this.source = source; this.target= target; + this.type = type; } void setTmp(Path tmp) { @@ -69,22 +73,4 @@ void setTmp(Path tmp) { Path getTmp() { return tmp; } - - static DiffInfo[] getDiffs(SnapshotDiffReport report, Path targetDir) { - List diffs = new ArrayList<>(); - for (SnapshotDiffReport.DiffReportEntry entry : report.getDiffList()) { - if (entry.getType() == SnapshotDiffReport.DiffType.DELETE) { - final Path source = new Path(targetDir, - DFSUtil.bytes2String(entry.getSourcePath())); - diffs.add(new DiffInfo(source, null)); - } else if (entry.getType() == SnapshotDiffReport.DiffType.RENAME) { - final Path source = new Path(targetDir, - DFSUtil.bytes2String(entry.getSourcePath())); - final Path target = new Path(targetDir, - DFSUtil.bytes2String(entry.getTargetPath())); - diffs.add(new DiffInfo(source, target)); - } - } - return diffs.toArray(new DiffInfo[diffs.size()]); - } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index 2385df3d90e..f919d211524 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -175,11 +175,18 @@ public Job createAndSubmitJob() throws Exception { job = createJob(); } if (inputOptions.shouldUseDiff()) { - if (!DistCpSync.sync(inputOptions, getConf())) { + DistCpSync distCpSync = new DistCpSync(inputOptions, getConf()); + if (distCpSync.sync()) { + createInputFileListingWithDiff(job, distCpSync); + } else { inputOptions.disableUsingDiff(); } } - createInputFileListing(job); + + // Fallback to default DistCp if without "diff" option or sync failed. + if (!inputOptions.shouldUseDiff()) { + createInputFileListing(job); + } job.submit(); submitted = true; @@ -384,6 +391,22 @@ protected Path createInputFileListing(Job job) throws IOException { return fileListingPath; } + /** + * Create input listing based on snapshot diff report. + * @param job - Handle to job + * @param distCpSync the class wraps the snapshot diff report + * @return Returns the path where the copy listing is created + * @throws IOException - If any + */ + private Path createInputFileListingWithDiff(Job job, DistCpSync distCpSync) + throws IOException { + Path fileListingPath = getFileListingPath(); + CopyListing copyListing = new SimpleCopyListing(job.getConfiguration(), + job.getCredentials(), distCpSync); + copyListing.buildListing(fileListingPath, inputOptions); + return fileListingPath; + } + /** * Get default name of the copy listing file. Use the meta folder * to create the copy listing file diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 302b626b8af..71c4ae90eae 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -614,9 +614,9 @@ public void validate(DistCpOptionSwitch option, boolean value) { throw new IllegalArgumentException( "Append is disallowed when skipping CRC"); } - if ((!syncFolder || !deleteMissing) && useDiff) { + if ((!syncFolder || deleteMissing) && useDiff) { throw new IllegalArgumentException( - "Diff is valid only with update and delete options"); + "Diff is valid only with update options"); } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java index 5bf638d5b3e..47a28a6f927 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java @@ -17,10 +17,10 @@ */ package org.apache.hadoop.tools; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; @@ -29,6 +29,9 @@ import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.EnumMap; +import java.util.ArrayList; +import java.util.HashSet; /** * This class provides the basic functionality to sync two FileSystems based on @@ -41,9 +44,26 @@ * source.s1 */ class DistCpSync { + private DistCpOptions inputOptions; + private Configuration conf; + private EnumMap> diffMap; + private DiffInfo[] renameDiffs; - static boolean sync(DistCpOptions inputOptions, Configuration conf) - throws IOException { + DistCpSync(DistCpOptions options, Configuration conf) { + this.inputOptions = options; + this.conf = conf; + } + + /** + * Check if three conditions are met before sync. + * 1. Only one source directory. + * 2. Both source and target file system are DFS. + * 3. There is no change between from and the current status in target + * file system. + * Throw exceptions if first two aren't met, and return false to fallback to + * default distcp if the third condition isn't met. + */ + private boolean preSyncCheck() throws IOException { List sourcePaths = inputOptions.getSourcePaths(); if (sourcePaths.size() != 1) { // we only support one source dir which must be a snapshottable directory @@ -62,26 +82,41 @@ static boolean sync(DistCpOptions inputOptions, Configuration conf) throw new IllegalArgumentException("The FileSystems needs to" + " be DistributedFileSystem for using snapshot-diff-based distcp"); } - final DistributedFileSystem sourceFs = (DistributedFileSystem) sfs; - final DistributedFileSystem targetFs= (DistributedFileSystem) tfs; + final DistributedFileSystem targetFs = (DistributedFileSystem) tfs; // make sure targetFS has no change between from and the current states - if (!checkNoChange(inputOptions, targetFs, targetDir)) { + if (!checkNoChange(targetFs, targetDir)) { // set the source path using the snapshot path inputOptions.setSourcePaths(Arrays.asList(getSourceSnapshotPath(sourceDir, inputOptions.getToSnapshot()))); return false; } + return true; + } + + public boolean sync() throws IOException { + if (!preSyncCheck()) { + return false; + } + + if (!getAllDiffs()) { + return false; + } + + List sourcePaths = inputOptions.getSourcePaths(); + final Path sourceDir = sourcePaths.get(0); + final Path targetDir = inputOptions.getTargetPath(); + final FileSystem tfs = targetDir.getFileSystem(conf); + final DistributedFileSystem targetFs = (DistributedFileSystem) tfs; Path tmpDir = null; try { tmpDir = createTargetTmpDir(targetFs, targetDir); - DiffInfo[] diffs = getDiffs(inputOptions, sourceFs, sourceDir, targetDir); - if (diffs == null) { - return false; + DiffInfo[] renameAndDeleteDiffs = getRenameAndDeleteDiffs(targetDir); + if (renameAndDeleteDiffs.length > 0) { + // do the real sync work: deletion and rename + syncDiff(renameAndDeleteDiffs, targetFs, tmpDir); } - // do the real sync work: deletion and rename - syncDiff(diffs, targetFs, tmpDir); return true; } catch (Exception e) { DistCp.LOG.warn("Failed to use snapshot diff for distcp", e); @@ -95,11 +130,64 @@ static boolean sync(DistCpOptions inputOptions, Configuration conf) } } - private static String getSnapshotName(String name) { + /** + * Get all diffs from source directory snapshot diff report, put them into an + * EnumMap whose key is DiffType, and value is a DiffInfo list. If there is + * no entry for a given DiffType, the associated value will be an empty list. + */ + private boolean getAllDiffs() throws IOException { + List sourcePaths = inputOptions.getSourcePaths(); + final Path sourceDir = sourcePaths.get(0); + try { + DistributedFileSystem fs = + (DistributedFileSystem) sourceDir.getFileSystem(conf); + final String from = getSnapshotName(inputOptions.getFromSnapshot()); + final String to = getSnapshotName(inputOptions.getToSnapshot()); + SnapshotDiffReport report = fs.getSnapshotDiffReport(sourceDir, + from, to); + + this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class); + for (SnapshotDiffReport.DiffType type : + SnapshotDiffReport.DiffType.values()) { + diffMap.put(type, new ArrayList()); + } + + for (SnapshotDiffReport.DiffReportEntry entry : report.getDiffList()) { + // If the entry is the snapshot root, usually a item like "M\t." + // in the diff report. We don't need to handle it and cannot handle it, + // since its sourcepath is empty. + if (entry.getSourcePath().length <= 0) { + continue; + } + List list = diffMap.get(entry.getType()); + + if (entry.getType() == SnapshotDiffReport.DiffType.MODIFY || + entry.getType() == SnapshotDiffReport.DiffType.CREATE || + entry.getType() == SnapshotDiffReport.DiffType.DELETE) { + final Path source = + new Path(DFSUtil.bytes2String(entry.getSourcePath())); + list.add(new DiffInfo(source, null, entry.getType())); + } else if (entry.getType() == SnapshotDiffReport.DiffType.RENAME) { + final Path source = + new Path(DFSUtil.bytes2String(entry.getSourcePath())); + final Path target = + new Path(DFSUtil.bytes2String(entry.getTargetPath())); + list.add(new DiffInfo(source, target, entry.getType())); + } + } + return true; + } catch (IOException e) { + DistCp.LOG.warn("Failed to compute snapshot diff on " + sourceDir, e); + } + this.diffMap = null; + return false; + } + + private String getSnapshotName(String name) { return Path.CUR_DIR.equals(name) ? "" : name; } - private static Path getSourceSnapshotPath(Path sourceDir, String snapshotName) { + private Path getSourceSnapshotPath(Path sourceDir, String snapshotName) { if (Path.CUR_DIR.equals(snapshotName)) { return sourceDir; } else { @@ -108,8 +196,8 @@ private static Path getSourceSnapshotPath(Path sourceDir, String snapshotName) { } } - private static Path createTargetTmpDir(DistributedFileSystem targetFs, - Path targetDir) throws IOException { + private Path createTargetTmpDir(DistributedFileSystem targetFs, + Path targetDir) throws IOException { final Path tmp = new Path(targetDir, DistCpConstants.HDFS_DISTCP_DIFF_DIRECTORY_NAME + DistCp.rand.nextInt()); if (!targetFs.mkdirs(tmp)) { @@ -118,8 +206,8 @@ private static Path createTargetTmpDir(DistributedFileSystem targetFs, return tmp; } - private static void deleteTargetTmpDir(DistributedFileSystem targetFs, - Path tmpDir) { + private void deleteTargetTmpDir(DistributedFileSystem targetFs, + Path tmpDir) { try { if (tmpDir != null) { targetFs.delete(tmpDir, true); @@ -133,8 +221,7 @@ private static void deleteTargetTmpDir(DistributedFileSystem targetFs, * Compute the snapshot diff on the given file system. Return true if the diff * is empty, i.e., no changes have happened in the FS. */ - private static boolean checkNoChange(DistCpOptions inputOptions, - DistributedFileSystem fs, Path path) { + private boolean checkNoChange(DistributedFileSystem fs, Path path) { try { SnapshotDiffReport targetDiff = fs.getSnapshotDiffReport(path, inputOptions.getFromSnapshot(), ""); @@ -151,22 +238,7 @@ private static boolean checkNoChange(DistCpOptions inputOptions, return false; } - @VisibleForTesting - static DiffInfo[] getDiffs(DistCpOptions inputOptions, - DistributedFileSystem fs, Path sourceDir, Path targetDir) { - try { - final String from = getSnapshotName(inputOptions.getFromSnapshot()); - final String to = getSnapshotName(inputOptions.getToSnapshot()); - SnapshotDiffReport sourceDiff = fs.getSnapshotDiffReport(sourceDir, - from, to); - return DiffInfo.getDiffs(sourceDiff, targetDir); - } catch (IOException e) { - DistCp.LOG.warn("Failed to compute snapshot diff on " + sourceDir, e); - } - return null; - } - - private static void syncDiff(DiffInfo[] diffs, + private void syncDiff(DiffInfo[] diffs, DistributedFileSystem targetFs, Path tmpDir) throws IOException { moveToTmpDir(diffs, targetFs, tmpDir); moveToTarget(diffs, targetFs); @@ -176,7 +248,7 @@ private static void syncDiff(DiffInfo[] diffs, * Move all the source files that should be renamed or deleted to the tmp * directory. */ - private static void moveToTmpDir(DiffInfo[] diffs, + private void moveToTmpDir(DiffInfo[] diffs, DistributedFileSystem targetFs, Path tmpDir) throws IOException { // sort the diffs based on their source paths to make sure the files and // subdirs are moved before moving their parents/ancestors. @@ -196,7 +268,7 @@ private static void moveToTmpDir(DiffInfo[] diffs, * Finish the rename operations: move all the intermediate files/directories * from the tmp dir to the final targets. */ - private static void moveToTarget(DiffInfo[] diffs, + private void moveToTarget(DiffInfo[] diffs, DistributedFileSystem targetFs) throws IOException { // sort the diffs based on their target paths to make sure the parent // directories are created first. @@ -210,4 +282,166 @@ private static void moveToTarget(DiffInfo[] diffs, } } } + + /** + * Get rename and delete diffs and add the targetDir as the prefix of their + * source and target paths. + */ + private DiffInfo[] getRenameAndDeleteDiffs(Path targetDir) { + List renameAndDeleteDiff = new ArrayList<>(); + for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.DELETE)) { + Path source = new Path(targetDir, diff.source); + renameAndDeleteDiff.add(new DiffInfo(source, diff.target, + diff.getType())); + } + + for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.RENAME)) { + Path source = new Path(targetDir, diff.source); + Path target = new Path(targetDir, diff.target); + renameAndDeleteDiff.add(new DiffInfo(source, target, diff.getType())); + } + + return renameAndDeleteDiff.toArray( + new DiffInfo[renameAndDeleteDiff.size()]); + } + + private DiffInfo[] getCreateAndModifyDiffs() { + List createDiff = + diffMap.get(SnapshotDiffReport.DiffType.CREATE); + List modifyDiff = + diffMap.get(SnapshotDiffReport.DiffType.MODIFY); + List diffs = + new ArrayList<>(createDiff.size() + modifyDiff.size()); + diffs.addAll(createDiff); + diffs.addAll(modifyDiff); + return diffs.toArray(new DiffInfo[diffs.size()]); + } + + /** + * Probe for a path being a parent of another. + * @return true if the parent's path matches the start of the child's + */ + private boolean isParentOf(Path parent, Path child) { + String parentPath = parent.toString(); + String childPath = child.toString(); + if (!parentPath.endsWith(Path.SEPARATOR)) { + parentPath += Path.SEPARATOR; + } + + return childPath.length() > parentPath.length() && + childPath.startsWith(parentPath); + } + + /** + * Find the possible rename item which equals to the parent or self of + * a created/modified file/directory. + * @param diff a modify/create diff item + * @param renameDiffArray all rename diffs + * @return possible rename item + */ + private DiffInfo getRenameItem(DiffInfo diff, DiffInfo[] renameDiffArray) { + for (DiffInfo renameItem : renameDiffArray) { + if (diff.source.equals(renameItem.source)) { + // The same path string may appear in: + // 1. both renamed and modified snapshot diff entries. + // 2. both renamed and created snapshot diff entries. + // Case 1 is the about same file/directory, whereas case 2 + // is about two different files/directories. + // We are finding case 1 here, thus we check against DiffType.MODIFY. + if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) { + return renameItem; + } + } else if (isParentOf(renameItem.source, diff.source)) { + // If rename entry is the parent of diff entry, then both MODIFY and + // CREATE diff entries should be handled. + return renameItem; + } + } + return null; + } + + /** + * For a given source path, get its target path based on the rename item. + * @return target path + */ + private Path getTargetPath(Path sourcePath, DiffInfo renameItem) { + if (sourcePath.equals(renameItem.source)) { + return renameItem.target; + } + StringBuffer sb = new StringBuffer(sourcePath.toString()); + String remain = sb.substring(renameItem.source.toString().length() + 1); + return new Path(renameItem.target, remain); + } + + /** + * Prepare the diff list. + * This diff list only includes created or modified files/directories, since + * delete and rename items are synchronized already. + * + * If the parent or self of a source path is renamed, we need to change its + * target path according the correspondent rename item. + * @return a diff list + */ + public ArrayList prepareDiffList() { + DiffInfo[] modifyAndCreateDiffs = getCreateAndModifyDiffs(); + + List renameDiffsList = + diffMap.get(SnapshotDiffReport.DiffType.RENAME); + DiffInfo[] renameDiffArray = + renameDiffsList.toArray(new DiffInfo[renameDiffsList.size()]); + Arrays.sort(renameDiffArray, DiffInfo.sourceComparator); + + ArrayList finalListWithTarget = new ArrayList<>(); + for (DiffInfo diff : modifyAndCreateDiffs) { + DiffInfo renameItem = getRenameItem(diff, renameDiffArray); + if (renameItem == null) { + diff.target = diff.source; + } else { + diff.target = getTargetPath(diff.source, renameItem); + } + finalListWithTarget.add(diff); + } + return finalListWithTarget; + } + + /** + * This method returns a list of items to be excluded when recursively + * traversing newDir to build the copy list. + * + * Specifically, given a newly created directory newDir (a CREATE entry in + * the snapshot diff), if a previously copied file/directory itemX is moved + * (a RENAME entry in the snapshot diff) into newDir, itemX should be + * excluded when recursively traversing newDir in caller method so that it + * will not to be copied again. + * If the same itemX also has a MODIFY entry in the snapshot diff report, + * meaning it was modified after it was previously copied, it will still + * be added to the copy list in caller method. + * @return the exclude list + */ + public HashSet getTraverseExcludeList(Path newDir, Path prefix) { + if (renameDiffs == null) { + List renameList = + diffMap.get(SnapshotDiffReport.DiffType.RENAME); + renameDiffs = renameList.toArray(new DiffInfo[renameList.size()]); + Arrays.sort(renameDiffs, DiffInfo.targetComparator); + } + + if (renameDiffs.length <= 0) { + return null; + } + + boolean foundChild = false; + HashSet excludeList = new HashSet<>(); + for (DiffInfo diff : renameDiffs) { + if (isParentOf(newDir, diff.target)) { + foundChild = true; + excludeList.add(new Path(prefix, diff.target).toUri().getPath()); + } else if (foundChild) { + // The renameDiffs was sorted, the matching section should be + // contiguous. + break; + } + } + return excludeList; + } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index 8f509133494..77743ebe1c3 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; @@ -40,6 +41,7 @@ import java.io.*; import java.util.ArrayList; +import java.util.HashSet; import static org.apache.hadoop.tools.DistCpConstants .HDFS_RESERVED_RAW_DIRECTORY_NAME; @@ -59,6 +61,7 @@ public class SimpleCopyListing extends CopyListing { private int numListstatusThreads = 1; private final int maxRetries = 3; private CopyFilter copyFilter; + private DistCpSync distCpSync; /** * Protected constructor, to initialize configuration. @@ -77,12 +80,20 @@ protected SimpleCopyListing(Configuration configuration, Credentials credentials } @VisibleForTesting - protected SimpleCopyListing(Configuration configuration, Credentials credentials, + protected SimpleCopyListing(Configuration configuration, + Credentials credentials, int numListstatusThreads) { super(configuration, credentials); this.numListstatusThreads = numListstatusThreads; } + protected SimpleCopyListing(Configuration configuration, + Credentials credentials, + DistCpSync distCpSync) { + this(configuration, credentials); + this.distCpSync = distCpSync; + } + @Override protected void validatePaths(DistCpOptions options) throws IOException, InvalidInputException { @@ -157,8 +168,106 @@ protected void validatePaths(DistCpOptions options) /** {@inheritDoc} */ @Override public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException { - doBuildListing(getWriter(pathToListingFile), options); + if(options.shouldUseDiff()) { + doBuildListingWithSnapshotDiff(getWriter(pathToListingFile), options); + }else { + doBuildListing(getWriter(pathToListingFile), options); + } } + + /** + * Get a path with its scheme and authority. + */ + private Path getPathWithSchemeAndAuthority(Path path) throws IOException { + FileSystem fs= path.getFileSystem(getConf()); + String scheme = path.toUri().getScheme(); + if (scheme == null) { + scheme = fs.getUri().getScheme(); + } + + String authority = path.toUri().getAuthority(); + if (authority == null) { + authority = fs.getUri().getAuthority(); + } + + return new Path(scheme, authority, path.toUri().getPath()); + } + + /** + * Write a single file/directory to the sequence file. + * @throws IOException + */ + private void addToFileListing(SequenceFile.Writer fileListWriter, + Path sourceRoot, Path path, DistCpOptions options) throws IOException { + sourceRoot = getPathWithSchemeAndAuthority(sourceRoot); + path = getPathWithSchemeAndAuthority(path); + path = makeQualified(path); + + FileSystem sourceFS = sourceRoot.getFileSystem(getConf()); + FileStatus fileStatus = sourceFS.getFileStatus(path); + final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL); + final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR); + final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs(); + CopyListingFileStatus fileCopyListingStatus = + DistCpUtils.toCopyListingFileStatus(sourceFS, fileStatus, + preserveAcls, preserveXAttrs, preserveRawXAttrs); + + writeToFileListingRoot(fileListWriter, fileCopyListingStatus, + sourceRoot, options); + } + + /** + * Build a copy list based on the snapshot diff report. + * + * Any file/directory changed or created will be in the list. Deleted + * files/directories will not be in the list, since they are handled by + * {@link org.apache.hadoop.tools.DistCpSync#sync}. An item can be + * created/modified and renamed, in which case, the target path is put + * into the list. + * @throws IOException + */ + @VisibleForTesting + public void doBuildListingWithSnapshotDiff(SequenceFile.Writer fileListWriter, + DistCpOptions options) throws IOException { + ArrayList diffList = distCpSync.prepareDiffList(); + Path sourceRoot = options.getSourcePaths().get(0); + FileSystem sourceFS = sourceRoot.getFileSystem(getConf()); + + try { + for (DiffInfo diff : diffList) { + // add snapshot paths prefix + diff.target = new Path(options.getSourcePaths().get(0), diff.target); + if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) { + addToFileListing(fileListWriter, sourceRoot, diff.target, options); + } else if (diff.getType() == SnapshotDiffReport.DiffType.CREATE) { + addToFileListing(fileListWriter, sourceRoot, diff.target, options); + + FileStatus sourceStatus = sourceFS.getFileStatus(diff.target); + if (sourceStatus.isDirectory()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding source dir for traverse: " + + sourceStatus.getPath()); + } + + HashSet excludeList = + distCpSync.getTraverseExcludeList(diff.source, + options.getSourcePaths().get(0)); + + ArrayList sourceDirs = new ArrayList<>(); + sourceDirs.add(sourceStatus); + + traverseDirectory(fileListWriter, sourceFS, sourceDirs, + sourceRoot, options, excludeList); + } + } + } + fileListWriter.close(); + fileListWriter = null; + } finally { + IOUtils.cleanup(LOG, fileListWriter); + } + } + /** * Collect the list of * {@literal } @@ -226,7 +335,7 @@ public void doBuildListing(SequenceFile.Writer fileListWriter, } } traverseDirectory(fileListWriter, sourceFS, sourceDirs, - sourcePathRoot, options); + sourcePathRoot, options, null); } } fileListWriter.close(); @@ -312,9 +421,33 @@ private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException { private static class FileStatusProcessor implements WorkRequestProcessor { private FileSystem fileSystem; + private HashSet excludeList; - public FileStatusProcessor(FileSystem fileSystem) { + public FileStatusProcessor(FileSystem fileSystem, + HashSet excludeList) { this.fileSystem = fileSystem; + this.excludeList = excludeList; + } + + /** + * Get FileStatuses for a given path. + * Exclude the some renamed FileStatuses since they are already handled by + * {@link org.apache.hadoop.tools.DistCpSync#sync}. + * @return an array of file status + */ + private FileStatus[] getFileStatus(Path path) throws IOException { + FileStatus[] fileStatuses = fileSystem.listStatus(path); + if (excludeList != null && excludeList.size() > 0) { + ArrayList fileStatusList = new ArrayList<>(); + for(FileStatus status : fileStatuses) { + if (!excludeList.contains(status.getPath().toUri().getPath())) { + fileStatusList.add(status); + } + } + fileStatuses = fileStatusList.toArray( + new FileStatus[fileStatusList.size()]); + } + return fileStatuses; } /* @@ -344,8 +477,8 @@ public WorkReport processItem( LOG.debug("Interrupted while sleeping in exponential backoff."); } } - result = new WorkReport( - fileSystem.listStatus(parent.getPath()), retry, true); + result = new WorkReport(getFileStatus(parent.getPath()), + retry, true); } catch (FileNotFoundException fnf) { LOG.error("FileNotFoundException exception in listStatus: " + fnf.getMessage()); @@ -376,7 +509,8 @@ private void traverseDirectory(SequenceFile.Writer fileListWriter, FileSystem sourceFS, ArrayList sourceDirs, Path sourcePathRoot, - DistCpOptions options) + DistCpOptions options, + HashSet excludeList) throws IOException { final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL); final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR); @@ -389,7 +523,8 @@ private void traverseDirectory(SequenceFile.Writer fileListWriter, new ProducerConsumer(numListstatusThreads); for (int i = 0; i < numListstatusThreads; i++) { workers.addWorker( - new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()))); + new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()), + excludeList)); } for (FileStatus status : sourceDirs) { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java index 0a9a11fb664..04de8e4d6d9 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java @@ -62,7 +62,6 @@ public void setUp() throws Exception { options = new DistCpOptions(Arrays.asList(source), target); options.setSyncFolder(true); - options.setDeleteMissing(true); options.setUseDiff(true, "s1", "s2"); options.appendToConf(conf); @@ -87,7 +86,7 @@ public void tearDown() throws Exception { @Test public void testFallback() throws Exception { // the source/target dir are not snapshottable dir - Assert.assertFalse(DistCpSync.sync(options, conf)); + Assert.assertFalse(sync()); // make sure the source path has been updated to the snapshot path final Path spath = new Path(source, HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2"); @@ -98,7 +97,7 @@ public void testFallback() throws Exception { // the source/target does not have the given snapshots dfs.allowSnapshot(source); dfs.allowSnapshot(target); - Assert.assertFalse(DistCpSync.sync(options, conf)); + Assert.assertFalse(sync()); Assert.assertEquals(spath, options.getSourcePaths().get(0)); // reset source path in options @@ -106,21 +105,38 @@ public void testFallback() throws Exception { dfs.createSnapshot(source, "s1"); dfs.createSnapshot(source, "s2"); dfs.createSnapshot(target, "s1"); - Assert.assertTrue(DistCpSync.sync(options, conf)); + Assert.assertTrue(sync()); // reset source paths in options options.setSourcePaths(Arrays.asList(source)); // changes have been made in target final Path subTarget = new Path(target, "sub"); dfs.mkdirs(subTarget); - Assert.assertFalse(DistCpSync.sync(options, conf)); + Assert.assertFalse(sync()); // make sure the source path has been updated to the snapshot path Assert.assertEquals(spath, options.getSourcePaths().get(0)); // reset source paths in options options.setSourcePaths(Arrays.asList(source)); dfs.delete(subTarget, true); - Assert.assertTrue(DistCpSync.sync(options, conf)); + Assert.assertTrue(sync()); + } + + private void enableAndCreateFirstSnapshot() throws Exception { + dfs.allowSnapshot(source); + dfs.allowSnapshot(target); + dfs.createSnapshot(source, "s1"); + dfs.createSnapshot(target, "s1"); + } + + private void syncAndVerify() throws Exception { + Assert.assertTrue(sync()); + verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false); + } + + private boolean sync() throws Exception { + DistCpSync distCpSync = new DistCpSync(options, conf); + return distCpSync.sync(); } /** @@ -164,23 +180,30 @@ private void initData(Path dir) throws Exception { * foo/ f4 * f1(new) */ - private void changeData(Path dir) throws Exception { + private int changeData(Path dir) throws Exception { final Path foo = new Path(dir, "foo"); final Path bar = new Path(dir, "bar"); final Path d1 = new Path(foo, "d1"); final Path f2 = new Path(bar, "f2"); final Path bar_d1 = new Path(bar, "d1"); + int numCreatedModified = 0; dfs.rename(d1, bar_d1); + numCreatedModified += 1; // modify ./foo + numCreatedModified += 1; // modify ./bar final Path f3 = new Path(bar_d1, "f3"); dfs.delete(f3, true); final Path newfoo = new Path(bar_d1, "foo"); dfs.rename(foo, newfoo); + numCreatedModified += 1; // modify ./foo/d1 final Path f1 = new Path(newfoo, "f1"); dfs.delete(f1, true); DFSTestUtil.createFile(dfs, f1, 2 * BLOCK_SIZE, DATA_NUM, 0); + numCreatedModified += 1; // create ./foo/f1 DFSTestUtil.appendFile(dfs, f2, (int) BLOCK_SIZE); + numCreatedModified += 1; // modify ./bar/f2 dfs.rename(bar, new Path(dir, "foo")); + return numCreatedModified; } /** @@ -190,13 +213,10 @@ private void changeData(Path dir) throws Exception { public void testSync() throws Exception { initData(source); initData(target); - dfs.allowSnapshot(source); - dfs.allowSnapshot(target); - dfs.createSnapshot(source, "s1"); - dfs.createSnapshot(target, "s1"); + enableAndCreateFirstSnapshot(); // make changes under source - changeData(source); + int numCreatedModified = changeData(source); dfs.createSnapshot(source, "s2"); // before sync, make some further changes on source. this should not affect @@ -206,17 +226,22 @@ public void testSync() throws Exception { final Path newdir = new Path(source, "foo/d1/foo/newdir"); dfs.mkdirs(newdir); + SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2"); + System.out.println(report); + + DistCpSync distCpSync = new DistCpSync(options, conf); + // do the sync - Assert.assertTrue(DistCpSync.sync(options, conf)); + Assert.assertTrue(distCpSync.sync()); // make sure the source path has been updated to the snapshot path final Path spath = new Path(source, - HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2"); + HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2"); Assert.assertEquals(spath, options.getSourcePaths().get(0)); // build copy listing final Path listingPath = new Path("/tmp/META/fileList.seq"); - CopyListing listing = new GlobbedCopyListing(conf, new Credentials()); + CopyListing listing = new SimpleCopyListing(conf, new Credentials(), distCpSync); listing.buildListing(listingPath, options); Map copyListing = getListing(listingPath); @@ -232,6 +257,9 @@ public void testSync() throws Exception { copyMapper.map(entry.getKey(), entry.getValue(), context); } + // verify that we only list modified and created files/directories + Assert.assertEquals(numCreatedModified, copyListing.size()); + // verify that we only copied new appended data of f2 and the new file f1 Assert.assertEquals(BLOCK_SIZE * 3, stubContext.getReporter() .getCounter(CopyMapper.Counter.BYTESCOPIED).getValue()); @@ -285,16 +313,13 @@ public void testSyncWithCurrent() throws Exception { options.setUseDiff(true, "s1", "."); initData(source); initData(target); - dfs.allowSnapshot(source); - dfs.allowSnapshot(target); - dfs.createSnapshot(source, "s1"); - dfs.createSnapshot(target, "s1"); + enableAndCreateFirstSnapshot(); // make changes under source changeData(source); // do the sync - Assert.assertTrue(DistCpSync.sync(options, conf)); + sync(); // make sure the source path is still unchanged Assert.assertEquals(source, options.getSourcePaths().get(0)); } @@ -328,10 +353,7 @@ private void changeData2(Path dir) throws Exception { public void testSync2() throws Exception { initData2(source); initData2(target); - dfs.allowSnapshot(source); - dfs.allowSnapshot(target); - dfs.createSnapshot(source, "s1"); - dfs.createSnapshot(target, "s1"); + enableAndCreateFirstSnapshot(); // make changes under source changeData2(source); @@ -340,9 +362,7 @@ public void testSync2() throws Exception { SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2"); System.out.println(report); - // do the sync - Assert.assertTrue(DistCpSync.sync(options, conf)); - verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false); + syncAndVerify(); } private void initData3(Path dir) throws Exception { @@ -375,16 +395,13 @@ private void changeData3(Path dir) throws Exception { } /** - * Test a case where there are multiple source files with the same name + * Test a case where there are multiple source files with the same name. */ @Test public void testSync3() throws Exception { initData3(source); initData3(target); - dfs.allowSnapshot(source); - dfs.allowSnapshot(target); - dfs.createSnapshot(source, "s1"); - dfs.createSnapshot(target, "s1"); + enableAndCreateFirstSnapshot(); // make changes under source changeData3(source); @@ -393,8 +410,268 @@ public void testSync3() throws Exception { SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2"); System.out.println(report); + syncAndVerify(); + } + + private void initData4(Path dir) throws Exception { + final Path d1 = new Path(dir, "d1"); + final Path d2 = new Path(d1, "d2"); + final Path f1 = new Path(d2, "f1"); + + DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L); + } + + private void changeData4(Path dir) throws Exception { + final Path d1 = new Path(dir, "d1"); + final Path d11 = new Path(dir, "d11"); + final Path d2 = new Path(d1, "d2"); + final Path d21 = new Path(d1, "d21"); + final Path f1 = new Path(d2, "f1"); + + dfs.delete(f1, false); + dfs.rename(d2, d21); + dfs.rename(d1, d11); + } + + /** + * Test a case where multiple level dirs are renamed. + */ + @Test + public void testSync4() throws Exception { + initData4(source); + initData4(target); + enableAndCreateFirstSnapshot(); + + // make changes under source + changeData4(source); + dfs.createSnapshot(source, "s2"); + + SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2"); + System.out.println(report); + + syncAndVerify(); + } + + private void initData5(Path dir) throws Exception { + final Path d1 = new Path(dir, "d1"); + final Path d2 = new Path(dir, "d2"); + final Path f1 = new Path(d1, "f1"); + final Path f2 = new Path(d2, "f2"); + + DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 0L); + } + + private void changeData5(Path dir) throws Exception { + final Path d1 = new Path(dir, "d1"); + final Path d2 = new Path(dir, "d2"); + final Path f1 = new Path(d1, "f1"); + final Path tmp = new Path(dir, "tmp"); + + dfs.delete(f1, false); + dfs.rename(d1, tmp); + dfs.rename(d2, d1); + final Path f2 = new Path(d1, "f2"); + dfs.delete(f2, false); + } + + /** + * Test a case with different delete and rename sequences. + */ + @Test + public void testSync5() throws Exception { + initData5(source); + initData5(target); + enableAndCreateFirstSnapshot(); + + // make changes under source + changeData5(source); + dfs.createSnapshot(source, "s2"); + + SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2"); + System.out.println(report); + + syncAndVerify(); + } + + private void testAndVerify(int numCreatedModified) + throws Exception{ + SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2"); + System.out.println(report); + + DistCpSync distCpSync = new DistCpSync(options, conf); // do the sync - Assert.assertTrue(DistCpSync.sync(options, conf)); - verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false); + Assert.assertTrue(distCpSync.sync()); + + // make sure the source path has been updated to the snapshot path + final Path spath = new Path(source, + HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2"); + Assert.assertEquals(spath, options.getSourcePaths().get(0)); + + // build copy listing + final Path listingPath = new Path("/tmp/META/fileList.seq"); + CopyListing listing = new SimpleCopyListing(conf, new Credentials(), distCpSync); + listing.buildListing(listingPath, options); + + Map copyListing = getListing(listingPath); + CopyMapper copyMapper = new CopyMapper(); + StubContext stubContext = new StubContext(conf, null, 0); + Mapper.Context context = + stubContext.getContext(); + // Enable append + context.getConfiguration().setBoolean( + DistCpOptionSwitch.APPEND.getConfigLabel(), true); + copyMapper.setup(context); + for (Map.Entry entry : + copyListing.entrySet()) { + copyMapper.map(entry.getKey(), entry.getValue(), context); + } + + // verify that we only list modified and created files/directories + Assert.assertEquals(numCreatedModified, copyListing.size()); + + // verify the source and target now has the same structure + verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false); + } + + private void initData6(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path foo_f1 = new Path(foo, "f1"); + final Path bar_f1 = new Path(bar, "f1"); + + DFSTestUtil.createFile(dfs, foo_f1, BLOCK_SIZE, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, bar_f1, BLOCK_SIZE, DATA_NUM, 0L); + } + + private int changeData6(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path foo2 = new Path(dir, "foo2"); + final Path foo_f1 = new Path(foo, "f1"); + + int numCreatedModified = 0; + dfs.rename(foo, foo2); + dfs.rename(bar, foo); + dfs.rename(foo2, bar); + DFSTestUtil.appendFile(dfs, foo_f1, (int) BLOCK_SIZE); + numCreatedModified += 1; // modify ./bar/f1 + return numCreatedModified; + } + + /** + * Test a case where there is a cycle in renaming dirs. + */ + @Test + public void testSync6() throws Exception { + initData6(source); + initData6(target); + enableAndCreateFirstSnapshot(); + int numCreatedModified = changeData6(source); + dfs.createSnapshot(source, "s2"); + + testAndVerify(numCreatedModified); + } + + private void initData7(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path foo_f1 = new Path(foo, "f1"); + final Path bar_f1 = new Path(bar, "f1"); + + DFSTestUtil.createFile(dfs, foo_f1, BLOCK_SIZE, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, bar_f1, BLOCK_SIZE, DATA_NUM, 0L); + } + + private int changeData7(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path foo2 = new Path(dir, "foo2"); + final Path foo_f1 = new Path(foo, "f1"); + final Path foo2_f2 = new Path(foo2, "f2"); + final Path foo_d1 = new Path(foo, "d1"); + final Path foo_d1_f3 = new Path(foo_d1, "f3"); + + int numCreatedModified = 0; + dfs.rename(foo, foo2); + DFSTestUtil.createFile(dfs, foo_f1, BLOCK_SIZE, DATA_NUM, 0L); + numCreatedModified += 2; // create ./foo and ./foo/f1 + DFSTestUtil.appendFile(dfs, foo_f1, (int) BLOCK_SIZE); + dfs.rename(foo_f1, foo2_f2); + numCreatedModified -= 1; // mv ./foo/f1 + numCreatedModified += 2; // "M ./foo" and "+ ./foo/f2" + DFSTestUtil.createFile(dfs, foo_d1_f3, BLOCK_SIZE, DATA_NUM, 0L); + numCreatedModified += 2; // create ./foo/d1 and ./foo/d1/f3 + return numCreatedModified; + } + + /** + * Test a case where rename a dir, then create a new dir with the same name + * and sub dir. + */ + @Test + public void testSync7() throws Exception { + initData7(source); + initData7(target); + enableAndCreateFirstSnapshot(); + int numCreatedModified = changeData7(source); + dfs.createSnapshot(source, "s2"); + + testAndVerify(numCreatedModified); + } + + private void initData8(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path d1 = new Path(dir, "d1"); + final Path foo_f1 = new Path(foo, "f1"); + final Path bar_f1 = new Path(bar, "f1"); + final Path d1_f1 = new Path(d1, "f1"); + + DFSTestUtil.createFile(dfs, foo_f1, BLOCK_SIZE, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, bar_f1, BLOCK_SIZE, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, d1_f1, BLOCK_SIZE, DATA_NUM, 0L); + } + + private int changeData8(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path createdDir = new Path(dir, "c"); + final Path d1 = new Path(dir, "d1"); + final Path d1_f1 = new Path(d1, "f1"); + final Path createdDir_f1 = new Path(createdDir, "f1"); + final Path foo_f3 = new Path(foo, "f3"); + final Path new_foo = new Path(createdDir, "foo"); + final Path foo_f4 = new Path(foo, "f4"); + final Path foo_d1 = new Path(foo, "d1"); + final Path bar = new Path(dir, "bar"); + final Path bar1 = new Path(dir, "bar1"); + + int numCreatedModified = 0; + DFSTestUtil.createFile(dfs, foo_f3, BLOCK_SIZE, DATA_NUM, 0L); + numCreatedModified += 1; // create ./c/foo/f3 + DFSTestUtil.createFile(dfs, createdDir_f1, BLOCK_SIZE, DATA_NUM, 0L); + numCreatedModified += 1; // create ./c + dfs.rename(createdDir_f1, foo_f4); + numCreatedModified += 1; // create ./c/foo/f4 + dfs.rename(d1_f1, createdDir_f1); // rename ./d1/f1 -> ./c/f1 + numCreatedModified += 1; // modify ./c/foo/d1 + dfs.rename(d1, foo_d1); + numCreatedModified += 1; // modify ./c/foo + dfs.rename(foo, new_foo); + dfs.rename(bar, bar1); + return numCreatedModified; + } + + /** + * Test a case where create a dir, then mv a existed dir into it. + */ + @Test + public void testSync8() throws Exception { + initData8(source); + initData8(target); + enableAndCreateFirstSnapshot(); + int numCreatedModified = changeData8(source); + dfs.createSnapshot(source, "s2"); + + testAndVerify(numCreatedModified); } } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index b9d9ada066d..4dc08fb8591 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -655,7 +655,7 @@ public void testDiffOption() { false)); DistCpOptions options = OptionsParser.parse(new String[] { "-update", - "-delete", "-diff", "s1", "s2", + "-diff", "s1", "s2", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/" }); options.appendToConf(conf); @@ -665,7 +665,7 @@ public void testDiffOption() { Assert.assertEquals("s2", options.getToSnapshot()); options = OptionsParser.parse(new String[] { - "-delete", "-diff", "s1", ".", "-update", + "-diff", "s1", ".", "-update", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/" }); options.appendToConf(conf); @@ -677,7 +677,7 @@ public void testDiffOption() { // -diff requires two option values try { - OptionsParser.parse(new String[] {"-diff", "s1", "-delete", "-update", + OptionsParser.parse(new String[] {"-diff", "s1", "-update", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/" }); fail("-diff should fail with only one snapshot name"); @@ -686,25 +686,25 @@ public void testDiffOption() { "Must provide both the starting and ending snapshot names", e); } - // make sure -diff is only valid when -update and -delete is specified + // make sure -diff is only valid when -update is specified try { OptionsParser.parse(new String[] { "-diff", "s1", "s2", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/" }); - fail("-diff should fail if -update or -delete option is not specified"); + fail("-diff should fail if -update option is not specified"); } catch (IllegalArgumentException e) { GenericTestUtils.assertExceptionContains( - "Diff is valid only with update and delete options", e); + "Diff is valid only with update options", e); } try { - OptionsParser.parse(new String[] { "-diff", "s1", "s2", "-update", + OptionsParser.parse(new String[] { "-diff", "s1", "s2", "-update", "-delete", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/" }); - fail("-diff should fail if -update or -delete option is not specified"); + fail("-diff should fail if -delete option is specified"); } catch (IllegalArgumentException e) { GenericTestUtils.assertExceptionContains( - "Diff is valid only with update and delete options", e); + "Diff is valid only with update options", e); } try { @@ -712,10 +712,10 @@ public void testDiffOption() { "-delete", "-overwrite", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/" }); - fail("-diff should fail if -update or -delete option is not specified"); + fail("-diff should fail if -update option is not specified"); } catch (IllegalArgumentException e) { GenericTestUtils.assertExceptionContains( - "Diff is valid only with update and delete options", e); + "Diff is valid only with update options", e); } }