From 7bc170ba267c646a99aa102f7e388331a3ffa660 Mon Sep 17 00:00:00 2001 From: Yongjun Zhang Date: Mon, 17 Oct 2016 13:19:47 -0700 Subject: [PATCH] HDFS-9820. Improve distcp to support efficient restore to an earlier snapshot. Contributed by Yongjun Zhang. (cherry picked from commit 8650cc84f20e7d8c32dcdcd91c94372d476e2276) Conflicts hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java --- .../org/apache/hadoop/tools/DiffInfo.java | 47 +- .../java/org/apache/hadoop/tools/DistCp.java | 37 +- .../apache/hadoop/tools/DistCpConstants.java | 1 + .../hadoop/tools/DistCpOptionSwitch.java | 5 + .../apache/hadoop/tools/DistCpOptions.java | 83 +- .../org/apache/hadoop/tools/DistCpSync.java | 254 +++-- .../apache/hadoop/tools/OptionsParser.java | 27 +- .../hadoop/tools/SimpleCopyListing.java | 17 +- .../hadoop/tools/TestDistCpOptions.java | 21 +- .../apache/hadoop/tools/TestDistCpSync.java | 4 +- .../tools/TestDistCpSyncReverseBase.java | 868 ++++++++++++++++++ .../TestDistCpSyncReverseFromSource.java | 36 + .../TestDistCpSyncReverseFromTarget.java | 36 + .../hadoop/tools/TestOptionsParser.java | 79 +- 14 files changed, 1353 insertions(+), 162 deletions(-) create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseBase.java create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseFromSource.java create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseFromTarget.java diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java index 79bb7fe2671..7e563013471 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java @@ -44,28 +44,49 @@ class DiffInfo { }; /** The source file/dir of the rename or deletion op */ - final Path source; + private Path source; + /** The target file/dir of the rename op. Null means the op is deletion. */ + private Path target; + + private SnapshotDiffReport.DiffType type; /** * The intermediate file/dir for the op. For a rename or a delete op, * we first rename the source to this tmp file/dir. */ private Path tmp; - /** The target file/dir of the rename op. Null means the op is deletion. */ - Path target; - private final SnapshotDiffReport.DiffType type; - - public SnapshotDiffReport.DiffType getType(){ - return this.type; - } - - DiffInfo(Path source, Path target, SnapshotDiffReport.DiffType type) { + DiffInfo(final Path source, final Path target, + SnapshotDiffReport.DiffType type) { assert source != null; this.source = source; this.target= target; this.type = type; } + void setSource(final Path source) { + this.source = source; + } + + Path getSource() { + return source; + } + + void setTarget(final Path target) { + this.target = target; + } + + Path getTarget() { + return target; + } + + public void setType(final SnapshotDiffReport.DiffType type){ + this.type = type; + } + + public SnapshotDiffReport.DiffType getType(){ + return type; + } + void setTmp(Path tmp) { this.tmp = tmp; } @@ -73,4 +94,10 @@ class DiffInfo { Path getTmp() { return tmp; } + + @Override + public String toString() { + return type + ": src=" + String.valueOf(source) + " tgt=" + + String.valueOf(target) + " tmp=" + String.valueOf(tmp); + } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index deecb3ac709..7b0d9f2452d 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -77,6 +77,24 @@ public class DistCp extends Configured implements Tool { private boolean submitted; private FileSystem jobFS; + private void prepareFileListing(Job job) throws Exception { + if (inputOptions.shouldUseSnapshotDiff()) { + // When "-diff" or "-rdiff" is passed, do sync() first, then + // create copyListing based on snapshot diff. + DistCpSync distCpSync = new DistCpSync(inputOptions, getConf()); + if (distCpSync.sync()) { + createInputFileListingWithDiff(job, distCpSync); + } else { + throw new Exception("DistCp sync failed, input options: " + + inputOptions); + } + } else { + // When no "-diff" or "-rdiff" is passed, create copyListing + // in regular way. + createInputFileListing(job); + } + } + /** * Public Constructor. Creates DistCp object with specified input-parameters. * (E.g. source-paths, target-location, etc.) @@ -175,21 +193,7 @@ public class DistCp extends Configured implements Tool { jobFS = metaFolder.getFileSystem(getConf()); job = createJob(); } - if (inputOptions.shouldUseDiff()) { - DistCpSync distCpSync = new DistCpSync(inputOptions, getConf()); - if (distCpSync.sync()) { - createInputFileListingWithDiff(job, distCpSync); - } else { - throw new Exception("DistCp sync failed, input options: " - + inputOptions); - } - } - - // Fallback to default DistCp if without "diff" option or sync failed. - if (!inputOptions.shouldUseDiff()) { - createInputFileListing(job); - } - + prepareFileListing(job); job.submit(); submitted = true; } finally { @@ -199,7 +203,8 @@ public class DistCp extends Configured implements Tool { } String jobID = job.getJobID().toString(); - job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); + job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, + jobID); LOG.info("DistCp job-id: " + jobID); return job; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index a6eda53439d..ecf1859ae78 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -60,6 +60,7 @@ public class DistCpConstants { public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite"; public static final String CONF_LABEL_APPEND = "distcp.copy.append"; public static final String CONF_LABEL_DIFF = "distcp.copy.diff"; + public static final String CONF_LABEL_RDIFF = "distcp.copy.rdiff"; public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb"; public static final String CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE = "distcp.simplelisting.file.status.size"; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index ef6b2c1f93f..b0007917e85 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -159,6 +159,11 @@ public enum DistCpOptionSwitch { "Use snapshot diff report to identify the difference between source and target"), 2), + RDIFF(DistCpConstants.CONF_LABEL_RDIFF, + new Option("rdiff", false, + "Use target snapshot diff report to identify changes made on target"), + 2), + /** * Should DisctpExecution be blocking */ diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index a3af9176ecf..c61816aa0e9 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -18,6 +18,7 @@ package org.apache.hadoop.tools; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.tools.util.DistCpUtils; @@ -42,8 +43,29 @@ public class DistCpOptions { private boolean append = false; private boolean skipCRC = false; private boolean blocking = true; + // When "-diff s1 s2 src tgt" is passed, apply forward snapshot diff (from s1 + // to s2) of source cluster to the target cluster to sync target cluster with + // the source cluster. Referred to as "Fdiff" in the code. + // It's required that s2 is newer than s1. private boolean useDiff = false; + // When "-rdiff s2 s1 src tgt" is passed, apply reversed snapshot diff (from + // s2 to s1) of target cluster to the target cluster, so to make target + // cluster go back to s1. Referred to as "Rdiff" in the code. + // It's required that s2 is newer than s1, and src and tgt have exact same + // content at their s1, if src is not the same as tgt. + private boolean useRdiff = false; + + // For both -diff and -rdiff, given the example command line switches, two + // steps are taken: + // 1. Sync Step. This step does renaming/deletion ops in the snapshot diff, + // so to avoid copying files copied already but renamed later(HDFS-7535) + // 2. Copy Step. This step copy the necessary files from src to tgt + // 2.1 For -diff, it copies from snapshot s2 of src (HDFS-8828) + // 2.2 For -rdiff, it copies from snapshot s1 of src, where the src + // could be the tgt itself (HDFS-9820). + // + public static final int maxNumListstatusThreads = 40; private int numListstatusThreads = 0; // Indicates that flag is not set. private int maxMaps = DistCpConstants.DEFAULT_MAPS; @@ -131,6 +153,8 @@ public class DistCpOptions { this.overwrite = that.overwrite; this.skipCRC = that.skipCRC; this.blocking = that.blocking; + this.useDiff = that.useDiff; + this.useRdiff = that.useRdiff; this.numListstatusThreads = that.numListstatusThreads; this.maxMaps = that.maxMaps; this.mapBandwidth = that.mapBandwidth; @@ -203,7 +227,7 @@ public class DistCpOptions { public void setDeleteMissing(boolean deleteMissing) { validate(DistCpOptionSwitch.DELETE_MISSING, deleteMissing); this.deleteMissing = deleteMissing; - ignoreDeleteMissingIfUseDiff(); + ignoreDeleteMissingIfUseSnapshotDiff(); } /** @@ -211,9 +235,9 @@ public class DistCpOptions { * For backward compatibility, we ignore the -delete option here, instead of * throwing an IllegalArgumentException. See HDFS-10397 for more discussion. */ - private void ignoreDeleteMissingIfUseDiff() { - if (deleteMissing && useDiff) { - OptionsParser.LOG.warn("-delete and -diff are mutually exclusive. " + + private void ignoreDeleteMissingIfUseSnapshotDiff() { + if (deleteMissing && (useDiff || useRdiff)) { + OptionsParser.LOG.warn("-delete and -diff/-rdiff are mutually exclusive. " + "The -delete option will be ignored."); deleteMissing = false; } @@ -295,6 +319,14 @@ public class DistCpOptions { return this.useDiff; } + public boolean shouldUseRdiff() { + return this.useRdiff; + } + + public boolean shouldUseSnapshotDiff() { + return shouldUseDiff() || shouldUseRdiff(); + } + public String getFromSnapshot() { return this.fromSnapshot; } @@ -303,16 +335,20 @@ public class DistCpOptions { return this.toSnapshot; } - public void setUseDiff(boolean useDiff, String fromSnapshot, String toSnapshot) { - validate(DistCpOptionSwitch.DIFF, useDiff); - this.useDiff = useDiff; - this.fromSnapshot = fromSnapshot; - this.toSnapshot = toSnapshot; - ignoreDeleteMissingIfUseDiff(); + public void setUseDiff(String fromSS, String toSS) { + this.fromSnapshot = fromSS; + this.toSnapshot = toSS; + validate(DistCpOptionSwitch.DIFF, true); + this.useDiff = true; + ignoreDeleteMissingIfUseSnapshotDiff(); } - public void disableUsingDiff() { - this.useDiff = false; + public void setUseRdiff(String fromSS, String toSS) { + this.fromSnapshot = fromSS; + this.toSnapshot = toSS; + validate(DistCpOptionSwitch.RDIFF, true); + this.useRdiff = true; + ignoreDeleteMissingIfUseSnapshotDiff(); } /** @@ -601,6 +637,7 @@ public class DistCpOptions { value : this.skipCRC); boolean append = (option == DistCpOptionSwitch.APPEND ? value : this.append); boolean useDiff = (option == DistCpOptionSwitch.DIFF ? value : this.useDiff); + boolean useRdiff = (option == DistCpOptionSwitch.RDIFF ? value : this.useRdiff); if (syncFolder && atomicCommit) { throw new IllegalArgumentException("Atomic commit can't be used with " + @@ -629,16 +666,29 @@ public class DistCpOptions { throw new IllegalArgumentException( "Append is disallowed when skipping CRC"); } - if (!syncFolder && useDiff) { + if (!syncFolder && (useDiff || useRdiff)) { throw new IllegalArgumentException( - "Diff is valid only with update options"); + "-diff/-rdiff is valid only with -update option"); + } + + if (useDiff || useRdiff) { + if (StringUtils.isBlank(fromSnapshot) || + StringUtils.isBlank(toSnapshot)) { + throw new IllegalArgumentException( + "Must provide both the starting and ending " + + "snapshot names for -diff/-rdiff"); + } + } + if (useDiff && useRdiff) { + throw new IllegalArgumentException( + "-diff and -rdiff are mutually exclusive"); } } /** * Add options to configuration. These will be used in the Mapper/committer * - * @param conf - Configruation object to which the options need to be added + * @param conf - Configuration object to which the options need to be added */ public void appendToConf(Configuration conf) { DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT, @@ -655,6 +705,8 @@ public class DistCpOptions { String.valueOf(append)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIFF, String.valueOf(useDiff)); + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.RDIFF, + String.valueOf(useRdiff)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC, String.valueOf(skipCRC)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH, @@ -682,6 +734,7 @@ public class DistCpOptions { ", overwrite=" + overwrite + ", append=" + append + ", useDiff=" + useDiff + + ", useRdiff=" + useRdiff + ", fromSnapshot=" + fromSnapshot + ", toSnapshot=" + toSnapshot + ", skipCRC=" + skipCRC + diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java index 38a1bef5071..f1fae11bd22 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java @@ -50,6 +50,11 @@ import java.util.HashSet; class DistCpSync { private DistCpOptions inputOptions; private Configuration conf; + // diffMap maps snapshot diff op type to a list of diff ops. + // It's initially created based on the snapshot diff. Then the individual + // diff stored there maybe modified instead of copied by the distcp algorithm + // afterwards, for better performance. + // private EnumMap> diffMap; private DiffInfo[] renameDiffs; @@ -58,6 +63,10 @@ class DistCpSync { this.conf = conf; } + private boolean isRdiff() { + return inputOptions.shouldUseRdiff(); + } + /** * Check if three conditions are met before sync. * 1. Only one source directory. @@ -77,21 +86,25 @@ class DistCpSync { final Path sourceDir = sourcePaths.get(0); final Path targetDir = inputOptions.getTargetPath(); - final FileSystem sfs = sourceDir.getFileSystem(conf); - final FileSystem tfs = targetDir.getFileSystem(conf); + final FileSystem srcFs = sourceDir.getFileSystem(conf); + final FileSystem tgtFs = targetDir.getFileSystem(conf); + final FileSystem snapshotDiffFs = isRdiff() ? tgtFs : srcFs; + final Path snapshotDiffDir = isRdiff() ? targetDir : sourceDir; + // currently we require both the source and the target file system are // DistributedFileSystem. - if (!(sfs instanceof DistributedFileSystem) || - !(tfs instanceof DistributedFileSystem)) { + if (!(srcFs instanceof DistributedFileSystem) || + !(tgtFs instanceof DistributedFileSystem)) { throw new IllegalArgumentException("The FileSystems needs to" + " be DistributedFileSystem for using snapshot-diff-based distcp"); } - final DistributedFileSystem targetFs = (DistributedFileSystem) tfs; + + final DistributedFileSystem targetFs = (DistributedFileSystem) tgtFs; // make sure targetFS has no change between from and the current states if (!checkNoChange(targetFs, targetDir)) { // set the source path using the snapshot path - inputOptions.setSourcePaths(Arrays.asList(getSourceSnapshotPath(sourceDir, + inputOptions.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir, inputOptions.getToSnapshot()))); return false; } @@ -101,17 +114,27 @@ class DistCpSync { try { final FileStatus fromSnapshotStat = - sfs.getFileStatus(getSourceSnapshotPath(sourceDir, from)); + snapshotDiffFs.getFileStatus(getSnapshotPath(snapshotDiffDir, from)); final FileStatus toSnapshotStat = - sfs.getFileStatus(getSourceSnapshotPath(sourceDir, to)); + snapshotDiffFs.getFileStatus(getSnapshotPath(snapshotDiffDir, to)); - // If toSnapshot isn't current dir then do a time check - if (!to.equals("") - && fromSnapshotStat.getModificationTime() > toSnapshotStat - .getModificationTime()) { - throw new HadoopIllegalArgumentException("Snapshot " + to - + " should be newer than " + from); + if (isRdiff()) { + // If fromSnapshot isn't current dir then do a time check + if (!from.equals("") + && fromSnapshotStat.getModificationTime() < toSnapshotStat + .getModificationTime()) { + throw new HadoopIllegalArgumentException("Snapshot " + from + + " should be newer than " + to); + } + } else { + // If toSnapshot isn't current dir then do a time check + if(!to.equals("") + && fromSnapshotStat.getModificationTime() > toSnapshotStat + .getModificationTime()) { + throw new HadoopIllegalArgumentException("Snapshot " + to + + " should be newer than " + from); + } } } catch (FileNotFoundException nfe) { throw new InvalidInputException("Input snapshot is not found", nfe); @@ -138,7 +161,8 @@ class DistCpSync { Path tmpDir = null; try { tmpDir = createTargetTmpDir(targetFs, targetDir); - DiffInfo[] renameAndDeleteDiffs = getRenameAndDeleteDiffs(targetDir); + DiffInfo[] renameAndDeleteDiffs = + getRenameAndDeleteDiffsForSync(targetDir); if (renameAndDeleteDiffs.length > 0) { // do the real sync work: deletion and rename syncDiff(renameAndDeleteDiffs, targetFs, tmpDir); @@ -151,7 +175,7 @@ class DistCpSync { deleteTargetTmpDir(targetFs, tmpDir); // TODO: since we have tmp directory, we can support "undo" with failures // set the source path using the snapshot path - inputOptions.setSourcePaths(Arrays.asList(getSourceSnapshotPath(sourceDir, + inputOptions.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir, inputOptions.getToSnapshot()))); } } @@ -162,16 +186,16 @@ class DistCpSync { * no entry for a given DiffType, the associated value will be an empty list. */ private boolean getAllDiffs() throws IOException { - List sourcePaths = inputOptions.getSourcePaths(); - final Path sourceDir = sourcePaths.get(0); + Path ssDir = isRdiff()? + inputOptions.getTargetPath() : inputOptions.getSourcePaths().get(0); + try { DistributedFileSystem fs = - (DistributedFileSystem) sourceDir.getFileSystem(conf); + (DistributedFileSystem) ssDir.getFileSystem(conf); final String from = getSnapshotName(inputOptions.getFromSnapshot()); final String to = getSnapshotName(inputOptions.getToSnapshot()); - SnapshotDiffReport report = fs.getSnapshotDiffReport(sourceDir, + SnapshotDiffReport report = fs.getSnapshotDiffReport(ssDir, from, to); - this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class); for (SnapshotDiffReport.DiffType type : SnapshotDiffReport.DiffType.values()) { @@ -185,25 +209,25 @@ class DistCpSync { if (entry.getSourcePath().length <= 0) { continue; } - List list = diffMap.get(entry.getType()); - - if (entry.getType() == SnapshotDiffReport.DiffType.MODIFY || - entry.getType() == SnapshotDiffReport.DiffType.CREATE || - entry.getType() == SnapshotDiffReport.DiffType.DELETE) { + SnapshotDiffReport.DiffType dt = entry.getType(); + List list = diffMap.get(dt); + if (dt == SnapshotDiffReport.DiffType.MODIFY || + dt == SnapshotDiffReport.DiffType.CREATE || + dt == SnapshotDiffReport.DiffType.DELETE) { final Path source = new Path(DFSUtilClient.bytes2String(entry.getSourcePath())); - list.add(new DiffInfo(source, null, entry.getType())); - } else if (entry.getType() == SnapshotDiffReport.DiffType.RENAME) { + list.add(new DiffInfo(source, null, dt)); + } else if (dt == SnapshotDiffReport.DiffType.RENAME) { final Path source = new Path(DFSUtilClient.bytes2String(entry.getSourcePath())); final Path target = new Path(DFSUtilClient.bytes2String(entry.getTargetPath())); - list.add(new DiffInfo(source, target, entry.getType())); + list.add(new DiffInfo(source, target, dt)); } } return true; } catch (IOException e) { - DistCp.LOG.warn("Failed to compute snapshot diff on " + sourceDir, e); + DistCp.LOG.warn("Failed to compute snapshot diff on " + ssDir, e); } this.diffMap = null; return false; @@ -213,11 +237,11 @@ class DistCpSync { return Path.CUR_DIR.equals(name) ? "" : name; } - private Path getSourceSnapshotPath(Path sourceDir, String snapshotName) { + private Path getSnapshotPath(Path inputDir, String snapshotName) { if (Path.CUR_DIR.equals(snapshotName)) { - return sourceDir; + return inputDir; } else { - return new Path(sourceDir, + return new Path(inputDir, HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + snapshotName); } } @@ -249,8 +273,9 @@ class DistCpSync { */ private boolean checkNoChange(DistributedFileSystem fs, Path path) { try { + final String from = getSnapshotName(inputOptions.getFromSnapshot()); SnapshotDiffReport targetDiff = - fs.getSnapshotDiffReport(path, inputOptions.getFromSnapshot(), ""); + fs.getSnapshotDiffReport(path, from, ""); if (!targetDiff.getDiffList().isEmpty()) { DistCp.LOG.warn("The target has been modified since snapshot " + inputOptions.getFromSnapshot()); @@ -259,7 +284,8 @@ class DistCpSync { return true; } } catch (IOException e) { - DistCp.LOG.warn("Failed to compute snapshot diff on " + path, e); + DistCp.LOG.warn("Failed to compute snapshot diff on " + path + + " at snapshot " + inputOptions.getFromSnapshot(), e); } return false; } @@ -281,12 +307,13 @@ class DistCpSync { Arrays.sort(diffs, DiffInfo.sourceComparator); Random random = new Random(); for (DiffInfo diff : diffs) { - Path tmpTarget = new Path(tmpDir, diff.source.getName()); + Path tmpTarget = new Path(tmpDir, diff.getSource().getName()); while (targetFs.exists(tmpTarget)) { - tmpTarget = new Path(tmpDir, diff.source.getName() + random.nextInt()); + tmpTarget = new Path(tmpDir, + diff.getSource().getName() + random.nextInt()); } diff.setTmp(tmpTarget); - targetFs.rename(diff.source, tmpTarget); + targetFs.rename(diff.getSource(), tmpTarget); } } @@ -300,11 +327,11 @@ class DistCpSync { // directories are created first. Arrays.sort(diffs, DiffInfo.targetComparator); for (DiffInfo diff : diffs) { - if (diff.target != null) { - if (!targetFs.exists(diff.target.getParent())) { - targetFs.mkdirs(diff.target.getParent()); + if (diff.getTarget() != null) { + if (!targetFs.exists(diff.getTarget().getParent())) { + targetFs.mkdirs(diff.getTarget().getParent()); } - targetFs.rename(diff.getTmp(), diff.target); + targetFs.rename(diff.getTmp(), diff.getTarget()); } } } @@ -313,17 +340,80 @@ class DistCpSync { * Get rename and delete diffs and add the targetDir as the prefix of their * source and target paths. */ - private DiffInfo[] getRenameAndDeleteDiffs(Path targetDir) { + private DiffInfo[] getRenameAndDeleteDiffsForSync(Path targetDir) { + // NOTE: when HDFS-10263 is done, getRenameAndDeleteDiffsRdiff + // should be the same as getRenameAndDeleteDiffsFdiff. Specifically, + // we should just move the body of getRenameAndDeleteDiffsFdiff + // to here and remove both getRenameAndDeleteDiffsFdiff + // and getRenameAndDeleteDiffsDdiff. + if (isRdiff()) { + return getRenameAndDeleteDiffsRdiff(targetDir); + } else { + return getRenameAndDeleteDiffsFdiff(targetDir); + } + } + + /** + * Get rename and delete diffs and add the targetDir as the prefix of their + * source and target paths. + */ + private DiffInfo[] getRenameAndDeleteDiffsRdiff(Path targetDir) { + List renameDiffsList = + diffMap.get(SnapshotDiffReport.DiffType.RENAME); + + // Prepare a renameDiffArray for translating deleted items below. + // Do a reversion here due to HDFS-10263. + List renameDiffsListReversed = + new ArrayList(renameDiffsList.size()); + for (DiffInfo diff : renameDiffsList) { + renameDiffsListReversed.add(new DiffInfo(diff.getTarget(), + diff.getSource(), diff.getType())); + } + DiffInfo[] renameDiffArray = + renameDiffsListReversed.toArray(new DiffInfo[renameDiffsList.size()]); + + Arrays.sort(renameDiffArray, DiffInfo.sourceComparator); + + List renameAndDeleteDiff = new ArrayList<>(); + // Traverse DELETE list, which we need to delete them in sync process. + // Use the renameDiffArray prepared to translate the path. + for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.DELETE)) { + DiffInfo renameItem = getRenameItem(diff, renameDiffArray); + Path source; + if (renameItem != null) { + source = new Path(targetDir, + translateRenamedPath(diff.getSource(), renameItem)); + } else { + source = new Path(targetDir, diff.getSource()); + } + renameAndDeleteDiff.add(new DiffInfo(source, null, + SnapshotDiffReport.DiffType.DELETE)); + } + for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.RENAME)) { + // swap target and source here for Rdiff + Path source = new Path(targetDir, diff.getSource()); + Path target = new Path(targetDir, diff.getTarget()); + renameAndDeleteDiff.add(new DiffInfo(source, target, diff.getType())); + } + return renameAndDeleteDiff.toArray( + new DiffInfo[renameAndDeleteDiff.size()]); + } + + /** + * Get rename and delete diffs and add the targetDir as the prefix of their + * source and target paths. + */ + private DiffInfo[] getRenameAndDeleteDiffsFdiff(Path targetDir) { List renameAndDeleteDiff = new ArrayList<>(); for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.DELETE)) { - Path source = new Path(targetDir, diff.source); - renameAndDeleteDiff.add(new DiffInfo(source, diff.target, + Path source = new Path(targetDir, diff.getSource()); + renameAndDeleteDiff.add(new DiffInfo(source, diff.getTarget(), diff.getType())); } for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.RENAME)) { - Path source = new Path(targetDir, diff.source); - Path target = new Path(targetDir, diff.target); + Path source = new Path(targetDir, diff.getSource()); + Path target = new Path(targetDir, diff.getTarget()); renameAndDeleteDiff.add(new DiffInfo(source, target, diff.getType())); } @@ -367,7 +457,7 @@ class DistCpSync { */ private DiffInfo getRenameItem(DiffInfo diff, DiffInfo[] renameDiffArray) { for (DiffInfo renameItem : renameDiffArray) { - if (diff.source.equals(renameItem.source)) { + if (diff.getSource().equals(renameItem.getSource())) { // The same path string may appear in: // 1. both renamed and modified snapshot diff entries. // 2. both renamed and created snapshot diff entries. @@ -377,7 +467,7 @@ class DistCpSync { if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) { return renameItem; } - } else if (isParentOf(renameItem.source, diff.source)) { + } else if (isParentOf(renameItem.getSource(), diff.getSource())) { // If rename entry is the parent of diff entry, then both MODIFY and // CREATE diff entries should be handled. return renameItem; @@ -387,16 +477,27 @@ class DistCpSync { } /** - * For a given source path, get its target path based on the rename item. + * For a given sourcePath, get its real path if it or its parent was renamed. + * + * For example, if we renamed dirX to dirY, and created dirY/fileX, + * the initial snapshot diff would be a CREATE snapshot diff that looks like + * + dirX/fileX + * The rename snapshot diff looks like + * R dirX dirY + * + * We convert the soucePath dirX/fileX to dirY/fileX here. + * * @return target path */ - private Path getTargetPath(Path sourcePath, DiffInfo renameItem) { - if (sourcePath.equals(renameItem.source)) { - return renameItem.target; + private Path translateRenamedPath(Path sourcePath, + DiffInfo renameItem) { + if (sourcePath.equals(renameItem.getSource())) { + return renameItem.getTarget(); } StringBuffer sb = new StringBuffer(sourcePath.toString()); - String remain = sb.substring(renameItem.source.toString().length() + 1); - return new Path(renameItem.target, remain); + String remain = + sb.substring(renameItem.getSource().toString().length() + 1); + return new Path(renameItem.getTarget(), remain); } /** @@ -406,26 +507,35 @@ class DistCpSync { * * If the parent or self of a source path is renamed, we need to change its * target path according the correspondent rename item. + * + * For RDiff usage, the diff.getSource() is what we will use as its target + * path. + * * @return a diff list */ - public ArrayList prepareDiffList() { + public ArrayList prepareDiffListForCopyListing() { DiffInfo[] modifyAndCreateDiffs = getCreateAndModifyDiffs(); - - List renameDiffsList = - diffMap.get(SnapshotDiffReport.DiffType.RENAME); - DiffInfo[] renameDiffArray = - renameDiffsList.toArray(new DiffInfo[renameDiffsList.size()]); - Arrays.sort(renameDiffArray, DiffInfo.sourceComparator); - ArrayList finalListWithTarget = new ArrayList<>(); - for (DiffInfo diff : modifyAndCreateDiffs) { - DiffInfo renameItem = getRenameItem(diff, renameDiffArray); - if (renameItem == null) { - diff.target = diff.source; - } else { - diff.target = getTargetPath(diff.source, renameItem); + if (isRdiff()) { + for (DiffInfo diff : modifyAndCreateDiffs) { + diff.setTarget(diff.getSource()); + finalListWithTarget.add(diff); + } + } else { + List renameDiffsList = + diffMap.get(SnapshotDiffReport.DiffType.RENAME); + DiffInfo[] renameDiffArray = + renameDiffsList.toArray(new DiffInfo[renameDiffsList.size()]); + Arrays.sort(renameDiffArray, DiffInfo.sourceComparator); + for (DiffInfo diff : modifyAndCreateDiffs) { + DiffInfo renameItem = getRenameItem(diff, renameDiffArray); + if (renameItem == null) { + diff.setTarget(diff.getSource()); + } else { + diff.setTarget(translateRenamedPath(diff.getSource(), renameItem)); + } + finalListWithTarget.add(diff); } - finalListWithTarget.add(diff); } return finalListWithTarget; } @@ -459,9 +569,9 @@ class DistCpSync { boolean foundChild = false; HashSet excludeList = new HashSet<>(); for (DiffInfo diff : renameDiffs) { - if (isParentOf(newDir, diff.target)) { + if (isParentOf(newDir, diff.getTarget())) { foundChild = true; - excludeList.add(new Path(prefix, diff.target).toUri().getPath()); + excludeList.add(new Path(prefix, diff.getTarget()).toUri().getPath()); } else if (foundChild) { // The renameDiffs was sorted, the matching section should be // contiguous. diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index b3a40efa4e8..af3cb92cc92 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -28,6 +28,7 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -66,6 +67,13 @@ public class OptionsParser { } } + private static void checkSnapshotsArgs(final String[] snapshots) { + Preconditions.checkArgument(snapshots != null && snapshots.length == 2 + && !StringUtils.isBlank(snapshots[0]) + && !StringUtils.isBlank(snapshots[1]), + "Must provide both the starting and ending snapshot names"); + } + /** * The parse method parses the command-line options, and creates * a corresponding Options object. @@ -74,7 +82,8 @@ public class OptionsParser { * @return The Options object, corresponding to the specified command-line. * @throws IllegalArgumentException Thrown if the parse fails. */ - public static DistCpOptions parse(String args[]) throws IllegalArgumentException { + public static DistCpOptions parse(String[] args) + throws IllegalArgumentException { CommandLineParser parser = new CustomParser(); @@ -154,10 +163,16 @@ public class OptionsParser { parsePreserveStatus(command, option); if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) { - String[] snapshots = getVals(command, DistCpOptionSwitch.DIFF.getSwitch()); - Preconditions.checkArgument(snapshots != null && snapshots.length == 2, - "Must provide both the starting and ending snapshot names"); - option.setUseDiff(true, snapshots[0], snapshots[1]); + String[] snapshots = getVals(command, + DistCpOptionSwitch.DIFF.getSwitch()); + checkSnapshotsArgs(snapshots); + option.setUseDiff(snapshots[0], snapshots[1]); + } + if (command.hasOption(DistCpOptionSwitch.RDIFF.getSwitch())) { + String[] snapshots = getVals(command, + DistCpOptionSwitch.RDIFF.getSwitch()); + checkSnapshotsArgs(snapshots); + option.setUseRdiff(snapshots[0], snapshots[1]); } parseFileLimit(command); @@ -342,7 +357,7 @@ public class OptionsParser { "source paths present"); } option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch. - SOURCE_FILE_LISTING.getSwitch())), targetPath); + SOURCE_FILE_LISTING.getSwitch())), targetPath); } else { if (sourcePaths.isEmpty()) { throw new IllegalArgumentException("Neither source file listing nor " + diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index bc30aa16609..0002d4f56b0 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -194,7 +194,7 @@ public class SimpleCopyListing extends CopyListing { @Override protected void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException { - if(options.shouldUseDiff()) { + if(options.shouldUseSnapshotDiff()) { doBuildListingWithSnapshotDiff(getWriter(pathToListingFile), options); }else { doBuildListing(getWriter(pathToListingFile), options); @@ -256,7 +256,7 @@ public class SimpleCopyListing extends CopyListing { protected void doBuildListingWithSnapshotDiff( SequenceFile.Writer fileListWriter, DistCpOptions options) throws IOException { - ArrayList diffList = distCpSync.prepareDiffList(); + ArrayList diffList = distCpSync.prepareDiffListForCopyListing(); Path sourceRoot = options.getSourcePaths().get(0); FileSystem sourceFS = sourceRoot.getFileSystem(getConf()); @@ -264,13 +264,16 @@ public class SimpleCopyListing extends CopyListing { List fileStatuses = Lists.newArrayList(); for (DiffInfo diff : diffList) { // add snapshot paths prefix - diff.target = new Path(options.getSourcePaths().get(0), diff.target); + diff.setTarget( + new Path(options.getSourcePaths().get(0), diff.getTarget())); if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) { - addToFileListing(fileListWriter, sourceRoot, diff.target, options); + addToFileListing(fileListWriter, + sourceRoot, diff.getTarget(), options); } else if (diff.getType() == SnapshotDiffReport.DiffType.CREATE) { - addToFileListing(fileListWriter, sourceRoot, diff.target, options); + addToFileListing(fileListWriter, + sourceRoot, diff.getTarget(), options); - FileStatus sourceStatus = sourceFS.getFileStatus(diff.target); + FileStatus sourceStatus = sourceFS.getFileStatus(diff.getTarget()); if (sourceStatus.isDirectory()) { if (LOG.isDebugEnabled()) { LOG.debug("Adding source dir for traverse: " + @@ -278,7 +281,7 @@ public class SimpleCopyListing extends CopyListing { } HashSet excludeList = - distCpSync.getTraverseExcludeList(diff.source, + distCpSync.getTraverseExcludeList(diff.getSource(), options.getSourcePaths().get(0)); ArrayList sourceDirs = new ArrayList<>(); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java index 7f15a60334c..74a100c7afa 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java @@ -213,7 +213,7 @@ public class TestDistCpOptions { new Path("hdfs://localhost:8020/target/")); options.setSyncFolder(true); options.setDeleteMissing(true); - options.setUseDiff(true, "s1", "s2"); + options.setUseDiff("s1", "s2"); assertFalse("-delete should be ignored when -diff is specified", options.shouldDeleteMissing()); } catch (IllegalArgumentException e) { @@ -305,7 +305,8 @@ public class TestDistCpOptions { DistCpOptions option = new DistCpOptions(new Path("abc"), new Path("xyz")); final String val = "DistCpOptions{atomicCommit=false, syncFolder=false, " + "deleteMissing=false, ignoreFailures=false, overwrite=false, " - + "append=false, useDiff=false, fromSnapshot=null, toSnapshot=null, " + + "append=false, useDiff=false, useRdiff=false, " + + "fromSnapshot=null, toSnapshot=null, " + "skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " + "mapBandwidth=100, sslConfigurationFile='null', " + "copyStrategy='uniformsize', preserveStatus=[], " @@ -432,7 +433,7 @@ public class TestDistCpOptions { new Path("hdfs://localhost:8020/source/first"), new Path("hdfs://localhost:8020/target/")); options.setSyncFolder(true); - options.setUseDiff(true, "s1", "s2"); + options.setUseDiff("s1", "s2"); Assert.assertTrue(options.shouldUseDiff()); Assert.assertEquals("s1", options.getFromSnapshot()); Assert.assertEquals("s2", options.getToSnapshot()); @@ -442,7 +443,7 @@ public class TestDistCpOptions { new Path("hdfs://localhost:8020/source/first"), new Path("hdfs://localhost:8020/target/")); options.setSyncFolder(true); - options.setUseDiff(true, "s1", "."); + options.setUseDiff("s1", "."); Assert.assertTrue(options.shouldUseDiff()); Assert.assertEquals("s1", options.getFromSnapshot()); Assert.assertEquals(".", options.getToSnapshot()); @@ -453,11 +454,11 @@ public class TestDistCpOptions { final DistCpOptions options = new DistCpOptions( new Path("hdfs://localhost:8020/source/first"), new Path("hdfs://localhost:8020/target/")); - options.setUseDiff(true, "s1", "s2"); + options.setUseDiff("s1", "s2"); fail("-diff should fail if -update option is not specified"); } catch (IllegalArgumentException e) { assertExceptionContains( - "Diff is valid only with update options", e); + "-diff/-rdiff is valid only with -update option", e); } try { @@ -465,7 +466,7 @@ public class TestDistCpOptions { new Path("hdfs://localhost:8020/source/first"), new Path("hdfs://localhost:8020/target/")); options.setSyncFolder(true); - options.setUseDiff(true, "s1", "s2"); + options.setUseDiff("s1", "s2"); options.setDeleteMissing(true); assertFalse("-delete should be ignored when -diff is specified", options.shouldDeleteMissing()); @@ -477,12 +478,12 @@ public class TestDistCpOptions { final DistCpOptions options = new DistCpOptions( new Path("hdfs://localhost:8020/source/first"), new Path("hdfs://localhost:8020/target/")); - options.setUseDiff(true, "s1", "s2"); + options.setUseDiff("s1", "s2"); options.setDeleteMissing(true); fail("-diff should fail if -update option is not specified"); } catch (IllegalArgumentException e) { assertExceptionContains( - "Diff is valid only with update options", e); + "-diff/-rdiff is valid only with -update option", e); } try { @@ -490,7 +491,7 @@ public class TestDistCpOptions { new Path("hdfs://localhost:8020/source/first"), new Path("hdfs://localhost:8020/target/")); options.setDeleteMissing(true); - options.setUseDiff(true, "s1", "s2"); + options.setUseDiff("s1", "s2"); fail("-delete should fail if -update option is not specified"); } catch (IllegalArgumentException e) { assertExceptionContains("Delete missing is applicable only with update " + diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java index 3419b2f07d7..94e860412d2 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java @@ -64,7 +64,7 @@ public class TestDistCpSync { options = new DistCpOptions(Arrays.asList(source), target); options.setSyncFolder(true); - options.setUseDiff(true, "s1", "s2"); + options.setUseDiff("s1", "s2"); options.appendToConf(conf); conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString()); @@ -312,7 +312,7 @@ public class TestDistCpSync { */ @Test public void testSyncWithCurrent() throws Exception { - options.setUseDiff(true, "s1", "."); + options.setUseDiff("s1", "."); initData(source); initData(target); enableAndCreateFirstSnapshot(); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseBase.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseBase.java new file mode 100644 index 00000000000..fea374ee1a7 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseBase.java @@ -0,0 +1,868 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.tools.mapred.CopyMapper; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; + +/** + * Base class to test "-rdiff s2 s1". + * Shared by "-rdiff s2 s1 src tgt" and "-rdiff s2 s1 tgt tgt" + */ +public abstract class TestDistCpSyncReverseBase { + private MiniDFSCluster cluster; + private final Configuration conf = new HdfsConfiguration(); + private DistributedFileSystem dfs; + private DistCpOptions options; + private Path source; + private boolean isSrcNotSameAsTgt = true; + private final Path target = new Path("/target"); + private final long blockSize = 1024; + private final short dataNum = 1; + + abstract void initSourcePath(); + + private static List lsr(final String prefix, + final FsShell shell, Path rootDir) throws Exception { + return lsr(prefix, shell, rootDir.toString(), null); + } + + private List lsrSource(final String prefix, + final FsShell shell, Path rootDir) throws Exception { + final Path spath = isSrcNotSameAsTgt? rootDir : + new Path(rootDir.toString(), + HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1"); + return lsr(prefix, shell, spath.toString(), null); + } + + private static List lsr(final String prefix, + final FsShell shell, String rootDir, String glob) throws Exception { + final String dir = glob == null ? rootDir : glob; + System.out.println(prefix + " lsr root=" + rootDir); + final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + final PrintStream out = new PrintStream(bytes); + final PrintStream oldOut = System.out; + final PrintStream oldErr = System.err; + System.setOut(out); + System.setErr(out); + final String results; + try { + Assert.assertEquals(0, shell.run(new String[] {"-lsr", dir })); + results = bytes.toString(); + } finally { + IOUtils.closeStream(out); + System.setOut(oldOut); + System.setErr(oldErr); + } + System.out.println("lsr results:\n" + results); + String dirname = rootDir; + if (rootDir.lastIndexOf(Path.SEPARATOR) != -1) { + dirname = rootDir.substring(rootDir.lastIndexOf(Path.SEPARATOR)); + } + + final List paths = new ArrayList(); + for (StringTokenizer t = new StringTokenizer(results, "\n"); t + .hasMoreTokens();) { + final String s = t.nextToken(); + final int i = s.indexOf(dirname); + if (i >= 0) { + paths.add(s.substring(i + dirname.length())); + } + } + Collections.sort(paths); + System.out + .println("lsr paths = " + paths.toString().replace(", ", ",\n ")); + return paths; + } + + public void setSource(final Path src) { + this.source = src; + } + + public void setSrcNotSameAsTgt(final boolean srcNotSameAsTgt) { + isSrcNotSameAsTgt = srcNotSameAsTgt; + } + + @Before + public void setUp() throws Exception { + initSourcePath(); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dataNum).build(); + cluster.waitActive(); + + dfs = cluster.getFileSystem(); + if (isSrcNotSameAsTgt) { + dfs.mkdirs(source); + } + dfs.mkdirs(target); + + options = new DistCpOptions(Arrays.asList(source), target); + options.setSyncFolder(true); + options.setUseRdiff("s2", "s1"); + options.appendToConf(conf); + + conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString()); + conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, target.toString()); + } + + @After + public void tearDown() throws Exception { + IOUtils.cleanup(null, dfs); + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Test the sync returns false in the following scenarios: + * 1. the source/target dir are not snapshottable dir + * 2. the source/target does not have the given snapshots + * 3. changes have been made in target + */ + @Test + public void testFallback() throws Exception { + // the source/target dir are not snapshottable dir + Assert.assertFalse(sync()); + // make sure the source path has been updated to the snapshot path + final Path spath = new Path(source, + HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1"); + Assert.assertEquals(spath, options.getSourcePaths().get(0)); + + // reset source path in options + options.setSourcePaths(Arrays.asList(source)); + // the source/target does not have the given snapshots + dfs.allowSnapshot(source); + dfs.allowSnapshot(target); + Assert.assertFalse(sync()); + Assert.assertEquals(spath, options.getSourcePaths().get(0)); + + // reset source path in options + options.setSourcePaths(Arrays.asList(source)); + this.enableAndCreateFirstSnapshot(); + dfs.createSnapshot(target, "s2"); + Assert.assertTrue(sync()); + + // reset source paths in options + options.setSourcePaths(Arrays.asList(source)); + // changes have been made in target + final Path subTarget = new Path(target, "sub"); + dfs.mkdirs(subTarget); + Assert.assertFalse(sync()); + // make sure the source path has been updated to the snapshot path + Assert.assertEquals(spath, options.getSourcePaths().get(0)); + + // reset source paths in options + options.setSourcePaths(Arrays.asList(source)); + dfs.delete(subTarget, true); + Assert.assertTrue(sync()); + } + + private void syncAndVerify() throws Exception { + + final FsShell shell = new FsShell(conf); + lsrSource("Before sync source: ", shell, source); + lsr("Before sync target: ", shell, target); + + Assert.assertTrue(sync()); + + lsrSource("After sync source: ", shell, source); + lsr("After sync target: ", shell, target); + + verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false); + } + + private boolean sync() throws Exception { + DistCpSync distCpSync = new DistCpSync(options, conf); + return distCpSync.sync(); + } + + private void enableAndCreateFirstSnapshot() throws Exception { + if (isSrcNotSameAsTgt) { + dfs.allowSnapshot(source); + dfs.createSnapshot(source, "s1"); + } + dfs.allowSnapshot(target); + dfs.createSnapshot(target, "s1"); + } + + private void createSecondSnapshotAtTarget() throws Exception { + dfs.createSnapshot(target, "s2"); + } + + private void createMiddleSnapshotAtTarget() throws Exception { + dfs.createSnapshot(target, "s1.5"); + } + + /** + * create some files and directories under the given directory. + * the final subtree looks like this: + * dir/ + * foo/ bar/ + * d1/ f1 d2/ f2 + * f3 f4 + */ + private void initData(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path d1 = new Path(foo, "d1"); + final Path f1 = new Path(foo, "f1"); + final Path d2 = new Path(bar, "d2"); + final Path f2 = new Path(bar, "f2"); + final Path f3 = new Path(d1, "f3"); + final Path f4 = new Path(d2, "f4"); + + DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0); + DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 0); + DFSTestUtil.createFile(dfs, f3, blockSize, dataNum, 0); + DFSTestUtil.createFile(dfs, f4, blockSize, dataNum, 0); + } + + /** + * make some changes under the given directory (created in the above way). + * 1. rename dir/foo/d1 to dir/bar/d1 + * 2. delete dir/bar/d1/f3 + * 3. rename dir/foo to /dir/bar/d1/foo + * 4. delete dir/bar/d1/foo/f1 + * 5. create file dir/bar/d1/foo/f1 whose size is 2*BLOCK_SIZE + * 6. append one BLOCK to file dir/bar/f2 + * 7. rename dir/bar to dir/foo + * + * Thus after all these ops the subtree looks like this: + * dir/ + * foo/ + * d1/ f2(A) d2/ + * foo/ f4 + * f1(new) + */ + private int changeData(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path d1 = new Path(foo, "d1"); + final Path f2 = new Path(bar, "f2"); + + final Path bar_d1 = new Path(bar, "d1"); + int numDeletedModified = 0; + dfs.rename(d1, bar_d1); + numDeletedModified += 1; // modify ./foo + numDeletedModified += 1; // modify ./bar + final Path f3 = new Path(bar_d1, "f3"); + dfs.delete(f3, true); + numDeletedModified += 1; // delete f3 + final Path newfoo = new Path(bar_d1, "foo"); + dfs.rename(foo, newfoo); + numDeletedModified += 1; // modify ./foo/d1 + final Path f1 = new Path(newfoo, "f1"); + dfs.delete(f1, true); + numDeletedModified += 1; // delete ./foo/f1 + DFSTestUtil.createFile(dfs, f1, 2 * blockSize, dataNum, 0); + DFSTestUtil.appendFile(dfs, f2, (int) blockSize); + numDeletedModified += 1; // modify ./bar/f2 + dfs.rename(bar, new Path(dir, "foo")); + return numDeletedModified; + } + + /** + * Test the basic functionality. + */ + @Test + public void testSync() throws Exception { + if (isSrcNotSameAsTgt) { + initData(source); + } + initData(target); + enableAndCreateFirstSnapshot(); + + final FsShell shell = new FsShell(conf); + + lsrSource("Before source: ", shell, source); + lsr("Before target: ", shell, target); + + // make changes under target + int numDeletedModified = changeData(target); + + createSecondSnapshotAtTarget(); + + SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1"); + System.out.println(report); + + DistCpSync distCpSync = new DistCpSync(options, conf); + + lsr("Before sync target: ", shell, target); + + // do the sync + Assert.assertTrue(distCpSync.sync()); + + lsr("After sync target: ", shell, target); + + // make sure the source path has been updated to the snapshot path + final Path spath = new Path(source, + HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1"); + Assert.assertEquals(spath, options.getSourcePaths().get(0)); + + // build copy listing + final Path listingPath = new Path("/tmp/META/fileList.seq"); + CopyListing listing = new SimpleCopyListing(conf, new Credentials(), + distCpSync); + listing.buildListing(listingPath, options); + + Map copyListing = getListing(listingPath); + CopyMapper copyMapper = new CopyMapper(); + StubContext stubContext = new StubContext(conf, null, 0); + Mapper.Context context = + stubContext.getContext(); + // Enable append + context.getConfiguration().setBoolean( + DistCpOptionSwitch.APPEND.getConfigLabel(), true); + copyMapper.setup(context); + for (Map.Entry entry : + copyListing.entrySet()) { + copyMapper.map(entry.getKey(), entry.getValue(), context); + } + + lsrSource("After mapper source: ", shell, source); + lsr("After mapper target: ", shell, target); + + // verify that we only list modified and created files/directories + Assert.assertEquals(numDeletedModified, copyListing.size()); + + // verify that we only copied new appended data of f2 and the new file f1 + Assert.assertEquals(blockSize * 3, stubContext.getReporter() + .getCounter(CopyMapper.Counter.BYTESCOPIED).getValue()); + + // verify the source and target now has the same structure + verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false); + } + + private Map getListing(Path listingPath) + throws Exception { + SequenceFile.Reader reader = null; + Map values = new HashMap<>(); + try { + reader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(listingPath)); + Text key = new Text(); + CopyListingFileStatus value = new CopyListingFileStatus(); + while (reader.next(key, value)) { + values.put(key, value); + key = new Text(); + value = new CopyListingFileStatus(); + } + } finally { + if (reader != null) { + reader.close(); + } + } + return values; + } + + private void verifyCopy(FileStatus s, FileStatus t, boolean compareName) + throws Exception { + Assert.assertEquals(s.isDirectory(), t.isDirectory()); + if (compareName) { + Assert.assertEquals(s.getPath().getName(), t.getPath().getName()); + } + if (!s.isDirectory()) { + // verify the file content is the same + byte[] sbytes = DFSTestUtil.readFileBuffer(dfs, s.getPath()); + byte[] tbytes = DFSTestUtil.readFileBuffer(dfs, t.getPath()); + Assert.assertArrayEquals(sbytes, tbytes); + } else { + FileStatus[] slist = dfs.listStatus(s.getPath()); + FileStatus[] tlist = dfs.listStatus(t.getPath()); + Assert.assertEquals(slist.length, tlist.length); + for (int i = 0; i < slist.length; i++) { + verifyCopy(slist[i], tlist[i], true); + } + } + } + + /** + * Test the case that "current" is snapshotted as "s2". + * @throws Exception + */ + @Test + public void testSyncWithCurrent() throws Exception { + options.setUseRdiff(".", "s1"); + if (isSrcNotSameAsTgt) { + initData(source); + } + initData(target); + enableAndCreateFirstSnapshot(); + + // make changes under target + changeData(target); + + // do the sync + Assert.assertTrue(sync()); + final Path spath = new Path(source, + HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1"); + // make sure the source path is still unchanged + Assert.assertEquals(spath, options.getSourcePaths().get(0)); + } + + private void initData2(Path dir) throws Exception { + final Path test = new Path(dir, "test"); + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path f1 = new Path(test, "f1"); + final Path f2 = new Path(foo, "f2"); + final Path f3 = new Path(bar, "f3"); + + DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 1L); + DFSTestUtil.createFile(dfs, f3, blockSize, dataNum, 2L); + } + + private void changeData2(Path dir) throws Exception { + final Path tmpFoo = new Path(dir, "tmpFoo"); + final Path test = new Path(dir, "test"); + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + + dfs.rename(test, tmpFoo); + dfs.rename(foo, test); + dfs.rename(bar, foo); + dfs.rename(tmpFoo, bar); + } + + @Test + public void testSync2() throws Exception { + if (isSrcNotSameAsTgt) { + initData2(source); + } + initData2(target); + enableAndCreateFirstSnapshot(); + + // make changes under target + changeData2(target); + + createSecondSnapshotAtTarget(); + + SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1"); + System.out.println(report); + + syncAndVerify(); + } + + private void initData3(Path dir) throws Exception { + final Path test = new Path(dir, "test"); + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path f1 = new Path(test, "file"); + final Path f2 = new Path(foo, "file"); + final Path f3 = new Path(bar, "file"); + + DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, f2, blockSize * 2, dataNum, 1L); + DFSTestUtil.createFile(dfs, f3, blockSize * 3, dataNum, 2L); + } + + private void changeData3(Path dir) throws Exception { + final Path test = new Path(dir, "test"); + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path f1 = new Path(test, "file"); + final Path f2 = new Path(foo, "file"); + final Path f3 = new Path(bar, "file"); + final Path newf1 = new Path(test, "newfile"); + final Path newf2 = new Path(foo, "newfile"); + final Path newf3 = new Path(bar, "newfile"); + + dfs.rename(f1, newf1); + dfs.rename(f2, newf2); + dfs.rename(f3, newf3); + } + + /** + * Test a case where there are multiple source files with the same name. + */ + @Test + public void testSync3() throws Exception { + if (isSrcNotSameAsTgt) { + initData3(source); + } + initData3(target); + enableAndCreateFirstSnapshot(); + + // make changes under target + changeData3(target); + + createSecondSnapshotAtTarget(); + + SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1"); + System.out.println(report); + + syncAndVerify(); + } + + private void initData4(Path dir) throws Exception { + final Path d1 = new Path(dir, "d1"); + final Path d2 = new Path(d1, "d2"); + final Path f1 = new Path(d2, "f1"); + + DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L); + } + + private int changeData4(Path dir) throws Exception { + final Path d1 = new Path(dir, "d1"); + final Path d11 = new Path(dir, "d11"); + final Path d2 = new Path(d1, "d2"); + final Path d21 = new Path(d1, "d21"); + final Path f1 = new Path(d2, "f1"); + + int numDeletedAndModified = 0; + dfs.delete(f1, false); + numDeletedAndModified += 1; + dfs.rename(d2, d21); + numDeletedAndModified += 1; + dfs.rename(d1, d11); + numDeletedAndModified += 1; + return numDeletedAndModified; + } + + /** + * Test a case where multiple level dirs are renamed. + */ + @Test + public void testSync4() throws Exception { + if (isSrcNotSameAsTgt) { + initData4(source); + } + initData4(target); + enableAndCreateFirstSnapshot(); + + final FsShell shell = new FsShell(conf); + lsr("Before change target: ", shell, target); + + // make changes under target + int numDeletedAndModified = changeData4(target); + + createSecondSnapshotAtTarget(); + + SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1"); + System.out.println(report); + + testAndVerify(numDeletedAndModified); + } + + private void initData5(Path dir) throws Exception { + final Path d1 = new Path(dir, "d1"); + final Path d2 = new Path(dir, "d2"); + final Path f1 = new Path(d1, "f1"); + final Path f2 = new Path(d2, "f2"); + + DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 0L); + } + + private int changeData5(Path dir) throws Exception { + final Path d1 = new Path(dir, "d1"); + final Path d2 = new Path(dir, "d2"); + final Path f1 = new Path(d1, "f1"); + final Path tmp = new Path(dir, "tmp"); + + int numDeletedAndModified = 0; + dfs.delete(f1, false); + numDeletedAndModified += 1; + dfs.rename(d1, tmp); + numDeletedAndModified += 1; + dfs.rename(d2, d1); + numDeletedAndModified += 1; + final Path f2 = new Path(d1, "f2"); + dfs.delete(f2, false); + numDeletedAndModified += 1; + return numDeletedAndModified; + } + + /** + * Test a case with different delete and rename sequences. + */ + @Test + public void testSync5() throws Exception { + if (isSrcNotSameAsTgt) { + initData5(source); + } + initData5(target); + enableAndCreateFirstSnapshot(); + + // make changes under target + int numDeletedAndModified = changeData5(target); + + createSecondSnapshotAtTarget(); + + SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1"); + System.out.println(report); + + testAndVerify(numDeletedAndModified); + } + + private void testAndVerify(int numDeletedAndModified) + throws Exception{ + SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1"); + System.out.println(report); + + final FsShell shell = new FsShell(conf); + + lsrSource("Before sync source: ", shell, source); + lsr("Before sync target: ", shell, target); + + DistCpSync distCpSync = new DistCpSync(options, conf); + // do the sync + distCpSync.sync(); + + lsr("After sync target: ", shell, target); + + // make sure the source path has been updated to the snapshot path + final Path spath = new Path(source, + HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1"); + Assert.assertEquals(spath, options.getSourcePaths().get(0)); + + // build copy listing + final Path listingPath = new Path("/tmp/META/fileList.seq"); + CopyListing listing = new SimpleCopyListing(conf, new Credentials(), distCpSync); + listing.buildListing(listingPath, options); + + Map copyListing = getListing(listingPath); + CopyMapper copyMapper = new CopyMapper(); + StubContext stubContext = new StubContext(conf, null, 0); + Mapper.Context context = + stubContext.getContext(); + // Enable append + context.getConfiguration().setBoolean( + DistCpOptionSwitch.APPEND.getConfigLabel(), true); + copyMapper.setup(context); + for (Map.Entry entry : + copyListing.entrySet()) { + copyMapper.map(entry.getKey(), entry.getValue(), context); + } + + // verify that we only list modified and created files/directories + Assert.assertEquals(numDeletedAndModified, copyListing.size()); + + lsr("After Copy target: ", shell, target); + + // verify the source and target now has the same structure + verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false); + } + + private void initData6(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path foo_f1 = new Path(foo, "f1"); + final Path bar_f1 = new Path(bar, "f1"); + + DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L); + } + + private int changeData6(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path foo2 = new Path(dir, "foo2"); + final Path foo_f1 = new Path(foo, "f1"); + + int numDeletedModified = 0; + dfs.rename(foo, foo2); + dfs.rename(bar, foo); + dfs.rename(foo2, bar); + DFSTestUtil.appendFile(dfs, foo_f1, (int) blockSize); + numDeletedModified += 1; // modify ./bar/f1 + return numDeletedModified; + } + + /** + * Test a case where there is a cycle in renaming dirs. + */ + @Test + public void testSync6() throws Exception { + if (isSrcNotSameAsTgt) { + initData6(source); + } + initData6(target); + enableAndCreateFirstSnapshot(); + int numDeletedModified = changeData6(target); + + createSecondSnapshotAtTarget(); + + testAndVerify(numDeletedModified); + } + + private void initData7(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path foo_f1 = new Path(foo, "f1"); + final Path bar_f1 = new Path(bar, "f1"); + + DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L); + } + + private int changeData7(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path foo2 = new Path(dir, "foo2"); + final Path foo_f1 = new Path(foo, "f1"); + final Path foo2_f2 = new Path(foo2, "f2"); + final Path foo_d1 = new Path(foo, "d1"); + final Path foo_d1_f3 = new Path(foo_d1, "f3"); + + int numDeletedAndModified = 0; + dfs.rename(foo, foo2); + DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L); + DFSTestUtil.appendFile(dfs, foo_f1, (int) blockSize); + dfs.rename(foo_f1, foo2_f2); + /* + * Difference between snapshot s1 and current directory under directory + /target: +M . ++ ./foo +R ./foo -> ./foo2 +M ./foo ++ ./foo/f2 + */ + numDeletedAndModified += 1; // "M ./foo" + DFSTestUtil.createFile(dfs, foo_d1_f3, blockSize, dataNum, 0L); + return numDeletedAndModified; + } + + /** + * Test a case where rename a dir, then create a new dir with the same name + * and sub dir. + */ + @Test + public void testSync7() throws Exception { + if (isSrcNotSameAsTgt) { + initData7(source); + } + initData7(target); + enableAndCreateFirstSnapshot(); + int numDeletedAndModified = changeData7(target); + + createSecondSnapshotAtTarget(); + + testAndVerify(numDeletedAndModified); + } + + private void initData8(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path d1 = new Path(dir, "d1"); + final Path foo_f1 = new Path(foo, "f1"); + final Path bar_f1 = new Path(bar, "f1"); + final Path d1_f1 = new Path(d1, "f1"); + + DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, d1_f1, blockSize, dataNum, 0L); + } + + private int changeData8(Path dir, boolean createMiddleSnapshot) + throws Exception { + final Path foo = new Path(dir, "foo"); + final Path createdDir = new Path(dir, "c"); + final Path d1 = new Path(dir, "d1"); + final Path d1_f1 = new Path(d1, "f1"); + final Path createdDir_f1 = new Path(createdDir, "f1"); + final Path foo_f3 = new Path(foo, "f3"); + final Path new_foo = new Path(createdDir, "foo"); + final Path foo_f4 = new Path(foo, "f4"); + final Path foo_d1 = new Path(foo, "d1"); + final Path bar = new Path(dir, "bar"); + final Path bar1 = new Path(dir, "bar1"); + + int numDeletedAndModified = 0; + DFSTestUtil.createFile(dfs, foo_f3, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, createdDir_f1, blockSize, dataNum, 0L); + dfs.rename(createdDir_f1, foo_f4); + dfs.rename(d1_f1, createdDir_f1); // rename ./d1/f1 -> ./c/f1 + numDeletedAndModified += 1; // modify ./c/foo/d1 + + if (createMiddleSnapshot) { + this.createMiddleSnapshotAtTarget(); + } + + dfs.rename(d1, foo_d1); + numDeletedAndModified += 1; // modify ./c/foo + dfs.rename(foo, new_foo); + dfs.rename(bar, bar1); + return numDeletedAndModified; + } + + /** + * Test a case where create a dir, then mv a existed dir into it. + */ + @Test + public void testSync8() throws Exception { + if (isSrcNotSameAsTgt) { + initData8(source); + } + initData8(target); + enableAndCreateFirstSnapshot(); + int numDeletedModified = changeData8(target, false); + + createSecondSnapshotAtTarget(); + + testAndVerify(numDeletedModified); + } + + /** + * Test a case where create a dir, then mv a existed dir into it. + * The difference between this one and testSync8 is, this one + * also creates a snapshot s1.5 in between s1 and s2. + */ + @Test + public void testSync9() throws Exception { + if (isSrcNotSameAsTgt) { + initData8(source); + } + initData8(target); + enableAndCreateFirstSnapshot(); + int numDeletedModified = changeData8(target, true); + + createSecondSnapshotAtTarget(); + + testAndVerify(numDeletedModified); + } +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseFromSource.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseFromSource.java new file mode 100644 index 00000000000..30cc9305623 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseFromSource.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools; + +import org.apache.hadoop.fs.Path; + +/** + * Test the case "-rdiff s2 s1 src tgt". + */ +public class TestDistCpSyncReverseFromSource + extends TestDistCpSyncReverseBase { + /* + * Initialize the source path to /target. + * @see org.apache.hadoop.tools.TestDistCpSyncReverseBase#initSourcePath() + */ + @Override + void initSourcePath() { + setSource(new Path("/source")); + setSrcNotSameAsTgt(true); + } +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseFromTarget.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseFromTarget.java new file mode 100644 index 00000000000..c1fb24b83b3 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseFromTarget.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools; + +import org.apache.hadoop.fs.Path; + +/** + * Test the case "-rdiff s2 s1 tgt tgt". + */ +public class TestDistCpSyncReverseFromTarget + extends TestDistCpSyncReverseBase { + /* + * Initialize the source path to /target. + * @see org.apache.hadoop.tools.TestDistCpSyncReverseBase#initSourcePath() + */ + @Override + void initSourcePath() { + setSource(new Path("/target")); + setSrcNotSameAsTgt(false); + } +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index 907fc24c127..35778d29f57 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -400,7 +400,8 @@ public class TestOptionsParser { DistCpOptions option = new DistCpOptions(new Path("abc"), new Path("xyz")); String val = "DistCpOptions{atomicCommit=false, syncFolder=false, " + "deleteMissing=false, ignoreFailures=false, overwrite=false, " - + "append=false, useDiff=false, fromSnapshot=null, toSnapshot=null, " + + "append=false, useDiff=false, useRdiff=false, " + + "fromSnapshot=null, toSnapshot=null, " + "skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " + "mapBandwidth=100, sslConfigurationFile='null', " + "copyStrategy='uniformsize', preserveStatus=[], " @@ -656,76 +657,106 @@ public class TestOptionsParser { } } - @Test - public void testDiffOption() { + // Test -diff or -rdiff + private void testSnapshotDiffOption(boolean isDiff) { + final String optionStr = isDiff? "-diff" : "-rdiff"; + final String optionLabel = isDiff? + DistCpOptionSwitch.DIFF.getConfigLabel() : + DistCpOptionSwitch.RDIFF.getConfigLabel(); Configuration conf = new Configuration(); - Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(), - false)); + Assert.assertFalse(conf.getBoolean(optionLabel, false)); DistCpOptions options = OptionsParser.parse(new String[] { "-update", - "-diff", "s1", "s2", + optionStr, "s1", "s2", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/" }); options.appendToConf(conf); - Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(), false)); - Assert.assertTrue(options.shouldUseDiff()); + Assert.assertTrue(conf.getBoolean(optionLabel, false)); + Assert.assertTrue(isDiff? + options.shouldUseDiff() : options.shouldUseRdiff()); Assert.assertEquals("s1", options.getFromSnapshot()); Assert.assertEquals("s2", options.getToSnapshot()); options = OptionsParser.parse(new String[] { - "-diff", "s1", ".", "-update", + optionStr, "s1", ".", "-update", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/" }); options.appendToConf(conf); - Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(), - false)); - Assert.assertTrue(options.shouldUseDiff()); + Assert.assertTrue(conf.getBoolean(optionLabel, false)); + Assert.assertTrue(isDiff? + options.shouldUseDiff() : options.shouldUseRdiff()); Assert.assertEquals("s1", options.getFromSnapshot()); Assert.assertEquals(".", options.getToSnapshot()); - // -diff requires two option values + // -diff/-rdiff requires two option values try { - OptionsParser.parse(new String[] {"-diff", "s1", "-update", + OptionsParser.parse(new String[] {optionStr, "s1", "-update", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/" }); - fail("-diff should fail with only one snapshot name"); + fail(optionStr + " should fail with only one snapshot name"); } catch (IllegalArgumentException e) { GenericTestUtils.assertExceptionContains( "Must provide both the starting and ending snapshot names", e); } - // make sure -diff is only valid when -update is specified + // make sure -diff/-rdiff is only valid when -update is specified try { - OptionsParser.parse(new String[] { "-diff", "s1", "s2", + OptionsParser.parse(new String[] {optionStr, "s1", "s2", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/" }); - fail("-diff should fail if -update option is not specified"); + fail(optionStr + " should fail if -update option is not specified"); } catch (IllegalArgumentException e) { GenericTestUtils.assertExceptionContains( - "Diff is valid only with update options", e); + "-diff/-rdiff is valid only with -update option", e); } try { options = OptionsParser.parse(new String[] { - "-diff", "s1", "s2", "-update", "-delete", + optionStr, "s1", "s2", "-update", "-delete", "hdfs://localhost:9820/source/first", "hdfs://localhost:9820/target/" }); - assertFalse("-delete should be ignored when -diff is specified", + assertFalse("-delete should be ignored when " + + optionStr + " is specified", options.shouldDeleteMissing()); } catch (IllegalArgumentException e) { fail("Got unexpected IllegalArgumentException: " + e.getMessage()); } try { - OptionsParser.parse(new String[] { "-diff", "s1", "s2", + OptionsParser.parse(new String[] {optionStr, "s1", "s2", "-delete", "-overwrite", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/" }); - fail("-diff should fail if -update option is not specified"); + fail(optionStr + " should fail if -update option is not specified"); } catch (IllegalArgumentException e) { GenericTestUtils.assertExceptionContains( - "Diff is valid only with update options", e); + "-diff/-rdiff is valid only with -update option", e); } + + final String optionStrOther = isDiff? "-rdiff" : "-diff"; + try { + OptionsParser.parse(new String[] { + optionStr, "s1", "s2", + optionStrOther, "s2", "s1", + "-update", + "hdfs://localhost:9820/source/first", + "hdfs://localhost:9820/target/" }); + fail(optionStr + " should fail if " + optionStrOther + + " is also specified"); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains( + "-diff and -rdiff are mutually exclusive", e); + } + } + + @Test + public void testDiffOption() { + testSnapshotDiffOption(true); + } + + @Test + public void testRdiffOption() { + testSnapshotDiffOption(false); } @Test