HDFS-9820. Improve distcp to support efficient restore to an earlier snapshot. Contributed by Yongjun Zhang.
(cherry picked from commit 8650cc84f2
)
Conflicts
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
This commit is contained in:
parent
590d19c0a8
commit
7bc170ba26
|
@ -44,28 +44,49 @@ class DiffInfo {
|
||||||
};
|
};
|
||||||
|
|
||||||
/** The source file/dir of the rename or deletion op */
|
/** The source file/dir of the rename or deletion op */
|
||||||
final Path source;
|
private Path source;
|
||||||
|
/** The target file/dir of the rename op. Null means the op is deletion. */
|
||||||
|
private Path target;
|
||||||
|
|
||||||
|
private SnapshotDiffReport.DiffType type;
|
||||||
/**
|
/**
|
||||||
* The intermediate file/dir for the op. For a rename or a delete op,
|
* The intermediate file/dir for the op. For a rename or a delete op,
|
||||||
* we first rename the source to this tmp file/dir.
|
* we first rename the source to this tmp file/dir.
|
||||||
*/
|
*/
|
||||||
private Path tmp;
|
private Path tmp;
|
||||||
/** The target file/dir of the rename op. Null means the op is deletion. */
|
|
||||||
Path target;
|
|
||||||
|
|
||||||
private final SnapshotDiffReport.DiffType type;
|
DiffInfo(final Path source, final Path target,
|
||||||
|
SnapshotDiffReport.DiffType type) {
|
||||||
public SnapshotDiffReport.DiffType getType(){
|
|
||||||
return this.type;
|
|
||||||
}
|
|
||||||
|
|
||||||
DiffInfo(Path source, Path target, SnapshotDiffReport.DiffType type) {
|
|
||||||
assert source != null;
|
assert source != null;
|
||||||
this.source = source;
|
this.source = source;
|
||||||
this.target= target;
|
this.target= target;
|
||||||
this.type = type;
|
this.type = type;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setSource(final Path source) {
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
|
Path getSource() {
|
||||||
|
return source;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setTarget(final Path target) {
|
||||||
|
this.target = target;
|
||||||
|
}
|
||||||
|
|
||||||
|
Path getTarget() {
|
||||||
|
return target;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setType(final SnapshotDiffReport.DiffType type){
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SnapshotDiffReport.DiffType getType(){
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
void setTmp(Path tmp) {
|
void setTmp(Path tmp) {
|
||||||
this.tmp = tmp;
|
this.tmp = tmp;
|
||||||
}
|
}
|
||||||
|
@ -73,4 +94,10 @@ class DiffInfo {
|
||||||
Path getTmp() {
|
Path getTmp() {
|
||||||
return tmp;
|
return tmp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return type + ": src=" + String.valueOf(source) + " tgt="
|
||||||
|
+ String.valueOf(target) + " tmp=" + String.valueOf(tmp);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,6 +77,24 @@ public class DistCp extends Configured implements Tool {
|
||||||
private boolean submitted;
|
private boolean submitted;
|
||||||
private FileSystem jobFS;
|
private FileSystem jobFS;
|
||||||
|
|
||||||
|
private void prepareFileListing(Job job) throws Exception {
|
||||||
|
if (inputOptions.shouldUseSnapshotDiff()) {
|
||||||
|
// When "-diff" or "-rdiff" is passed, do sync() first, then
|
||||||
|
// create copyListing based on snapshot diff.
|
||||||
|
DistCpSync distCpSync = new DistCpSync(inputOptions, getConf());
|
||||||
|
if (distCpSync.sync()) {
|
||||||
|
createInputFileListingWithDiff(job, distCpSync);
|
||||||
|
} else {
|
||||||
|
throw new Exception("DistCp sync failed, input options: "
|
||||||
|
+ inputOptions);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// When no "-diff" or "-rdiff" is passed, create copyListing
|
||||||
|
// in regular way.
|
||||||
|
createInputFileListing(job);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Public Constructor. Creates DistCp object with specified input-parameters.
|
* Public Constructor. Creates DistCp object with specified input-parameters.
|
||||||
* (E.g. source-paths, target-location, etc.)
|
* (E.g. source-paths, target-location, etc.)
|
||||||
|
@ -175,21 +193,7 @@ public class DistCp extends Configured implements Tool {
|
||||||
jobFS = metaFolder.getFileSystem(getConf());
|
jobFS = metaFolder.getFileSystem(getConf());
|
||||||
job = createJob();
|
job = createJob();
|
||||||
}
|
}
|
||||||
if (inputOptions.shouldUseDiff()) {
|
prepareFileListing(job);
|
||||||
DistCpSync distCpSync = new DistCpSync(inputOptions, getConf());
|
|
||||||
if (distCpSync.sync()) {
|
|
||||||
createInputFileListingWithDiff(job, distCpSync);
|
|
||||||
} else {
|
|
||||||
throw new Exception("DistCp sync failed, input options: "
|
|
||||||
+ inputOptions);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fallback to default DistCp if without "diff" option or sync failed.
|
|
||||||
if (!inputOptions.shouldUseDiff()) {
|
|
||||||
createInputFileListing(job);
|
|
||||||
}
|
|
||||||
|
|
||||||
job.submit();
|
job.submit();
|
||||||
submitted = true;
|
submitted = true;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -199,7 +203,8 @@ public class DistCp extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
String jobID = job.getJobID().toString();
|
String jobID = job.getJobID().toString();
|
||||||
job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
|
job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID,
|
||||||
|
jobID);
|
||||||
LOG.info("DistCp job-id: " + jobID);
|
LOG.info("DistCp job-id: " + jobID);
|
||||||
|
|
||||||
return job;
|
return job;
|
||||||
|
|
|
@ -60,6 +60,7 @@ public class DistCpConstants {
|
||||||
public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
|
public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
|
||||||
public static final String CONF_LABEL_APPEND = "distcp.copy.append";
|
public static final String CONF_LABEL_APPEND = "distcp.copy.append";
|
||||||
public static final String CONF_LABEL_DIFF = "distcp.copy.diff";
|
public static final String CONF_LABEL_DIFF = "distcp.copy.diff";
|
||||||
|
public static final String CONF_LABEL_RDIFF = "distcp.copy.rdiff";
|
||||||
public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
|
public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
|
||||||
public static final String CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE =
|
public static final String CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE =
|
||||||
"distcp.simplelisting.file.status.size";
|
"distcp.simplelisting.file.status.size";
|
||||||
|
|
|
@ -159,6 +159,11 @@ public enum DistCpOptionSwitch {
|
||||||
"Use snapshot diff report to identify the difference between source and target"),
|
"Use snapshot diff report to identify the difference between source and target"),
|
||||||
2),
|
2),
|
||||||
|
|
||||||
|
RDIFF(DistCpConstants.CONF_LABEL_RDIFF,
|
||||||
|
new Option("rdiff", false,
|
||||||
|
"Use target snapshot diff report to identify changes made on target"),
|
||||||
|
2),
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Should DisctpExecution be blocking
|
* Should DisctpExecution be blocking
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.tools;
|
package org.apache.hadoop.tools;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.tools.util.DistCpUtils;
|
import org.apache.hadoop.tools.util.DistCpUtils;
|
||||||
|
@ -42,8 +43,29 @@ public class DistCpOptions {
|
||||||
private boolean append = false;
|
private boolean append = false;
|
||||||
private boolean skipCRC = false;
|
private boolean skipCRC = false;
|
||||||
private boolean blocking = true;
|
private boolean blocking = true;
|
||||||
|
// When "-diff s1 s2 src tgt" is passed, apply forward snapshot diff (from s1
|
||||||
|
// to s2) of source cluster to the target cluster to sync target cluster with
|
||||||
|
// the source cluster. Referred to as "Fdiff" in the code.
|
||||||
|
// It's required that s2 is newer than s1.
|
||||||
private boolean useDiff = false;
|
private boolean useDiff = false;
|
||||||
|
|
||||||
|
// When "-rdiff s2 s1 src tgt" is passed, apply reversed snapshot diff (from
|
||||||
|
// s2 to s1) of target cluster to the target cluster, so to make target
|
||||||
|
// cluster go back to s1. Referred to as "Rdiff" in the code.
|
||||||
|
// It's required that s2 is newer than s1, and src and tgt have exact same
|
||||||
|
// content at their s1, if src is not the same as tgt.
|
||||||
|
private boolean useRdiff = false;
|
||||||
|
|
||||||
|
// For both -diff and -rdiff, given the example command line switches, two
|
||||||
|
// steps are taken:
|
||||||
|
// 1. Sync Step. This step does renaming/deletion ops in the snapshot diff,
|
||||||
|
// so to avoid copying files copied already but renamed later(HDFS-7535)
|
||||||
|
// 2. Copy Step. This step copy the necessary files from src to tgt
|
||||||
|
// 2.1 For -diff, it copies from snapshot s2 of src (HDFS-8828)
|
||||||
|
// 2.2 For -rdiff, it copies from snapshot s1 of src, where the src
|
||||||
|
// could be the tgt itself (HDFS-9820).
|
||||||
|
//
|
||||||
|
|
||||||
public static final int maxNumListstatusThreads = 40;
|
public static final int maxNumListstatusThreads = 40;
|
||||||
private int numListstatusThreads = 0; // Indicates that flag is not set.
|
private int numListstatusThreads = 0; // Indicates that flag is not set.
|
||||||
private int maxMaps = DistCpConstants.DEFAULT_MAPS;
|
private int maxMaps = DistCpConstants.DEFAULT_MAPS;
|
||||||
|
@ -131,6 +153,8 @@ public class DistCpOptions {
|
||||||
this.overwrite = that.overwrite;
|
this.overwrite = that.overwrite;
|
||||||
this.skipCRC = that.skipCRC;
|
this.skipCRC = that.skipCRC;
|
||||||
this.blocking = that.blocking;
|
this.blocking = that.blocking;
|
||||||
|
this.useDiff = that.useDiff;
|
||||||
|
this.useRdiff = that.useRdiff;
|
||||||
this.numListstatusThreads = that.numListstatusThreads;
|
this.numListstatusThreads = that.numListstatusThreads;
|
||||||
this.maxMaps = that.maxMaps;
|
this.maxMaps = that.maxMaps;
|
||||||
this.mapBandwidth = that.mapBandwidth;
|
this.mapBandwidth = that.mapBandwidth;
|
||||||
|
@ -203,7 +227,7 @@ public class DistCpOptions {
|
||||||
public void setDeleteMissing(boolean deleteMissing) {
|
public void setDeleteMissing(boolean deleteMissing) {
|
||||||
validate(DistCpOptionSwitch.DELETE_MISSING, deleteMissing);
|
validate(DistCpOptionSwitch.DELETE_MISSING, deleteMissing);
|
||||||
this.deleteMissing = deleteMissing;
|
this.deleteMissing = deleteMissing;
|
||||||
ignoreDeleteMissingIfUseDiff();
|
ignoreDeleteMissingIfUseSnapshotDiff();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -211,9 +235,9 @@ public class DistCpOptions {
|
||||||
* For backward compatibility, we ignore the -delete option here, instead of
|
* For backward compatibility, we ignore the -delete option here, instead of
|
||||||
* throwing an IllegalArgumentException. See HDFS-10397 for more discussion.
|
* throwing an IllegalArgumentException. See HDFS-10397 for more discussion.
|
||||||
*/
|
*/
|
||||||
private void ignoreDeleteMissingIfUseDiff() {
|
private void ignoreDeleteMissingIfUseSnapshotDiff() {
|
||||||
if (deleteMissing && useDiff) {
|
if (deleteMissing && (useDiff || useRdiff)) {
|
||||||
OptionsParser.LOG.warn("-delete and -diff are mutually exclusive. " +
|
OptionsParser.LOG.warn("-delete and -diff/-rdiff are mutually exclusive. " +
|
||||||
"The -delete option will be ignored.");
|
"The -delete option will be ignored.");
|
||||||
deleteMissing = false;
|
deleteMissing = false;
|
||||||
}
|
}
|
||||||
|
@ -295,6 +319,14 @@ public class DistCpOptions {
|
||||||
return this.useDiff;
|
return this.useDiff;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean shouldUseRdiff() {
|
||||||
|
return this.useRdiff;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean shouldUseSnapshotDiff() {
|
||||||
|
return shouldUseDiff() || shouldUseRdiff();
|
||||||
|
}
|
||||||
|
|
||||||
public String getFromSnapshot() {
|
public String getFromSnapshot() {
|
||||||
return this.fromSnapshot;
|
return this.fromSnapshot;
|
||||||
}
|
}
|
||||||
|
@ -303,16 +335,20 @@ public class DistCpOptions {
|
||||||
return this.toSnapshot;
|
return this.toSnapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setUseDiff(boolean useDiff, String fromSnapshot, String toSnapshot) {
|
public void setUseDiff(String fromSS, String toSS) {
|
||||||
validate(DistCpOptionSwitch.DIFF, useDiff);
|
this.fromSnapshot = fromSS;
|
||||||
this.useDiff = useDiff;
|
this.toSnapshot = toSS;
|
||||||
this.fromSnapshot = fromSnapshot;
|
validate(DistCpOptionSwitch.DIFF, true);
|
||||||
this.toSnapshot = toSnapshot;
|
this.useDiff = true;
|
||||||
ignoreDeleteMissingIfUseDiff();
|
ignoreDeleteMissingIfUseSnapshotDiff();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void disableUsingDiff() {
|
public void setUseRdiff(String fromSS, String toSS) {
|
||||||
this.useDiff = false;
|
this.fromSnapshot = fromSS;
|
||||||
|
this.toSnapshot = toSS;
|
||||||
|
validate(DistCpOptionSwitch.RDIFF, true);
|
||||||
|
this.useRdiff = true;
|
||||||
|
ignoreDeleteMissingIfUseSnapshotDiff();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -601,6 +637,7 @@ public class DistCpOptions {
|
||||||
value : this.skipCRC);
|
value : this.skipCRC);
|
||||||
boolean append = (option == DistCpOptionSwitch.APPEND ? value : this.append);
|
boolean append = (option == DistCpOptionSwitch.APPEND ? value : this.append);
|
||||||
boolean useDiff = (option == DistCpOptionSwitch.DIFF ? value : this.useDiff);
|
boolean useDiff = (option == DistCpOptionSwitch.DIFF ? value : this.useDiff);
|
||||||
|
boolean useRdiff = (option == DistCpOptionSwitch.RDIFF ? value : this.useRdiff);
|
||||||
|
|
||||||
if (syncFolder && atomicCommit) {
|
if (syncFolder && atomicCommit) {
|
||||||
throw new IllegalArgumentException("Atomic commit can't be used with " +
|
throw new IllegalArgumentException("Atomic commit can't be used with " +
|
||||||
|
@ -629,16 +666,29 @@ public class DistCpOptions {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Append is disallowed when skipping CRC");
|
"Append is disallowed when skipping CRC");
|
||||||
}
|
}
|
||||||
if (!syncFolder && useDiff) {
|
if (!syncFolder && (useDiff || useRdiff)) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Diff is valid only with update options");
|
"-diff/-rdiff is valid only with -update option");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (useDiff || useRdiff) {
|
||||||
|
if (StringUtils.isBlank(fromSnapshot) ||
|
||||||
|
StringUtils.isBlank(toSnapshot)) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Must provide both the starting and ending " +
|
||||||
|
"snapshot names for -diff/-rdiff");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (useDiff && useRdiff) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"-diff and -rdiff are mutually exclusive");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add options to configuration. These will be used in the Mapper/committer
|
* Add options to configuration. These will be used in the Mapper/committer
|
||||||
*
|
*
|
||||||
* @param conf - Configruation object to which the options need to be added
|
* @param conf - Configuration object to which the options need to be added
|
||||||
*/
|
*/
|
||||||
public void appendToConf(Configuration conf) {
|
public void appendToConf(Configuration conf) {
|
||||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT,
|
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT,
|
||||||
|
@ -655,6 +705,8 @@ public class DistCpOptions {
|
||||||
String.valueOf(append));
|
String.valueOf(append));
|
||||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIFF,
|
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIFF,
|
||||||
String.valueOf(useDiff));
|
String.valueOf(useDiff));
|
||||||
|
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.RDIFF,
|
||||||
|
String.valueOf(useRdiff));
|
||||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
|
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
|
||||||
String.valueOf(skipCRC));
|
String.valueOf(skipCRC));
|
||||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
|
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
|
||||||
|
@ -682,6 +734,7 @@ public class DistCpOptions {
|
||||||
", overwrite=" + overwrite +
|
", overwrite=" + overwrite +
|
||||||
", append=" + append +
|
", append=" + append +
|
||||||
", useDiff=" + useDiff +
|
", useDiff=" + useDiff +
|
||||||
|
", useRdiff=" + useRdiff +
|
||||||
", fromSnapshot=" + fromSnapshot +
|
", fromSnapshot=" + fromSnapshot +
|
||||||
", toSnapshot=" + toSnapshot +
|
", toSnapshot=" + toSnapshot +
|
||||||
", skipCRC=" + skipCRC +
|
", skipCRC=" + skipCRC +
|
||||||
|
|
|
@ -50,6 +50,11 @@ import java.util.HashSet;
|
||||||
class DistCpSync {
|
class DistCpSync {
|
||||||
private DistCpOptions inputOptions;
|
private DistCpOptions inputOptions;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
// diffMap maps snapshot diff op type to a list of diff ops.
|
||||||
|
// It's initially created based on the snapshot diff. Then the individual
|
||||||
|
// diff stored there maybe modified instead of copied by the distcp algorithm
|
||||||
|
// afterwards, for better performance.
|
||||||
|
//
|
||||||
private EnumMap<SnapshotDiffReport.DiffType, List<DiffInfo>> diffMap;
|
private EnumMap<SnapshotDiffReport.DiffType, List<DiffInfo>> diffMap;
|
||||||
private DiffInfo[] renameDiffs;
|
private DiffInfo[] renameDiffs;
|
||||||
|
|
||||||
|
@ -58,6 +63,10 @@ class DistCpSync {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isRdiff() {
|
||||||
|
return inputOptions.shouldUseRdiff();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if three conditions are met before sync.
|
* Check if three conditions are met before sync.
|
||||||
* 1. Only one source directory.
|
* 1. Only one source directory.
|
||||||
|
@ -77,21 +86,25 @@ class DistCpSync {
|
||||||
final Path sourceDir = sourcePaths.get(0);
|
final Path sourceDir = sourcePaths.get(0);
|
||||||
final Path targetDir = inputOptions.getTargetPath();
|
final Path targetDir = inputOptions.getTargetPath();
|
||||||
|
|
||||||
final FileSystem sfs = sourceDir.getFileSystem(conf);
|
final FileSystem srcFs = sourceDir.getFileSystem(conf);
|
||||||
final FileSystem tfs = targetDir.getFileSystem(conf);
|
final FileSystem tgtFs = targetDir.getFileSystem(conf);
|
||||||
|
final FileSystem snapshotDiffFs = isRdiff() ? tgtFs : srcFs;
|
||||||
|
final Path snapshotDiffDir = isRdiff() ? targetDir : sourceDir;
|
||||||
|
|
||||||
// currently we require both the source and the target file system are
|
// currently we require both the source and the target file system are
|
||||||
// DistributedFileSystem.
|
// DistributedFileSystem.
|
||||||
if (!(sfs instanceof DistributedFileSystem) ||
|
if (!(srcFs instanceof DistributedFileSystem) ||
|
||||||
!(tfs instanceof DistributedFileSystem)) {
|
!(tgtFs instanceof DistributedFileSystem)) {
|
||||||
throw new IllegalArgumentException("The FileSystems needs to" +
|
throw new IllegalArgumentException("The FileSystems needs to" +
|
||||||
" be DistributedFileSystem for using snapshot-diff-based distcp");
|
" be DistributedFileSystem for using snapshot-diff-based distcp");
|
||||||
}
|
}
|
||||||
final DistributedFileSystem targetFs = (DistributedFileSystem) tfs;
|
|
||||||
|
final DistributedFileSystem targetFs = (DistributedFileSystem) tgtFs;
|
||||||
|
|
||||||
// make sure targetFS has no change between from and the current states
|
// make sure targetFS has no change between from and the current states
|
||||||
if (!checkNoChange(targetFs, targetDir)) {
|
if (!checkNoChange(targetFs, targetDir)) {
|
||||||
// set the source path using the snapshot path
|
// set the source path using the snapshot path
|
||||||
inputOptions.setSourcePaths(Arrays.asList(getSourceSnapshotPath(sourceDir,
|
inputOptions.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir,
|
||||||
inputOptions.getToSnapshot())));
|
inputOptions.getToSnapshot())));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -101,17 +114,27 @@ class DistCpSync {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final FileStatus fromSnapshotStat =
|
final FileStatus fromSnapshotStat =
|
||||||
sfs.getFileStatus(getSourceSnapshotPath(sourceDir, from));
|
snapshotDiffFs.getFileStatus(getSnapshotPath(snapshotDiffDir, from));
|
||||||
|
|
||||||
final FileStatus toSnapshotStat =
|
final FileStatus toSnapshotStat =
|
||||||
sfs.getFileStatus(getSourceSnapshotPath(sourceDir, to));
|
snapshotDiffFs.getFileStatus(getSnapshotPath(snapshotDiffDir, to));
|
||||||
|
|
||||||
// If toSnapshot isn't current dir then do a time check
|
if (isRdiff()) {
|
||||||
if (!to.equals("")
|
// If fromSnapshot isn't current dir then do a time check
|
||||||
&& fromSnapshotStat.getModificationTime() > toSnapshotStat
|
if (!from.equals("")
|
||||||
.getModificationTime()) {
|
&& fromSnapshotStat.getModificationTime() < toSnapshotStat
|
||||||
throw new HadoopIllegalArgumentException("Snapshot " + to
|
.getModificationTime()) {
|
||||||
+ " should be newer than " + from);
|
throw new HadoopIllegalArgumentException("Snapshot " + from
|
||||||
|
+ " should be newer than " + to);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// If toSnapshot isn't current dir then do a time check
|
||||||
|
if(!to.equals("")
|
||||||
|
&& fromSnapshotStat.getModificationTime() > toSnapshotStat
|
||||||
|
.getModificationTime()) {
|
||||||
|
throw new HadoopIllegalArgumentException("Snapshot " + to
|
||||||
|
+ " should be newer than " + from);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (FileNotFoundException nfe) {
|
} catch (FileNotFoundException nfe) {
|
||||||
throw new InvalidInputException("Input snapshot is not found", nfe);
|
throw new InvalidInputException("Input snapshot is not found", nfe);
|
||||||
|
@ -138,7 +161,8 @@ class DistCpSync {
|
||||||
Path tmpDir = null;
|
Path tmpDir = null;
|
||||||
try {
|
try {
|
||||||
tmpDir = createTargetTmpDir(targetFs, targetDir);
|
tmpDir = createTargetTmpDir(targetFs, targetDir);
|
||||||
DiffInfo[] renameAndDeleteDiffs = getRenameAndDeleteDiffs(targetDir);
|
DiffInfo[] renameAndDeleteDiffs =
|
||||||
|
getRenameAndDeleteDiffsForSync(targetDir);
|
||||||
if (renameAndDeleteDiffs.length > 0) {
|
if (renameAndDeleteDiffs.length > 0) {
|
||||||
// do the real sync work: deletion and rename
|
// do the real sync work: deletion and rename
|
||||||
syncDiff(renameAndDeleteDiffs, targetFs, tmpDir);
|
syncDiff(renameAndDeleteDiffs, targetFs, tmpDir);
|
||||||
|
@ -151,7 +175,7 @@ class DistCpSync {
|
||||||
deleteTargetTmpDir(targetFs, tmpDir);
|
deleteTargetTmpDir(targetFs, tmpDir);
|
||||||
// TODO: since we have tmp directory, we can support "undo" with failures
|
// TODO: since we have tmp directory, we can support "undo" with failures
|
||||||
// set the source path using the snapshot path
|
// set the source path using the snapshot path
|
||||||
inputOptions.setSourcePaths(Arrays.asList(getSourceSnapshotPath(sourceDir,
|
inputOptions.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir,
|
||||||
inputOptions.getToSnapshot())));
|
inputOptions.getToSnapshot())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -162,16 +186,16 @@ class DistCpSync {
|
||||||
* no entry for a given DiffType, the associated value will be an empty list.
|
* no entry for a given DiffType, the associated value will be an empty list.
|
||||||
*/
|
*/
|
||||||
private boolean getAllDiffs() throws IOException {
|
private boolean getAllDiffs() throws IOException {
|
||||||
List<Path> sourcePaths = inputOptions.getSourcePaths();
|
Path ssDir = isRdiff()?
|
||||||
final Path sourceDir = sourcePaths.get(0);
|
inputOptions.getTargetPath() : inputOptions.getSourcePaths().get(0);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
DistributedFileSystem fs =
|
DistributedFileSystem fs =
|
||||||
(DistributedFileSystem) sourceDir.getFileSystem(conf);
|
(DistributedFileSystem) ssDir.getFileSystem(conf);
|
||||||
final String from = getSnapshotName(inputOptions.getFromSnapshot());
|
final String from = getSnapshotName(inputOptions.getFromSnapshot());
|
||||||
final String to = getSnapshotName(inputOptions.getToSnapshot());
|
final String to = getSnapshotName(inputOptions.getToSnapshot());
|
||||||
SnapshotDiffReport report = fs.getSnapshotDiffReport(sourceDir,
|
SnapshotDiffReport report = fs.getSnapshotDiffReport(ssDir,
|
||||||
from, to);
|
from, to);
|
||||||
|
|
||||||
this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class);
|
this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class);
|
||||||
for (SnapshotDiffReport.DiffType type :
|
for (SnapshotDiffReport.DiffType type :
|
||||||
SnapshotDiffReport.DiffType.values()) {
|
SnapshotDiffReport.DiffType.values()) {
|
||||||
|
@ -185,25 +209,25 @@ class DistCpSync {
|
||||||
if (entry.getSourcePath().length <= 0) {
|
if (entry.getSourcePath().length <= 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
List<DiffInfo> list = diffMap.get(entry.getType());
|
SnapshotDiffReport.DiffType dt = entry.getType();
|
||||||
|
List<DiffInfo> list = diffMap.get(dt);
|
||||||
if (entry.getType() == SnapshotDiffReport.DiffType.MODIFY ||
|
if (dt == SnapshotDiffReport.DiffType.MODIFY ||
|
||||||
entry.getType() == SnapshotDiffReport.DiffType.CREATE ||
|
dt == SnapshotDiffReport.DiffType.CREATE ||
|
||||||
entry.getType() == SnapshotDiffReport.DiffType.DELETE) {
|
dt == SnapshotDiffReport.DiffType.DELETE) {
|
||||||
final Path source =
|
final Path source =
|
||||||
new Path(DFSUtilClient.bytes2String(entry.getSourcePath()));
|
new Path(DFSUtilClient.bytes2String(entry.getSourcePath()));
|
||||||
list.add(new DiffInfo(source, null, entry.getType()));
|
list.add(new DiffInfo(source, null, dt));
|
||||||
} else if (entry.getType() == SnapshotDiffReport.DiffType.RENAME) {
|
} else if (dt == SnapshotDiffReport.DiffType.RENAME) {
|
||||||
final Path source =
|
final Path source =
|
||||||
new Path(DFSUtilClient.bytes2String(entry.getSourcePath()));
|
new Path(DFSUtilClient.bytes2String(entry.getSourcePath()));
|
||||||
final Path target =
|
final Path target =
|
||||||
new Path(DFSUtilClient.bytes2String(entry.getTargetPath()));
|
new Path(DFSUtilClient.bytes2String(entry.getTargetPath()));
|
||||||
list.add(new DiffInfo(source, target, entry.getType()));
|
list.add(new DiffInfo(source, target, dt));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
DistCp.LOG.warn("Failed to compute snapshot diff on " + sourceDir, e);
|
DistCp.LOG.warn("Failed to compute snapshot diff on " + ssDir, e);
|
||||||
}
|
}
|
||||||
this.diffMap = null;
|
this.diffMap = null;
|
||||||
return false;
|
return false;
|
||||||
|
@ -213,11 +237,11 @@ class DistCpSync {
|
||||||
return Path.CUR_DIR.equals(name) ? "" : name;
|
return Path.CUR_DIR.equals(name) ? "" : name;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Path getSourceSnapshotPath(Path sourceDir, String snapshotName) {
|
private Path getSnapshotPath(Path inputDir, String snapshotName) {
|
||||||
if (Path.CUR_DIR.equals(snapshotName)) {
|
if (Path.CUR_DIR.equals(snapshotName)) {
|
||||||
return sourceDir;
|
return inputDir;
|
||||||
} else {
|
} else {
|
||||||
return new Path(sourceDir,
|
return new Path(inputDir,
|
||||||
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + snapshotName);
|
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + snapshotName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -249,8 +273,9 @@ class DistCpSync {
|
||||||
*/
|
*/
|
||||||
private boolean checkNoChange(DistributedFileSystem fs, Path path) {
|
private boolean checkNoChange(DistributedFileSystem fs, Path path) {
|
||||||
try {
|
try {
|
||||||
|
final String from = getSnapshotName(inputOptions.getFromSnapshot());
|
||||||
SnapshotDiffReport targetDiff =
|
SnapshotDiffReport targetDiff =
|
||||||
fs.getSnapshotDiffReport(path, inputOptions.getFromSnapshot(), "");
|
fs.getSnapshotDiffReport(path, from, "");
|
||||||
if (!targetDiff.getDiffList().isEmpty()) {
|
if (!targetDiff.getDiffList().isEmpty()) {
|
||||||
DistCp.LOG.warn("The target has been modified since snapshot "
|
DistCp.LOG.warn("The target has been modified since snapshot "
|
||||||
+ inputOptions.getFromSnapshot());
|
+ inputOptions.getFromSnapshot());
|
||||||
|
@ -259,7 +284,8 @@ class DistCpSync {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
DistCp.LOG.warn("Failed to compute snapshot diff on " + path, e);
|
DistCp.LOG.warn("Failed to compute snapshot diff on " + path
|
||||||
|
+ " at snapshot " + inputOptions.getFromSnapshot(), e);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -281,12 +307,13 @@ class DistCpSync {
|
||||||
Arrays.sort(diffs, DiffInfo.sourceComparator);
|
Arrays.sort(diffs, DiffInfo.sourceComparator);
|
||||||
Random random = new Random();
|
Random random = new Random();
|
||||||
for (DiffInfo diff : diffs) {
|
for (DiffInfo diff : diffs) {
|
||||||
Path tmpTarget = new Path(tmpDir, diff.source.getName());
|
Path tmpTarget = new Path(tmpDir, diff.getSource().getName());
|
||||||
while (targetFs.exists(tmpTarget)) {
|
while (targetFs.exists(tmpTarget)) {
|
||||||
tmpTarget = new Path(tmpDir, diff.source.getName() + random.nextInt());
|
tmpTarget = new Path(tmpDir,
|
||||||
|
diff.getSource().getName() + random.nextInt());
|
||||||
}
|
}
|
||||||
diff.setTmp(tmpTarget);
|
diff.setTmp(tmpTarget);
|
||||||
targetFs.rename(diff.source, tmpTarget);
|
targetFs.rename(diff.getSource(), tmpTarget);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,11 +327,11 @@ class DistCpSync {
|
||||||
// directories are created first.
|
// directories are created first.
|
||||||
Arrays.sort(diffs, DiffInfo.targetComparator);
|
Arrays.sort(diffs, DiffInfo.targetComparator);
|
||||||
for (DiffInfo diff : diffs) {
|
for (DiffInfo diff : diffs) {
|
||||||
if (diff.target != null) {
|
if (diff.getTarget() != null) {
|
||||||
if (!targetFs.exists(diff.target.getParent())) {
|
if (!targetFs.exists(diff.getTarget().getParent())) {
|
||||||
targetFs.mkdirs(diff.target.getParent());
|
targetFs.mkdirs(diff.getTarget().getParent());
|
||||||
}
|
}
|
||||||
targetFs.rename(diff.getTmp(), diff.target);
|
targetFs.rename(diff.getTmp(), diff.getTarget());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -313,17 +340,80 @@ class DistCpSync {
|
||||||
* Get rename and delete diffs and add the targetDir as the prefix of their
|
* Get rename and delete diffs and add the targetDir as the prefix of their
|
||||||
* source and target paths.
|
* source and target paths.
|
||||||
*/
|
*/
|
||||||
private DiffInfo[] getRenameAndDeleteDiffs(Path targetDir) {
|
private DiffInfo[] getRenameAndDeleteDiffsForSync(Path targetDir) {
|
||||||
|
// NOTE: when HDFS-10263 is done, getRenameAndDeleteDiffsRdiff
|
||||||
|
// should be the same as getRenameAndDeleteDiffsFdiff. Specifically,
|
||||||
|
// we should just move the body of getRenameAndDeleteDiffsFdiff
|
||||||
|
// to here and remove both getRenameAndDeleteDiffsFdiff
|
||||||
|
// and getRenameAndDeleteDiffsDdiff.
|
||||||
|
if (isRdiff()) {
|
||||||
|
return getRenameAndDeleteDiffsRdiff(targetDir);
|
||||||
|
} else {
|
||||||
|
return getRenameAndDeleteDiffsFdiff(targetDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get rename and delete diffs and add the targetDir as the prefix of their
|
||||||
|
* source and target paths.
|
||||||
|
*/
|
||||||
|
private DiffInfo[] getRenameAndDeleteDiffsRdiff(Path targetDir) {
|
||||||
|
List<DiffInfo> renameDiffsList =
|
||||||
|
diffMap.get(SnapshotDiffReport.DiffType.RENAME);
|
||||||
|
|
||||||
|
// Prepare a renameDiffArray for translating deleted items below.
|
||||||
|
// Do a reversion here due to HDFS-10263.
|
||||||
|
List<DiffInfo> renameDiffsListReversed =
|
||||||
|
new ArrayList<DiffInfo>(renameDiffsList.size());
|
||||||
|
for (DiffInfo diff : renameDiffsList) {
|
||||||
|
renameDiffsListReversed.add(new DiffInfo(diff.getTarget(),
|
||||||
|
diff.getSource(), diff.getType()));
|
||||||
|
}
|
||||||
|
DiffInfo[] renameDiffArray =
|
||||||
|
renameDiffsListReversed.toArray(new DiffInfo[renameDiffsList.size()]);
|
||||||
|
|
||||||
|
Arrays.sort(renameDiffArray, DiffInfo.sourceComparator);
|
||||||
|
|
||||||
|
List<DiffInfo> renameAndDeleteDiff = new ArrayList<>();
|
||||||
|
// Traverse DELETE list, which we need to delete them in sync process.
|
||||||
|
// Use the renameDiffArray prepared to translate the path.
|
||||||
|
for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.DELETE)) {
|
||||||
|
DiffInfo renameItem = getRenameItem(diff, renameDiffArray);
|
||||||
|
Path source;
|
||||||
|
if (renameItem != null) {
|
||||||
|
source = new Path(targetDir,
|
||||||
|
translateRenamedPath(diff.getSource(), renameItem));
|
||||||
|
} else {
|
||||||
|
source = new Path(targetDir, diff.getSource());
|
||||||
|
}
|
||||||
|
renameAndDeleteDiff.add(new DiffInfo(source, null,
|
||||||
|
SnapshotDiffReport.DiffType.DELETE));
|
||||||
|
}
|
||||||
|
for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.RENAME)) {
|
||||||
|
// swap target and source here for Rdiff
|
||||||
|
Path source = new Path(targetDir, diff.getSource());
|
||||||
|
Path target = new Path(targetDir, diff.getTarget());
|
||||||
|
renameAndDeleteDiff.add(new DiffInfo(source, target, diff.getType()));
|
||||||
|
}
|
||||||
|
return renameAndDeleteDiff.toArray(
|
||||||
|
new DiffInfo[renameAndDeleteDiff.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get rename and delete diffs and add the targetDir as the prefix of their
|
||||||
|
* source and target paths.
|
||||||
|
*/
|
||||||
|
private DiffInfo[] getRenameAndDeleteDiffsFdiff(Path targetDir) {
|
||||||
List<DiffInfo> renameAndDeleteDiff = new ArrayList<>();
|
List<DiffInfo> renameAndDeleteDiff = new ArrayList<>();
|
||||||
for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.DELETE)) {
|
for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.DELETE)) {
|
||||||
Path source = new Path(targetDir, diff.source);
|
Path source = new Path(targetDir, diff.getSource());
|
||||||
renameAndDeleteDiff.add(new DiffInfo(source, diff.target,
|
renameAndDeleteDiff.add(new DiffInfo(source, diff.getTarget(),
|
||||||
diff.getType()));
|
diff.getType()));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.RENAME)) {
|
for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.RENAME)) {
|
||||||
Path source = new Path(targetDir, diff.source);
|
Path source = new Path(targetDir, diff.getSource());
|
||||||
Path target = new Path(targetDir, diff.target);
|
Path target = new Path(targetDir, diff.getTarget());
|
||||||
renameAndDeleteDiff.add(new DiffInfo(source, target, diff.getType()));
|
renameAndDeleteDiff.add(new DiffInfo(source, target, diff.getType()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -367,7 +457,7 @@ class DistCpSync {
|
||||||
*/
|
*/
|
||||||
private DiffInfo getRenameItem(DiffInfo diff, DiffInfo[] renameDiffArray) {
|
private DiffInfo getRenameItem(DiffInfo diff, DiffInfo[] renameDiffArray) {
|
||||||
for (DiffInfo renameItem : renameDiffArray) {
|
for (DiffInfo renameItem : renameDiffArray) {
|
||||||
if (diff.source.equals(renameItem.source)) {
|
if (diff.getSource().equals(renameItem.getSource())) {
|
||||||
// The same path string may appear in:
|
// The same path string may appear in:
|
||||||
// 1. both renamed and modified snapshot diff entries.
|
// 1. both renamed and modified snapshot diff entries.
|
||||||
// 2. both renamed and created snapshot diff entries.
|
// 2. both renamed and created snapshot diff entries.
|
||||||
|
@ -377,7 +467,7 @@ class DistCpSync {
|
||||||
if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) {
|
if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) {
|
||||||
return renameItem;
|
return renameItem;
|
||||||
}
|
}
|
||||||
} else if (isParentOf(renameItem.source, diff.source)) {
|
} else if (isParentOf(renameItem.getSource(), diff.getSource())) {
|
||||||
// If rename entry is the parent of diff entry, then both MODIFY and
|
// If rename entry is the parent of diff entry, then both MODIFY and
|
||||||
// CREATE diff entries should be handled.
|
// CREATE diff entries should be handled.
|
||||||
return renameItem;
|
return renameItem;
|
||||||
|
@ -387,16 +477,27 @@ class DistCpSync {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For a given source path, get its target path based on the rename item.
|
* For a given sourcePath, get its real path if it or its parent was renamed.
|
||||||
|
*
|
||||||
|
* For example, if we renamed dirX to dirY, and created dirY/fileX,
|
||||||
|
* the initial snapshot diff would be a CREATE snapshot diff that looks like
|
||||||
|
* + dirX/fileX
|
||||||
|
* The rename snapshot diff looks like
|
||||||
|
* R dirX dirY
|
||||||
|
*
|
||||||
|
* We convert the soucePath dirX/fileX to dirY/fileX here.
|
||||||
|
*
|
||||||
* @return target path
|
* @return target path
|
||||||
*/
|
*/
|
||||||
private Path getTargetPath(Path sourcePath, DiffInfo renameItem) {
|
private Path translateRenamedPath(Path sourcePath,
|
||||||
if (sourcePath.equals(renameItem.source)) {
|
DiffInfo renameItem) {
|
||||||
return renameItem.target;
|
if (sourcePath.equals(renameItem.getSource())) {
|
||||||
|
return renameItem.getTarget();
|
||||||
}
|
}
|
||||||
StringBuffer sb = new StringBuffer(sourcePath.toString());
|
StringBuffer sb = new StringBuffer(sourcePath.toString());
|
||||||
String remain = sb.substring(renameItem.source.toString().length() + 1);
|
String remain =
|
||||||
return new Path(renameItem.target, remain);
|
sb.substring(renameItem.getSource().toString().length() + 1);
|
||||||
|
return new Path(renameItem.getTarget(), remain);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -406,26 +507,35 @@ class DistCpSync {
|
||||||
*
|
*
|
||||||
* If the parent or self of a source path is renamed, we need to change its
|
* If the parent or self of a source path is renamed, we need to change its
|
||||||
* target path according the correspondent rename item.
|
* target path according the correspondent rename item.
|
||||||
|
*
|
||||||
|
* For RDiff usage, the diff.getSource() is what we will use as its target
|
||||||
|
* path.
|
||||||
|
*
|
||||||
* @return a diff list
|
* @return a diff list
|
||||||
*/
|
*/
|
||||||
public ArrayList<DiffInfo> prepareDiffList() {
|
public ArrayList<DiffInfo> prepareDiffListForCopyListing() {
|
||||||
DiffInfo[] modifyAndCreateDiffs = getCreateAndModifyDiffs();
|
DiffInfo[] modifyAndCreateDiffs = getCreateAndModifyDiffs();
|
||||||
|
|
||||||
List<DiffInfo> renameDiffsList =
|
|
||||||
diffMap.get(SnapshotDiffReport.DiffType.RENAME);
|
|
||||||
DiffInfo[] renameDiffArray =
|
|
||||||
renameDiffsList.toArray(new DiffInfo[renameDiffsList.size()]);
|
|
||||||
Arrays.sort(renameDiffArray, DiffInfo.sourceComparator);
|
|
||||||
|
|
||||||
ArrayList<DiffInfo> finalListWithTarget = new ArrayList<>();
|
ArrayList<DiffInfo> finalListWithTarget = new ArrayList<>();
|
||||||
for (DiffInfo diff : modifyAndCreateDiffs) {
|
if (isRdiff()) {
|
||||||
DiffInfo renameItem = getRenameItem(diff, renameDiffArray);
|
for (DiffInfo diff : modifyAndCreateDiffs) {
|
||||||
if (renameItem == null) {
|
diff.setTarget(diff.getSource());
|
||||||
diff.target = diff.source;
|
finalListWithTarget.add(diff);
|
||||||
} else {
|
}
|
||||||
diff.target = getTargetPath(diff.source, renameItem);
|
} else {
|
||||||
|
List<DiffInfo> renameDiffsList =
|
||||||
|
diffMap.get(SnapshotDiffReport.DiffType.RENAME);
|
||||||
|
DiffInfo[] renameDiffArray =
|
||||||
|
renameDiffsList.toArray(new DiffInfo[renameDiffsList.size()]);
|
||||||
|
Arrays.sort(renameDiffArray, DiffInfo.sourceComparator);
|
||||||
|
for (DiffInfo diff : modifyAndCreateDiffs) {
|
||||||
|
DiffInfo renameItem = getRenameItem(diff, renameDiffArray);
|
||||||
|
if (renameItem == null) {
|
||||||
|
diff.setTarget(diff.getSource());
|
||||||
|
} else {
|
||||||
|
diff.setTarget(translateRenamedPath(diff.getSource(), renameItem));
|
||||||
|
}
|
||||||
|
finalListWithTarget.add(diff);
|
||||||
}
|
}
|
||||||
finalListWithTarget.add(diff);
|
|
||||||
}
|
}
|
||||||
return finalListWithTarget;
|
return finalListWithTarget;
|
||||||
}
|
}
|
||||||
|
@ -459,9 +569,9 @@ class DistCpSync {
|
||||||
boolean foundChild = false;
|
boolean foundChild = false;
|
||||||
HashSet<String> excludeList = new HashSet<>();
|
HashSet<String> excludeList = new HashSet<>();
|
||||||
for (DiffInfo diff : renameDiffs) {
|
for (DiffInfo diff : renameDiffs) {
|
||||||
if (isParentOf(newDir, diff.target)) {
|
if (isParentOf(newDir, diff.getTarget())) {
|
||||||
foundChild = true;
|
foundChild = true;
|
||||||
excludeList.add(new Path(prefix, diff.target).toUri().getPath());
|
excludeList.add(new Path(prefix, diff.getTarget()).toUri().getPath());
|
||||||
} else if (foundChild) {
|
} else if (foundChild) {
|
||||||
// The renameDiffs was sorted, the matching section should be
|
// The renameDiffs was sorted, the matching section should be
|
||||||
// contiguous.
|
// contiguous.
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.commons.cli.GnuParser;
|
||||||
import org.apache.commons.cli.HelpFormatter;
|
import org.apache.commons.cli.HelpFormatter;
|
||||||
import org.apache.commons.cli.Options;
|
import org.apache.commons.cli.Options;
|
||||||
import org.apache.commons.cli.ParseException;
|
import org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -66,6 +67,13 @@ public class OptionsParser {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void checkSnapshotsArgs(final String[] snapshots) {
|
||||||
|
Preconditions.checkArgument(snapshots != null && snapshots.length == 2
|
||||||
|
&& !StringUtils.isBlank(snapshots[0])
|
||||||
|
&& !StringUtils.isBlank(snapshots[1]),
|
||||||
|
"Must provide both the starting and ending snapshot names");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The parse method parses the command-line options, and creates
|
* The parse method parses the command-line options, and creates
|
||||||
* a corresponding Options object.
|
* a corresponding Options object.
|
||||||
|
@ -74,7 +82,8 @@ public class OptionsParser {
|
||||||
* @return The Options object, corresponding to the specified command-line.
|
* @return The Options object, corresponding to the specified command-line.
|
||||||
* @throws IllegalArgumentException Thrown if the parse fails.
|
* @throws IllegalArgumentException Thrown if the parse fails.
|
||||||
*/
|
*/
|
||||||
public static DistCpOptions parse(String args[]) throws IllegalArgumentException {
|
public static DistCpOptions parse(String[] args)
|
||||||
|
throws IllegalArgumentException {
|
||||||
|
|
||||||
CommandLineParser parser = new CustomParser();
|
CommandLineParser parser = new CustomParser();
|
||||||
|
|
||||||
|
@ -154,10 +163,16 @@ public class OptionsParser {
|
||||||
parsePreserveStatus(command, option);
|
parsePreserveStatus(command, option);
|
||||||
|
|
||||||
if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
|
if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
|
||||||
String[] snapshots = getVals(command, DistCpOptionSwitch.DIFF.getSwitch());
|
String[] snapshots = getVals(command,
|
||||||
Preconditions.checkArgument(snapshots != null && snapshots.length == 2,
|
DistCpOptionSwitch.DIFF.getSwitch());
|
||||||
"Must provide both the starting and ending snapshot names");
|
checkSnapshotsArgs(snapshots);
|
||||||
option.setUseDiff(true, snapshots[0], snapshots[1]);
|
option.setUseDiff(snapshots[0], snapshots[1]);
|
||||||
|
}
|
||||||
|
if (command.hasOption(DistCpOptionSwitch.RDIFF.getSwitch())) {
|
||||||
|
String[] snapshots = getVals(command,
|
||||||
|
DistCpOptionSwitch.RDIFF.getSwitch());
|
||||||
|
checkSnapshotsArgs(snapshots);
|
||||||
|
option.setUseRdiff(snapshots[0], snapshots[1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
parseFileLimit(command);
|
parseFileLimit(command);
|
||||||
|
@ -342,7 +357,7 @@ public class OptionsParser {
|
||||||
"source paths present");
|
"source paths present");
|
||||||
}
|
}
|
||||||
option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.
|
option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.
|
||||||
SOURCE_FILE_LISTING.getSwitch())), targetPath);
|
SOURCE_FILE_LISTING.getSwitch())), targetPath);
|
||||||
} else {
|
} else {
|
||||||
if (sourcePaths.isEmpty()) {
|
if (sourcePaths.isEmpty()) {
|
||||||
throw new IllegalArgumentException("Neither source file listing nor " +
|
throw new IllegalArgumentException("Neither source file listing nor " +
|
||||||
|
|
|
@ -194,7 +194,7 @@ public class SimpleCopyListing extends CopyListing {
|
||||||
@Override
|
@Override
|
||||||
protected void doBuildListing(Path pathToListingFile,
|
protected void doBuildListing(Path pathToListingFile,
|
||||||
DistCpOptions options) throws IOException {
|
DistCpOptions options) throws IOException {
|
||||||
if(options.shouldUseDiff()) {
|
if(options.shouldUseSnapshotDiff()) {
|
||||||
doBuildListingWithSnapshotDiff(getWriter(pathToListingFile), options);
|
doBuildListingWithSnapshotDiff(getWriter(pathToListingFile), options);
|
||||||
}else {
|
}else {
|
||||||
doBuildListing(getWriter(pathToListingFile), options);
|
doBuildListing(getWriter(pathToListingFile), options);
|
||||||
|
@ -256,7 +256,7 @@ public class SimpleCopyListing extends CopyListing {
|
||||||
protected void doBuildListingWithSnapshotDiff(
|
protected void doBuildListingWithSnapshotDiff(
|
||||||
SequenceFile.Writer fileListWriter, DistCpOptions options)
|
SequenceFile.Writer fileListWriter, DistCpOptions options)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ArrayList<DiffInfo> diffList = distCpSync.prepareDiffList();
|
ArrayList<DiffInfo> diffList = distCpSync.prepareDiffListForCopyListing();
|
||||||
Path sourceRoot = options.getSourcePaths().get(0);
|
Path sourceRoot = options.getSourcePaths().get(0);
|
||||||
FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
|
FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
|
||||||
|
|
||||||
|
@ -264,13 +264,16 @@ public class SimpleCopyListing extends CopyListing {
|
||||||
List<FileStatusInfo> fileStatuses = Lists.newArrayList();
|
List<FileStatusInfo> fileStatuses = Lists.newArrayList();
|
||||||
for (DiffInfo diff : diffList) {
|
for (DiffInfo diff : diffList) {
|
||||||
// add snapshot paths prefix
|
// add snapshot paths prefix
|
||||||
diff.target = new Path(options.getSourcePaths().get(0), diff.target);
|
diff.setTarget(
|
||||||
|
new Path(options.getSourcePaths().get(0), diff.getTarget()));
|
||||||
if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) {
|
if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) {
|
||||||
addToFileListing(fileListWriter, sourceRoot, diff.target, options);
|
addToFileListing(fileListWriter,
|
||||||
|
sourceRoot, diff.getTarget(), options);
|
||||||
} else if (diff.getType() == SnapshotDiffReport.DiffType.CREATE) {
|
} else if (diff.getType() == SnapshotDiffReport.DiffType.CREATE) {
|
||||||
addToFileListing(fileListWriter, sourceRoot, diff.target, options);
|
addToFileListing(fileListWriter,
|
||||||
|
sourceRoot, diff.getTarget(), options);
|
||||||
|
|
||||||
FileStatus sourceStatus = sourceFS.getFileStatus(diff.target);
|
FileStatus sourceStatus = sourceFS.getFileStatus(diff.getTarget());
|
||||||
if (sourceStatus.isDirectory()) {
|
if (sourceStatus.isDirectory()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Adding source dir for traverse: " +
|
LOG.debug("Adding source dir for traverse: " +
|
||||||
|
@ -278,7 +281,7 @@ public class SimpleCopyListing extends CopyListing {
|
||||||
}
|
}
|
||||||
|
|
||||||
HashSet<String> excludeList =
|
HashSet<String> excludeList =
|
||||||
distCpSync.getTraverseExcludeList(diff.source,
|
distCpSync.getTraverseExcludeList(diff.getSource(),
|
||||||
options.getSourcePaths().get(0));
|
options.getSourcePaths().get(0));
|
||||||
|
|
||||||
ArrayList<FileStatus> sourceDirs = new ArrayList<>();
|
ArrayList<FileStatus> sourceDirs = new ArrayList<>();
|
||||||
|
|
|
@ -213,7 +213,7 @@ public class TestDistCpOptions {
|
||||||
new Path("hdfs://localhost:8020/target/"));
|
new Path("hdfs://localhost:8020/target/"));
|
||||||
options.setSyncFolder(true);
|
options.setSyncFolder(true);
|
||||||
options.setDeleteMissing(true);
|
options.setDeleteMissing(true);
|
||||||
options.setUseDiff(true, "s1", "s2");
|
options.setUseDiff("s1", "s2");
|
||||||
assertFalse("-delete should be ignored when -diff is specified",
|
assertFalse("-delete should be ignored when -diff is specified",
|
||||||
options.shouldDeleteMissing());
|
options.shouldDeleteMissing());
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
|
@ -305,7 +305,8 @@ public class TestDistCpOptions {
|
||||||
DistCpOptions option = new DistCpOptions(new Path("abc"), new Path("xyz"));
|
DistCpOptions option = new DistCpOptions(new Path("abc"), new Path("xyz"));
|
||||||
final String val = "DistCpOptions{atomicCommit=false, syncFolder=false, "
|
final String val = "DistCpOptions{atomicCommit=false, syncFolder=false, "
|
||||||
+ "deleteMissing=false, ignoreFailures=false, overwrite=false, "
|
+ "deleteMissing=false, ignoreFailures=false, overwrite=false, "
|
||||||
+ "append=false, useDiff=false, fromSnapshot=null, toSnapshot=null, "
|
+ "append=false, useDiff=false, useRdiff=false, "
|
||||||
|
+ "fromSnapshot=null, toSnapshot=null, "
|
||||||
+ "skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, "
|
+ "skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, "
|
||||||
+ "mapBandwidth=100, sslConfigurationFile='null', "
|
+ "mapBandwidth=100, sslConfigurationFile='null', "
|
||||||
+ "copyStrategy='uniformsize', preserveStatus=[], "
|
+ "copyStrategy='uniformsize', preserveStatus=[], "
|
||||||
|
@ -432,7 +433,7 @@ public class TestDistCpOptions {
|
||||||
new Path("hdfs://localhost:8020/source/first"),
|
new Path("hdfs://localhost:8020/source/first"),
|
||||||
new Path("hdfs://localhost:8020/target/"));
|
new Path("hdfs://localhost:8020/target/"));
|
||||||
options.setSyncFolder(true);
|
options.setSyncFolder(true);
|
||||||
options.setUseDiff(true, "s1", "s2");
|
options.setUseDiff("s1", "s2");
|
||||||
Assert.assertTrue(options.shouldUseDiff());
|
Assert.assertTrue(options.shouldUseDiff());
|
||||||
Assert.assertEquals("s1", options.getFromSnapshot());
|
Assert.assertEquals("s1", options.getFromSnapshot());
|
||||||
Assert.assertEquals("s2", options.getToSnapshot());
|
Assert.assertEquals("s2", options.getToSnapshot());
|
||||||
|
@ -442,7 +443,7 @@ public class TestDistCpOptions {
|
||||||
new Path("hdfs://localhost:8020/source/first"),
|
new Path("hdfs://localhost:8020/source/first"),
|
||||||
new Path("hdfs://localhost:8020/target/"));
|
new Path("hdfs://localhost:8020/target/"));
|
||||||
options.setSyncFolder(true);
|
options.setSyncFolder(true);
|
||||||
options.setUseDiff(true, "s1", ".");
|
options.setUseDiff("s1", ".");
|
||||||
Assert.assertTrue(options.shouldUseDiff());
|
Assert.assertTrue(options.shouldUseDiff());
|
||||||
Assert.assertEquals("s1", options.getFromSnapshot());
|
Assert.assertEquals("s1", options.getFromSnapshot());
|
||||||
Assert.assertEquals(".", options.getToSnapshot());
|
Assert.assertEquals(".", options.getToSnapshot());
|
||||||
|
@ -453,11 +454,11 @@ public class TestDistCpOptions {
|
||||||
final DistCpOptions options = new DistCpOptions(
|
final DistCpOptions options = new DistCpOptions(
|
||||||
new Path("hdfs://localhost:8020/source/first"),
|
new Path("hdfs://localhost:8020/source/first"),
|
||||||
new Path("hdfs://localhost:8020/target/"));
|
new Path("hdfs://localhost:8020/target/"));
|
||||||
options.setUseDiff(true, "s1", "s2");
|
options.setUseDiff("s1", "s2");
|
||||||
fail("-diff should fail if -update option is not specified");
|
fail("-diff should fail if -update option is not specified");
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
assertExceptionContains(
|
assertExceptionContains(
|
||||||
"Diff is valid only with update options", e);
|
"-diff/-rdiff is valid only with -update option", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -465,7 +466,7 @@ public class TestDistCpOptions {
|
||||||
new Path("hdfs://localhost:8020/source/first"),
|
new Path("hdfs://localhost:8020/source/first"),
|
||||||
new Path("hdfs://localhost:8020/target/"));
|
new Path("hdfs://localhost:8020/target/"));
|
||||||
options.setSyncFolder(true);
|
options.setSyncFolder(true);
|
||||||
options.setUseDiff(true, "s1", "s2");
|
options.setUseDiff("s1", "s2");
|
||||||
options.setDeleteMissing(true);
|
options.setDeleteMissing(true);
|
||||||
assertFalse("-delete should be ignored when -diff is specified",
|
assertFalse("-delete should be ignored when -diff is specified",
|
||||||
options.shouldDeleteMissing());
|
options.shouldDeleteMissing());
|
||||||
|
@ -477,12 +478,12 @@ public class TestDistCpOptions {
|
||||||
final DistCpOptions options = new DistCpOptions(
|
final DistCpOptions options = new DistCpOptions(
|
||||||
new Path("hdfs://localhost:8020/source/first"),
|
new Path("hdfs://localhost:8020/source/first"),
|
||||||
new Path("hdfs://localhost:8020/target/"));
|
new Path("hdfs://localhost:8020/target/"));
|
||||||
options.setUseDiff(true, "s1", "s2");
|
options.setUseDiff("s1", "s2");
|
||||||
options.setDeleteMissing(true);
|
options.setDeleteMissing(true);
|
||||||
fail("-diff should fail if -update option is not specified");
|
fail("-diff should fail if -update option is not specified");
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
assertExceptionContains(
|
assertExceptionContains(
|
||||||
"Diff is valid only with update options", e);
|
"-diff/-rdiff is valid only with -update option", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -490,7 +491,7 @@ public class TestDistCpOptions {
|
||||||
new Path("hdfs://localhost:8020/source/first"),
|
new Path("hdfs://localhost:8020/source/first"),
|
||||||
new Path("hdfs://localhost:8020/target/"));
|
new Path("hdfs://localhost:8020/target/"));
|
||||||
options.setDeleteMissing(true);
|
options.setDeleteMissing(true);
|
||||||
options.setUseDiff(true, "s1", "s2");
|
options.setUseDiff("s1", "s2");
|
||||||
fail("-delete should fail if -update option is not specified");
|
fail("-delete should fail if -update option is not specified");
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
assertExceptionContains("Delete missing is applicable only with update " +
|
assertExceptionContains("Delete missing is applicable only with update " +
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class TestDistCpSync {
|
||||||
|
|
||||||
options = new DistCpOptions(Arrays.asList(source), target);
|
options = new DistCpOptions(Arrays.asList(source), target);
|
||||||
options.setSyncFolder(true);
|
options.setSyncFolder(true);
|
||||||
options.setUseDiff(true, "s1", "s2");
|
options.setUseDiff("s1", "s2");
|
||||||
options.appendToConf(conf);
|
options.appendToConf(conf);
|
||||||
|
|
||||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString());
|
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString());
|
||||||
|
@ -312,7 +312,7 @@ public class TestDistCpSync {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSyncWithCurrent() throws Exception {
|
public void testSyncWithCurrent() throws Exception {
|
||||||
options.setUseDiff(true, "s1", ".");
|
options.setUseDiff("s1", ".");
|
||||||
initData(source);
|
initData(source);
|
||||||
initData(target);
|
initData(target);
|
||||||
enableAndCreateFirstSnapshot();
|
enableAndCreateFirstSnapshot();
|
||||||
|
|
|
@ -0,0 +1,868 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.tools;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FsShell;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.tools.mapred.CopyMapper;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.PrintStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.StringTokenizer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class to test "-rdiff s2 s1".
|
||||||
|
* Shared by "-rdiff s2 s1 src tgt" and "-rdiff s2 s1 tgt tgt"
|
||||||
|
*/
|
||||||
|
public abstract class TestDistCpSyncReverseBase {
|
||||||
|
private MiniDFSCluster cluster;
|
||||||
|
private final Configuration conf = new HdfsConfiguration();
|
||||||
|
private DistributedFileSystem dfs;
|
||||||
|
private DistCpOptions options;
|
||||||
|
private Path source;
|
||||||
|
private boolean isSrcNotSameAsTgt = true;
|
||||||
|
private final Path target = new Path("/target");
|
||||||
|
private final long blockSize = 1024;
|
||||||
|
private final short dataNum = 1;
|
||||||
|
|
||||||
|
abstract void initSourcePath();
|
||||||
|
|
||||||
|
private static List<String> lsr(final String prefix,
|
||||||
|
final FsShell shell, Path rootDir) throws Exception {
|
||||||
|
return lsr(prefix, shell, rootDir.toString(), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> lsrSource(final String prefix,
|
||||||
|
final FsShell shell, Path rootDir) throws Exception {
|
||||||
|
final Path spath = isSrcNotSameAsTgt? rootDir :
|
||||||
|
new Path(rootDir.toString(),
|
||||||
|
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
|
||||||
|
return lsr(prefix, shell, spath.toString(), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<String> lsr(final String prefix,
|
||||||
|
final FsShell shell, String rootDir, String glob) throws Exception {
|
||||||
|
final String dir = glob == null ? rootDir : glob;
|
||||||
|
System.out.println(prefix + " lsr root=" + rootDir);
|
||||||
|
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||||
|
final PrintStream out = new PrintStream(bytes);
|
||||||
|
final PrintStream oldOut = System.out;
|
||||||
|
final PrintStream oldErr = System.err;
|
||||||
|
System.setOut(out);
|
||||||
|
System.setErr(out);
|
||||||
|
final String results;
|
||||||
|
try {
|
||||||
|
Assert.assertEquals(0, shell.run(new String[] {"-lsr", dir }));
|
||||||
|
results = bytes.toString();
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeStream(out);
|
||||||
|
System.setOut(oldOut);
|
||||||
|
System.setErr(oldErr);
|
||||||
|
}
|
||||||
|
System.out.println("lsr results:\n" + results);
|
||||||
|
String dirname = rootDir;
|
||||||
|
if (rootDir.lastIndexOf(Path.SEPARATOR) != -1) {
|
||||||
|
dirname = rootDir.substring(rootDir.lastIndexOf(Path.SEPARATOR));
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<String> paths = new ArrayList<String>();
|
||||||
|
for (StringTokenizer t = new StringTokenizer(results, "\n"); t
|
||||||
|
.hasMoreTokens();) {
|
||||||
|
final String s = t.nextToken();
|
||||||
|
final int i = s.indexOf(dirname);
|
||||||
|
if (i >= 0) {
|
||||||
|
paths.add(s.substring(i + dirname.length()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Collections.sort(paths);
|
||||||
|
System.out
|
||||||
|
.println("lsr paths = " + paths.toString().replace(", ", ",\n "));
|
||||||
|
return paths;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSource(final Path src) {
|
||||||
|
this.source = src;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSrcNotSameAsTgt(final boolean srcNotSameAsTgt) {
|
||||||
|
isSrcNotSameAsTgt = srcNotSameAsTgt;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
initSourcePath();
|
||||||
|
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dataNum).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
dfs = cluster.getFileSystem();
|
||||||
|
if (isSrcNotSameAsTgt) {
|
||||||
|
dfs.mkdirs(source);
|
||||||
|
}
|
||||||
|
dfs.mkdirs(target);
|
||||||
|
|
||||||
|
options = new DistCpOptions(Arrays.asList(source), target);
|
||||||
|
options.setSyncFolder(true);
|
||||||
|
options.setUseRdiff("s2", "s1");
|
||||||
|
options.appendToConf(conf);
|
||||||
|
|
||||||
|
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString());
|
||||||
|
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, target.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
IOUtils.cleanup(null, dfs);
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the sync returns false in the following scenarios:
|
||||||
|
* 1. the source/target dir are not snapshottable dir
|
||||||
|
* 2. the source/target does not have the given snapshots
|
||||||
|
* 3. changes have been made in target
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testFallback() throws Exception {
|
||||||
|
// the source/target dir are not snapshottable dir
|
||||||
|
Assert.assertFalse(sync());
|
||||||
|
// make sure the source path has been updated to the snapshot path
|
||||||
|
final Path spath = new Path(source,
|
||||||
|
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
|
||||||
|
Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
||||||
|
|
||||||
|
// reset source path in options
|
||||||
|
options.setSourcePaths(Arrays.asList(source));
|
||||||
|
// the source/target does not have the given snapshots
|
||||||
|
dfs.allowSnapshot(source);
|
||||||
|
dfs.allowSnapshot(target);
|
||||||
|
Assert.assertFalse(sync());
|
||||||
|
Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
||||||
|
|
||||||
|
// reset source path in options
|
||||||
|
options.setSourcePaths(Arrays.asList(source));
|
||||||
|
this.enableAndCreateFirstSnapshot();
|
||||||
|
dfs.createSnapshot(target, "s2");
|
||||||
|
Assert.assertTrue(sync());
|
||||||
|
|
||||||
|
// reset source paths in options
|
||||||
|
options.setSourcePaths(Arrays.asList(source));
|
||||||
|
// changes have been made in target
|
||||||
|
final Path subTarget = new Path(target, "sub");
|
||||||
|
dfs.mkdirs(subTarget);
|
||||||
|
Assert.assertFalse(sync());
|
||||||
|
// make sure the source path has been updated to the snapshot path
|
||||||
|
Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
||||||
|
|
||||||
|
// reset source paths in options
|
||||||
|
options.setSourcePaths(Arrays.asList(source));
|
||||||
|
dfs.delete(subTarget, true);
|
||||||
|
Assert.assertTrue(sync());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void syncAndVerify() throws Exception {
|
||||||
|
|
||||||
|
final FsShell shell = new FsShell(conf);
|
||||||
|
lsrSource("Before sync source: ", shell, source);
|
||||||
|
lsr("Before sync target: ", shell, target);
|
||||||
|
|
||||||
|
Assert.assertTrue(sync());
|
||||||
|
|
||||||
|
lsrSource("After sync source: ", shell, source);
|
||||||
|
lsr("After sync target: ", shell, target);
|
||||||
|
|
||||||
|
verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean sync() throws Exception {
|
||||||
|
DistCpSync distCpSync = new DistCpSync(options, conf);
|
||||||
|
return distCpSync.sync();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void enableAndCreateFirstSnapshot() throws Exception {
|
||||||
|
if (isSrcNotSameAsTgt) {
|
||||||
|
dfs.allowSnapshot(source);
|
||||||
|
dfs.createSnapshot(source, "s1");
|
||||||
|
}
|
||||||
|
dfs.allowSnapshot(target);
|
||||||
|
dfs.createSnapshot(target, "s1");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createSecondSnapshotAtTarget() throws Exception {
|
||||||
|
dfs.createSnapshot(target, "s2");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createMiddleSnapshotAtTarget() throws Exception {
|
||||||
|
dfs.createSnapshot(target, "s1.5");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* create some files and directories under the given directory.
|
||||||
|
* the final subtree looks like this:
|
||||||
|
* dir/
|
||||||
|
* foo/ bar/
|
||||||
|
* d1/ f1 d2/ f2
|
||||||
|
* f3 f4
|
||||||
|
*/
|
||||||
|
private void initData(Path dir) throws Exception {
|
||||||
|
final Path foo = new Path(dir, "foo");
|
||||||
|
final Path bar = new Path(dir, "bar");
|
||||||
|
final Path d1 = new Path(foo, "d1");
|
||||||
|
final Path f1 = new Path(foo, "f1");
|
||||||
|
final Path d2 = new Path(bar, "d2");
|
||||||
|
final Path f2 = new Path(bar, "f2");
|
||||||
|
final Path f3 = new Path(d1, "f3");
|
||||||
|
final Path f4 = new Path(d2, "f4");
|
||||||
|
|
||||||
|
DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0);
|
||||||
|
DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 0);
|
||||||
|
DFSTestUtil.createFile(dfs, f3, blockSize, dataNum, 0);
|
||||||
|
DFSTestUtil.createFile(dfs, f4, blockSize, dataNum, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* make some changes under the given directory (created in the above way).
|
||||||
|
* 1. rename dir/foo/d1 to dir/bar/d1
|
||||||
|
* 2. delete dir/bar/d1/f3
|
||||||
|
* 3. rename dir/foo to /dir/bar/d1/foo
|
||||||
|
* 4. delete dir/bar/d1/foo/f1
|
||||||
|
* 5. create file dir/bar/d1/foo/f1 whose size is 2*BLOCK_SIZE
|
||||||
|
* 6. append one BLOCK to file dir/bar/f2
|
||||||
|
* 7. rename dir/bar to dir/foo
|
||||||
|
*
|
||||||
|
* Thus after all these ops the subtree looks like this:
|
||||||
|
* dir/
|
||||||
|
* foo/
|
||||||
|
* d1/ f2(A) d2/
|
||||||
|
* foo/ f4
|
||||||
|
* f1(new)
|
||||||
|
*/
|
||||||
|
private int changeData(Path dir) throws Exception {
|
||||||
|
final Path foo = new Path(dir, "foo");
|
||||||
|
final Path bar = new Path(dir, "bar");
|
||||||
|
final Path d1 = new Path(foo, "d1");
|
||||||
|
final Path f2 = new Path(bar, "f2");
|
||||||
|
|
||||||
|
final Path bar_d1 = new Path(bar, "d1");
|
||||||
|
int numDeletedModified = 0;
|
||||||
|
dfs.rename(d1, bar_d1);
|
||||||
|
numDeletedModified += 1; // modify ./foo
|
||||||
|
numDeletedModified += 1; // modify ./bar
|
||||||
|
final Path f3 = new Path(bar_d1, "f3");
|
||||||
|
dfs.delete(f3, true);
|
||||||
|
numDeletedModified += 1; // delete f3
|
||||||
|
final Path newfoo = new Path(bar_d1, "foo");
|
||||||
|
dfs.rename(foo, newfoo);
|
||||||
|
numDeletedModified += 1; // modify ./foo/d1
|
||||||
|
final Path f1 = new Path(newfoo, "f1");
|
||||||
|
dfs.delete(f1, true);
|
||||||
|
numDeletedModified += 1; // delete ./foo/f1
|
||||||
|
DFSTestUtil.createFile(dfs, f1, 2 * blockSize, dataNum, 0);
|
||||||
|
DFSTestUtil.appendFile(dfs, f2, (int) blockSize);
|
||||||
|
numDeletedModified += 1; // modify ./bar/f2
|
||||||
|
dfs.rename(bar, new Path(dir, "foo"));
|
||||||
|
return numDeletedModified;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the basic functionality.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSync() throws Exception {
|
||||||
|
if (isSrcNotSameAsTgt) {
|
||||||
|
initData(source);
|
||||||
|
}
|
||||||
|
initData(target);
|
||||||
|
enableAndCreateFirstSnapshot();
|
||||||
|
|
||||||
|
final FsShell shell = new FsShell(conf);
|
||||||
|
|
||||||
|
lsrSource("Before source: ", shell, source);
|
||||||
|
lsr("Before target: ", shell, target);
|
||||||
|
|
||||||
|
// make changes under target
|
||||||
|
int numDeletedModified = changeData(target);
|
||||||
|
|
||||||
|
createSecondSnapshotAtTarget();
|
||||||
|
|
||||||
|
SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1");
|
||||||
|
System.out.println(report);
|
||||||
|
|
||||||
|
DistCpSync distCpSync = new DistCpSync(options, conf);
|
||||||
|
|
||||||
|
lsr("Before sync target: ", shell, target);
|
||||||
|
|
||||||
|
// do the sync
|
||||||
|
Assert.assertTrue(distCpSync.sync());
|
||||||
|
|
||||||
|
lsr("After sync target: ", shell, target);
|
||||||
|
|
||||||
|
// make sure the source path has been updated to the snapshot path
|
||||||
|
final Path spath = new Path(source,
|
||||||
|
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
|
||||||
|
Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
||||||
|
|
||||||
|
// build copy listing
|
||||||
|
final Path listingPath = new Path("/tmp/META/fileList.seq");
|
||||||
|
CopyListing listing = new SimpleCopyListing(conf, new Credentials(),
|
||||||
|
distCpSync);
|
||||||
|
listing.buildListing(listingPath, options);
|
||||||
|
|
||||||
|
Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
|
||||||
|
CopyMapper copyMapper = new CopyMapper();
|
||||||
|
StubContext stubContext = new StubContext(conf, null, 0);
|
||||||
|
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
|
||||||
|
stubContext.getContext();
|
||||||
|
// Enable append
|
||||||
|
context.getConfiguration().setBoolean(
|
||||||
|
DistCpOptionSwitch.APPEND.getConfigLabel(), true);
|
||||||
|
copyMapper.setup(context);
|
||||||
|
for (Map.Entry<Text, CopyListingFileStatus> entry :
|
||||||
|
copyListing.entrySet()) {
|
||||||
|
copyMapper.map(entry.getKey(), entry.getValue(), context);
|
||||||
|
}
|
||||||
|
|
||||||
|
lsrSource("After mapper source: ", shell, source);
|
||||||
|
lsr("After mapper target: ", shell, target);
|
||||||
|
|
||||||
|
// verify that we only list modified and created files/directories
|
||||||
|
Assert.assertEquals(numDeletedModified, copyListing.size());
|
||||||
|
|
||||||
|
// verify that we only copied new appended data of f2 and the new file f1
|
||||||
|
Assert.assertEquals(blockSize * 3, stubContext.getReporter()
|
||||||
|
.getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
|
||||||
|
|
||||||
|
// verify the source and target now has the same structure
|
||||||
|
verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<Text, CopyListingFileStatus> getListing(Path listingPath)
|
||||||
|
throws Exception {
|
||||||
|
SequenceFile.Reader reader = null;
|
||||||
|
Map<Text, CopyListingFileStatus> values = new HashMap<>();
|
||||||
|
try {
|
||||||
|
reader = new SequenceFile.Reader(conf,
|
||||||
|
SequenceFile.Reader.file(listingPath));
|
||||||
|
Text key = new Text();
|
||||||
|
CopyListingFileStatus value = new CopyListingFileStatus();
|
||||||
|
while (reader.next(key, value)) {
|
||||||
|
values.put(key, value);
|
||||||
|
key = new Text();
|
||||||
|
value = new CopyListingFileStatus();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (reader != null) {
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return values;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyCopy(FileStatus s, FileStatus t, boolean compareName)
|
||||||
|
throws Exception {
|
||||||
|
Assert.assertEquals(s.isDirectory(), t.isDirectory());
|
||||||
|
if (compareName) {
|
||||||
|
Assert.assertEquals(s.getPath().getName(), t.getPath().getName());
|
||||||
|
}
|
||||||
|
if (!s.isDirectory()) {
|
||||||
|
// verify the file content is the same
|
||||||
|
byte[] sbytes = DFSTestUtil.readFileBuffer(dfs, s.getPath());
|
||||||
|
byte[] tbytes = DFSTestUtil.readFileBuffer(dfs, t.getPath());
|
||||||
|
Assert.assertArrayEquals(sbytes, tbytes);
|
||||||
|
} else {
|
||||||
|
FileStatus[] slist = dfs.listStatus(s.getPath());
|
||||||
|
FileStatus[] tlist = dfs.listStatus(t.getPath());
|
||||||
|
Assert.assertEquals(slist.length, tlist.length);
|
||||||
|
for (int i = 0; i < slist.length; i++) {
|
||||||
|
verifyCopy(slist[i], tlist[i], true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the case that "current" is snapshotted as "s2".
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSyncWithCurrent() throws Exception {
|
||||||
|
options.setUseRdiff(".", "s1");
|
||||||
|
if (isSrcNotSameAsTgt) {
|
||||||
|
initData(source);
|
||||||
|
}
|
||||||
|
initData(target);
|
||||||
|
enableAndCreateFirstSnapshot();
|
||||||
|
|
||||||
|
// make changes under target
|
||||||
|
changeData(target);
|
||||||
|
|
||||||
|
// do the sync
|
||||||
|
Assert.assertTrue(sync());
|
||||||
|
final Path spath = new Path(source,
|
||||||
|
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
|
||||||
|
// make sure the source path is still unchanged
|
||||||
|
Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initData2(Path dir) throws Exception {
|
||||||
|
final Path test = new Path(dir, "test");
|
||||||
|
final Path foo = new Path(dir, "foo");
|
||||||
|
final Path bar = new Path(dir, "bar");
|
||||||
|
final Path f1 = new Path(test, "f1");
|
||||||
|
final Path f2 = new Path(foo, "f2");
|
||||||
|
final Path f3 = new Path(bar, "f3");
|
||||||
|
|
||||||
|
DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
|
||||||
|
DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 1L);
|
||||||
|
DFSTestUtil.createFile(dfs, f3, blockSize, dataNum, 2L);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void changeData2(Path dir) throws Exception {
|
||||||
|
final Path tmpFoo = new Path(dir, "tmpFoo");
|
||||||
|
final Path test = new Path(dir, "test");
|
||||||
|
final Path foo = new Path(dir, "foo");
|
||||||
|
final Path bar = new Path(dir, "bar");
|
||||||
|
|
||||||
|
dfs.rename(test, tmpFoo);
|
||||||
|
dfs.rename(foo, test);
|
||||||
|
dfs.rename(bar, foo);
|
||||||
|
dfs.rename(tmpFoo, bar);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSync2() throws Exception {
|
||||||
|
if (isSrcNotSameAsTgt) {
|
||||||
|
initData2(source);
|
||||||
|
}
|
||||||
|
initData2(target);
|
||||||
|
enableAndCreateFirstSnapshot();
|
||||||
|
|
||||||
|
// make changes under target
|
||||||
|
changeData2(target);
|
||||||
|
|
||||||
|
createSecondSnapshotAtTarget();
|
||||||
|
|
||||||
|
SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1");
|
||||||
|
System.out.println(report);
|
||||||
|
|
||||||
|
syncAndVerify();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initData3(Path dir) throws Exception {
|
||||||
|
final Path test = new Path(dir, "test");
|
||||||
|
final Path foo = new Path(dir, "foo");
|
||||||
|
final Path bar = new Path(dir, "bar");
|
||||||
|
final Path f1 = new Path(test, "file");
|
||||||
|
final Path f2 = new Path(foo, "file");
|
||||||
|
final Path f3 = new Path(bar, "file");
|
||||||
|
|
||||||
|
DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
|
||||||
|
DFSTestUtil.createFile(dfs, f2, blockSize * 2, dataNum, 1L);
|
||||||
|
DFSTestUtil.createFile(dfs, f3, blockSize * 3, dataNum, 2L);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void changeData3(Path dir) throws Exception {
|
||||||
|
final Path test = new Path(dir, "test");
|
||||||
|
final Path foo = new Path(dir, "foo");
|
||||||
|
final Path bar = new Path(dir, "bar");
|
||||||
|
final Path f1 = new Path(test, "file");
|
||||||
|
final Path f2 = new Path(foo, "file");
|
||||||
|
final Path f3 = new Path(bar, "file");
|
||||||
|
final Path newf1 = new Path(test, "newfile");
|
||||||
|
final Path newf2 = new Path(foo, "newfile");
|
||||||
|
final Path newf3 = new Path(bar, "newfile");
|
||||||
|
|
||||||
|
dfs.rename(f1, newf1);
|
||||||
|
dfs.rename(f2, newf2);
|
||||||
|
dfs.rename(f3, newf3);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a case where there are multiple source files with the same name.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSync3() throws Exception {
|
||||||
|
if (isSrcNotSameAsTgt) {
|
||||||
|
initData3(source);
|
||||||
|
}
|
||||||
|
initData3(target);
|
||||||
|
enableAndCreateFirstSnapshot();
|
||||||
|
|
||||||
|
// make changes under target
|
||||||
|
changeData3(target);
|
||||||
|
|
||||||
|
createSecondSnapshotAtTarget();
|
||||||
|
|
||||||
|
SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1");
|
||||||
|
System.out.println(report);
|
||||||
|
|
||||||
|
syncAndVerify();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initData4(Path dir) throws Exception {
|
||||||
|
final Path d1 = new Path(dir, "d1");
|
||||||
|
final Path d2 = new Path(d1, "d2");
|
||||||
|
final Path f1 = new Path(d2, "f1");
|
||||||
|
|
||||||
|
DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int changeData4(Path dir) throws Exception {
|
||||||
|
final Path d1 = new Path(dir, "d1");
|
||||||
|
final Path d11 = new Path(dir, "d11");
|
||||||
|
final Path d2 = new Path(d1, "d2");
|
||||||
|
final Path d21 = new Path(d1, "d21");
|
||||||
|
final Path f1 = new Path(d2, "f1");
|
||||||
|
|
||||||
|
int numDeletedAndModified = 0;
|
||||||
|
dfs.delete(f1, false);
|
||||||
|
numDeletedAndModified += 1;
|
||||||
|
dfs.rename(d2, d21);
|
||||||
|
numDeletedAndModified += 1;
|
||||||
|
dfs.rename(d1, d11);
|
||||||
|
numDeletedAndModified += 1;
|
||||||
|
return numDeletedAndModified;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a case where multiple level dirs are renamed.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSync4() throws Exception {
|
||||||
|
if (isSrcNotSameAsTgt) {
|
||||||
|
initData4(source);
|
||||||
|
}
|
||||||
|
initData4(target);
|
||||||
|
enableAndCreateFirstSnapshot();
|
||||||
|
|
||||||
|
final FsShell shell = new FsShell(conf);
|
||||||
|
lsr("Before change target: ", shell, target);
|
||||||
|
|
||||||
|
// make changes under target
|
||||||
|
int numDeletedAndModified = changeData4(target);
|
||||||
|
|
||||||
|
createSecondSnapshotAtTarget();
|
||||||
|
|
||||||
|
SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1");
|
||||||
|
System.out.println(report);
|
||||||
|
|
||||||
|
testAndVerify(numDeletedAndModified);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initData5(Path dir) throws Exception {
|
||||||
|
final Path d1 = new Path(dir, "d1");
|
||||||
|
final Path d2 = new Path(dir, "d2");
|
||||||
|
final Path f1 = new Path(d1, "f1");
|
||||||
|
final Path f2 = new Path(d2, "f2");
|
||||||
|
|
||||||
|
DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
|
||||||
|
DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int changeData5(Path dir) throws Exception {
|
||||||
|
final Path d1 = new Path(dir, "d1");
|
||||||
|
final Path d2 = new Path(dir, "d2");
|
||||||
|
final Path f1 = new Path(d1, "f1");
|
||||||
|
final Path tmp = new Path(dir, "tmp");
|
||||||
|
|
||||||
|
int numDeletedAndModified = 0;
|
||||||
|
dfs.delete(f1, false);
|
||||||
|
numDeletedAndModified += 1;
|
||||||
|
dfs.rename(d1, tmp);
|
||||||
|
numDeletedAndModified += 1;
|
||||||
|
dfs.rename(d2, d1);
|
||||||
|
numDeletedAndModified += 1;
|
||||||
|
final Path f2 = new Path(d1, "f2");
|
||||||
|
dfs.delete(f2, false);
|
||||||
|
numDeletedAndModified += 1;
|
||||||
|
return numDeletedAndModified;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a case with different delete and rename sequences.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSync5() throws Exception {
|
||||||
|
if (isSrcNotSameAsTgt) {
|
||||||
|
initData5(source);
|
||||||
|
}
|
||||||
|
initData5(target);
|
||||||
|
enableAndCreateFirstSnapshot();
|
||||||
|
|
||||||
|
// make changes under target
|
||||||
|
int numDeletedAndModified = changeData5(target);
|
||||||
|
|
||||||
|
createSecondSnapshotAtTarget();
|
||||||
|
|
||||||
|
SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1");
|
||||||
|
System.out.println(report);
|
||||||
|
|
||||||
|
testAndVerify(numDeletedAndModified);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testAndVerify(int numDeletedAndModified)
|
||||||
|
throws Exception{
|
||||||
|
SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1");
|
||||||
|
System.out.println(report);
|
||||||
|
|
||||||
|
final FsShell shell = new FsShell(conf);
|
||||||
|
|
||||||
|
lsrSource("Before sync source: ", shell, source);
|
||||||
|
lsr("Before sync target: ", shell, target);
|
||||||
|
|
||||||
|
DistCpSync distCpSync = new DistCpSync(options, conf);
|
||||||
|
// do the sync
|
||||||
|
distCpSync.sync();
|
||||||
|
|
||||||
|
lsr("After sync target: ", shell, target);
|
||||||
|
|
||||||
|
// make sure the source path has been updated to the snapshot path
|
||||||
|
final Path spath = new Path(source,
|
||||||
|
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
|
||||||
|
Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
||||||
|
|
||||||
|
// build copy listing
|
||||||
|
final Path listingPath = new Path("/tmp/META/fileList.seq");
|
||||||
|
CopyListing listing = new SimpleCopyListing(conf, new Credentials(), distCpSync);
|
||||||
|
listing.buildListing(listingPath, options);
|
||||||
|
|
||||||
|
Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
|
||||||
|
CopyMapper copyMapper = new CopyMapper();
|
||||||
|
StubContext stubContext = new StubContext(conf, null, 0);
|
||||||
|
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
|
||||||
|
stubContext.getContext();
|
||||||
|
// Enable append
|
||||||
|
context.getConfiguration().setBoolean(
|
||||||
|
DistCpOptionSwitch.APPEND.getConfigLabel(), true);
|
||||||
|
copyMapper.setup(context);
|
||||||
|
for (Map.Entry<Text, CopyListingFileStatus> entry :
|
||||||
|
copyListing.entrySet()) {
|
||||||
|
copyMapper.map(entry.getKey(), entry.getValue(), context);
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify that we only list modified and created files/directories
|
||||||
|
Assert.assertEquals(numDeletedAndModified, copyListing.size());
|
||||||
|
|
||||||
|
lsr("After Copy target: ", shell, target);
|
||||||
|
|
||||||
|
// verify the source and target now has the same structure
|
||||||
|
verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initData6(Path dir) throws Exception {
|
||||||
|
final Path foo = new Path(dir, "foo");
|
||||||
|
final Path bar = new Path(dir, "bar");
|
||||||
|
final Path foo_f1 = new Path(foo, "f1");
|
||||||
|
final Path bar_f1 = new Path(bar, "f1");
|
||||||
|
|
||||||
|
DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
|
||||||
|
DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int changeData6(Path dir) throws Exception {
|
||||||
|
final Path foo = new Path(dir, "foo");
|
||||||
|
final Path bar = new Path(dir, "bar");
|
||||||
|
final Path foo2 = new Path(dir, "foo2");
|
||||||
|
final Path foo_f1 = new Path(foo, "f1");
|
||||||
|
|
||||||
|
int numDeletedModified = 0;
|
||||||
|
dfs.rename(foo, foo2);
|
||||||
|
dfs.rename(bar, foo);
|
||||||
|
dfs.rename(foo2, bar);
|
||||||
|
DFSTestUtil.appendFile(dfs, foo_f1, (int) blockSize);
|
||||||
|
numDeletedModified += 1; // modify ./bar/f1
|
||||||
|
return numDeletedModified;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a case where there is a cycle in renaming dirs.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSync6() throws Exception {
|
||||||
|
if (isSrcNotSameAsTgt) {
|
||||||
|
initData6(source);
|
||||||
|
}
|
||||||
|
initData6(target);
|
||||||
|
enableAndCreateFirstSnapshot();
|
||||||
|
int numDeletedModified = changeData6(target);
|
||||||
|
|
||||||
|
createSecondSnapshotAtTarget();
|
||||||
|
|
||||||
|
testAndVerify(numDeletedModified);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initData7(Path dir) throws Exception {
|
||||||
|
final Path foo = new Path(dir, "foo");
|
||||||
|
final Path bar = new Path(dir, "bar");
|
||||||
|
final Path foo_f1 = new Path(foo, "f1");
|
||||||
|
final Path bar_f1 = new Path(bar, "f1");
|
||||||
|
|
||||||
|
DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
|
||||||
|
DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int changeData7(Path dir) throws Exception {
|
||||||
|
final Path foo = new Path(dir, "foo");
|
||||||
|
final Path foo2 = new Path(dir, "foo2");
|
||||||
|
final Path foo_f1 = new Path(foo, "f1");
|
||||||
|
final Path foo2_f2 = new Path(foo2, "f2");
|
||||||
|
final Path foo_d1 = new Path(foo, "d1");
|
||||||
|
final Path foo_d1_f3 = new Path(foo_d1, "f3");
|
||||||
|
|
||||||
|
int numDeletedAndModified = 0;
|
||||||
|
dfs.rename(foo, foo2);
|
||||||
|
DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
|
||||||
|
DFSTestUtil.appendFile(dfs, foo_f1, (int) blockSize);
|
||||||
|
dfs.rename(foo_f1, foo2_f2);
|
||||||
|
/*
|
||||||
|
* Difference between snapshot s1 and current directory under directory
|
||||||
|
/target:
|
||||||
|
M .
|
||||||
|
+ ./foo
|
||||||
|
R ./foo -> ./foo2
|
||||||
|
M ./foo
|
||||||
|
+ ./foo/f2
|
||||||
|
*/
|
||||||
|
numDeletedAndModified += 1; // "M ./foo"
|
||||||
|
DFSTestUtil.createFile(dfs, foo_d1_f3, blockSize, dataNum, 0L);
|
||||||
|
return numDeletedAndModified;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a case where rename a dir, then create a new dir with the same name
|
||||||
|
* and sub dir.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSync7() throws Exception {
|
||||||
|
if (isSrcNotSameAsTgt) {
|
||||||
|
initData7(source);
|
||||||
|
}
|
||||||
|
initData7(target);
|
||||||
|
enableAndCreateFirstSnapshot();
|
||||||
|
int numDeletedAndModified = changeData7(target);
|
||||||
|
|
||||||
|
createSecondSnapshotAtTarget();
|
||||||
|
|
||||||
|
testAndVerify(numDeletedAndModified);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initData8(Path dir) throws Exception {
|
||||||
|
final Path foo = new Path(dir, "foo");
|
||||||
|
final Path bar = new Path(dir, "bar");
|
||||||
|
final Path d1 = new Path(dir, "d1");
|
||||||
|
final Path foo_f1 = new Path(foo, "f1");
|
||||||
|
final Path bar_f1 = new Path(bar, "f1");
|
||||||
|
final Path d1_f1 = new Path(d1, "f1");
|
||||||
|
|
||||||
|
DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
|
||||||
|
DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L);
|
||||||
|
DFSTestUtil.createFile(dfs, d1_f1, blockSize, dataNum, 0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int changeData8(Path dir, boolean createMiddleSnapshot)
|
||||||
|
throws Exception {
|
||||||
|
final Path foo = new Path(dir, "foo");
|
||||||
|
final Path createdDir = new Path(dir, "c");
|
||||||
|
final Path d1 = new Path(dir, "d1");
|
||||||
|
final Path d1_f1 = new Path(d1, "f1");
|
||||||
|
final Path createdDir_f1 = new Path(createdDir, "f1");
|
||||||
|
final Path foo_f3 = new Path(foo, "f3");
|
||||||
|
final Path new_foo = new Path(createdDir, "foo");
|
||||||
|
final Path foo_f4 = new Path(foo, "f4");
|
||||||
|
final Path foo_d1 = new Path(foo, "d1");
|
||||||
|
final Path bar = new Path(dir, "bar");
|
||||||
|
final Path bar1 = new Path(dir, "bar1");
|
||||||
|
|
||||||
|
int numDeletedAndModified = 0;
|
||||||
|
DFSTestUtil.createFile(dfs, foo_f3, blockSize, dataNum, 0L);
|
||||||
|
DFSTestUtil.createFile(dfs, createdDir_f1, blockSize, dataNum, 0L);
|
||||||
|
dfs.rename(createdDir_f1, foo_f4);
|
||||||
|
dfs.rename(d1_f1, createdDir_f1); // rename ./d1/f1 -> ./c/f1
|
||||||
|
numDeletedAndModified += 1; // modify ./c/foo/d1
|
||||||
|
|
||||||
|
if (createMiddleSnapshot) {
|
||||||
|
this.createMiddleSnapshotAtTarget();
|
||||||
|
}
|
||||||
|
|
||||||
|
dfs.rename(d1, foo_d1);
|
||||||
|
numDeletedAndModified += 1; // modify ./c/foo
|
||||||
|
dfs.rename(foo, new_foo);
|
||||||
|
dfs.rename(bar, bar1);
|
||||||
|
return numDeletedAndModified;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a case where create a dir, then mv a existed dir into it.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSync8() throws Exception {
|
||||||
|
if (isSrcNotSameAsTgt) {
|
||||||
|
initData8(source);
|
||||||
|
}
|
||||||
|
initData8(target);
|
||||||
|
enableAndCreateFirstSnapshot();
|
||||||
|
int numDeletedModified = changeData8(target, false);
|
||||||
|
|
||||||
|
createSecondSnapshotAtTarget();
|
||||||
|
|
||||||
|
testAndVerify(numDeletedModified);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a case where create a dir, then mv a existed dir into it.
|
||||||
|
* The difference between this one and testSync8 is, this one
|
||||||
|
* also creates a snapshot s1.5 in between s1 and s2.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSync9() throws Exception {
|
||||||
|
if (isSrcNotSameAsTgt) {
|
||||||
|
initData8(source);
|
||||||
|
}
|
||||||
|
initData8(target);
|
||||||
|
enableAndCreateFirstSnapshot();
|
||||||
|
int numDeletedModified = changeData8(target, true);
|
||||||
|
|
||||||
|
createSecondSnapshotAtTarget();
|
||||||
|
|
||||||
|
testAndVerify(numDeletedModified);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,36 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.tools;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the case "-rdiff s2 s1 src tgt".
|
||||||
|
*/
|
||||||
|
public class TestDistCpSyncReverseFromSource
|
||||||
|
extends TestDistCpSyncReverseBase {
|
||||||
|
/*
|
||||||
|
* Initialize the source path to /target.
|
||||||
|
* @see org.apache.hadoop.tools.TestDistCpSyncReverseBase#initSourcePath()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
void initSourcePath() {
|
||||||
|
setSource(new Path("/source"));
|
||||||
|
setSrcNotSameAsTgt(true);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,36 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.tools;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the case "-rdiff s2 s1 tgt tgt".
|
||||||
|
*/
|
||||||
|
public class TestDistCpSyncReverseFromTarget
|
||||||
|
extends TestDistCpSyncReverseBase {
|
||||||
|
/*
|
||||||
|
* Initialize the source path to /target.
|
||||||
|
* @see org.apache.hadoop.tools.TestDistCpSyncReverseBase#initSourcePath()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
void initSourcePath() {
|
||||||
|
setSource(new Path("/target"));
|
||||||
|
setSrcNotSameAsTgt(false);
|
||||||
|
}
|
||||||
|
}
|
|
@ -400,7 +400,8 @@ public class TestOptionsParser {
|
||||||
DistCpOptions option = new DistCpOptions(new Path("abc"), new Path("xyz"));
|
DistCpOptions option = new DistCpOptions(new Path("abc"), new Path("xyz"));
|
||||||
String val = "DistCpOptions{atomicCommit=false, syncFolder=false, "
|
String val = "DistCpOptions{atomicCommit=false, syncFolder=false, "
|
||||||
+ "deleteMissing=false, ignoreFailures=false, overwrite=false, "
|
+ "deleteMissing=false, ignoreFailures=false, overwrite=false, "
|
||||||
+ "append=false, useDiff=false, fromSnapshot=null, toSnapshot=null, "
|
+ "append=false, useDiff=false, useRdiff=false, "
|
||||||
|
+ "fromSnapshot=null, toSnapshot=null, "
|
||||||
+ "skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, "
|
+ "skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, "
|
||||||
+ "mapBandwidth=100, sslConfigurationFile='null', "
|
+ "mapBandwidth=100, sslConfigurationFile='null', "
|
||||||
+ "copyStrategy='uniformsize', preserveStatus=[], "
|
+ "copyStrategy='uniformsize', preserveStatus=[], "
|
||||||
|
@ -656,76 +657,106 @@ public class TestOptionsParser {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
// Test -diff or -rdiff
|
||||||
public void testDiffOption() {
|
private void testSnapshotDiffOption(boolean isDiff) {
|
||||||
|
final String optionStr = isDiff? "-diff" : "-rdiff";
|
||||||
|
final String optionLabel = isDiff?
|
||||||
|
DistCpOptionSwitch.DIFF.getConfigLabel() :
|
||||||
|
DistCpOptionSwitch.RDIFF.getConfigLabel();
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(),
|
Assert.assertFalse(conf.getBoolean(optionLabel, false));
|
||||||
false));
|
|
||||||
|
|
||||||
DistCpOptions options = OptionsParser.parse(new String[] { "-update",
|
DistCpOptions options = OptionsParser.parse(new String[] { "-update",
|
||||||
"-diff", "s1", "s2",
|
optionStr, "s1", "s2",
|
||||||
"hdfs://localhost:8020/source/first",
|
"hdfs://localhost:8020/source/first",
|
||||||
"hdfs://localhost:8020/target/" });
|
"hdfs://localhost:8020/target/" });
|
||||||
options.appendToConf(conf);
|
options.appendToConf(conf);
|
||||||
Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(), false));
|
Assert.assertTrue(conf.getBoolean(optionLabel, false));
|
||||||
Assert.assertTrue(options.shouldUseDiff());
|
Assert.assertTrue(isDiff?
|
||||||
|
options.shouldUseDiff() : options.shouldUseRdiff());
|
||||||
Assert.assertEquals("s1", options.getFromSnapshot());
|
Assert.assertEquals("s1", options.getFromSnapshot());
|
||||||
Assert.assertEquals("s2", options.getToSnapshot());
|
Assert.assertEquals("s2", options.getToSnapshot());
|
||||||
|
|
||||||
options = OptionsParser.parse(new String[] {
|
options = OptionsParser.parse(new String[] {
|
||||||
"-diff", "s1", ".", "-update",
|
optionStr, "s1", ".", "-update",
|
||||||
"hdfs://localhost:8020/source/first",
|
"hdfs://localhost:8020/source/first",
|
||||||
"hdfs://localhost:8020/target/" });
|
"hdfs://localhost:8020/target/" });
|
||||||
options.appendToConf(conf);
|
options.appendToConf(conf);
|
||||||
Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(),
|
Assert.assertTrue(conf.getBoolean(optionLabel, false));
|
||||||
false));
|
Assert.assertTrue(isDiff?
|
||||||
Assert.assertTrue(options.shouldUseDiff());
|
options.shouldUseDiff() : options.shouldUseRdiff());
|
||||||
Assert.assertEquals("s1", options.getFromSnapshot());
|
Assert.assertEquals("s1", options.getFromSnapshot());
|
||||||
Assert.assertEquals(".", options.getToSnapshot());
|
Assert.assertEquals(".", options.getToSnapshot());
|
||||||
|
|
||||||
// -diff requires two option values
|
// -diff/-rdiff requires two option values
|
||||||
try {
|
try {
|
||||||
OptionsParser.parse(new String[] {"-diff", "s1", "-update",
|
OptionsParser.parse(new String[] {optionStr, "s1", "-update",
|
||||||
"hdfs://localhost:8020/source/first",
|
"hdfs://localhost:8020/source/first",
|
||||||
"hdfs://localhost:8020/target/" });
|
"hdfs://localhost:8020/target/" });
|
||||||
fail("-diff should fail with only one snapshot name");
|
fail(optionStr + " should fail with only one snapshot name");
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
GenericTestUtils.assertExceptionContains(
|
GenericTestUtils.assertExceptionContains(
|
||||||
"Must provide both the starting and ending snapshot names", e);
|
"Must provide both the starting and ending snapshot names", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure -diff is only valid when -update is specified
|
// make sure -diff/-rdiff is only valid when -update is specified
|
||||||
try {
|
try {
|
||||||
OptionsParser.parse(new String[] { "-diff", "s1", "s2",
|
OptionsParser.parse(new String[] {optionStr, "s1", "s2",
|
||||||
"hdfs://localhost:8020/source/first",
|
"hdfs://localhost:8020/source/first",
|
||||||
"hdfs://localhost:8020/target/" });
|
"hdfs://localhost:8020/target/" });
|
||||||
fail("-diff should fail if -update option is not specified");
|
fail(optionStr + " should fail if -update option is not specified");
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
GenericTestUtils.assertExceptionContains(
|
GenericTestUtils.assertExceptionContains(
|
||||||
"Diff is valid only with update options", e);
|
"-diff/-rdiff is valid only with -update option", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
options = OptionsParser.parse(new String[] {
|
options = OptionsParser.parse(new String[] {
|
||||||
"-diff", "s1", "s2", "-update", "-delete",
|
optionStr, "s1", "s2", "-update", "-delete",
|
||||||
"hdfs://localhost:9820/source/first",
|
"hdfs://localhost:9820/source/first",
|
||||||
"hdfs://localhost:9820/target/" });
|
"hdfs://localhost:9820/target/" });
|
||||||
assertFalse("-delete should be ignored when -diff is specified",
|
assertFalse("-delete should be ignored when "
|
||||||
|
+ optionStr + " is specified",
|
||||||
options.shouldDeleteMissing());
|
options.shouldDeleteMissing());
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
fail("Got unexpected IllegalArgumentException: " + e.getMessage());
|
fail("Got unexpected IllegalArgumentException: " + e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
OptionsParser.parse(new String[] { "-diff", "s1", "s2",
|
OptionsParser.parse(new String[] {optionStr, "s1", "s2",
|
||||||
"-delete", "-overwrite",
|
"-delete", "-overwrite",
|
||||||
"hdfs://localhost:8020/source/first",
|
"hdfs://localhost:8020/source/first",
|
||||||
"hdfs://localhost:8020/target/" });
|
"hdfs://localhost:8020/target/" });
|
||||||
fail("-diff should fail if -update option is not specified");
|
fail(optionStr + " should fail if -update option is not specified");
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
GenericTestUtils.assertExceptionContains(
|
GenericTestUtils.assertExceptionContains(
|
||||||
"Diff is valid only with update options", e);
|
"-diff/-rdiff is valid only with -update option", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String optionStrOther = isDiff? "-rdiff" : "-diff";
|
||||||
|
try {
|
||||||
|
OptionsParser.parse(new String[] {
|
||||||
|
optionStr, "s1", "s2",
|
||||||
|
optionStrOther, "s2", "s1",
|
||||||
|
"-update",
|
||||||
|
"hdfs://localhost:9820/source/first",
|
||||||
|
"hdfs://localhost:9820/target/" });
|
||||||
|
fail(optionStr + " should fail if " + optionStrOther
|
||||||
|
+ " is also specified");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"-diff and -rdiff are mutually exclusive", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDiffOption() {
|
||||||
|
testSnapshotDiffOption(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRdiffOption() {
|
||||||
|
testSnapshotDiffOption(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue