HADOOP-10459. distcp V2 doesn't preserve root dir's attributes when -p is specified. Contributed by Yongjun Zhang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1584227 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2014-04-03 00:32:25 +00:00
parent 5b3481a750
commit 640a097533
16 changed files with 400 additions and 78 deletions

View File

@ -338,6 +338,9 @@ Release 2.5.0 - UNRELEASED
HADOOP-10414. Incorrect property name for RefreshUserMappingProtocol in
hadoop-policy.xml. (Joey Echeverria via atm)
HADOOP-10459. distcp V2 doesn't preserve root dir's attributes when -p is
specified. (Yongjun Zhang via atm)
Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -40,6 +40,8 @@ import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Random;
import com.google.common.annotations.VisibleForTesting;
/**
* DistCp is the main driver-class for DistCpV2.
* For command-line use, DistCp::main() orchestrates the parsing of command-line
@ -87,7 +89,8 @@ public class DistCp extends Configured implements Tool {
/**
* To be used with the ToolRunner. Not for public consumption.
*/
private DistCp() {}
@VisibleForTesting
public DistCp() {}
/**
* Implementation of Tool::run(). Orchestrates the copy of source file(s)
@ -105,7 +108,7 @@ public class DistCp extends Configured implements Tool {
try {
inputOptions = (OptionsParser.parse(argv));
setTargetPathExists();
LOG.info("Input Options: " + inputOptions);
} catch (Throwable e) {
LOG.error("Invalid arguments: ", e);
@ -169,6 +172,18 @@ public class DistCp extends Configured implements Tool {
return job;
}
/**
* Set targetPathExists in both inputOptions and job config,
* for the benefit of CopyCommitter
*/
private void setTargetPathExists() throws IOException {
Path target = inputOptions.getTargetPath();
FileSystem targetFS = target.getFileSystem(getConf());
boolean targetExists = targetFS.exists(target);
inputOptions.setTargetPathExists(targetExists);
getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS,
targetExists);
}
/**
* Create Job object for submitting it, with all the configuration
*

View File

@ -74,6 +74,9 @@ public class DistCpConstants {
*/
public static final String CONF_LABEL_TARGET_FINAL_PATH = "distcp.target.final.path";
/* Boolean to indicate whether the target of distcp exists. */
public static final String CONF_LABEL_TARGET_PATH_EXISTS = "distcp.target.path.exists";
/**
* DistCp job id for consumers of the Disctp
*/

View File

@ -60,6 +60,10 @@ public class DistCpOptions {
private Path targetPath;
// targetPathExist is a derived field, it's initialized in the
// beginning of distcp.
private boolean targetPathExists = true;
public static enum FileAttribute{
REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE;
@ -123,6 +127,7 @@ public class DistCpOptions {
this.sourceFileListing = that.getSourceFileListing();
this.sourcePaths = that.getSourcePaths();
this.targetPath = that.getTargetPath();
this.targetPathExists = that.getTargetPathExists();
}
}
@ -439,6 +444,22 @@ public class DistCpOptions {
return targetPath;
}
/**
* Getter for the targetPathExists.
* @return The target-path.
*/
public boolean getTargetPathExists() {
return targetPathExists;
}
/**
* Set targetPathExists.
* @param targetPathExists Whether the target path of distcp exists.
*/
public boolean setTargetPathExists(boolean targetPathExists) {
return this.targetPathExists = targetPathExists;
}
public void validate(DistCpOptionSwitch option, boolean value) {
boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
@ -515,6 +536,7 @@ public class DistCpOptions {
", sourceFileListing=" + sourceFileListing +
", sourcePaths=" + sourcePaths +
", targetPath=" + targetPath +
", targetPathExists=" + targetPathExists +
'}';
}

View File

@ -111,7 +111,24 @@ public class SimpleCopyListing extends CopyListing {
public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException {
doBuildListing(getWriter(pathToListingFile), options);
}
/**
* Collect the list of
* <sourceRelativePath, sourceFileStatus>
* to be copied and write to the sequence file. In essence, any file or
* directory that need to be copied or sync-ed is written as an entry to the
* sequence file, with the possible exception of the source root:
* when either -update (sync) or -overwrite switch is specified, and if
* the the source root is a directory, then the source root entry is not
* written to the sequence file, because only the contents of the source
* directory need to be copied in this case.
* See {@link org.apache.hadoop.tools.util.DistCpUtils.getRelativePath} for
* how relative path is computed.
* See computeSourceRootPath method for how the root path of the source is
* computed.
* @param fileListWriter
* @param options
* @throws IOException
*/
@VisibleForTesting
public void doBuildListing(SequenceFile.Writer fileListWriter,
DistCpOptions options) throws IOException {
@ -125,7 +142,12 @@ public class SimpleCopyListing extends CopyListing {
boolean localFile = (rootStatus.getClass() != FileStatus.class);
FileStatus[] sourceFiles = sourceFS.listStatus(path);
if (sourceFiles != null && sourceFiles.length > 0) {
boolean explore = (sourceFiles != null && sourceFiles.length > 0);
if (!explore || rootStatus.isDirectory()) {
writeToFileListingRoot(fileListWriter, rootStatus, sourcePathRoot,
localFile, options);
}
if (explore) {
for (FileStatus sourceStatus: sourceFiles) {
if (LOG.isDebugEnabled()) {
LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
@ -141,9 +163,6 @@ public class SimpleCopyListing extends CopyListing {
localFile, options);
}
}
} else {
writeToFileListing(fileListWriter, rootStatus, sourcePathRoot,
localFile, options);
}
}
fileListWriter.close();
@ -158,18 +177,19 @@ public class SimpleCopyListing extends CopyListing {
Path target = options.getTargetPath();
FileSystem targetFS = target.getFileSystem(getConf());
final boolean targetPathExists = options.getTargetPathExists();
boolean solitaryFile = options.getSourcePaths().size() == 1
&& !sourceStatus.isDirectory();
if (solitaryFile) {
if (targetFS.isFile(target) || !targetFS.exists(target)) {
if (targetFS.isFile(target) || !targetPathExists) {
return sourceStatus.getPath();
} else {
return sourceStatus.getPath().getParent();
}
} else {
boolean specialHandling = (options.getSourcePaths().size() == 1 && !targetFS.exists(target)) ||
boolean specialHandling = (options.getSourcePaths().size() == 1 && !targetPathExists) ||
options.shouldSyncFolder() || options.shouldOverwrite();
return specialHandling && sourceStatus.isDirectory() ? sourceStatus.getPath() :
@ -253,15 +273,30 @@ public class SimpleCopyListing extends CopyListing {
}
}
}
private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
FileStatus fileStatus, Path sourcePathRoot,
boolean localFile,
DistCpOptions options) throws IOException {
boolean syncOrOverwrite = options.shouldSyncFolder() ||
options.shouldOverwrite();
if (fileStatus.getPath().equals(sourcePathRoot) &&
fileStatus.isDirectory() && syncOrOverwrite) {
// Skip the root-paths when syncOrOverwrite
if (LOG.isDebugEnabled()) {
LOG.debug("Skip " + fileStatus.getPath());
}
return;
}
writeToFileListing(fileListWriter, fileStatus, sourcePathRoot, localFile,
options);
}
private void writeToFileListing(SequenceFile.Writer fileListWriter,
FileStatus fileStatus,
Path sourcePathRoot,
boolean localFile,
DistCpOptions options) throws IOException {
if (fileStatus.getPath().equals(sourcePathRoot) && fileStatus.isDirectory())
return; // Skip the root-paths.
if (LOG.isDebugEnabled()) {
LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());

View File

@ -54,7 +54,10 @@ public class CopyCommitter extends FileOutputCommitter {
private static final Log LOG = LogFactory.getLog(CopyCommitter.class);
private final TaskAttemptContext taskAttemptContext;
private boolean syncFolder = false;
private boolean overwrite = false;
private boolean targetPathExists = true;
/**
* Create a output committer
*
@ -71,6 +74,10 @@ public class CopyCommitter extends FileOutputCommitter {
@Override
public void commitJob(JobContext jobContext) throws IOException {
Configuration conf = jobContext.getConfiguration();
syncFolder = conf.getBoolean(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, false);
overwrite = conf.getBoolean(DistCpConstants.CONF_LABEL_OVERWRITE, false);
targetPathExists = conf.getBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true);
super.commitJob(jobContext);
cleanupTempFiles(jobContext);
@ -155,6 +162,8 @@ public class CopyCommitter extends FileOutputCommitter {
// user/group permissions, etc.) based on the corresponding source directories.
private void preserveFileAttributesForDirectories(Configuration conf) throws IOException {
String attrSymbols = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
final boolean syncOrOverwrite = syncFolder || overwrite;
LOG.info("About to preserve attributes: " + attrSymbols);
EnumSet<FileAttribute> attributes = DistCpUtils.unpackAttributes(attrSymbols);
@ -179,12 +188,10 @@ public class CopyCommitter extends FileOutputCommitter {
if (! srcFileStatus.isDirectory()) continue;
Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath);
// Skip the root folder.
// Status can't be preserved on root-folder. (E.g. multiple paths may
// be copied to a single target folder. Which source-attributes to use
// on the target is undefined.)
if (targetRoot.equals(targetFile)) continue;
//
// Skip the root folder when syncOrOverwrite is true.
//
if (targetRoot.equals(targetFile) && syncOrOverwrite) continue;
FileSystem targetFS = targetFile.getFileSystem(conf);
DistCpUtils.preserve(targetFS, targetFile, srcFileStatus, attributes);
@ -218,7 +225,14 @@ public class CopyCommitter extends FileOutputCommitter {
Path targetFinalPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
targets.add(targetFinalPath);
DistCpOptions options = new DistCpOptions(targets, new Path("/NONE"));
//
// Set up options to be the same from the CopyListing.buildListing's perspective,
// so to collect similar listings as when doing the copy
//
options.setOverwrite(overwrite);
options.setSyncFolder(syncFolder);
options.setTargetPathExists(targetPathExists);
target.buildListing(targetListing, options);
Path sortedTargetListing = DistCpUtils.sortListing(clusterFS, conf, targetListing);
long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen();

View File

@ -103,12 +103,14 @@ public class TestCopyListing extends SimpleCopyListing {
DistCpOptions options = new DistCpOptions(srcPaths, target);
Path listingFile = new Path("/tmp/list4");
listing.buildListing(listingFile, options);
Assert.assertEquals(listing.getNumberOfPaths(), 2);
Assert.assertEquals(listing.getNumberOfPaths(), 3);
SequenceFile.Reader reader = new SequenceFile.Reader(getConf(),
SequenceFile.Reader.file(listingFile));
FileStatus fileStatus = new FileStatus();
Text relativePath = new Text();
Assert.assertTrue(reader.next(relativePath, fileStatus));
Assert.assertEquals(relativePath.toString(), "/1");
Assert.assertTrue(reader.next(relativePath, fileStatus));
Assert.assertEquals(relativePath.toString(), "/1/file");
Assert.assertTrue(reader.next(relativePath, fileStatus));
Assert.assertEquals(relativePath.toString(), "/2");
@ -270,7 +272,8 @@ public class TestCopyListing extends SimpleCopyListing {
final Path listFile = new Path(testRoot, "/tmp/fileList.seq");
listing.buildListing(listFile, options);
reader = new SequenceFile.Reader(fs, listFile, getConf());
reader = new SequenceFile.Reader(getConf(), SequenceFile.Reader.file(listFile));
FileStatus fileStatus = new FileStatus();
Text relativePath = new Text();
Assert.assertTrue(reader.next(relativePath, fileStatus));

View File

@ -0,0 +1,204 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.util.ToolRunner;
/**
* A JUnit test for copying files recursively.
*/
public class TestDistCpSystem extends TestCase {
private static final String SRCDAT = "srcdat";
private static final String DSTDAT = "dstdat";
private class FileEntry {
String path;
boolean isDir;
public FileEntry(String path, boolean isDir) {
this.path = path;
this.isDir = isDir;
}
String getPath() { return path; }
boolean isDirectory() { return isDir; }
}
private void createFiles(FileSystem fs, String topdir,
FileEntry[] entries) throws IOException {
for (FileEntry entry : entries) {
Path newpath = new Path(topdir + "/" + entry.getPath());
if (entry.isDirectory()) {
fs.mkdirs(newpath);
} else {
OutputStream out = fs.create(newpath);
try {
out.write((topdir + "/" + entry).getBytes());
out.write("\n".getBytes());
} finally {
out.close();
}
}
}
}
private static FileStatus[] getFileStatus(FileSystem fs,
String topdir, FileEntry[] files) throws IOException {
Path root = new Path(topdir);
List<FileStatus> statuses = new ArrayList<FileStatus>();
for (int idx = 0; idx < files.length; ++idx) {
Path newpath = new Path(root, files[idx].getPath());
statuses.add(fs.getFileStatus(newpath));
}
return statuses.toArray(new FileStatus[statuses.size()]);
}
/** delete directory and everything underneath it.*/
private static void deldir(FileSystem fs, String topdir) throws IOException {
fs.delete(new Path(topdir), true);
}
private void testPreserveUserHelper(
FileEntry[] srcEntries,
FileEntry[] dstEntries,
boolean createSrcDir,
boolean createTgtDir,
boolean update) throws Exception {
Configuration conf = null;
MiniDFSCluster cluster = null;
try {
final String testRoot = "/testdir";
final String testSrcRel = SRCDAT;
final String testSrc = testRoot + "/" + testSrcRel;
final String testDstRel = DSTDAT;
final String testDst = testRoot + "/" + testDstRel;
conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
String nnUri = FileSystem.getDefaultUri(conf).toString();
FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
fs.mkdirs(new Path(testRoot));
if (createSrcDir) {
fs.mkdirs(new Path(testSrc));
}
if (createTgtDir) {
fs.mkdirs(new Path(testDst));
}
createFiles(fs, testRoot, srcEntries);
FileStatus[] srcstats = getFileStatus(fs, testRoot, srcEntries);
for(int i = 0; i < srcEntries.length; i++) {
fs.setOwner(srcstats[i].getPath(), "u" + i, null);
}
String[] args = update? new String[]{"-pu", "-update", nnUri+testSrc,
nnUri+testDst} : new String[]{"-pu", nnUri+testSrc, nnUri+testDst};
ToolRunner.run(conf, new DistCp(), args);
String realTgtPath = testDst;
if (!createTgtDir) {
realTgtPath = testRoot;
}
FileStatus[] dststat = getFileStatus(fs, realTgtPath, dstEntries);
for(int i = 0; i < dststat.length; i++) {
assertEquals("i=" + i, "u" + i, dststat[i].getOwner());
}
deldir(fs, testRoot);
} finally {
if (cluster != null) { cluster.shutdown(); }
}
}
public void testPreserveUseNonEmptyDir() throws Exception {
FileEntry[] srcfiles = {
new FileEntry(SRCDAT, true),
new FileEntry(SRCDAT + "/a", false),
new FileEntry(SRCDAT + "/b", true),
new FileEntry(SRCDAT + "/b/c", false)
};
FileEntry[] dstfiles = {
new FileEntry(DSTDAT, true),
new FileEntry(DSTDAT + "/a", false),
new FileEntry(DSTDAT + "/b", true),
new FileEntry(DSTDAT + "/b/c", false)
};
testPreserveUserHelper(srcfiles, srcfiles, false, true, false);
testPreserveUserHelper(srcfiles, dstfiles, false, false, false);
}
public void testPreserveUserEmptyDir() throws Exception {
FileEntry[] srcfiles = {
new FileEntry(SRCDAT, true)
};
FileEntry[] dstfiles = {
new FileEntry(DSTDAT, true)
};
testPreserveUserHelper(srcfiles, srcfiles, false, true, false);
testPreserveUserHelper(srcfiles, dstfiles, false, false, false);
}
public void testPreserveUserSingleFile() throws Exception {
FileEntry[] srcfiles = {
new FileEntry(SRCDAT, false)
};
FileEntry[] dstfiles = {
new FileEntry(DSTDAT, false)
};
testPreserveUserHelper(srcfiles, srcfiles, false, true, false);
testPreserveUserHelper(srcfiles, dstfiles, false, false, false);
}
public void testPreserveUserNonEmptyDirWithUpdate() throws Exception {
FileEntry[] srcfiles = {
new FileEntry(SRCDAT + "/a", false),
new FileEntry(SRCDAT + "/b", true),
new FileEntry(SRCDAT + "/b/c", false)
};
FileEntry[] dstfiles = {
new FileEntry("a", false),
new FileEntry("b", true),
new FileEntry("b/c", false)
};
testPreserveUserHelper(srcfiles, dstfiles, true, true, true);
}
}

View File

@ -85,7 +85,7 @@ public class TestDistCpViewFs {
addEntries(listFile, "singlefile1/file1");
createFiles("singlefile1/file1");
runTest(listFile, target, sync);
runTest(listFile, target, false, sync);
checkResult(target, 1);
} catch (IOException e) {
@ -108,7 +108,7 @@ public class TestDistCpViewFs {
addEntries(listFile, "singlefile1/file1");
createFiles("singlefile1/file1", target.toString());
runTest(listFile, target, sync);
runTest(listFile, target, false, sync);
checkResult(target, 1);
} catch (IOException e) {
@ -132,7 +132,7 @@ public class TestDistCpViewFs {
createFiles("singlefile2/file2");
mkdirs(target.toString());
runTest(listFile, target, sync);
runTest(listFile, target, true, sync);
checkResult(target, 1, "file2");
} catch (IOException e) {
@ -155,7 +155,7 @@ public class TestDistCpViewFs {
addEntries(listFile, "singledir");
mkdirs(root + "/singledir/dir1");
runTest(listFile, target, sync);
runTest(listFile, target, false, sync);
checkResult(target, 1, "dir1");
} catch (IOException e) {
@ -174,7 +174,7 @@ public class TestDistCpViewFs {
mkdirs(root + "/singledir/dir1");
mkdirs(target.toString());
runTest(listFile, target, false);
runTest(listFile, target, true, false);
checkResult(target, 1, "singledir/dir1");
} catch (IOException e) {
@ -193,7 +193,7 @@ public class TestDistCpViewFs {
mkdirs(root + "/Usingledir/Udir1");
mkdirs(target.toString());
runTest(listFile, target, true);
runTest(listFile, target, true, true);
checkResult(target, 1, "Udir1");
} catch (IOException e) {
@ -217,7 +217,7 @@ public class TestDistCpViewFs {
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(target.toString());
runTest(listFile, target, sync);
runTest(listFile, target, true, sync);
checkResult(target, 3, "file3", "file4", "file5");
} catch (IOException e) {
@ -240,7 +240,7 @@ public class TestDistCpViewFs {
addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
runTest(listFile, target, sync);
runTest(listFile, target, false, sync);
checkResult(target, 3, "file3", "file4", "file5");
} catch (IOException e) {
@ -259,7 +259,7 @@ public class TestDistCpViewFs {
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(target.toString(), root + "/singledir/dir1");
runTest(listFile, target, false);
runTest(listFile, target, true, false);
checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5", "singledir/dir1");
} catch (IOException e) {
@ -278,7 +278,7 @@ public class TestDistCpViewFs {
createFiles("Umultifile/Ufile3", "Umultifile/Ufile4", "Umultifile/Ufile5");
mkdirs(target.toString(), root + "/Usingledir/Udir1");
runTest(listFile, target, true);
runTest(listFile, target, true, true);
checkResult(target, 4, "Ufile3", "Ufile4", "Ufile5", "Udir1");
} catch (IOException e) {
@ -297,7 +297,7 @@ public class TestDistCpViewFs {
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(root + "/singledir/dir1");
runTest(listFile, target, false);
runTest(listFile, target, false, false);
checkResult(target, 2, "multifile/file3", "multifile/file4",
"multifile/file5", "singledir/dir1");
@ -317,7 +317,7 @@ public class TestDistCpViewFs {
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(root + "/singledir/dir1");
runTest(listFile, target, true);
runTest(listFile, target, false, true);
checkResult(target, 4, "file3", "file4", "file5", "dir1");
} catch (IOException e) {
@ -338,7 +338,7 @@ public class TestDistCpViewFs {
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
createFiles("singledir/dir2/file6");
runTest(listFile, target, false);
runTest(listFile, target, false, false);
checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5",
"singledir/dir2/file6");
@ -361,7 +361,7 @@ public class TestDistCpViewFs {
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
createFiles("singledir/dir2/file6");
runTest(listFile, target, true);
runTest(listFile, target, false, true);
checkResult(target, 4, "file3", "file4", "file5", "dir2/file6");
} catch (IOException e) {
@ -384,7 +384,7 @@ public class TestDistCpViewFs {
createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
"singledir1/dir3/file9");
runTest(listFile, target, false);
runTest(listFile, target, false, false);
checkResult(target, 4, "file3", "file4", "file5",
"dir3/file7", "dir3/file8", "dir3/file9");
@ -408,7 +408,7 @@ public class TestDistCpViewFs {
createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
"singledir1/dir3/file9");
runTest(listFile, target, true);
runTest(listFile, target, false, true);
checkResult(target, 6, "file3", "file4", "file5",
"file7", "file8", "file9");
@ -460,9 +460,11 @@ public class TestDistCpViewFs {
}
}
private void runTest(Path listFile, Path target, boolean sync) throws IOException {
private void runTest(Path listFile, Path target, boolean targetExists,
boolean sync) throws IOException {
DistCpOptions options = new DistCpOptions(listFile, target);
options.setSyncFolder(sync);
options.setTargetPathExists(targetExists);
try {
new DistCp(getConf(), options).execute();
} catch (Exception e) {

View File

@ -112,7 +112,7 @@ public class TestFileBasedCopyListing {
addEntries(listFile, "/tmp/singlefile1/file1");
createFiles("/tmp/singlefile1/file1");
runTest(listFile, target, sync);
runTest(listFile, target, false, sync);
checkResult(listFile, 0);
} catch (IOException e) {
@ -138,7 +138,7 @@ public class TestFileBasedCopyListing {
addEntries(listFile, "/tmp/singlefile1/file1");
createFiles("/tmp/singlefile1/file1", target.toString());
runTest(listFile, target, sync);
runTest(listFile, target, false, sync);
checkResult(listFile, 0);
} catch (IOException e) {
@ -165,7 +165,7 @@ public class TestFileBasedCopyListing {
createFiles("/tmp/singlefile2/file2");
mkdirs(target.toString());
runTest(listFile, target, sync);
runTest(listFile, target, true, sync);
checkResult(listFile, 1);
} catch (IOException e) {
@ -191,7 +191,7 @@ public class TestFileBasedCopyListing {
addEntries(listFile, "/tmp/singledir");
mkdirs("/tmp/singledir/dir1");
runTest(listFile, target, sync);
runTest(listFile, target, false, sync);
checkResult(listFile, 1);
} catch (IOException e) {
@ -213,7 +213,7 @@ public class TestFileBasedCopyListing {
mkdirs("/tmp/singledir/dir1");
mkdirs(target.toString());
runTest(listFile, target);
runTest(listFile, target, true);
checkResult(listFile, 1);
} catch (IOException e) {
@ -235,7 +235,7 @@ public class TestFileBasedCopyListing {
mkdirs("/tmp/Usingledir/Udir1");
mkdirs(target.toString());
runTest(listFile, target, true);
runTest(listFile, target, true, true);
checkResult(listFile, 1);
} catch (IOException e) {
@ -262,7 +262,7 @@ public class TestFileBasedCopyListing {
createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
mkdirs(target.toString());
runTest(listFile, target, sync);
runTest(listFile, target, true, sync);
checkResult(listFile, 3);
} catch (IOException e) {
@ -288,7 +288,7 @@ public class TestFileBasedCopyListing {
addEntries(listFile, "/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
runTest(listFile, target, sync);
runTest(listFile, target, false, sync);
checkResult(listFile, 3);
} catch (IOException e) {
@ -310,7 +310,7 @@ public class TestFileBasedCopyListing {
createFiles("/tmp/multifile/file3", "/tmp/multifile/file4", "/tmp/multifile/file5");
mkdirs(target.toString(), "/tmp/singledir/dir1");
runTest(listFile, target);
runTest(listFile, target, true);
checkResult(listFile, 4);
} catch (IOException e) {
@ -440,7 +440,7 @@ public class TestFileBasedCopyListing {
"/tmp/singledir1/dir3/file9");
mkdirs(target.toString());
runTest(listFile, target);
runTest(listFile, target, true);
checkResult(listFile, 6);
} catch (IOException e) {
@ -507,14 +507,17 @@ public class TestFileBasedCopyListing {
}
}
private void runTest(Path listFile, Path target) throws IOException {
runTest(listFile, target, true);
private void runTest(Path listFile, Path target,
boolean targetExists) throws IOException {
runTest(listFile, target, targetExists, true);
}
private void runTest(Path listFile, Path target, boolean sync) throws IOException {
private void runTest(Path listFile, Path target, boolean targetExists,
boolean sync) throws IOException {
CopyListing listing = new FileBasedCopyListing(config, CREDENTIALS);
DistCpOptions options = new DistCpOptions(listFile, target);
options.setSyncFolder(sync);
options.setTargetPathExists(targetExists);
listing.buildListing(listFile, options);
}
@ -530,6 +533,11 @@ public class TestFileBasedCopyListing {
Text relPath = new Text();
FileStatus fileStatus = new FileStatus();
while (reader.next(relPath, fileStatus)) {
if (fileStatus.isDirectory() && relPath.toString().equals("")) {
// ignore root with empty relPath, which is an entry to be
// used for preserving root attributes etc.
continue;
}
Assert.assertEquals(fileStatus.getPath().toUri().getPath(), map.get(relPath.toString()));
recCount++;
}

View File

@ -111,7 +111,7 @@ public class TestGlobbedCopyListing {
Path target = new Path(fileSystemPath.toString() + "/tmp/target");
Path listingPath = new Path(fileSystemPath.toString() + "/tmp/META/fileList.seq");
DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
options.setTargetPathExists(false);
new GlobbedCopyListing(new Configuration(), CREDENTIALS).buildListing(listingPath, options);
verifyContents(listingPath);
@ -124,6 +124,11 @@ public class TestGlobbedCopyListing {
FileStatus value = new FileStatus();
Map<String, String> actualValues = new HashMap<String, String>();
while (reader.next(key, value)) {
if (value.isDirectory() && key.toString().equals("")) {
// ignore root with empty relPath, which is an entry to be
// used for preserving root attributes etc.
continue;
}
actualValues.put(value.getPath().toString(), key.toString());
}

View File

@ -82,7 +82,7 @@ public class TestIntegration {
addEntries(listFile, "singlefile1/file1");
createFiles("singlefile1/file1");
runTest(listFile, target, sync);
runTest(listFile, target, false, sync);
checkResult(target, 1);
} catch (IOException e) {
@ -105,7 +105,7 @@ public class TestIntegration {
addEntries(listFile, "singlefile1/file1");
createFiles("singlefile1/file1", "target");
runTest(listFile, target, sync);
runTest(listFile, target, false, sync);
checkResult(target, 1);
} catch (IOException e) {
@ -129,7 +129,7 @@ public class TestIntegration {
createFiles("singlefile2/file2");
mkdirs(target.toString());
runTest(listFile, target, sync);
runTest(listFile, target, true, sync);
checkResult(target, 1, "file2");
} catch (IOException e) {
@ -152,7 +152,7 @@ public class TestIntegration {
addEntries(listFile, "singledir");
mkdirs(root + "/singledir/dir1");
runTest(listFile, target, sync);
runTest(listFile, target, false, sync);
checkResult(target, 1, "dir1");
} catch (IOException e) {
@ -171,7 +171,7 @@ public class TestIntegration {
mkdirs(root + "/singledir/dir1");
mkdirs(target.toString());
runTest(listFile, target, false);
runTest(listFile, target, true, false);
checkResult(target, 1, "singledir/dir1");
} catch (IOException e) {
@ -190,7 +190,7 @@ public class TestIntegration {
mkdirs(root + "/Usingledir/Udir1");
mkdirs(target.toString());
runTest(listFile, target, true);
runTest(listFile, target, true, true);
checkResult(target, 1, "Udir1");
} catch (IOException e) {
@ -214,7 +214,7 @@ public class TestIntegration {
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(target.toString());
runTest(listFile, target, sync);
runTest(listFile, target, true, sync);
checkResult(target, 3, "file3", "file4", "file5");
} catch (IOException e) {
@ -286,7 +286,7 @@ public class TestIntegration {
addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
runTest(listFile, target, sync);
runTest(listFile, target, false, sync);
checkResult(target, 3, "file3", "file4", "file5");
} catch (IOException e) {
@ -305,7 +305,7 @@ public class TestIntegration {
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(target.toString(), root + "/singledir/dir1");
runTest(listFile, target, false);
runTest(listFile, target, true, false);
checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5", "singledir/dir1");
} catch (IOException e) {
@ -324,7 +324,7 @@ public class TestIntegration {
createFiles("Umultifile/Ufile3", "Umultifile/Ufile4", "Umultifile/Ufile5");
mkdirs(target.toString(), root + "/Usingledir/Udir1");
runTest(listFile, target, true);
runTest(listFile, target, true, true);
checkResult(target, 4, "Ufile3", "Ufile4", "Ufile5", "Udir1");
} catch (IOException e) {
@ -343,7 +343,7 @@ public class TestIntegration {
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(root + "/singledir/dir1");
runTest(listFile, target, false);
runTest(listFile, target, false, false);
checkResult(target, 2, "multifile/file3", "multifile/file4",
"multifile/file5", "singledir/dir1");
@ -363,7 +363,7 @@ public class TestIntegration {
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(root + "/singledir/dir1");
runTest(listFile, target, true);
runTest(listFile, target, false, true);
checkResult(target, 4, "file3", "file4", "file5", "dir1");
} catch (IOException e) {
@ -382,7 +382,7 @@ public class TestIntegration {
createFiles("srcdir/file1", "dstdir/file1", "dstdir/file2");
Path target = new Path(root + "/dstdir");
runTest(listFile, target, true, true, false);
runTest(listFile, target, false, true, true, false);
checkResult(target, 1, "file1");
} catch (IOException e) {
@ -406,7 +406,7 @@ public class TestIntegration {
createWithContents("dstdir/file1", contents2);
Path target = new Path(root + "/dstdir");
runTest(listFile, target, false, false, true);
runTest(listFile, target, false, false, false, true);
checkResult(target, 1, "file1");
@ -436,7 +436,7 @@ public class TestIntegration {
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
createFiles("singledir/dir2/file6");
runTest(listFile, target, false);
runTest(listFile, target, false, false);
checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5",
"singledir/dir2/file6");
@ -459,7 +459,7 @@ public class TestIntegration {
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
createFiles("singledir/dir2/file6");
runTest(listFile, target, true);
runTest(listFile, target, false, true);
checkResult(target, 4, "file3", "file4", "file5", "dir2/file6");
} catch (IOException e) {
@ -482,7 +482,7 @@ public class TestIntegration {
createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
"singledir1/dir3/file9");
runTest(listFile, target, false);
runTest(listFile, target, false, false);
checkResult(target, 4, "file3", "file4", "file5",
"dir3/file7", "dir3/file8", "dir3/file9");
@ -506,7 +506,7 @@ public class TestIntegration {
createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
"singledir1/dir3/file9");
runTest(listFile, target, true);
runTest(listFile, target, false, true);
checkResult(target, 6, "file3", "file4", "file5",
"file7", "file8", "file9");
@ -584,16 +584,19 @@ public class TestIntegration {
}
}
private void runTest(Path listFile, Path target, boolean sync) throws IOException {
runTest(listFile, target, sync, false, false);
private void runTest(Path listFile, Path target, boolean targetExists,
boolean sync) throws IOException {
runTest(listFile, target, targetExists, sync, false, false);
}
private void runTest(Path listFile, Path target, boolean sync, boolean delete,
private void runTest(Path listFile, Path target, boolean targetExists,
boolean sync, boolean delete,
boolean overwrite) throws IOException {
DistCpOptions options = new DistCpOptions(listFile, target);
options.setSyncFolder(sync);
options.setDeleteMissing(delete);
options.setOverwrite(overwrite);
options.setTargetPathExists(targetExists);
try {
new DistCp(getConf(), options).execute();
} catch (Exception e) {

View File

@ -354,7 +354,7 @@ public class TestOptionsParser {
DistCpOptions option = new DistCpOptions(new Path("abc"), new Path("xyz"));
String val = "DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, " +
"ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', " +
"sourceFileListing=abc, sourcePaths=null, targetPath=xyz}";
"sourceFileListing=abc, sourcePaths=null, targetPath=xyz, targetPathExists=true}";
Assert.assertEquals(val, option.toString());
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
DistCpOptionSwitch.ATOMIC_COMMIT.name());

View File

@ -146,7 +146,8 @@ public class TestCopyCommitter {
new Path("/out"));
options.preserve(FileAttribute.PERMISSION);
options.appendToConf(conf);
options.setTargetPathExists(false);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, options);

View File

@ -133,7 +133,9 @@ public class TestUniformSizeInputFormat {
Path sourcePath = recordReader.getCurrentValue().getPath();
FileSystem fs = sourcePath.getFileSystem(configuration);
FileStatus fileStatus [] = fs.listStatus(sourcePath);
Assert.assertEquals(fileStatus.length, 1);
if (fileStatus.length > 1) {
continue;
}
currentSplitSize += fileStatus[0].getLen();
}
Assert.assertTrue(

View File

@ -59,7 +59,9 @@ public class TestDynamicInputFormat {
for (int i=0; i<N_FILES; ++i)
createFile("/tmp/source/" + String.valueOf(i));
FileSystem fileSystem = cluster.getFileSystem();
expectedFilePaths.add(fileSystem.listStatus(
new Path("/tmp/source/0"))[0].getPath().getParent().toString());
}
private static Configuration getConfigurationForCluster() {