HDFS-7535. Utilize Snapshot diff report for distcp. Contributed by Jing Zhao.

(cherry picked from commit ed70fa142c)
This commit is contained in:
Jing Zhao 2015-03-04 10:30:53 -08:00
parent ab397194c3
commit b2ccf54c14
12 changed files with 790 additions and 14 deletions

View File

@ -399,6 +399,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7789. DFSck should resolve the path to support cross-FS symlinks. HDFS-7789. DFSck should resolve the path to support cross-FS symlinks.
(gera) (gera)
HDFS-7535. Utilize Snapshot diff report for distcp. (jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode. HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -224,7 +224,9 @@ public abstract class CopyListing extends Configured {
Credentials credentials, Credentials credentials,
DistCpOptions options) DistCpOptions options)
throws IOException { throws IOException {
if (options.shouldUseDiff()) {
return new GlobbedCopyListing(configuration, credentials);
}
String copyListingClassName = configuration.get(DistCpConstants. String copyListingClassName = configuration.get(DistCpConstants.
CONF_LABEL_COPY_LISTING_CLASS, ""); CONF_LABEL_COPY_LISTING_CLASS, "");
Class<? extends CopyListing> copyListingClass; Class<? extends CopyListing> copyListingClass;

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
/**
* Information presenting a rename/delete op derived from a snapshot diff entry.
* This includes the source file/dir of the rename/delete op, and the target
* file/dir of a rename op.
*/
class DiffInfo {
static final Comparator<DiffInfo> sourceComparator = new Comparator<DiffInfo>() {
@Override
public int compare(DiffInfo d1, DiffInfo d2) {
return d2.source.compareTo(d1.source);
}
};
static final Comparator<DiffInfo> targetComparator = new Comparator<DiffInfo>() {
@Override
public int compare(DiffInfo d1, DiffInfo d2) {
return d1.target == null ? -1 :
(d2.target == null ? 1 : d1.target.compareTo(d2.target));
}
};
/** The source file/dir of the rename or deletion op */
final Path source;
/**
* The intermediate file/dir for the op. For a rename or a delete op,
* we first rename the source to this tmp file/dir.
*/
private Path tmp;
/** The target file/dir of the rename op. Null means the op is deletion. */
final Path target;
DiffInfo(Path source, Path target) {
assert source != null;
this.source = source;
this.target= target;
}
void setTmp(Path tmp) {
this.tmp = tmp;
}
Path getTmp() {
return tmp;
}
static DiffInfo[] getDiffs(SnapshotDiffReport report, Path targetDir) {
List<DiffInfo> diffs = new ArrayList<>();
for (SnapshotDiffReport.DiffReportEntry entry : report.getDiffList()) {
if (entry.getType() == SnapshotDiffReport.DiffType.DELETE) {
final Path source = new Path(targetDir,
DFSUtil.bytes2String(entry.getSourcePath()));
diffs.add(new DiffInfo(source, null));
} else if (entry.getType() == SnapshotDiffReport.DiffType.RENAME) {
final Path source = new Path(targetDir,
DFSUtil.bytes2String(entry.getSourcePath()));
final Path target = new Path(targetDir,
DFSUtil.bytes2String(entry.getTargetPath()));
diffs.add(new DiffInfo(source, target));
}
}
return diffs.toArray(new DiffInfo[diffs.size()]);
}
}

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.tools; package org.apache.hadoop.tools;
import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -27,10 +30,10 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobSubmissionFiles; import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.tools.CopyListing.*; import org.apache.hadoop.tools.CopyListing.*;
import org.apache.hadoop.tools.mapred.CopyMapper; import org.apache.hadoop.tools.mapred.CopyMapper;
import org.apache.hadoop.tools.mapred.CopyOutputFormat; import org.apache.hadoop.tools.mapred.CopyOutputFormat;
@ -39,9 +42,6 @@ import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Random;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
/** /**
@ -62,7 +62,7 @@ public class DistCp extends Configured implements Tool {
*/ */
static final int SHUTDOWN_HOOK_PRIORITY = 30; static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final Log LOG = LogFactory.getLog(DistCp.class); static final Log LOG = LogFactory.getLog(DistCp.class);
private DistCpOptions inputOptions; private DistCpOptions inputOptions;
private Path metaFolder; private Path metaFolder;
@ -171,9 +171,13 @@ public class DistCp extends Configured implements Tool {
//Don't cleanup while we are setting up. //Don't cleanup while we are setting up.
metaFolder = createMetaFolderPath(); metaFolder = createMetaFolderPath();
jobFS = metaFolder.getFileSystem(getConf()); jobFS = metaFolder.getFileSystem(getConf());
job = createJob(); job = createJob();
} }
if (inputOptions.shouldUseDiff()) {
if (!DistCpSync.sync(inputOptions, getConf())) {
inputOptions.disableUsingDiff();
}
}
createInputFileListing(job); createInputFileListing(job);
job.submit(); job.submit();

View File

@ -53,6 +53,7 @@ public class DistCpConstants {
public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc"; public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite"; public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
public static final String CONF_LABEL_APPEND = "distcp.copy.append"; public static final String CONF_LABEL_APPEND = "distcp.copy.append";
public static final String CONF_LABEL_DIFF = "distcp.copy.diff";
public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb"; public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE = public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =
@ -134,4 +135,6 @@ public class DistCpConstants {
* Value of reserved raw HDFS directory when copying raw.* xattrs. * Value of reserved raw HDFS directory when copying raw.* xattrs.
*/ */
static final String HDFS_RESERVED_RAW_DIRECTORY_NAME = "/.reserved/raw"; static final String HDFS_RESERVED_RAW_DIRECTORY_NAME = "/.reserved/raw";
static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp";
} }

View File

@ -41,8 +41,6 @@ public enum DistCpOptionSwitch {
* target file. Note that when preserving checksum type, block size is also * target file. Note that when preserving checksum type, block size is also
* preserved. * preserved.
* *
* @see PRESERVE_STATUS_DEFAULT
*
* If any of the optional switches are present among rbugpcaxt, then * If any of the optional switches are present among rbugpcaxt, then
* only the corresponding file attribute is preserved. * only the corresponding file attribute is preserved.
*/ */
@ -149,6 +147,11 @@ public enum DistCpOptionSwitch {
new Option("append", false, new Option("append", false,
"Reuse existing data in target files and append new data to them if possible")), "Reuse existing data in target files and append new data to them if possible")),
DIFF(DistCpConstants.CONF_LABEL_DIFF,
new Option("diff", false,
"Use snapshot diff report to identify the difference between source and target"),
2),
/** /**
* Should DisctpExecution be blocking * Should DisctpExecution be blocking
*/ */
@ -178,6 +181,11 @@ public enum DistCpOptionSwitch {
this.option = option; this.option = option;
} }
DistCpOptionSwitch(String confLabel, Option option, int argNum) {
this(confLabel, option);
this.option.setArgs(argNum);
}
/** /**
* Get Configuration label for the option * Get Configuration label for the option
* @return configuration label name * @return configuration label name

View File

@ -42,6 +42,7 @@ public class DistCpOptions {
private boolean append = false; private boolean append = false;
private boolean skipCRC = false; private boolean skipCRC = false;
private boolean blocking = true; private boolean blocking = true;
private boolean useDiff = false;
private int maxMaps = DistCpConstants.DEFAULT_MAPS; private int maxMaps = DistCpConstants.DEFAULT_MAPS;
private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB; private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
@ -61,6 +62,9 @@ public class DistCpOptions {
private Path sourceFileListing; private Path sourceFileListing;
private List<Path> sourcePaths; private List<Path> sourcePaths;
private String fromSnapshot;
private String toSnapshot;
private Path targetPath; private Path targetPath;
// targetPathExist is a derived field, it's initialized in the // targetPathExist is a derived field, it's initialized in the
@ -264,6 +268,29 @@ public class DistCpOptions {
this.append = append; this.append = append;
} }
public boolean shouldUseDiff() {
return this.useDiff;
}
public String getFromSnapshot() {
return this.fromSnapshot;
}
public String getToSnapshot() {
return this.toSnapshot;
}
public void setUseDiff(boolean useDiff, String fromSnapshot, String toSnapshot) {
validate(DistCpOptionSwitch.DIFF, useDiff);
this.useDiff = useDiff;
this.fromSnapshot = fromSnapshot;
this.toSnapshot = toSnapshot;
}
public void disableUsingDiff() {
this.useDiff = false;
}
/** /**
* Should CRC/checksum check be skipped while checking files are identical * Should CRC/checksum check be skipped while checking files are identical
* *
@ -508,6 +535,7 @@ public class DistCpOptions {
boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ? boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ?
value : this.skipCRC); value : this.skipCRC);
boolean append = (option == DistCpOptionSwitch.APPEND ? value : this.append); boolean append = (option == DistCpOptionSwitch.APPEND ? value : this.append);
boolean useDiff = (option == DistCpOptionSwitch.DIFF ? value : this.useDiff);
if (syncFolder && atomicCommit) { if (syncFolder && atomicCommit) {
throw new IllegalArgumentException("Atomic commit can't be used with " + throw new IllegalArgumentException("Atomic commit can't be used with " +
@ -536,6 +564,10 @@ public class DistCpOptions {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Append is disallowed when skipping CRC"); "Append is disallowed when skipping CRC");
} }
if ((!syncFolder || !deleteMissing) && useDiff) {
throw new IllegalArgumentException(
"Diff is valid only with update and delete options");
}
} }
/** /**
@ -556,6 +588,8 @@ public class DistCpOptions {
String.valueOf(overwrite)); String.valueOf(overwrite));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND, DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND,
String.valueOf(append)); String.valueOf(append));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIFF,
String.valueOf(useDiff));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC, DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
String.valueOf(skipCRC)); String.valueOf(skipCRC));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH, DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
/**
* This class provides the basic functionality to sync two FileSystems based on
* the snapshot diff report. More specifically, we have the following settings:
* 1. Both the source and target FileSystem must be DistributedFileSystem
* 2. Two snapshots (e.g., s1 and s2) have been created on the source FS.
* The diff between these two snapshots will be copied to the target FS.
* 3. The target has the same snapshot s1. No changes have been made on the
* target since s1. All the files/directories in the target are the same with
* source.s1
*/
class DistCpSync {
static boolean sync(DistCpOptions inputOptions, Configuration conf)
throws IOException {
List<Path> sourcePaths = inputOptions.getSourcePaths();
if (sourcePaths.size() != 1) {
// we only support one source dir which must be a snapshottable directory
DistCp.LOG.warn(sourcePaths.size() + " source paths are provided");
return false;
}
final Path sourceDir = sourcePaths.get(0);
final Path targetDir = inputOptions.getTargetPath();
final FileSystem sfs = sourceDir.getFileSystem(conf);
final FileSystem tfs = targetDir.getFileSystem(conf);
// currently we require both the source and the target file system are
// DistributedFileSystem.
if (!(sfs instanceof DistributedFileSystem) ||
!(tfs instanceof DistributedFileSystem)) {
DistCp.LOG.warn("To use diff-based distcp, the FileSystems needs to" +
" be DistributedFileSystem");
return false;
}
final DistributedFileSystem sourceFs = (DistributedFileSystem) sfs;
final DistributedFileSystem targetFs= (DistributedFileSystem) tfs;
// make sure targetFS has no change between from and the current states
if (!checkNoChange(inputOptions, targetFs, targetDir)) {
return false;
}
Path tmpDir = null;
try {
tmpDir = createTargetTmpDir(targetFs, targetDir);
DiffInfo[] diffs = getDiffs(inputOptions, sourceFs, sourceDir, targetDir);
if (diffs == null) {
return false;
}
// do the real sync work: deletion and rename
syncDiff(diffs, targetFs, tmpDir);
return true;
} catch (Exception e) {
DistCp.LOG.warn("Failed to use snapshot diff for distcp", e);
return false;
} finally {
deleteTargetTmpDir(targetFs, tmpDir);
// TODO: since we have tmp directory, we can support "undo" with failures
}
}
private static Path createTargetTmpDir(DistributedFileSystem targetFs,
Path targetDir) throws IOException {
final Path tmp = new Path(targetDir,
DistCpConstants.HDFS_DISTCP_DIFF_DIRECTORY_NAME + DistCp.rand.nextInt());
if (!targetFs.mkdirs(tmp)) {
throw new IOException("The tmp directory " + tmp + " already exists");
}
return tmp;
}
private static void deleteTargetTmpDir(DistributedFileSystem targetFs,
Path tmpDir) {
try {
if (tmpDir != null) {
targetFs.delete(tmpDir, true);
}
} catch (IOException e) {
DistCp.LOG.error("Unable to cleanup tmp dir: " + tmpDir, e);
}
}
/**
* Compute the snapshot diff on the given file system. Return true if the diff
* is empty, i.e., no changes have happened in the FS.
*/
private static boolean checkNoChange(DistCpOptions inputOptions,
DistributedFileSystem fs, Path path) {
try {
SnapshotDiffReport targetDiff =
fs.getSnapshotDiffReport(path, inputOptions.getFromSnapshot(), "");
if (!targetDiff.getDiffList().isEmpty()) {
DistCp.LOG.warn("The target has been modified since snapshot "
+ inputOptions.getFromSnapshot());
return false;
} else {
return true;
}
} catch (IOException e) {
DistCp.LOG.warn("Failed to compute snapshot diff on " + path, e);
}
return false;
}
@VisibleForTesting
static DiffInfo[] getDiffs(DistCpOptions inputOptions,
DistributedFileSystem fs, Path sourceDir, Path targetDir) {
try {
SnapshotDiffReport sourceDiff = fs.getSnapshotDiffReport(sourceDir,
inputOptions.getFromSnapshot(), inputOptions.getToSnapshot());
return DiffInfo.getDiffs(sourceDiff, targetDir);
} catch (IOException e) {
DistCp.LOG.warn("Failed to compute snapshot diff on " + sourceDir, e);
}
return null;
}
private static void syncDiff(DiffInfo[] diffs,
DistributedFileSystem targetFs, Path tmpDir) throws IOException {
moveToTmpDir(diffs, targetFs, tmpDir);
moveToTarget(diffs, targetFs);
}
/**
* Move all the source files that should be renamed or deleted to the tmp
* directory.
*/
private static void moveToTmpDir(DiffInfo[] diffs,
DistributedFileSystem targetFs, Path tmpDir) throws IOException {
// sort the diffs based on their source paths to make sure the files and
// subdirs are moved before moving their parents/ancestors.
Arrays.sort(diffs, DiffInfo.sourceComparator);
Random random = new Random();
for (DiffInfo diff : diffs) {
Path tmpTarget = new Path(tmpDir, diff.source.getName());
while (targetFs.exists(tmpTarget)) {
tmpTarget = new Path(tmpDir, diff.source.getName() + random.nextInt());
}
diff.setTmp(tmpTarget);
targetFs.rename(diff.source, tmpTarget);
}
}
/**
* Finish the rename operations: move all the intermediate files/directories
* from the tmp dir to the final targets.
*/
private static void moveToTarget(DiffInfo[] diffs,
DistributedFileSystem targetFs) throws IOException {
// sort the diffs based on their target paths to make sure the parent
// directories are created first.
Arrays.sort(diffs, DiffInfo.targetComparator);
for (DiffInfo diff : diffs) {
if (diff.target != null) {
if (!targetFs.exists(diff.target.getParent())) {
targetFs.mkdirs(diff.target.getParent());
}
targetFs.rename(diff.getTmp(), diff.target);
}
}
}
}

View File

@ -18,13 +18,22 @@
package org.apache.hadoop.tools; package org.apache.hadoop.tools;
import org.apache.commons.cli.*; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import java.util.*; import com.google.common.base.Preconditions;
/** /**
* The OptionsParser parses out the command-line options passed to DistCp, * The OptionsParser parses out the command-line options passed to DistCp,
@ -207,6 +216,13 @@ public class OptionsParser {
} }
} }
if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
String[] snapshots = getVals(command, DistCpOptionSwitch.DIFF.getSwitch());
Preconditions.checkArgument(snapshots != null && snapshots.length == 2,
"Must provide both the starting and ending snapshot names");
option.setUseDiff(true, snapshots[0], snapshots[1]);
}
if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) { if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
String fileLimitString = getVal(command, String fileLimitString = getVal(command,
DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim()); DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
@ -247,6 +263,10 @@ public class OptionsParser {
} }
} }
private static String[] getVals(CommandLine command, String option) {
return command.getOptionValues(option);
}
public static void usage() { public static void usage() {
HelpFormatter formatter = new HelpFormatter(); HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("distcp OPTIONS [source_path...] <target_path>\n\nOPTIONS", cliOptions); formatter.printHelp("distcp OPTIONS [source_path...] <target_path>\n\nOPTIONS", cliOptions);

View File

@ -90,7 +90,8 @@ public class CopyCommitter extends FileOutputCommitter {
} }
try { try {
if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) { if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)
&& !(conf.getBoolean(DistCpConstants.CONF_LABEL_DIFF, false))) {
deleteMissing(conf); deleteMissing(conf);
} else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) { } else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) {
commitData(conf); commitData(conf);

View File

@ -0,0 +1,349 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.tools.mapred.CopyMapper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class TestDistCpSync {
private MiniDFSCluster cluster;
private final Configuration conf = new HdfsConfiguration();
private DistributedFileSystem dfs;
private DistCpOptions options;
private final Path source = new Path("/source");
private final Path target = new Path("/target");
private final long BLOCK_SIZE = 1024;
private final short DATA_NUM = 1;
@Before
public void setUp() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATA_NUM).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
dfs.mkdirs(source);
dfs.mkdirs(target);
options = new DistCpOptions(Arrays.asList(source), target);
options.setSyncFolder(true);
options.setDeleteMissing(true);
options.setUseDiff(true, "s1", "s2");
options.appendToConf(conf);
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString());
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, target.toString());
}
@After
public void tearDown() throws Exception {
IOUtils.cleanup(null, dfs);
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Test the sync returns false in the following scenarios:
* 1. the source/target dir are not snapshottable dir
* 2. the source/target does not have the given snapshots
* 3. changes have been made in target
*/
@Test
public void testFallback() throws Exception {
// the source/target dir are not snapshottable dir
Assert.assertFalse(DistCpSync.sync(options, conf));
// the source/target does not have the given snapshots
dfs.allowSnapshot(source);
dfs.allowSnapshot(target);
Assert.assertFalse(DistCpSync.sync(options, conf));
dfs.createSnapshot(source, "s1");
dfs.createSnapshot(source, "s2");
dfs.createSnapshot(target, "s1");
Assert.assertTrue(DistCpSync.sync(options, conf));
// changes have been made in target
final Path subTarget = new Path(target, "sub");
dfs.mkdirs(subTarget);
Assert.assertFalse(DistCpSync.sync(options, conf));
dfs.delete(subTarget, true);
Assert.assertTrue(DistCpSync.sync(options, conf));
}
/**
* create some files and directories under the given directory.
* the final subtree looks like this:
* dir/
* foo/ bar/
* d1/ f1 d2/ f2
* f3 f4
*/
private void initData(Path dir) throws Exception {
final Path foo = new Path(dir, "foo");
final Path bar = new Path(dir, "bar");
final Path d1 = new Path(foo, "d1");
final Path f1 = new Path(foo, "f1");
final Path d2 = new Path(bar, "d2");
final Path f2 = new Path(bar, "f2");
final Path f3 = new Path(d1, "f3");
final Path f4 = new Path(d2, "f4");
DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0);
DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 0);
DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE, DATA_NUM, 0);
DFSTestUtil.createFile(dfs, f4, BLOCK_SIZE, DATA_NUM, 0);
}
/**
* make some changes under the given directory (created in the above way).
* 1. rename dir/foo/d1 to dir/bar/d1
* 2. delete dir/bar/d1/f3
* 3. rename dir/foo to /dir/bar/d1/foo
* 4. delete dir/bar/d1/foo/f1
* 5. create file dir/bar/d1/foo/f1 whose size is 2*BLOCK_SIZE
* 6. append one BLOCK to file dir/bar/f2
* 7. rename dir/bar to dir/foo
*
* Thus after all these ops the subtree looks like this:
* dir/
* foo/
* d1/ f2(A) d2/
* foo/ f4
* f1(new)
*/
private void changeData(Path dir) throws Exception {
final Path foo = new Path(dir, "foo");
final Path bar = new Path(dir, "bar");
final Path d1 = new Path(foo, "d1");
final Path f2 = new Path(bar, "f2");
final Path bar_d1 = new Path(bar, "d1");
dfs.rename(d1, bar_d1);
final Path f3 = new Path(bar_d1, "f3");
dfs.delete(f3, true);
final Path newfoo = new Path(bar_d1, "foo");
dfs.rename(foo, newfoo);
final Path f1 = new Path(newfoo, "f1");
dfs.delete(f1, true);
DFSTestUtil.createFile(dfs, f1, 2 * BLOCK_SIZE, DATA_NUM, 0);
DFSTestUtil.appendFile(dfs, f2, (int) BLOCK_SIZE);
dfs.rename(bar, new Path(dir, "foo"));
}
/**
* Test the basic functionality.
*/
@Test
public void testSync() throws Exception {
initData(source);
initData(target);
dfs.allowSnapshot(source);
dfs.allowSnapshot(target);
dfs.createSnapshot(source, "s1");
dfs.createSnapshot(target, "s1");
// make changes under source
changeData(source);
dfs.createSnapshot(source, "s2");
// do the sync
Assert.assertTrue(DistCpSync.sync(options, conf));
// build copy listing
final Path listingPath = new Path("/tmp/META/fileList.seq");
CopyListing listing = new GlobbedCopyListing(conf, new Credentials());
listing.buildListing(listingPath, options);
Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(conf, null, 0);
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
stubContext.getContext();
// Enable append
context.getConfiguration().setBoolean(
DistCpOptionSwitch.APPEND.getConfigLabel(), true);
copyMapper.setup(context);
for (Map.Entry<Text, CopyListingFileStatus> entry : copyListing.entrySet()) {
copyMapper.map(entry.getKey(), entry.getValue(), context);
}
// verify that we only copied new appended data of f2 and the new file f1
Assert.assertEquals(BLOCK_SIZE * 3, stubContext.getReporter()
.getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
// verify the source and target now has the same structure
verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
}
private Map<Text, CopyListingFileStatus> getListing(Path listingPath)
throws Exception {
SequenceFile.Reader reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(listingPath));
Text key = new Text();
CopyListingFileStatus value = new CopyListingFileStatus();
Map<Text, CopyListingFileStatus> values = new HashMap<>();
while (reader.next(key, value)) {
values.put(key, value);
key = new Text();
value = new CopyListingFileStatus();
}
return values;
}
private void verifyCopy(FileStatus s, FileStatus t, boolean compareName)
throws Exception {
Assert.assertEquals(s.isDirectory(), t.isDirectory());
if (compareName) {
Assert.assertEquals(s.getPath().getName(), t.getPath().getName());
}
if (!s.isDirectory()) {
// verify the file content is the same
byte[] sbytes = DFSTestUtil.readFileBuffer(dfs, s.getPath());
byte[] tbytes = DFSTestUtil.readFileBuffer(dfs, t.getPath());
Assert.assertArrayEquals(sbytes, tbytes);
} else {
FileStatus[] slist = dfs.listStatus(s.getPath());
FileStatus[] tlist = dfs.listStatus(t.getPath());
Assert.assertEquals(slist.length, tlist.length);
for (int i = 0; i < slist.length; i++) {
verifyCopy(slist[i], tlist[i], true);
}
}
}
private void initData2(Path dir) throws Exception {
final Path test = new Path(dir, "test");
final Path foo = new Path(dir, "foo");
final Path bar = new Path(dir, "bar");
final Path f1 = new Path(test, "f1");
final Path f2 = new Path(foo, "f2");
final Path f3 = new Path(bar, "f3");
DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L);
DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 1L);
DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE, DATA_NUM, 2L);
}
private void changeData2(Path dir) throws Exception {
final Path tmpFoo = new Path(dir, "tmpFoo");
final Path test = new Path(dir, "test");
final Path foo = new Path(dir, "foo");
final Path bar = new Path(dir, "bar");
dfs.rename(test, tmpFoo);
dfs.rename(foo, test);
dfs.rename(bar, foo);
dfs.rename(tmpFoo, bar);
}
@Test
public void testSync2() throws Exception {
initData2(source);
initData2(target);
dfs.allowSnapshot(source);
dfs.allowSnapshot(target);
dfs.createSnapshot(source, "s1");
dfs.createSnapshot(target, "s1");
// make changes under source
changeData2(source);
dfs.createSnapshot(source, "s2");
SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
System.out.println(report);
// do the sync
Assert.assertTrue(DistCpSync.sync(options, conf));
verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
}
private void initData3(Path dir) throws Exception {
final Path test = new Path(dir, "test");
final Path foo = new Path(dir, "foo");
final Path bar = new Path(dir, "bar");
final Path f1 = new Path(test, "file");
final Path f2 = new Path(foo, "file");
final Path f3 = new Path(bar, "file");
DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L);
DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE * 2, DATA_NUM, 1L);
DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE * 3, DATA_NUM, 2L);
}
private void changeData3(Path dir) throws Exception {
final Path test = new Path(dir, "test");
final Path foo = new Path(dir, "foo");
final Path bar = new Path(dir, "bar");
final Path f1 = new Path(test, "file");
final Path f2 = new Path(foo, "file");
final Path f3 = new Path(bar, "file");
final Path newf1 = new Path(test, "newfile");
final Path newf2 = new Path(foo, "newfile");
final Path newf3 = new Path(bar, "newfile");
dfs.rename(f1, newf1);
dfs.rename(f2, newf2);
dfs.rename(f3, newf3);
}
/**
* Test a case where there are multiple source files with the same name
*/
@Test
public void testSync3() throws Exception {
initData3(source);
initData3(target);
dfs.allowSnapshot(source);
dfs.allowSnapshot(target);
dfs.createSnapshot(source, "s1");
dfs.createSnapshot(target, "s1");
// make changes under source
changeData3(source);
dfs.createSnapshot(source, "s2");
SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
System.out.println(report);
// do the sync
Assert.assertTrue(DistCpSync.sync(options, conf));
verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
}
}

View File

@ -584,7 +584,7 @@ public class TestOptionsParser {
// make sure -append is only valid when -update is specified // make sure -append is only valid when -update is specified
try { try {
options = OptionsParser.parse(new String[] { "-append", OptionsParser.parse(new String[] { "-append",
"hdfs://localhost:8020/source/first", "hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/" }); "hdfs://localhost:8020/target/" });
fail("Append should fail if update option is not specified"); fail("Append should fail if update option is not specified");
@ -595,7 +595,7 @@ public class TestOptionsParser {
// make sure -append is invalid when skipCrc is specified // make sure -append is invalid when skipCrc is specified
try { try {
options = OptionsParser.parse(new String[] { OptionsParser.parse(new String[] {
"-append", "-update", "-skipcrccheck", "-append", "-update", "-skipcrccheck",
"hdfs://localhost:8020/source/first", "hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/" }); "hdfs://localhost:8020/target/" });
@ -605,4 +605,75 @@ public class TestOptionsParser {
"Append is disallowed when skipping CRC", e); "Append is disallowed when skipping CRC", e);
} }
} }
@Test
public void testDiffOption() {
Configuration conf = new Configuration();
Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(),
false));
DistCpOptions options = OptionsParser.parse(new String[] { "-update",
"-delete", "-diff", "s1", "s2",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/" });
options.appendToConf(conf);
Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(), false));
Assert.assertTrue(options.shouldUseDiff());
Assert.assertEquals("s1", options.getFromSnapshot());
Assert.assertEquals("s2", options.getToSnapshot());
options = OptionsParser.parse(new String[] {
"-delete", "-diff", "s1", ".", "-update",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/" });
options.appendToConf(conf);
Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(),
false));
Assert.assertTrue(options.shouldUseDiff());
Assert.assertEquals("s1", options.getFromSnapshot());
Assert.assertEquals(".", options.getToSnapshot());
// -diff requires two option values
try {
OptionsParser.parse(new String[] {"-diff", "s1", "-delete", "-update",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/" });
fail("-diff should fail with only one snapshot name");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(
"Must provide both the starting and ending snapshot names", e);
}
// make sure -diff is only valid when -update and -delete is specified
try {
OptionsParser.parse(new String[] { "-diff", "s1", "s2",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/" });
fail("-diff should fail if -update or -delete option is not specified");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(
"Diff is valid only with update and delete options", e);
}
try {
OptionsParser.parse(new String[] { "-diff", "s1", "s2", "-update",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/" });
fail("-diff should fail if -update or -delete option is not specified");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(
"Diff is valid only with update and delete options", e);
}
try {
OptionsParser.parse(new String[] { "-diff", "s1", "s2",
"-delete", "-overwrite",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/" });
fail("-diff should fail if -update or -delete option is not specified");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(
"Diff is valid only with update and delete options", e);
}
}
} }