HADOOP-14267. Make DistCpOptions immutable. Contributed by Mingliang Liu

This commit is contained in:
Mingliang Liu 2016-06-23 00:21:49 -07:00
parent 73835c73e2
commit 26172a94d6
25 changed files with 1578 additions and 1073 deletions

View File

@ -77,41 +77,41 @@ public abstract class CopyListing extends Configured {
* TARGET IS DIR : Key-"/file1", Value-FileStatus(/tmp/file1)
*
* @param pathToListFile - Output file where the listing would be stored
* @param options - Input options to distcp
* @param distCpContext - distcp context associated with input options
* @throws IOException - Exception if any
*/
public final void buildListing(Path pathToListFile,
DistCpOptions options) throws IOException {
validatePaths(options);
doBuildListing(pathToListFile, options);
DistCpContext distCpContext) throws IOException {
validatePaths(distCpContext);
doBuildListing(pathToListFile, distCpContext);
Configuration config = getConf();
config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, pathToListFile.toString());
config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, getBytesToCopy());
config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths());
validateFinalListing(pathToListFile, options);
validateFinalListing(pathToListFile, distCpContext);
LOG.info("Number of paths in the copy list: " + this.getNumberOfPaths());
}
/**
* Validate input and output paths
*
* @param options - Input options
* @param distCpContext - Distcp context
* @throws InvalidInputException If inputs are invalid
* @throws IOException any Exception with FS
*/
protected abstract void validatePaths(DistCpOptions options)
protected abstract void validatePaths(DistCpContext distCpContext)
throws IOException, InvalidInputException;
/**
* The interface to be implemented by sub-classes, to create the source/target file listing.
* @param pathToListFile Path on HDFS where the listing file is written.
* @param options Input Options for DistCp (indicating source/target paths.)
* @param distCpContext - Distcp context
* @throws IOException Thrown on failure to create the listing file.
*/
protected abstract void doBuildListing(Path pathToListFile,
DistCpOptions options) throws IOException;
DistCpContext distCpContext) throws IOException;
/**
* Return the total bytes that distCp should copy for the source paths
@ -135,17 +135,17 @@ public abstract class CopyListing extends Configured {
* If preserving XAttrs, checks that file system can support XAttrs.
*
* @param pathToListFile - path listing build by doBuildListing
* @param options - Input options to distcp
* @param context - Distcp context with associated input options
* @throws IOException - Any issues while checking for duplicates and throws
* @throws DuplicateFileException - if there are duplicates
*/
private void validateFinalListing(Path pathToListFile, DistCpOptions options)
private void validateFinalListing(Path pathToListFile, DistCpContext context)
throws DuplicateFileException, IOException {
Configuration config = getConf();
FileSystem fs = pathToListFile.getFileSystem(config);
final boolean splitLargeFile = options.splitLargeFile();
final boolean splitLargeFile = context.splitLargeFile();
// When splitLargeFile is enabled, we don't randomize the copylist
// earlier, so we don't do the sorting here. For a file that has
@ -188,7 +188,7 @@ public abstract class CopyListing extends Configured {
}
}
reader.getCurrentValue(lastFileStatus);
if (options.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
if (context.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
FileSystem lastFs = lastFileStatus.getPath().getFileSystem(config);
URI lastFsUri = lastFs.getUri();
if (!aclSupportCheckFsSet.contains(lastFsUri)) {
@ -196,7 +196,7 @@ public abstract class CopyListing extends Configured {
aclSupportCheckFsSet.add(lastFsUri);
}
}
if (options.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) {
if (context.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) {
FileSystem lastFs = lastFileStatus.getPath().getFileSystem(config);
URI lastFsUri = lastFs.getUri();
if (!xAttrSupportCheckFsSet.contains(lastFsUri)) {
@ -210,7 +210,7 @@ public abstract class CopyListing extends Configured {
lastChunkOffset = lastFileStatus.getChunkOffset();
lastChunkLength = lastFileStatus.getChunkLength();
}
if (options.shouldUseDiff() && LOG.isDebugEnabled()) {
if (context.shouldUseDiff() && LOG.isDebugEnabled()) {
LOG.debug("Copy list entry " + idx + ": " +
lastFileStatus.getPath().toUri().getPath());
idx++;
@ -253,14 +253,12 @@ public abstract class CopyListing extends Configured {
* Public Factory method with which the appropriate CopyListing implementation may be retrieved.
* @param configuration The input configuration.
* @param credentials Credentials object on which the FS delegation tokens are cached
* @param options The input Options, to help choose the appropriate CopyListing Implementation.
* @param context Distcp context with associated input options
* @return An instance of the appropriate CopyListing implementation.
* @throws java.io.IOException - Exception if any
*/
public static CopyListing getCopyListing(Configuration configuration,
Credentials credentials,
DistCpOptions options)
throws IOException {
Credentials credentials, DistCpContext context) throws IOException {
String copyListingClassName = configuration.get(DistCpConstants.
CONF_LABEL_COPY_LISTING_CLASS, "");
Class<? extends CopyListing> copyListingClass;
@ -270,7 +268,7 @@ public abstract class CopyListing extends Configured {
CONF_LABEL_COPY_LISTING_CLASS, GlobbedCopyListing.class,
CopyListing.class);
} else {
if (options.getSourceFileListing() == null) {
if (context.getSourceFileListing() == null) {
copyListingClass = GlobbedCopyListing.class;
} else {
copyListingClass = FileBasedCopyListing.class;

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.tools;
import java.io.IOException;
import java.util.Random;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -35,7 +36,6 @@ import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.CopyListing.*;
import org.apache.hadoop.tools.mapred.CopyMapper;
import org.apache.hadoop.tools.mapred.CopyOutputFormat;
@ -66,7 +66,9 @@ public class DistCp extends Configured implements Tool {
static final Log LOG = LogFactory.getLog(DistCp.class);
private DistCpOptions inputOptions;
@VisibleForTesting
DistCpContext context;
private Path metaFolder;
private static final String PREFIX = "_distcp";
@ -79,15 +81,14 @@ public class DistCp extends Configured implements Tool {
private FileSystem jobFS;
private void prepareFileListing(Job job) throws Exception {
if (inputOptions.shouldUseSnapshotDiff()) {
if (context.shouldUseSnapshotDiff()) {
// When "-diff" or "-rdiff" is passed, do sync() first, then
// create copyListing based on snapshot diff.
DistCpSync distCpSync = new DistCpSync(inputOptions, getConf());
DistCpSync distCpSync = new DistCpSync(context, getConf());
if (distCpSync.sync()) {
createInputFileListingWithDiff(job, distCpSync);
} else {
throw new Exception("DistCp sync failed, input options: "
+ inputOptions);
throw new Exception("DistCp sync failed, input options: " + context);
}
} else {
// When no "-diff" or "-rdiff" is passed, create copyListing
@ -99,16 +100,19 @@ public class DistCp extends Configured implements Tool {
/**
* Public Constructor. Creates DistCp object with specified input-parameters.
* (E.g. source-paths, target-location, etc.)
* @param inputOptions Options (indicating source-paths, target-location.)
* @param configuration The Hadoop configuration against which the Copy-mapper must run.
* @param configuration configuration against which the Copy-mapper must run
* @param inputOptions Immutable options
* @throws Exception
*/
public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
public DistCp(Configuration configuration, DistCpOptions inputOptions)
throws Exception {
Configuration config = new Configuration(configuration);
config.addResource(DISTCP_DEFAULT_XML);
config.addResource(DISTCP_SITE_XML);
setConf(config);
this.inputOptions = inputOptions;
if (inputOptions != null) {
this.context = new DistCpContext(inputOptions);
}
this.metaFolder = createMetaFolderPath();
}
@ -134,10 +138,10 @@ public class DistCp extends Configured implements Tool {
}
try {
inputOptions = (OptionsParser.parse(argv));
setOptionsForSplitLargeFile();
context = new DistCpContext(OptionsParser.parse(argv));
checkSplitLargeFile();
setTargetPathExists();
LOG.info("Input Options: " + inputOptions);
LOG.info("Input Options: " + context);
} catch (Throwable e) {
LOG.error("Invalid arguments: ", e);
System.err.println("Invalid arguments: " + e.getMessage());
@ -173,9 +177,11 @@ public class DistCp extends Configured implements Tool {
* @throws Exception
*/
public Job execute() throws Exception {
Preconditions.checkState(context != null,
"The DistCpContext should have been created before running DistCp!");
Job job = createAndSubmitJob();
if (inputOptions.shouldBlock()) {
if (context.shouldBlock()) {
waitForJobCompletion(job);
}
return job;
@ -186,7 +192,7 @@ public class DistCp extends Configured implements Tool {
* @return The mapreduce job object that has been submitted
*/
public Job createAndSubmitJob() throws Exception {
assert inputOptions != null;
assert context != null;
assert getConf() != null;
Job job = null;
try {
@ -230,53 +236,36 @@ public class DistCp extends Configured implements Tool {
* for the benefit of CopyCommitter
*/
private void setTargetPathExists() throws IOException {
Path target = inputOptions.getTargetPath();
Path target = context.getTargetPath();
FileSystem targetFS = target.getFileSystem(getConf());
boolean targetExists = targetFS.exists(target);
inputOptions.setTargetPathExists(targetExists);
context.setTargetPathExists(targetExists);
getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS,
targetExists);
}
/**
* Check if concat is supported by fs.
* Throws UnsupportedOperationException if not.
* Check splitting large files is supported and populate configs.
*/
private void checkConcatSupport(FileSystem fs) {
private void checkSplitLargeFile() throws IOException {
if (!context.splitLargeFile()) {
return;
}
final Path target = context.getTargetPath();
final FileSystem targetFS = target.getFileSystem(getConf());
try {
Path[] src = null;
Path tgt = null;
fs.concat(tgt, src);
targetFS.concat(tgt, src);
} catch (UnsupportedOperationException use) {
throw new UnsupportedOperationException(
DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() +
" is not supported since the target file system doesn't" +
" support concat.", use);
" is not supported since the target file system doesn't" +
" support concat.", use);
} catch (Exception e) {
// Ignore other exception
}
}
/**
* Set up needed options for splitting large files.
*/
private void setOptionsForSplitLargeFile() throws IOException {
if (!inputOptions.splitLargeFile()) {
return;
}
Path target = inputOptions.getTargetPath();
FileSystem targetFS = target.getFileSystem(getConf());
checkConcatSupport(targetFS);
LOG.info("Enabling preserving blocksize since "
+ DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " is passed.");
inputOptions.preserve(FileAttribute.BLOCKSIZE);
LOG.info("Set " +
DistCpOptionSwitch.APPEND.getSwitch()
+ " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()
+ " is passed.");
inputOptions.setAppend(false);
LOG.info("Set " +
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES
@ -286,7 +275,6 @@ public class DistCp extends Configured implements Tool {
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
}
/**
* Create Job object for submitting it, with all the configuration
*
@ -300,7 +288,7 @@ public class DistCp extends Configured implements Tool {
jobName += ": " + userChosenName;
Job job = Job.getInstance(getConf());
job.setJobName(jobName);
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), context));
job.setJarByClass(CopyMapper.class);
configureOutputFormat(job);
@ -311,9 +299,9 @@ public class DistCp extends Configured implements Tool {
job.setOutputFormatClass(CopyOutputFormat.class);
job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
job.getConfiguration().set(JobContext.NUM_MAPS,
String.valueOf(inputOptions.getMaxMaps()));
String.valueOf(context.getMaxMaps()));
inputOptions.appendToConf(job.getConfiguration());
context.appendToConf(job.getConfiguration());
return job;
}
@ -325,18 +313,20 @@ public class DistCp extends Configured implements Tool {
*/
private void configureOutputFormat(Job job) throws IOException {
final Configuration configuration = job.getConfiguration();
Path targetPath = inputOptions.getTargetPath();
Path targetPath = context.getTargetPath();
FileSystem targetFS = targetPath.getFileSystem(configuration);
targetPath = targetPath.makeQualified(targetFS.getUri(),
targetFS.getWorkingDirectory());
if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
if (context.shouldPreserve(
DistCpOptions.FileAttribute.ACL)) {
DistCpUtils.checkFileSystemAclSupport(targetFS);
}
if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) {
if (context.shouldPreserve(
DistCpOptions.FileAttribute.XATTR)) {
DistCpUtils.checkFileSystemXAttrSupport(targetFS);
}
if (inputOptions.shouldAtomicCommit()) {
Path workDir = inputOptions.getAtomicWorkPath();
if (context.shouldAtomicCommit()) {
Path workDir = context.getAtomicWorkPath();
if (workDir == null) {
workDir = targetPath.getParent();
}
@ -353,7 +343,7 @@ public class DistCp extends Configured implements Tool {
}
CopyOutputFormat.setCommitDirectory(job, targetPath);
Path logPath = inputOptions.getLogPath();
Path logPath = context.getLogPath();
if (logPath == null) {
logPath = new Path(metaFolder, "_logs");
} else {
@ -374,8 +364,8 @@ public class DistCp extends Configured implements Tool {
protected Path createInputFileListing(Job job) throws IOException {
Path fileListingPath = getFileListingPath();
CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),
job.getCredentials(), inputOptions);
copyListing.buildListing(fileListingPath, inputOptions);
job.getCredentials(), context);
copyListing.buildListing(fileListingPath, context);
return fileListingPath;
}
@ -391,7 +381,7 @@ public class DistCp extends Configured implements Tool {
Path fileListingPath = getFileListingPath();
CopyListing copyListing = new SimpleCopyListing(job.getConfiguration(),
job.getCredentials(), distCpSync);
copyListing.buildListing(fileListingPath, inputOptions);
copyListing.buildListing(fileListingPath, context);
return fileListingPath;
}

View File

@ -0,0 +1,198 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import java.util.List;
import java.util.Set;
/**
* This is the context of the distcp at runtime.
*
* It has the immutable {@link DistCpOptions} and mutable runtime status.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DistCpContext {
private final DistCpOptions options;
/** The source paths can be set at runtime via snapshots. */
private List<Path> sourcePaths;
/** This is a derived field, it's initialized in the beginning of distcp. */
private boolean targetPathExists = true;
/** Indicate that raw.* xattrs should be preserved if true. */
private boolean preserveRawXattrs = false;
public DistCpContext(DistCpOptions options) {
this.options = options;
this.sourcePaths = options.getSourcePaths();
}
public void setSourcePaths(List<Path> sourcePaths) {
this.sourcePaths = sourcePaths;
}
/**
* @return the sourcePaths. Please note this method does not directly delegate
* to the {@link #options}.
*/
public List<Path> getSourcePaths() {
return sourcePaths;
}
public Path getSourceFileListing() {
return options.getSourceFileListing();
}
public Path getTargetPath() {
return options.getTargetPath();
}
public boolean shouldAtomicCommit() {
return options.shouldAtomicCommit();
}
public boolean shouldSyncFolder() {
return options.shouldSyncFolder();
}
public boolean shouldDeleteMissing() {
return options.shouldDeleteMissing();
}
public boolean shouldIgnoreFailures() {
return options.shouldIgnoreFailures();
}
public boolean shouldOverwrite() {
return options.shouldOverwrite();
}
public boolean shouldAppend() {
return options.shouldAppend();
}
public boolean shouldSkipCRC() {
return options.shouldSkipCRC();
}
public boolean shouldBlock() {
return options.shouldBlock();
}
public boolean shouldUseDiff() {
return options.shouldUseDiff();
}
public boolean shouldUseRdiff() {
return options.shouldUseRdiff();
}
public boolean shouldUseSnapshotDiff() {
return options.shouldUseSnapshotDiff();
}
public String getFromSnapshot() {
return options.getFromSnapshot();
}
public String getToSnapshot() {
return options.getToSnapshot();
}
public final String getFiltersFile() {
return options.getFiltersFile();
}
public int getNumListstatusThreads() {
return options.getNumListstatusThreads();
}
public int getMaxMaps() {
return options.getMaxMaps();
}
public float getMapBandwidth() {
return options.getMapBandwidth();
}
public Set<FileAttribute> getPreserveAttributes() {
return options.getPreserveAttributes();
}
public boolean shouldPreserve(FileAttribute attribute) {
return options.shouldPreserve(attribute);
}
public boolean shouldPreserveRawXattrs() {
return preserveRawXattrs;
}
public void setPreserveRawXattrs(boolean preserveRawXattrs) {
this.preserveRawXattrs = preserveRawXattrs;
}
public Path getAtomicWorkPath() {
return options.getAtomicWorkPath();
}
public Path getLogPath() {
return options.getLogPath();
}
public String getCopyStrategy() {
return options.getCopyStrategy();
}
public int getBlocksPerChunk() {
return options.getBlocksPerChunk();
}
public final boolean splitLargeFile() {
return options.getBlocksPerChunk() > 0;
}
public void setTargetPathExists(boolean targetPathExists) {
this.targetPathExists = targetPathExists;
}
public boolean isTargetPathExists() {
return targetPathExists;
}
public void appendToConf(Configuration conf) {
options.appendToConf(conf);
}
@Override
public String toString() {
return options.toString() +
", sourcePaths=" + sourcePaths +
", targetPathExists=" + targetPathExists +
", preserveRawXattrs" + preserveRawXattrs;
}
}

View File

@ -81,7 +81,7 @@ public enum DistCpOptionSwitch {
NUM_LISTSTATUS_THREADS(DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
new Option("numListstatusThreads", true, "Number of threads to " +
"use for building file listing (max " +
DistCpOptions.maxNumListstatusThreads + ").")),
DistCpOptions.MAX_NUM_LISTSTATUS_THREADS + ").")),
/**
* Max number of maps to use during copy. DistCp will split work
* as equally as possible among these maps

View File

@ -48,7 +48,7 @@ import java.util.HashSet;
* source.s1
*/
class DistCpSync {
private DistCpOptions inputOptions;
private DistCpContext context;
private Configuration conf;
// diffMap maps snapshot diff op type to a list of diff ops.
// It's initially created based on the snapshot diff. Then the individual
@ -58,13 +58,13 @@ class DistCpSync {
private EnumMap<SnapshotDiffReport.DiffType, List<DiffInfo>> diffMap;
private DiffInfo[] renameDiffs;
DistCpSync(DistCpOptions options, Configuration conf) {
this.inputOptions = options;
DistCpSync(DistCpContext context, Configuration conf) {
this.context = context;
this.conf = conf;
}
private boolean isRdiff() {
return inputOptions.shouldUseRdiff();
return context.shouldUseRdiff();
}
/**
@ -77,14 +77,14 @@ class DistCpSync {
* default distcp if the third condition isn't met.
*/
private boolean preSyncCheck() throws IOException {
List<Path> sourcePaths = inputOptions.getSourcePaths();
List<Path> sourcePaths = context.getSourcePaths();
if (sourcePaths.size() != 1) {
// we only support one source dir which must be a snapshottable directory
throw new IllegalArgumentException(sourcePaths.size()
+ " source paths are provided");
}
final Path sourceDir = sourcePaths.get(0);
final Path targetDir = inputOptions.getTargetPath();
final Path targetDir = context.getTargetPath();
final FileSystem srcFs = sourceDir.getFileSystem(conf);
final FileSystem tgtFs = targetDir.getFileSystem(conf);
@ -104,13 +104,15 @@ class DistCpSync {
// make sure targetFS has no change between from and the current states
if (!checkNoChange(targetFs, targetDir)) {
// set the source path using the snapshot path
inputOptions.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir,
inputOptions.getToSnapshot())));
context.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir,
context.getToSnapshot())));
return false;
}
final String from = getSnapshotName(inputOptions.getFromSnapshot());
final String to = getSnapshotName(inputOptions.getToSnapshot());
final String from = getSnapshotName(
context.getFromSnapshot());
final String to = getSnapshotName(
context.getToSnapshot());
try {
final FileStatus fromSnapshotStat =
@ -152,9 +154,9 @@ class DistCpSync {
return false;
}
List<Path> sourcePaths = inputOptions.getSourcePaths();
List<Path> sourcePaths = context.getSourcePaths();
final Path sourceDir = sourcePaths.get(0);
final Path targetDir = inputOptions.getTargetPath();
final Path targetDir = context.getTargetPath();
final FileSystem tfs = targetDir.getFileSystem(conf);
final DistributedFileSystem targetFs = (DistributedFileSystem) tfs;
@ -175,8 +177,8 @@ class DistCpSync {
deleteTargetTmpDir(targetFs, tmpDir);
// TODO: since we have tmp directory, we can support "undo" with failures
// set the source path using the snapshot path
inputOptions.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir,
inputOptions.getToSnapshot())));
context.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir,
context.getToSnapshot())));
}
}
@ -187,13 +189,13 @@ class DistCpSync {
*/
private boolean getAllDiffs() throws IOException {
Path ssDir = isRdiff()?
inputOptions.getTargetPath() : inputOptions.getSourcePaths().get(0);
context.getTargetPath() : context.getSourcePaths().get(0);
try {
DistributedFileSystem fs =
(DistributedFileSystem) ssDir.getFileSystem(conf);
final String from = getSnapshotName(inputOptions.getFromSnapshot());
final String to = getSnapshotName(inputOptions.getToSnapshot());
final String from = getSnapshotName(context.getFromSnapshot());
final String to = getSnapshotName(context.getToSnapshot());
SnapshotDiffReport report = fs.getSnapshotDiffReport(ssDir,
from, to);
this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class);
@ -273,19 +275,19 @@ class DistCpSync {
*/
private boolean checkNoChange(DistributedFileSystem fs, Path path) {
try {
final String from = getSnapshotName(inputOptions.getFromSnapshot());
final String from = getSnapshotName(context.getFromSnapshot());
SnapshotDiffReport targetDiff =
fs.getSnapshotDiffReport(path, from, "");
if (!targetDiff.getDiffList().isEmpty()) {
DistCp.LOG.warn("The target has been modified since snapshot "
+ inputOptions.getFromSnapshot());
+ context.getFromSnapshot());
return false;
} else {
return true;
}
} catch (IOException e) {
DistCp.LOG.warn("Failed to compute snapshot diff on " + path
+ " at snapshot " + inputOptions.getFromSnapshot(), e);
+ " at snapshot " + context.getFromSnapshot(), e);
}
return false;
}

View File

@ -52,7 +52,7 @@ public class FileBasedCopyListing extends CopyListing {
/** {@inheritDoc} */
@Override
protected void validatePaths(DistCpOptions options)
protected void validatePaths(DistCpContext context)
throws IOException, InvalidInputException {
}
@ -60,14 +60,14 @@ public class FileBasedCopyListing extends CopyListing {
* Implementation of CopyListing::buildListing().
* Iterates over all source paths mentioned in the input-file.
* @param pathToListFile Path on HDFS where the listing file is written.
* @param options Input Options for DistCp (indicating source/target paths.)
* @param context Distcp context with associated input options.
* @throws IOException
*/
@Override
public void doBuildListing(Path pathToListFile, DistCpOptions options) throws IOException {
DistCpOptions newOption = new DistCpOptions(options);
newOption.setSourcePaths(fetchFileList(options.getSourceFileListing()));
globbedListing.buildListing(pathToListFile, newOption);
public void doBuildListing(Path pathToListFile, DistCpContext context)
throws IOException {
context.setSourcePaths(fetchFileList(context.getSourceFileListing()));
globbedListing.buildListing(pathToListFile, context);
}
private List<Path> fetchFileList(Path sourceListing) throws IOException {

View File

@ -51,7 +51,7 @@ public class GlobbedCopyListing extends CopyListing {
/** {@inheritDoc} */
@Override
protected void validatePaths(DistCpOptions options)
protected void validatePaths(DistCpContext context)
throws IOException, InvalidInputException {
}
@ -60,19 +60,19 @@ public class GlobbedCopyListing extends CopyListing {
* Creates the copy listing by "globbing" all source-paths.
* @param pathToListingFile The location at which the copy-listing file
* is to be created.
* @param options Input Options for DistCp (indicating source/target paths.)
* @param context The distcp context with associated input options.
* @throws IOException
*/
@Override
public void doBuildListing(Path pathToListingFile,
DistCpOptions options) throws IOException {
public void doBuildListing(Path pathToListingFile, DistCpContext context)
throws IOException {
List<Path> globbedPaths = new ArrayList<Path>();
if (options.getSourcePaths().isEmpty()) {
if (context.getSourcePaths().isEmpty()) {
throw new InvalidInputException("Nothing to process. Source paths::EMPTY");
}
for (Path p : options.getSourcePaths()) {
for (Path p : context.getSourcePaths()) {
FileSystem fs = p.getFileSystem(getConf());
FileStatus[] inputs = fs.globStatus(p);
@ -85,9 +85,8 @@ public class GlobbedCopyListing extends CopyListing {
}
}
DistCpOptions optionsGlobbed = new DistCpOptions(options);
optionsGlobbed.setSourcePaths(globbedPaths);
simpleListing.buildListing(pathToListingFile, optionsGlobbed);
context.setSourcePaths(globbedPaths);
simpleListing.buildListing(pathToListingFile, context);
}
/** {@inheritDoc} */

View File

@ -32,7 +32,6 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import com.google.common.base.Preconditions;
@ -95,253 +94,126 @@ public class OptionsParser {
Arrays.toString(args), e);
}
DistCpOptions option = parseSourceAndTargetPaths(command);
option.setIgnoreFailures(
command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch()));
option.setAtomicCommit(
command.hasOption(DistCpOptionSwitch.ATOMIC_COMMIT.getSwitch()));
option.setSyncFolder(
command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch()));
option.setOverwrite(
command.hasOption(DistCpOptionSwitch.OVERWRITE.getSwitch()));
option.setAppend(
command.hasOption(DistCpOptionSwitch.APPEND.getSwitch()));
option.setDeleteMissing(
command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch()));
option.setSkipCRC(
command.hasOption(DistCpOptionSwitch.SKIP_CRC.getSwitch()));
if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch()) &&
option.shouldAtomicCommit()) {
String workPath = getVal(command, DistCpOptionSwitch.WORK_PATH.getSwitch());
if (workPath != null && !workPath.isEmpty()) {
option.setAtomicWorkPath(new Path(workPath));
}
} else if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) {
throw new IllegalArgumentException("-tmp work-path can only be specified along with -atomic");
}
if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) {
option.setLogPath(new Path(getVal(command, DistCpOptionSwitch.LOG_PATH.getSwitch())));
}
if (command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch())) {
option.setBlocking(false);
}
parseBandwidth(command, option);
parseNumListStatusThreads(command, option);
parseMaxMaps(command, option);
if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) {
option.setCopyStrategy(
getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch()));
}
parsePreserveStatus(command, option);
DistCpOptions.Builder builder = parseSourceAndTargetPaths(command);
builder
.withAtomicCommit(
command.hasOption(DistCpOptionSwitch.ATOMIC_COMMIT.getSwitch()))
.withSyncFolder(
command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch()))
.withDeleteMissing(
command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch()))
.withIgnoreFailures(
command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch()))
.withOverwrite(
command.hasOption(DistCpOptionSwitch.OVERWRITE.getSwitch()))
.withAppend(
command.hasOption(DistCpOptionSwitch.APPEND.getSwitch()))
.withCRC(
command.hasOption(DistCpOptionSwitch.SKIP_CRC.getSwitch()))
.withBlocking(
!command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch()));
if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
String[] snapshots = getVals(command,
DistCpOptionSwitch.DIFF.getSwitch());
checkSnapshotsArgs(snapshots);
option.setUseDiff(snapshots[0], snapshots[1]);
builder.withUseDiff(snapshots[0], snapshots[1]);
}
if (command.hasOption(DistCpOptionSwitch.RDIFF.getSwitch())) {
String[] snapshots = getVals(command,
DistCpOptionSwitch.RDIFF.getSwitch());
checkSnapshotsArgs(snapshots);
option.setUseRdiff(snapshots[0], snapshots[1]);
builder.withUseRdiff(snapshots[0], snapshots[1]);
}
parseFileLimit(command);
parseSizeLimit(command);
if (command.hasOption(DistCpOptionSwitch.FILTERS.getSwitch())) {
option.setFiltersFile(getVal(command,
DistCpOptionSwitch.FILTERS.getSwitch()));
builder.withFiltersFile(
getVal(command, DistCpOptionSwitch.FILTERS.getSwitch()));
}
parseBlocksPerChunk(command, option);
if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) {
builder.withLogPath(
new Path(getVal(command, DistCpOptionSwitch.LOG_PATH.getSwitch())));
}
option.validate();
return option;
}
/**
* A helper method to parse chunk size in number of blocks.
* Used when breaking large file into chunks to copy in parallel.
*
* @param command command line arguments
*/
private static void parseBlocksPerChunk(CommandLine command,
DistCpOptions option) {
boolean hasOption =
command.hasOption(DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch());
LOG.info("parseChunkSize: " +
DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " " + hasOption);
if (hasOption) {
String chunkSizeString = getVal(command,
DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch().trim());
try {
int csize = Integer.parseInt(chunkSizeString);
if (csize < 0) {
csize = 0;
}
LOG.info("Set distcp blocksPerChunk to " + csize);
option.setBlocksPerChunk(csize);
}
catch (NumberFormatException e) {
throw new IllegalArgumentException("blocksPerChunk is invalid: "
+ chunkSizeString, e);
if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) {
final String workPath = getVal(command,
DistCpOptionSwitch.WORK_PATH.getSwitch());
if (workPath != null && !workPath.isEmpty()) {
builder.withAtomicWorkPath(new Path(workPath));
}
}
}
/**
* parseSizeLimit is a helper method for parsing the deprecated
* argument SIZE_LIMIT.
*
* @param command command line arguments
*/
private static void parseSizeLimit(CommandLine command) {
if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) {
String sizeLimitString = getVal(command,
DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim());
if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
try {
Long.parseLong(sizeLimitString);
}
catch (NumberFormatException e) {
throw new IllegalArgumentException("Size-limit is invalid: "
+ sizeLimitString, e);
}
LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" +
" option. Ignoring.");
}
}
/**
* parseFileLimit is a helper method for parsing the deprecated
* argument FILE_LIMIT.
*
* @param command command line arguments
*/
private static void parseFileLimit(CommandLine command) {
if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
String fileLimitString = getVal(command,
DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
try {
Integer.parseInt(fileLimitString);
final Float mapBandwidth = Float.parseFloat(
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()));
builder.withMapBandwidth(mapBandwidth);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("File-limit is invalid: "
+ fileLimitString, e);
}
LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" +
" option. Ignoring.");
}
}
/**
* parsePreserveStatus is a helper method for parsing PRESERVE_STATUS.
*
* @param command command line arguments
* @param option parsed distcp options
*/
private static void parsePreserveStatus(CommandLine command,
DistCpOptions option) {
if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
String attributes =
getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch());
if (attributes == null || attributes.isEmpty()) {
for (FileAttribute attribute : FileAttribute.values()) {
option.preserve(attribute);
}
} else {
for (int index = 0; index < attributes.length(); index++) {
option.preserve(FileAttribute.
getAttribute(attributes.charAt(index)));
}
throw new IllegalArgumentException("Bandwidth specified is invalid: " +
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e);
}
}
}
/**
* parseMaxMaps is a helper method for parsing MAX_MAPS.
*
* @param command command line arguments
* @param option parsed distcp options
*/
private static void parseMaxMaps(CommandLine command,
DistCpOptions option) {
if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) {
try {
Integer maps = Integer.parseInt(
getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim());
option.setMaxMaps(maps);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Number of maps is invalid: " +
getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e);
}
}
}
/**
* parseNumListStatusThreads is a helper method for parsing
* NUM_LISTSTATUS_THREADS.
*
* @param command command line arguments
* @param option parsed distcp options
*/
private static void parseNumListStatusThreads(CommandLine command,
DistCpOptions option) {
if (command.hasOption(
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) {
try {
Integer numThreads = Integer.parseInt(getVal(command,
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()).trim());
option.setNumListstatusThreads(numThreads);
final Integer numThreads = Integer.parseInt(getVal(command,
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()));
builder.withNumListstatusThreads(numThreads);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Number of liststatus threads is invalid: " + getVal(command,
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e);
}
}
}
/**
* parseBandwidth is a helper method for parsing BANDWIDTH.
*
* @param command command line arguments
* @param option parsed distcp options
*/
private static void parseBandwidth(CommandLine command,
DistCpOptions option) {
if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) {
try {
Float mapBandwidth = Float.parseFloat(
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
if (mapBandwidth <= 0) {
throw new IllegalArgumentException("Bandwidth specified is not " +
"positive: " + mapBandwidth);
}
option.setMapBandwidth(mapBandwidth);
final Integer maps = Integer.parseInt(
getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()));
builder.maxMaps(maps);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Bandwidth specified is invalid: " +
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e);
throw new IllegalArgumentException("Number of maps is invalid: " +
getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e);
}
}
if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) {
builder.withCopyStrategy(
getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch()));
}
if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
builder.preserve(
getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch()));
}
if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" +
" option. Ignoring.");
}
if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) {
LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" +
" option. Ignoring.");
}
if (command.hasOption(DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch())) {
final String chunkSizeStr = getVal(command,
DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch().trim());
try {
int csize = Integer.parseInt(chunkSizeStr);
csize = csize > 0 ? csize : 0;
LOG.info("Set distcp blocksPerChunk to " + csize);
builder.withBlocksPerChunk(csize);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("blocksPerChunk is invalid: "
+ chunkSizeStr, e);
}
}
return builder.build();
}
/**
@ -351,9 +223,8 @@ public class OptionsParser {
* @param command command line arguments
* @return DistCpOptions
*/
private static DistCpOptions parseSourceAndTargetPaths(
private static DistCpOptions.Builder parseSourceAndTargetPaths(
CommandLine command) {
DistCpOptions option;
Path targetPath;
List<Path> sourcePaths = new ArrayList<Path>();
@ -378,20 +249,22 @@ public class OptionsParser {
throw new IllegalArgumentException("Both source file listing and " +
"source paths present");
}
option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.
SOURCE_FILE_LISTING.getSwitch())), targetPath);
return new DistCpOptions.Builder(new Path(getVal(command,
DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())), targetPath);
} else {
if (sourcePaths.isEmpty()) {
throw new IllegalArgumentException("Neither source file listing nor " +
"source paths present");
}
option = new DistCpOptions(sourcePaths, targetPath);
return new DistCpOptions.Builder(sourcePaths, targetPath);
}
return option;
}
private static String getVal(CommandLine command, String swtch) {
String optionValue = command.getOptionValue(swtch);
if (swtch == null) {
return null;
}
String optionValue = command.getOptionValue(swtch.trim());
if (optionValue == null) {
return null;
} else {

View File

@ -123,10 +123,10 @@ public class SimpleCopyListing extends CopyListing {
}
@Override
protected void validatePaths(DistCpOptions options)
protected void validatePaths(DistCpContext context)
throws IOException, InvalidInputException {
Path targetPath = options.getTargetPath();
Path targetPath = context.getTargetPath();
FileSystem targetFS = targetPath.getFileSystem(getConf());
boolean targetExists = false;
boolean targetIsFile = false;
@ -142,12 +142,12 @@ public class SimpleCopyListing extends CopyListing {
//If target is a file, then source has to be single file
if (targetIsFile) {
if (options.getSourcePaths().size() > 1) {
if (context.getSourcePaths().size() > 1) {
throw new InvalidInputException("Multiple source being copied to a file: " +
targetPath);
}
Path srcPath = options.getSourcePaths().get(0);
Path srcPath = context.getSourcePaths().get(0);
FileSystem sourceFS = srcPath.getFileSystem(getConf());
if (!sourceFS.isFile(srcPath)) {
throw new InvalidInputException("Cannot copy " + srcPath +
@ -155,12 +155,12 @@ public class SimpleCopyListing extends CopyListing {
}
}
if (options.shouldAtomicCommit() && targetExists) {
if (context.shouldAtomicCommit() && targetExists) {
throw new InvalidInputException("Target path for atomic-commit already exists: " +
targetPath + ". Cannot atomic-commit to pre-existing target-path.");
}
for (Path path: options.getSourcePaths()) {
for (Path path: context.getSourcePaths()) {
FileSystem fs = path.getFileSystem(getConf());
if (!fs.exists(path)) {
throw new InvalidInputException(path + " doesn't exist");
@ -184,7 +184,7 @@ public class SimpleCopyListing extends CopyListing {
}
if (targetIsReservedRaw) {
options.preserveRawXattrs();
context.setPreserveRawXattrs(true);
getConf().setBoolean(DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, true);
}
@ -194,18 +194,19 @@ public class SimpleCopyListing extends CopyListing {
*/
Credentials credentials = getCredentials();
if (credentials != null) {
Path[] inputPaths = options.getSourcePaths().toArray(new Path[1]);
Path[] inputPaths = context.getSourcePaths()
.toArray(new Path[1]);
TokenCache.obtainTokensForNamenodes(credentials, inputPaths, getConf());
}
}
@Override
protected void doBuildListing(Path pathToListingFile,
DistCpOptions options) throws IOException {
if(options.shouldUseSnapshotDiff()) {
doBuildListingWithSnapshotDiff(getWriter(pathToListingFile), options);
}else {
doBuildListing(getWriter(pathToListingFile), options);
DistCpContext context) throws IOException {
if (context.shouldUseSnapshotDiff()) {
doBuildListingWithSnapshotDiff(getWriter(pathToListingFile), context);
} else {
doBuildListing(getWriter(pathToListingFile), context);
}
}
@ -232,22 +233,22 @@ public class SimpleCopyListing extends CopyListing {
* @throws IOException
*/
private void addToFileListing(SequenceFile.Writer fileListWriter,
Path sourceRoot, Path path, DistCpOptions options) throws IOException {
Path sourceRoot, Path path, DistCpContext context) throws IOException {
sourceRoot = getPathWithSchemeAndAuthority(sourceRoot);
path = getPathWithSchemeAndAuthority(path);
path = makeQualified(path);
FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
FileStatus fileStatus = sourceFS.getFileStatus(path);
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs();
final boolean preserveAcls = context.shouldPreserve(FileAttribute.ACL);
final boolean preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
final boolean preserveRawXAttrs = context.shouldPreserveRawXattrs();
LinkedList<CopyListingFileStatus> fileCopyListingStatus =
DistCpUtils.toCopyListingFileStatus(sourceFS, fileStatus,
preserveAcls, preserveXAttrs, preserveRawXAttrs,
options.getBlocksPerChunk());
context.getBlocksPerChunk());
writeToFileListingRoot(fileListWriter, fileCopyListingStatus,
sourceRoot, options);
sourceRoot, context);
}
/**
@ -258,14 +259,16 @@ public class SimpleCopyListing extends CopyListing {
* {@link org.apache.hadoop.tools.DistCpSync#sync}. An item can be
* created/modified and renamed, in which case, the target path is put
* into the list.
* @param fileListWriter the list for holding processed results
* @param context The DistCp context with associated input options
* @throws IOException
*/
@VisibleForTesting
protected void doBuildListingWithSnapshotDiff(
SequenceFile.Writer fileListWriter, DistCpOptions options)
SequenceFile.Writer fileListWriter, DistCpContext context)
throws IOException {
ArrayList<DiffInfo> diffList = distCpSync.prepareDiffListForCopyListing();
Path sourceRoot = options.getSourcePaths().get(0);
Path sourceRoot = context.getSourcePaths().get(0);
FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
try {
@ -273,13 +276,13 @@ public class SimpleCopyListing extends CopyListing {
for (DiffInfo diff : diffList) {
// add snapshot paths prefix
diff.setTarget(
new Path(options.getSourcePaths().get(0), diff.getTarget()));
new Path(context.getSourcePaths().get(0), diff.getTarget()));
if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) {
addToFileListing(fileListWriter,
sourceRoot, diff.getTarget(), options);
sourceRoot, diff.getTarget(), context);
} else if (diff.getType() == SnapshotDiffReport.DiffType.CREATE) {
addToFileListing(fileListWriter,
sourceRoot, diff.getTarget(), options);
sourceRoot, diff.getTarget(), context);
FileStatus sourceStatus = sourceFS.getFileStatus(diff.getTarget());
if (sourceStatus.isDirectory()) {
@ -290,13 +293,13 @@ public class SimpleCopyListing extends CopyListing {
HashSet<String> excludeList =
distCpSync.getTraverseExcludeList(diff.getSource(),
options.getSourcePaths().get(0));
context.getSourcePaths().get(0));
ArrayList<FileStatus> sourceDirs = new ArrayList<>();
sourceDirs.add(sourceStatus);
traverseDirectory(fileListWriter, sourceFS, sourceDirs,
sourceRoot, options, excludeList, fileStatuses);
sourceRoot, context, excludeList, fileStatuses);
}
}
}
@ -325,27 +328,30 @@ public class SimpleCopyListing extends CopyListing {
* See computeSourceRootPath method for how the root path of the source is
* computed.
* @param fileListWriter
* @param options
* @param context The distcp context with associated input options
* @throws IOException
*/
@VisibleForTesting
protected void doBuildListing(SequenceFile.Writer fileListWriter,
DistCpOptions options) throws IOException {
if (options.getNumListstatusThreads() > 0) {
numListstatusThreads = options.getNumListstatusThreads();
DistCpContext context) throws IOException {
if (context.getNumListstatusThreads() > 0) {
numListstatusThreads = context.getNumListstatusThreads();
}
try {
List<FileStatusInfo> statusList = Lists.newArrayList();
for (Path path: options.getSourcePaths()) {
for (Path path: context.getSourcePaths()) {
FileSystem sourceFS = path.getFileSystem(getConf());
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs();
final boolean preserveAcls =
context.shouldPreserve(FileAttribute.ACL);
final boolean preserveXAttrs =
context.shouldPreserve(FileAttribute.XATTR);
final boolean preserveRawXAttrs =
context.shouldPreserveRawXattrs();
path = makeQualified(path);
FileStatus rootStatus = sourceFS.getFileStatus(path);
Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
Path sourcePathRoot = computeSourceRootPath(rootStatus, context);
FileStatus[] sourceFiles = sourceFS.listStatus(path);
boolean explore = (sourceFiles != null && sourceFiles.length > 0);
@ -353,9 +359,9 @@ public class SimpleCopyListing extends CopyListing {
LinkedList<CopyListingFileStatus> rootCopyListingStatus =
DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus,
preserveAcls, preserveXAttrs, preserveRawXAttrs,
options.getBlocksPerChunk());
context.getBlocksPerChunk());
writeToFileListingRoot(fileListWriter, rootCopyListingStatus,
sourcePathRoot, options);
sourcePathRoot, context);
}
if (explore) {
ArrayList<FileStatus> sourceDirs = new ArrayList<FileStatus>();
@ -368,7 +374,7 @@ public class SimpleCopyListing extends CopyListing {
preserveAcls && sourceStatus.isDirectory(),
preserveXAttrs && sourceStatus.isDirectory(),
preserveRawXAttrs && sourceStatus.isDirectory(),
options.getBlocksPerChunk());
context.getBlocksPerChunk());
for (CopyListingFileStatus fs : sourceCopyListingStatus) {
if (randomizeFileListing) {
addToFileListing(statusList,
@ -385,7 +391,7 @@ public class SimpleCopyListing extends CopyListing {
}
}
traverseDirectory(fileListWriter, sourceFS, sourceDirs,
sourcePathRoot, options, null, statusList);
sourcePathRoot, context, null, statusList);
}
}
if (randomizeFileListing) {
@ -447,13 +453,13 @@ public class SimpleCopyListing extends CopyListing {
}
private Path computeSourceRootPath(FileStatus sourceStatus,
DistCpOptions options) throws IOException {
DistCpContext context) throws IOException {
Path target = options.getTargetPath();
Path target = context.getTargetPath();
FileSystem targetFS = target.getFileSystem(getConf());
final boolean targetPathExists = options.getTargetPathExists();
final boolean targetPathExists = context.isTargetPathExists();
boolean solitaryFile = options.getSourcePaths().size() == 1
boolean solitaryFile = context.getSourcePaths().size() == 1
&& !sourceStatus.isDirectory();
if (solitaryFile) {
@ -463,8 +469,11 @@ public class SimpleCopyListing extends CopyListing {
return sourceStatus.getPath().getParent();
}
} else {
boolean specialHandling = (options.getSourcePaths().size() == 1 && !targetPathExists) ||
options.shouldSyncFolder() || options.shouldOverwrite();
boolean specialHandling =
(context.getSourcePaths().size() == 1 &&
!targetPathExists) ||
context.shouldSyncFolder() ||
context.shouldOverwrite();
if ((specialHandling && sourceStatus.isDirectory()) ||
sourceStatus.getPath().isRoot()) {
@ -610,13 +619,13 @@ public class SimpleCopyListing extends CopyListing {
FileSystem sourceFS,
ArrayList<FileStatus> sourceDirs,
Path sourcePathRoot,
DistCpOptions options,
DistCpContext context,
HashSet<String> excludeList,
List<FileStatusInfo> fileStatuses)
throws IOException {
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
final boolean preserveAcls = context.shouldPreserve(FileAttribute.ACL);
final boolean preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
final boolean preserveRawXattrs = context.shouldPreserveRawXattrs();
assert numListstatusThreads > 0;
if (LOG.isDebugEnabled()) {
@ -649,7 +658,7 @@ public class SimpleCopyListing extends CopyListing {
preserveAcls && child.isDirectory(),
preserveXAttrs && child.isDirectory(),
preserveRawXattrs && child.isDirectory(),
options.getBlocksPerChunk());
context.getBlocksPerChunk());
for (CopyListingFileStatus fs : childCopyListingStatus) {
if (randomizeFileListing) {
@ -681,9 +690,9 @@ public class SimpleCopyListing extends CopyListing {
private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
LinkedList<CopyListingFileStatus> fileStatus, Path sourcePathRoot,
DistCpOptions options) throws IOException {
boolean syncOrOverwrite = options.shouldSyncFolder() ||
options.shouldOverwrite();
DistCpContext context) throws IOException {
boolean syncOrOverwrite = context.shouldSyncFolder() ||
context.shouldOverwrite();
for (CopyListingFileStatus fs : fileStatus) {
if (fs.getPath().equals(sourcePathRoot) &&
fs.isDirectory() && syncOrOverwrite) {

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.tools.CopyListing;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptionSwitch;
import org.apache.hadoop.tools.DistCpContext;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.GlobbedCopyListing;
@ -354,16 +355,18 @@ public class CopyCommitter extends FileOutputCommitter {
Path resultNonePath = Path.getPathWithoutSchemeAndAuthority(targetFinalPath)
.toString().startsWith(DistCpConstants.HDFS_RESERVED_RAW_DIRECTORY_NAME)
? DistCpConstants.RAW_NONE_PATH : DistCpConstants.NONE_PATH;
DistCpOptions options = new DistCpOptions(targets, resultNonePath);
//
// Set up options to be the same from the CopyListing.buildListing's perspective,
// so to collect similar listings as when doing the copy
//
options.setOverwrite(overwrite);
options.setSyncFolder(syncFolder);
options.setTargetPathExists(targetPathExists);
target.buildListing(targetListing, options);
DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath)
.withOverwrite(overwrite)
.withSyncFolder(syncFolder)
.build();
DistCpContext distCpContext = new DistCpContext(options);
distCpContext.setTargetPathExists(targetPathExists);
target.buildListing(targetListing, distCpContext);
Path sortedTargetListing = DistCpUtils.sortListing(clusterFS, conf, targetListing);
long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen();

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.tools.CopyListing.AclsNotSupportedException;
import org.apache.hadoop.tools.CopyListing.XAttrsNotSupportedException;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.DistCpContext;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
import org.apache.hadoop.util.StringUtils;
@ -116,13 +116,13 @@ public class DistCpUtils {
* a particular strategy from distcp-default.xml
*
* @param conf - Configuration object
* @param options - Handle to input options
* @param context - Distcp context with associated input options
* @return Class implementing the strategy specified in options.
*/
public static Class<? extends InputFormat> getStrategy(Configuration conf,
DistCpOptions options) {
DistCpContext context) {
String confLabel = "distcp."
+ StringUtils.toLowerCase(options.getCopyStrategy())
+ StringUtils.toLowerCase(context.getCopyStrategy())
+ ".strategy" + ".impl";
return conf.getClass(confLabel, UniformSizeInputFormat.class, InputFormat.class);
}

View File

@ -103,20 +103,19 @@ public class TestCopyListing extends SimpleCopyListing {
List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(new Path("/tmp/in/1"));
srcPaths.add(new Path("/tmp/in/2"));
Path target = new Path("/tmp/out/1");
final Path target = new Path("/tmp/out/1");
TestDistCpUtils.createFile(fs, "/tmp/in/1");
TestDistCpUtils.createFile(fs, "/tmp/in/2");
fs.mkdirs(target);
DistCpOptions options = new DistCpOptions(srcPaths, target);
validatePaths(options);
final DistCpOptions options = new DistCpOptions.Builder(srcPaths, target)
.build();
validatePaths(new DistCpContext(options));
TestDistCpUtils.delete(fs, "/tmp");
//No errors
target = new Path("/tmp/out/1");
fs.create(target).close();
options = new DistCpOptions(srcPaths, target);
try {
validatePaths(options);
validatePaths(new DistCpContext(options));
Assert.fail("Invalid inputs accepted");
} catch (InvalidInputException ignore) { }
TestDistCpUtils.delete(fs, "/tmp");
@ -124,11 +123,9 @@ public class TestCopyListing extends SimpleCopyListing {
srcPaths.clear();
srcPaths.add(new Path("/tmp/in/1"));
fs.mkdirs(new Path("/tmp/in/1"));
target = new Path("/tmp/out/1");
fs.create(target).close();
options = new DistCpOptions(srcPaths, target);
try {
validatePaths(options);
validatePaths(new DistCpContext(options));
Assert.fail("Invalid inputs accepted");
} catch (InvalidInputException ignore) { }
TestDistCpUtils.delete(fs, "/tmp");
@ -151,10 +148,13 @@ public class TestCopyListing extends SimpleCopyListing {
TestDistCpUtils.createFile(fs, "/tmp/in/src2/1.txt");
Path target = new Path("/tmp/out");
Path listingFile = new Path("/tmp/list");
DistCpOptions options = new DistCpOptions(srcPaths, target);
CopyListing listing = CopyListing.getCopyListing(getConf(), CREDENTIALS, options);
final DistCpOptions options = new DistCpOptions.Builder(srcPaths, target)
.build();
final DistCpContext context = new DistCpContext(options);
CopyListing listing = CopyListing.getCopyListing(getConf(), CREDENTIALS,
context);
try {
listing.buildListing(listingFile, options);
listing.buildListing(listingFile, context);
Assert.fail("Duplicates not detected");
} catch (DuplicateFileException ignore) {
}
@ -196,11 +196,12 @@ public class TestCopyListing extends SimpleCopyListing {
Path listingFile = new Path("/tmp/file");
DistCpOptions options = new DistCpOptions(srcPaths, target);
options.setSyncFolder(true);
final DistCpOptions options = new DistCpOptions.Builder(srcPaths, target)
.withSyncFolder(true)
.build();
CopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
try {
listing.buildListing(listingFile, options);
listing.buildListing(listingFile, new DistCpContext(options));
Assert.fail("Duplicates not detected");
} catch (DuplicateFileException ignore) {
}
@ -209,7 +210,7 @@ public class TestCopyListing extends SimpleCopyListing {
TestDistCpUtils.delete(fs, "/tmp");
try {
listing.buildListing(listingFile, options);
listing.buildListing(listingFile, new DistCpContext(options));
Assert.fail("Invalid input not detected");
} catch (InvalidInputException ignore) {
}
@ -244,14 +245,14 @@ public class TestCopyListing extends SimpleCopyListing {
}
Path listingFile = new Path("/tmp/file");
DistCpOptions options = new DistCpOptions(srcPaths, target);
options.setSyncFolder(true);
final DistCpOptions options = new DistCpOptions.Builder(srcPaths, target)
.withSyncFolder(true).build();
// Check without randomizing files
getConf().setBoolean(
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
listing.buildListing(listingFile, options);
listing.buildListing(listingFile, new DistCpContext(options));
Assert.assertEquals(listing.getNumberOfPaths(), pathCount);
validateFinalListing(listingFile, srcFiles);
@ -265,7 +266,7 @@ public class TestCopyListing extends SimpleCopyListing {
// Set the seed for randomness, so that it can be verified later
long seed = System.nanoTime();
listing.setSeedForRandomListing(seed);
listing.buildListing(listingFile, options);
listing.buildListing(listingFile, new DistCpContext(options));
Assert.assertEquals(listing.getNumberOfPaths(), pathCount);
// validate randomness
@ -322,11 +323,12 @@ public class TestCopyListing extends SimpleCopyListing {
List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(sourceFile);
DistCpOptions options = new DistCpOptions(srcPaths, targetFile);
DistCpOptions options = new DistCpOptions.Builder(srcPaths, targetFile)
.build();
CopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
final Path listFile = new Path(testRoot, "/tmp/fileList.seq");
listing.buildListing(listFile, options);
listing.buildListing(listFile, new DistCpContext(options));
reader = new SequenceFile.Reader(getConf(), SequenceFile.Reader.file(listFile));
@ -359,10 +361,11 @@ public class TestCopyListing extends SimpleCopyListing {
doThrow(expectedEx).when(writer).close();
SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
DistCpOptions options = new DistCpOptions(srcs, new Path(outFile.toURI()));
final DistCpOptions options = new DistCpOptions.Builder(srcs,
new Path(outFile.toURI())).build();
Exception actualEx = null;
try {
listing.doBuildListing(writer, options);
listing.doBuildListing(writer, new DistCpContext(options));
} catch (Exception e) {
actualEx = e;
}

View File

@ -0,0 +1,500 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import java.util.Collections;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.apache.hadoop.tools.DistCpOptions.MAX_NUM_LISTSTATUS_THREADS;
import static org.junit.Assert.fail;
/**
* This is to test constructing {@link DistCpOptions} manually with setters.
*
* The test cases in this class is very similar to the parser test, see
* {@link TestOptionsParser}.
*/
public class TestDistCpOptions {
private static final float DELTA = 0.001f;
@Test
public void testSetIgnoreFailure() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertFalse(builder.build().shouldIgnoreFailures());
builder.withIgnoreFailures(true);
Assert.assertTrue(builder.build().shouldIgnoreFailures());
}
@Test
public void testSetOverwrite() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertFalse(builder.build().shouldOverwrite());
builder.withOverwrite(true);
Assert.assertTrue(builder.build().shouldOverwrite());
try {
builder.withSyncFolder(true).build();
Assert.fail("Update and overwrite aren't allowed together");
} catch (IllegalArgumentException ignore) {
}
}
@Test
public void testLogPath() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertNull(builder.build().getLogPath());
final Path logPath = new Path("hdfs://localhost:8020/logs");
builder.withLogPath(logPath);
Assert.assertEquals(logPath, builder.build().getLogPath());
}
@Test
public void testSetBlokcing() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertTrue(builder.build().shouldBlock());
builder.withBlocking(false);
Assert.assertFalse(builder.build().shouldBlock());
}
@Test
public void testSetBandwidth() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertEquals(0, builder.build().getMapBandwidth(), DELTA);
builder.withMapBandwidth(11);
Assert.assertEquals(11, builder.build().getMapBandwidth(), DELTA);
}
@Test(expected = IllegalArgumentException.class)
public void testSetNonPositiveBandwidth() {
new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"))
.withMapBandwidth(-11)
.build();
}
@Test(expected = IllegalArgumentException.class)
public void testSetZeroBandwidth() {
new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"))
.withMapBandwidth(0)
.build();
}
@Test
public void testSetSkipCRC() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertFalse(builder.build().shouldSkipCRC());
final DistCpOptions options = builder.withSyncFolder(true).withCRC(true)
.build();
Assert.assertTrue(options.shouldSyncFolder());
Assert.assertTrue(options.shouldSkipCRC());
}
@Test
public void testSetAtomicCommit() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertFalse(builder.build().shouldAtomicCommit());
builder.withAtomicCommit(true);
Assert.assertTrue(builder.build().shouldAtomicCommit());
try {
builder.withSyncFolder(true).build();
Assert.fail("Atomic and sync folders were mutually exclusive");
} catch (IllegalArgumentException ignore) {
}
}
@Test
public void testSetWorkPath() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertNull(builder.build().getAtomicWorkPath());
builder.withAtomicCommit(true);
Assert.assertNull(builder.build().getAtomicWorkPath());
final Path workPath = new Path("hdfs://localhost:8020/work");
builder.withAtomicWorkPath(workPath);
Assert.assertEquals(workPath, builder.build().getAtomicWorkPath());
}
@Test
public void testSetSyncFolders() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertFalse(builder.build().shouldSyncFolder());
builder.withSyncFolder(true);
Assert.assertTrue(builder.build().shouldSyncFolder());
}
@Test
public void testSetDeleteMissing() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertFalse(builder.build().shouldDeleteMissing());
DistCpOptions options = builder.withSyncFolder(true)
.withDeleteMissing(true)
.build();
Assert.assertTrue(options.shouldSyncFolder());
Assert.assertTrue(options.shouldDeleteMissing());
options = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"))
.withOverwrite(true)
.withDeleteMissing(true)
.build();
Assert.assertTrue(options.shouldOverwrite());
Assert.assertTrue(options.shouldDeleteMissing());
try {
new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"))
.withDeleteMissing(true)
.build();
fail("Delete missing should fail without update or overwrite options");
} catch (IllegalArgumentException e) {
assertExceptionContains("Delete missing is applicable only with update " +
"or overwrite options", e);
}
try {
new DistCpOptions.Builder(
new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"))
.withSyncFolder(true)
.withDeleteMissing(true)
.withUseDiff("s1", "s2")
.build();
fail("Should have failed as -delete and -diff are mutually exclusive.");
} catch (IllegalArgumentException e) {
assertExceptionContains(
"-delete and -diff/-rdiff are mutually exclusive.", e);
}
}
@Test
public void testSetMaps() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertEquals(DistCpConstants.DEFAULT_MAPS,
builder.build().getMaxMaps());
builder.maxMaps(1);
Assert.assertEquals(1, builder.build().getMaxMaps());
builder.maxMaps(0);
Assert.assertEquals(1, builder.build().getMaxMaps());
}
@Test
public void testSetNumListtatusThreads() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"));
// If command line argument isn't set, we expect .getNumListstatusThreads
// option to be zero (so that we know when to override conf properties).
Assert.assertEquals(0, builder.build().getNumListstatusThreads());
builder.withNumListstatusThreads(12);
Assert.assertEquals(12, builder.build().getNumListstatusThreads());
builder.withNumListstatusThreads(0);
Assert.assertEquals(0, builder.build().getNumListstatusThreads());
// Ignore large number of threads.
builder.withNumListstatusThreads(MAX_NUM_LISTSTATUS_THREADS * 2);
Assert.assertEquals(MAX_NUM_LISTSTATUS_THREADS,
builder.build().getNumListstatusThreads());
}
@Test
public void testSourceListing() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"));
Assert.assertEquals(new Path("hdfs://localhost:8020/source/first"),
builder.build().getSourceFileListing());
}
@Test(expected = IllegalArgumentException.class)
public void testMissingTarget() {
new DistCpOptions.Builder(new Path("hdfs://localhost:8020/source/first"),
null);
}
@Test
public void testToString() {
DistCpOptions option = new DistCpOptions.Builder(new Path("abc"),
new Path("xyz")).build();
String val = "DistCpOptions{atomicCommit=false, syncFolder=false, " +
"deleteMissing=false, ignoreFailures=false, overwrite=false, " +
"append=false, useDiff=false, useRdiff=false, " +
"fromSnapshot=null, toSnapshot=null, " +
"skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " +
"mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " +
"atomicWorkPath=null, logPath=null, sourceFileListing=abc, " +
"sourcePaths=null, targetPath=xyz, filtersFile='null'," +
" blocksPerChunk=0}";
String optionString = option.toString();
Assert.assertEquals(val, optionString);
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
DistCpOptionSwitch.ATOMIC_COMMIT.name());
}
@Test
public void testCopyStrategy() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"));
Assert.assertEquals(DistCpConstants.UNIFORMSIZE,
builder.build().getCopyStrategy());
builder.withCopyStrategy("dynamic");
Assert.assertEquals("dynamic", builder.build().getCopyStrategy());
}
@Test
public void testTargetPath() {
final DistCpOptions options = new DistCpOptions.Builder(
new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/")).build();
Assert.assertEquals(new Path("hdfs://localhost:8020/target/"),
options.getTargetPath());
}
@Test
public void testPreserve() {
DistCpOptions options = new DistCpOptions.Builder(
new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"))
.build();
Assert.assertFalse(options.shouldPreserve(FileAttribute.BLOCKSIZE));
Assert.assertFalse(options.shouldPreserve(FileAttribute.REPLICATION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = new DistCpOptions.Builder(
new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"))
.preserve(FileAttribute.ACL)
.build();
Assert.assertFalse(options.shouldPreserve(FileAttribute.BLOCKSIZE));
Assert.assertFalse(options.shouldPreserve(FileAttribute.REPLICATION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
Assert.assertTrue(options.shouldPreserve(FileAttribute.ACL));
options = new DistCpOptions.Builder(
new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"))
.preserve(FileAttribute.BLOCKSIZE)
.preserve(FileAttribute.REPLICATION)
.preserve(FileAttribute.PERMISSION)
.preserve(FileAttribute.USER)
.preserve(FileAttribute.GROUP)
.preserve(FileAttribute.CHECKSUMTYPE)
.build();
Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
}
@Test
public void testAppendOption() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"))
.withSyncFolder(true)
.withAppend(true);
Assert.assertTrue(builder.build().shouldAppend());
try {
// make sure -append is only valid when -update is specified
new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"))
.withAppend(true)
.build();
fail("Append should fail if update option is not specified");
} catch (IllegalArgumentException e) {
assertExceptionContains(
"Append is valid only with update options", e);
}
try {
// make sure -append is invalid when skipCrc is specified
new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"))
.withSyncFolder(true)
.withAppend(true)
.withCRC(true)
.build();
fail("Append should fail if skipCrc option is specified");
} catch (IllegalArgumentException e) {
assertExceptionContains(
"Append is disallowed when skipping CRC", e);
}
}
@Test
public void testDiffOption() {
DistCpOptions options = new DistCpOptions.Builder(
new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"))
.withSyncFolder(true)
.withUseDiff("s1", "s2")
.build();
Assert.assertTrue(options.shouldUseDiff());
Assert.assertEquals("s1", options.getFromSnapshot());
Assert.assertEquals("s2", options.getToSnapshot());
options = new DistCpOptions.Builder(
new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"))
.withSyncFolder(true)
.withUseDiff("s1", ".")
.build();
Assert.assertTrue(options.shouldUseDiff());
Assert.assertEquals("s1", options.getFromSnapshot());
Assert.assertEquals(".", options.getToSnapshot());
// make sure -diff is only valid when -update is specified
try {
new DistCpOptions.Builder(new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"))
.withUseDiff("s1", "s2")
.build();
fail("-diff should fail if -update option is not specified");
} catch (IllegalArgumentException e) {
assertExceptionContains(
"-diff/-rdiff is valid only with -update option", e);
}
try {
new DistCpOptions.Builder(
new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"))
.withSyncFolder(true)
.withUseDiff("s1", "s2")
.withDeleteMissing(true)
.build();
fail("Should fail as -delete and -diff/-rdiff are mutually exclusive.");
} catch (IllegalArgumentException e) {
assertExceptionContains(
"-delete and -diff/-rdiff are mutually exclusive.", e);
}
try {
new DistCpOptions.Builder(new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"))
.withUseDiff("s1", "s2")
.withDeleteMissing(true)
.build();
fail("-diff should fail if -update option is not specified");
} catch (IllegalArgumentException e) {
assertExceptionContains(
"-delete and -diff/-rdiff are mutually exclusive.", e);
}
try {
new DistCpOptions.Builder(new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"))
.withDeleteMissing(true)
.withUseDiff("s1", "s2")
.build();
fail("Should have failed as -delete and -diff are mutually exclusive");
} catch (IllegalArgumentException e) {
assertExceptionContains(
"-delete and -diff/-rdiff are mutually exclusive", e);
}
}
@Test
public void testExclusionsOption() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
new Path("hdfs://localhost:8020/source/first"),
new Path("hdfs://localhost:8020/target/"));
Assert.assertNull(builder.build().getFiltersFile());
builder.withFiltersFile("/tmp/filters.txt");
Assert.assertEquals("/tmp/filters.txt", builder.build().getFiltersFile());
}
@Test
public void testSetOptionsForSplitLargeFile() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
new Path("hdfs://localhost:8020/source/"),
new Path("hdfs://localhost:8020/target/"))
.withAppend(true)
.withSyncFolder(true);
Assert.assertFalse(builder.build().shouldPreserve(FileAttribute.BLOCKSIZE));
Assert.assertTrue(builder.build().shouldAppend());
builder.withBlocksPerChunk(5440);
Assert.assertTrue(builder.build().shouldPreserve(FileAttribute.BLOCKSIZE));
Assert.assertFalse(builder.build().shouldAppend());
}
}

View File

@ -39,7 +39,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -47,7 +47,7 @@ public class TestDistCpSync {
private MiniDFSCluster cluster;
private final Configuration conf = new HdfsConfiguration();
private DistributedFileSystem dfs;
private DistCpOptions options;
private DistCpContext context;
private final Path source = new Path("/source");
private final Path target = new Path("/target");
private final long BLOCK_SIZE = 1024;
@ -62,10 +62,13 @@ public class TestDistCpSync {
dfs.mkdirs(source);
dfs.mkdirs(target);
options = new DistCpOptions(Arrays.asList(source), target);
options.setSyncFolder(true);
options.setUseDiff("s1", "s2");
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(source), target)
.withSyncFolder(true)
.withUseDiff("s1", "s2")
.build();
options.appendToConf(conf);
context = new DistCpContext(options);
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString());
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, target.toString());
@ -92,34 +95,34 @@ public class TestDistCpSync {
// make sure the source path has been updated to the snapshot path
final Path spath = new Path(source,
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
Assert.assertEquals(spath, options.getSourcePaths().get(0));
Assert.assertEquals(spath, context.getSourcePaths().get(0));
// reset source path in options
options.setSourcePaths(Arrays.asList(source));
context.setSourcePaths(Collections.singletonList(source));
// the source/target does not have the given snapshots
dfs.allowSnapshot(source);
dfs.allowSnapshot(target);
Assert.assertFalse(sync());
Assert.assertEquals(spath, options.getSourcePaths().get(0));
Assert.assertEquals(spath, context.getSourcePaths().get(0));
// reset source path in options
options.setSourcePaths(Arrays.asList(source));
context.setSourcePaths(Collections.singletonList(source));
dfs.createSnapshot(source, "s1");
dfs.createSnapshot(source, "s2");
dfs.createSnapshot(target, "s1");
Assert.assertTrue(sync());
// reset source paths in options
options.setSourcePaths(Arrays.asList(source));
context.setSourcePaths(Collections.singletonList(source));
// changes have been made in target
final Path subTarget = new Path(target, "sub");
dfs.mkdirs(subTarget);
Assert.assertFalse(sync());
// make sure the source path has been updated to the snapshot path
Assert.assertEquals(spath, options.getSourcePaths().get(0));
Assert.assertEquals(spath, context.getSourcePaths().get(0));
// reset source paths in options
options.setSourcePaths(Arrays.asList(source));
context.setSourcePaths(Collections.singletonList(source));
dfs.delete(subTarget, true);
Assert.assertTrue(sync());
}
@ -137,7 +140,7 @@ public class TestDistCpSync {
}
private boolean sync() throws Exception {
DistCpSync distCpSync = new DistCpSync(options, conf);
DistCpSync distCpSync = new DistCpSync(context, conf);
return distCpSync.sync();
}
@ -231,7 +234,7 @@ public class TestDistCpSync {
SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
System.out.println(report);
DistCpSync distCpSync = new DistCpSync(options, conf);
DistCpSync distCpSync = new DistCpSync(context, conf);
// do the sync
Assert.assertTrue(distCpSync.sync());
@ -239,24 +242,24 @@ public class TestDistCpSync {
// make sure the source path has been updated to the snapshot path
final Path spath = new Path(source,
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
Assert.assertEquals(spath, options.getSourcePaths().get(0));
Assert.assertEquals(spath, context.getSourcePaths().get(0));
// build copy listing
final Path listingPath = new Path("/tmp/META/fileList.seq");
CopyListing listing = new SimpleCopyListing(conf, new Credentials(), distCpSync);
listing.buildListing(listingPath, options);
listing.buildListing(listingPath, context);
Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(conf, null, 0);
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
Mapper<Text, CopyListingFileStatus, Text, Text>.Context mapContext =
stubContext.getContext();
// Enable append
context.getConfiguration().setBoolean(
mapContext.getConfiguration().setBoolean(
DistCpOptionSwitch.APPEND.getConfigLabel(), true);
copyMapper.setup(context);
copyMapper.setup(mapContext);
for (Map.Entry<Text, CopyListingFileStatus> entry : copyListing.entrySet()) {
copyMapper.map(entry.getKey(), entry.getValue(), context);
copyMapper.map(entry.getKey(), entry.getValue(), mapContext);
}
// verify that we only list modified and created files/directories
@ -312,7 +315,12 @@ public class TestDistCpSync {
*/
@Test
public void testSyncWithCurrent() throws Exception {
options.setUseDiff("s1", ".");
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(source), target)
.withSyncFolder(true)
.withUseDiff("s1", ".")
.build();
context = new DistCpContext(options);
initData(source);
initData(target);
enableAndCreateFirstSnapshot();
@ -323,7 +331,7 @@ public class TestDistCpSync {
// do the sync
sync();
// make sure the source path is still unchanged
Assert.assertEquals(source, options.getSourcePaths().get(0));
Assert.assertEquals(source, context.getSourcePaths().get(0));
}
private void initData2(Path dir) throws Exception {
@ -501,32 +509,32 @@ public class TestDistCpSync {
SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
System.out.println(report);
DistCpSync distCpSync = new DistCpSync(options, conf);
DistCpSync distCpSync = new DistCpSync(context, conf);
// do the sync
Assert.assertTrue(distCpSync.sync());
// make sure the source path has been updated to the snapshot path
final Path spath = new Path(source,
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
Assert.assertEquals(spath, options.getSourcePaths().get(0));
Assert.assertEquals(spath, context.getSourcePaths().get(0));
// build copy listing
final Path listingPath = new Path("/tmp/META/fileList.seq");
CopyListing listing = new SimpleCopyListing(conf, new Credentials(), distCpSync);
listing.buildListing(listingPath, options);
listing.buildListing(listingPath, context);
Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(conf, null, 0);
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
Mapper<Text, CopyListingFileStatus, Text, Text>.Context mapContext =
stubContext.getContext();
// Enable append
context.getConfiguration().setBoolean(
mapContext.getConfiguration().setBoolean(
DistCpOptionSwitch.APPEND.getConfigLabel(), true);
copyMapper.setup(context);
copyMapper.setup(mapContext);
for (Map.Entry<Text, CopyListingFileStatus> entry :
copyListing.entrySet()) {
copyMapper.map(entry.getKey(), entry.getValue(), context);
copyMapper.map(entry.getKey(), entry.getValue(), mapContext);
}
// verify that we only list modified and created files/directories
@ -729,7 +737,7 @@ public class TestDistCpSync {
boolean threwException = false;
try {
DistCpSync distCpSync = new DistCpSync(options, conf);
DistCpSync distCpSync = new DistCpSync(context, conf);
// do the sync
distCpSync.sync();
} catch (HadoopIllegalArgumentException e) {

View File

@ -56,7 +56,8 @@ public abstract class TestDistCpSyncReverseBase {
private MiniDFSCluster cluster;
private final Configuration conf = new HdfsConfiguration();
private DistributedFileSystem dfs;
private DistCpOptions options;
private DistCpOptions.Builder optionsBuilder;
private DistCpContext distCpContext;
private Path source;
private boolean isSrcNotSameAsTgt = true;
private final Path target = new Path("/target");
@ -139,10 +140,12 @@ public abstract class TestDistCpSyncReverseBase {
}
dfs.mkdirs(target);
options = new DistCpOptions(Arrays.asList(source), target);
options.setSyncFolder(true);
options.setUseRdiff("s2", "s1");
optionsBuilder = new DistCpOptions.Builder(Arrays.asList(source), target)
.withSyncFolder(true)
.withUseRdiff("s2", "s1");
final DistCpOptions options = optionsBuilder.build();
options.appendToConf(conf);
distCpContext = new DistCpContext(options);
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString());
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, target.toString());
@ -169,33 +172,33 @@ public abstract class TestDistCpSyncReverseBase {
// make sure the source path has been updated to the snapshot path
final Path spath = new Path(source,
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
Assert.assertEquals(spath, options.getSourcePaths().get(0));
Assert.assertEquals(spath, distCpContext.getSourcePaths().get(0));
// reset source path in options
options.setSourcePaths(Arrays.asList(source));
optionsBuilder.withSourcePaths(Arrays.asList(source));
// the source/target does not have the given snapshots
dfs.allowSnapshot(source);
dfs.allowSnapshot(target);
Assert.assertFalse(sync());
Assert.assertEquals(spath, options.getSourcePaths().get(0));
Assert.assertEquals(spath, distCpContext.getSourcePaths().get(0));
// reset source path in options
options.setSourcePaths(Arrays.asList(source));
optionsBuilder.withSourcePaths(Arrays.asList(source));
this.enableAndCreateFirstSnapshot();
dfs.createSnapshot(target, "s2");
Assert.assertTrue(sync());
// reset source paths in options
options.setSourcePaths(Arrays.asList(source));
optionsBuilder.withSourcePaths(Arrays.asList(source));
// changes have been made in target
final Path subTarget = new Path(target, "sub");
dfs.mkdirs(subTarget);
Assert.assertFalse(sync());
// make sure the source path has been updated to the snapshot path
Assert.assertEquals(spath, options.getSourcePaths().get(0));
Assert.assertEquals(spath, distCpContext.getSourcePaths().get(0));
// reset source paths in options
options.setSourcePaths(Arrays.asList(source));
optionsBuilder.withSourcePaths(Arrays.asList(source));
dfs.delete(subTarget, true);
Assert.assertTrue(sync());
}
@ -215,7 +218,8 @@ public abstract class TestDistCpSyncReverseBase {
}
private boolean sync() throws Exception {
DistCpSync distCpSync = new DistCpSync(options, conf);
distCpContext = new DistCpContext(optionsBuilder.build());
final DistCpSync distCpSync = new DistCpSync(distCpContext, conf);
return distCpSync.sync();
}
@ -328,7 +332,7 @@ public abstract class TestDistCpSyncReverseBase {
SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1");
System.out.println(report);
DistCpSync distCpSync = new DistCpSync(options, conf);
final DistCpSync distCpSync = new DistCpSync(distCpContext, conf);
lsr("Before sync target: ", shell, target);
@ -340,13 +344,13 @@ public abstract class TestDistCpSyncReverseBase {
// make sure the source path has been updated to the snapshot path
final Path spath = new Path(source,
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
Assert.assertEquals(spath, options.getSourcePaths().get(0));
Assert.assertEquals(spath, distCpContext.getSourcePaths().get(0));
// build copy listing
final Path listingPath = new Path("/tmp/META/fileList.seq");
CopyListing listing = new SimpleCopyListing(conf, new Credentials(),
distCpSync);
listing.buildListing(listingPath, options);
listing.buildListing(listingPath, distCpContext);
Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
CopyMapper copyMapper = new CopyMapper();
@ -425,7 +429,7 @@ public abstract class TestDistCpSyncReverseBase {
*/
@Test
public void testSyncWithCurrent() throws Exception {
options.setUseRdiff(".", "s1");
optionsBuilder.withUseRdiff(".", "s1");
if (isSrcNotSameAsTgt) {
initData(source);
}
@ -440,7 +444,7 @@ public abstract class TestDistCpSyncReverseBase {
final Path spath = new Path(source,
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
// make sure the source path is still unchanged
Assert.assertEquals(spath, options.getSourcePaths().get(0));
Assert.assertEquals(spath, distCpContext.getSourcePaths().get(0));
}
private void initData2(Path dir) throws Exception {
@ -649,7 +653,7 @@ public abstract class TestDistCpSyncReverseBase {
lsrSource("Before sync source: ", shell, source);
lsr("Before sync target: ", shell, target);
DistCpSync distCpSync = new DistCpSync(options, conf);
DistCpSync distCpSync = new DistCpSync(distCpContext, conf);
// do the sync
distCpSync.sync();
@ -658,12 +662,12 @@ public abstract class TestDistCpSyncReverseBase {
// make sure the source path has been updated to the snapshot path
final Path spath = new Path(source,
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
Assert.assertEquals(spath, options.getSourcePaths().get(0));
Assert.assertEquals(spath, distCpContext.getSourcePaths().get(0));
// build copy listing
final Path listingPath = new Path("/tmp/META/fileList.seq");
CopyListing listing = new SimpleCopyListing(conf, new Credentials(), distCpSync);
listing.buildListing(listingPath, options);
listing.buildListing(listingPath, distCpContext);
Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
CopyMapper copyMapper = new CopyMapper();

View File

@ -413,11 +413,13 @@ public class TestDistCpViewFs {
private void runTest(Path listFile, Path target, boolean targetExists,
boolean sync) throws IOException {
DistCpOptions options = new DistCpOptions(listFile, target);
options.setSyncFolder(sync);
options.setTargetPathExists(targetExists);
final DistCpOptions options = new DistCpOptions.Builder(listFile, target)
.withSyncFolder(sync)
.build();
try {
new DistCp(getConf(), options).execute();
final DistCp distcp = new DistCp(getConf(), options);
distcp.context.setTargetPathExists(targetExists);
distcp.execute();
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new IOException(e);

View File

@ -514,10 +514,11 @@ public class TestFileBasedCopyListing {
private void runTest(Path listFile, Path target, boolean targetExists,
boolean sync) throws IOException {
CopyListing listing = new FileBasedCopyListing(config, CREDENTIALS);
DistCpOptions options = new DistCpOptions(listFile, target);
options.setSyncFolder(sync);
options.setTargetPathExists(targetExists);
listing.buildListing(listFile, options);
final DistCpOptions options = new DistCpOptions.Builder(listFile, target)
.withSyncFolder(sync).build();
final DistCpContext context = new DistCpContext(options);
context.setTargetPathExists(targetExists);
listing.buildListing(listFile, context);
}
private void checkResult(Path listFile, int count) throws IOException {

View File

@ -34,7 +34,7 @@ import org.junit.Test;
import java.io.DataOutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -109,9 +109,12 @@ public class TestGlobbedCopyListing {
Path source = new Path(fileSystemPath.toString() + "/tmp/source");
Path target = new Path(fileSystemPath.toString() + "/tmp/target");
Path listingPath = new Path(fileSystemPath.toString() + "/tmp/META/fileList.seq");
DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
options.setTargetPathExists(false);
new GlobbedCopyListing(new Configuration(), CREDENTIALS).buildListing(listingPath, options);
DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(source), target).build();
DistCpContext context = new DistCpContext(options);
context.setTargetPathExists(false);
new GlobbedCopyListing(new Configuration(), CREDENTIALS)
.buildListing(listingPath, context);
verifyContents(listingPath);
}

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -493,7 +492,8 @@ public class TestIntegration {
List<Path> sources = new ArrayList<Path>();
sources.add(sourcePath);
DistCpOptions options = new DistCpOptions(sources, target);
DistCpOptions options = new DistCpOptions.Builder(sources, target)
.build();
Configuration conf = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(
@ -559,14 +559,16 @@ public class TestIntegration {
private void runTest(Path listFile, Path target, boolean targetExists,
boolean sync, boolean delete,
boolean overwrite) throws IOException {
DistCpOptions options = new DistCpOptions(listFile, target);
options.setSyncFolder(sync);
options.setDeleteMissing(delete);
options.setOverwrite(overwrite);
options.setTargetPathExists(targetExists);
options.setNumListstatusThreads(numListstatusThreads);
final DistCpOptions options = new DistCpOptions.Builder(listFile, target)
.withSyncFolder(sync)
.withDeleteMissing(delete)
.withOverwrite(overwrite)
.withNumListstatusThreads(numListstatusThreads)
.build();
try {
new DistCp(getConf(), options).execute();
final DistCp distCp = new DistCp(getConf(), options);
distCp.context.setTargetPathExists(targetExists);
distCp.execute();
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new IOException(e);

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.tools;
import static org.junit.Assert.assertFalse;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.fail;
import org.junit.Assert;
@ -28,7 +28,6 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.DistCpOptions.*;
import org.apache.hadoop.conf.Configuration;
import java.util.Iterator;
import java.util.NoSuchElementException;
public class TestOptionsParser {
@ -329,7 +328,7 @@ public class TestOptionsParser {
"100",
"hdfs://localhost:9820/source/first",
"hdfs://localhost:9820/target/"});
Assert.assertEquals(DistCpOptions.maxNumListstatusThreads,
Assert.assertEquals(DistCpOptions.MAX_NUM_LISTSTATUS_THREADS,
options.getNumListstatusThreads());
}
@ -382,25 +381,6 @@ public class TestOptionsParser {
} catch (IllegalArgumentException ignore) {}
}
@Test
public void testToString() {
DistCpOptions option = new DistCpOptions(new Path("abc"), new Path("xyz"));
String val = "DistCpOptions{atomicCommit=false, syncFolder=false, "
+ "deleteMissing=false, ignoreFailures=false, overwrite=false, "
+ "append=false, useDiff=false, useRdiff=false, "
+ "fromSnapshot=null, toSnapshot=null, "
+ "skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, "
+ "mapBandwidth=0.0, "
+ "copyStrategy='uniformsize', preserveStatus=[], "
+ "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, "
+ "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, "
+ "targetPathExists=true, filtersFile='null', blocksPerChunk=0}";
String optionString = option.toString();
Assert.assertEquals(val, optionString);
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
DistCpOptionSwitch.ATOMIC_COMMIT.name());
}
@Test
public void testCopyStrategy() {
DistCpOptions options = OptionsParser.parse(new String[] {
@ -529,13 +509,8 @@ public class TestOptionsParser {
"-f",
"hdfs://localhost:9820/source/first",
"hdfs://localhost:9820/target/"});
int i = 0;
Iterator<FileAttribute> attribIterator = options.preserveAttributes();
while (attribIterator.hasNext()) {
attribIterator.next();
i++;
}
Assert.assertEquals(i, DistCpOptionSwitch.PRESERVE_STATUS_DEFAULT.length() - 2);
Assert.assertEquals(DistCpOptionSwitch.PRESERVE_STATUS_DEFAULT.length() - 2,
options.getPreserveAttributes().size());
try {
OptionsParser.parse(new String[] {
@ -545,19 +520,18 @@ public class TestOptionsParser {
"hdfs://localhost:9820/target"});
Assert.fail("Invalid preserve attribute");
}
catch (IllegalArgumentException ignore) {}
catch (NoSuchElementException ignore) {}
options = OptionsParser.parse(new String[] {
"-f",
"hdfs://localhost:9820/source/first",
"hdfs://localhost:9820/target/"});
Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
options.preserve(FileAttribute.PERMISSION);
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
Builder builder = new DistCpOptions.Builder(
new Path("hdfs://localhost:9820/source/first"),
new Path("hdfs://localhost:9820/target/"));
Assert.assertFalse(
builder.build().shouldPreserve(FileAttribute.PERMISSION));
builder.preserve(FileAttribute.PERMISSION);
Assert.assertTrue(builder.build().shouldPreserve(FileAttribute.PERMISSION));
options.preserve(FileAttribute.PERMISSION);
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
builder.preserve(FileAttribute.PERMISSION);
Assert.assertTrue(builder.build().shouldPreserve(FileAttribute.PERMISSION));
}
@Test
@ -756,28 +730,25 @@ public class TestOptionsParser {
}
try {
options = OptionsParser.parse(new String[] {
optionStr, "s1", "s2", "-update", "-delete",
OptionsParser.parse(new String[] {
"-diff", "s1", "s2", "-update", "-delete",
"hdfs://localhost:9820/source/first",
"hdfs://localhost:9820/target/" });
assertFalse("-delete should be ignored when "
+ optionStr + " is specified",
options.shouldDeleteMissing());
fail("Should fail as -delete and -diff/-rdiff are mutually exclusive");
} catch (IllegalArgumentException e) {
fail("Got unexpected IllegalArgumentException: " + e.getMessage());
assertExceptionContains(
"-delete and -diff/-rdiff are mutually exclusive", e);
}
try {
options = OptionsParser.parse(new String[] {
optionStr, "s1", "s2", "-delete",
OptionsParser.parse(new String[] {
"-diff", "s1", "s2", "-delete",
"hdfs://localhost:9820/source/first",
"hdfs://localhost:9820/target/" });
fail(optionStr + " should fail if -update option is not specified");
fail("Should fail as -delete and -diff/-rdiff are mutually exclusive");
} catch (IllegalArgumentException e) {
assertFalse("-delete should be ignored when -diff is specified",
options.shouldDeleteMissing());
GenericTestUtils.assertExceptionContains(
"-diff/-rdiff is valid only with -update option", e);
assertExceptionContains(
"-delete and -diff/-rdiff are mutually exclusive", e);
}
try {
@ -785,10 +756,10 @@ public class TestOptionsParser {
"-delete", "-overwrite",
"hdfs://localhost:9820/source/first",
"hdfs://localhost:9820/target/" });
fail(optionStr + " should fail if -update option is not specified");
fail("Should fail as -delete and -diff are mutually exclusive");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(
"-diff/-rdiff is valid only with -update option", e);
assertExceptionContains(
"-delete and -diff/-rdiff are mutually exclusive", e);
}
final String optionStrOther = isDiff? "-rdiff" : "-diff";

View File

@ -19,9 +19,8 @@
package org.apache.hadoop.tools.contract;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.junit.Assert.*;
import java.util.Arrays;
import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -184,7 +183,8 @@ public abstract class AbstractContractDistCpTest
* @throws Exception if there is a failure
*/
private void runDistCp(Path src, Path dst) throws Exception {
DistCpOptions options = new DistCpOptions(Arrays.asList(src), dst);
DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(src), dst).build();
Job job = new DistCp(conf, options).execute();
assertNotNull("Unexpected null job returned from DistCp execution.", job);
assertTrue("DistCp job did not complete.", job.isComplete());

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.tools.CopyListing;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpContext;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.GlobbedCopyListing;
@ -146,15 +147,16 @@ public class TestCopyCommitter {
sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm);
targetBase = TestDistCpUtils.createTestSetup(fs, initialPerm);
DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
new Path("/out"));
options.preserve(FileAttribute.PERMISSION);
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path("/out"))
.preserve(FileAttribute.PERMISSION).build();
options.appendToConf(conf);
options.setTargetPathExists(false);
final DistCpContext context = new DistCpContext(options);
context.setTargetPathExists(false);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, options);
listing.buildListing(listingFile, context);
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
@ -197,15 +199,15 @@ public class TestCopyCommitter {
String targetBaseAdd = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
fs.rename(new Path(targetBaseAdd), new Path(targetBase));
DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
new Path("/out"));
options.setSyncFolder(true);
options.setDeleteMissing(true);
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path("/out"))
.withSyncFolder(true).withDeleteMissing(true).build();
options.appendToConf(conf);
final DistCpContext context = new DistCpContext(options);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, options);
listing.buildListing(listingFile, context);
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
@ -266,15 +268,15 @@ public class TestCopyCommitter {
TestDistCpUtils.createFile(fs, targetBase + "/9");
TestDistCpUtils.createFile(fs, targetBase + "/A");
DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
new Path("/out"));
options.setSyncFolder(true);
options.setDeleteMissing(true);
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path("/out"))
.withSyncFolder(true).withDeleteMissing(true).build();
options.appendToConf(conf);
final DistCpContext context = new DistCpContext(options);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, options);
listing.buildListing(listingFile, context);
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.tools.CopyListing;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpContext;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.StubContext;
import org.apache.hadoop.security.Credentials;
@ -74,9 +75,9 @@ public class TestUniformSizeInputFormat {
List<Path> sourceList = new ArrayList<Path>();
sourceList.add(sourcePath);
final DistCpOptions distCpOptions = new DistCpOptions(sourceList, targetPath);
distCpOptions.setMaxMaps(nMaps);
return distCpOptions;
return new DistCpOptions.Builder(sourceList, targetPath)
.maxMaps(nMaps)
.build();
}
private static int createFile(String path, int fileSize) throws Exception {
@ -100,14 +101,14 @@ public class TestUniformSizeInputFormat {
}
public void testGetSplits(int nMaps) throws Exception {
DistCpOptions options = getOptions(nMaps);
DistCpContext context = new DistCpContext(getOptions(nMaps));
Configuration configuration = new Configuration();
configuration.set("mapred.map.tasks",
String.valueOf(options.getMaxMaps()));
String.valueOf(context.getMaxMaps()));
Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+ "/tmp/testGetSplits_1/fileList.seq");
CopyListing.getCopyListing(configuration, CREDENTIALS, options).
buildListing(listFile, options);
CopyListing.getCopyListing(configuration, CREDENTIALS, context)
.buildListing(listFile, context);
JobContext jobContext = new JobContextImpl(configuration, new JobID());
UniformSizeInputFormat uniformSizeInputFormat = new UniformSizeInputFormat();

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.tools.mapred.lib;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpContext;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -84,9 +85,9 @@ public class TestDynamicInputFormat {
List<Path> sourceList = new ArrayList<Path>();
sourceList.add(sourcePath);
DistCpOptions options = new DistCpOptions(sourceList, targetPath);
options.setMaxMaps(NUM_SPLITS);
return options;
return new DistCpOptions.Builder(sourceList, targetPath)
.maxMaps(NUM_SPLITS)
.build();
}
private static void createFile(String path) throws Exception {
@ -110,13 +111,13 @@ public class TestDynamicInputFormat {
@Test
public void testGetSplits() throws Exception {
DistCpOptions options = getOptions();
final DistCpContext context = new DistCpContext(getOptions());
Configuration configuration = new Configuration();
configuration.set("mapred.map.tasks",
String.valueOf(options.getMaxMaps()));
CopyListing.getCopyListing(configuration, CREDENTIALS, options).buildListing(
new Path(cluster.getFileSystem().getUri().toString()
+"/tmp/testDynInputFormat/fileList.seq"), options);
String.valueOf(context.getMaxMaps()));
CopyListing.getCopyListing(configuration, CREDENTIALS, context)
.buildListing(new Path(cluster.getFileSystem().getUri().toString()
+"/tmp/testDynInputFormat/fileList.seq"), context);
JobContext jobContext = new JobContextImpl(configuration, new JobID());
DynamicInputFormat<Text, CopyListingFileStatus> inputFormat =