MAPREDUCE-5014. Extending DistCp through a custom CopyListing is not possible. (Contributed by Srikanth Sundarrajan)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1584232 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2014-04-03 00:49:52 +00:00
parent 660a0a0470
commit 71a7f2b72b
7 changed files with 171 additions and 37 deletions

View File

@ -30,6 +30,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5759. Remove unnecessary conf load in Limits (Sandy Ryza)
MAPREDUCE-5014. Extend Distcp to accept a custom CopyListing.
(Srikanth Sundarrajan via amareshwari)
Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.security.Credentials;
import java.io.IOException;
import java.lang.reflect.Constructor;
/**
* The CopyListing abstraction is responsible for how the list of
@ -193,14 +194,34 @@ public abstract class CopyListing extends Configured {
* @param credentials Credentials object on which the FS delegation tokens are cached
* @param options The input Options, to help choose the appropriate CopyListing Implementation.
* @return An instance of the appropriate CopyListing implementation.
* @throws java.io.IOException - Exception if any
*/
public static CopyListing getCopyListing(Configuration configuration,
Credentials credentials,
DistCpOptions options) {
if (options.getSourceFileListing() == null) {
return new GlobbedCopyListing(configuration, credentials);
DistCpOptions options)
throws IOException {
String copyListingClassName = configuration.get(DistCpConstants.
CONF_LABEL_COPY_LISTING_CLASS, "");
Class<? extends CopyListing> copyListingClass;
try {
if (! copyListingClassName.isEmpty()) {
copyListingClass = configuration.getClass(DistCpConstants.
CONF_LABEL_COPY_LISTING_CLASS, GlobbedCopyListing.class,
CopyListing.class);
} else {
return new FileBasedCopyListing(configuration, credentials);
if (options.getSourceFileListing() == null) {
copyListingClass = GlobbedCopyListing.class;
} else {
copyListingClass = FileBasedCopyListing.class;
}
}
copyListingClassName = copyListingClass.getName();
Constructor<? extends CopyListing> constructor = copyListingClass.
getDeclaredConstructor(Configuration.class, Credentials.class);
return constructor.newInstance(configuration, credentials);
} catch (Exception e) {
throw new IOException("Unable to instantiate " + copyListingClassName, e);
}
}

View File

@ -320,7 +320,7 @@ public class DistCp extends Configured implements Tool {
* @return Returns the path where the copy listing is created
* @throws IOException - If any
*/
private Path createInputFileListing(Job job) throws IOException {
protected Path createInputFileListing(Job job) throws IOException {
Path fileListingPath = getFileListingPath();
CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),
job.getCredentials(), inputOptions);
@ -335,7 +335,7 @@ public class DistCp extends Configured implements Tool {
* @return - Path where the copy listing file has to be saved
* @throws IOException - Exception if any
*/
private Path getFileListingPath() throws IOException {
protected Path getFileListingPath() throws IOException {
String fileListPathStr = metaFolder + "/fileList.seq";
Path path = new Path(fileListPathStr);
return new Path(path.toUri().normalize().toString());

View File

@ -82,6 +82,9 @@ public class DistCpConstants {
/* Meta folder where the job's intermediate data is kept */
public static final String CONF_LABEL_META_FOLDER = "distcp.meta.folder";
/* DistCp CopyListing class override param */
public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class";
/**
* Conf label for SSL Trust-store location.
*/

View File

@ -130,17 +130,20 @@ public class SimpleCopyListing extends CopyListing {
if (LOG.isDebugEnabled()) {
LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
}
writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot, localFile);
writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot,
localFile, options);
if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Traversing non-empty source dir: " + sourceStatus.getPath());
}
traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, localFile);
traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot,
localFile, options);
}
}
} else {
writeToFileListing(fileListWriter, rootStatus, sourcePathRoot, localFile);
writeToFileListing(fileListWriter, rootStatus, sourcePathRoot,
localFile, options);
}
}
fileListWriter.close();
@ -174,6 +177,17 @@ public class SimpleCopyListing extends CopyListing {
}
}
/**
* Provide an option to skip copy of a path, Allows for exclusion
* of files such as {@link org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME}
* @param path - Path being considered for copy while building the file listing
* @param options - Input options passed during DistCp invocation
* @return - True if the path should be considered for copy, false otherwise
*/
protected boolean shouldCopy(Path path, DistCpOptions options) {
return true;
}
/** {@inheritDoc} */
@Override
protected long getBytesToCopy() {
@ -215,7 +229,9 @@ public class SimpleCopyListing extends CopyListing {
private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
FileStatus sourceStatus,
Path sourcePathRoot, boolean localFile)
Path sourcePathRoot,
boolean localFile,
DistCpOptions options)
throws IOException {
FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
Stack<FileStatus> pathStack = new Stack<FileStatus>();
@ -226,7 +242,8 @@ public class SimpleCopyListing extends CopyListing {
if (LOG.isDebugEnabled())
LOG.debug("Recording source-path: "
+ sourceStatus.getPath() + " for copy.");
writeToFileListing(fileListWriter, child, sourcePathRoot, localFile);
writeToFileListing(fileListWriter, child, sourcePathRoot,
localFile, options);
if (isDirectoryAndNotEmpty(sourceFS, child)) {
if (LOG.isDebugEnabled())
LOG.debug("Traversing non-empty source dir: "
@ -238,8 +255,10 @@ public class SimpleCopyListing extends CopyListing {
}
private void writeToFileListing(SequenceFile.Writer fileListWriter,
FileStatus fileStatus, Path sourcePathRoot,
boolean localFile) throws IOException {
FileStatus fileStatus,
Path sourcePathRoot,
boolean localFile,
DistCpOptions options) throws IOException {
if (fileStatus.getPath().equals(sourcePathRoot) && fileStatus.isDirectory())
return; // Skip the root-paths.
@ -253,6 +272,10 @@ public class SimpleCopyListing extends CopyListing {
status = getFileStatus(fileStatus);
}
if (!shouldCopy(fileStatus.getPath(), options)) {
return;
}
fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath())), status);
fileListWriter.sync();

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.Credentials;
@ -82,7 +83,39 @@ public class TestCopyListing extends SimpleCopyListing {
return 0;
}
@Test
@Test(timeout=10000)
public void testSkipCopy() throws Exception {
SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS) {
@Override
protected boolean shouldCopy(Path path, DistCpOptions options) {
return !path.getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME);
}
};
FileSystem fs = FileSystem.get(getConf());
List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(new Path("/tmp/in4/1"));
srcPaths.add(new Path("/tmp/in4/2"));
Path target = new Path("/tmp/out4/1");
TestDistCpUtils.createFile(fs, "/tmp/in4/1/_SUCCESS");
TestDistCpUtils.createFile(fs, "/tmp/in4/1/file");
TestDistCpUtils.createFile(fs, "/tmp/in4/2");
fs.mkdirs(target);
DistCpOptions options = new DistCpOptions(srcPaths, target);
Path listingFile = new Path("/tmp/list4");
listing.buildListing(listingFile, options);
Assert.assertEquals(listing.getNumberOfPaths(), 2);
SequenceFile.Reader reader = new SequenceFile.Reader(getConf(),
SequenceFile.Reader.file(listingFile));
FileStatus fileStatus = new FileStatus();
Text relativePath = new Text();
Assert.assertTrue(reader.next(relativePath, fileStatus));
Assert.assertEquals(relativePath.toString(), "/1/file");
Assert.assertTrue(reader.next(relativePath, fileStatus));
Assert.assertEquals(relativePath.toString(), "/2");
Assert.assertFalse(reader.next(relativePath, fileStatus));
}
@Test(timeout=10000)
public void testMultipleSrcToFile() {
FileSystem fs = null;
try {
@ -127,7 +160,7 @@ public class TestCopyListing extends SimpleCopyListing {
}
}
@Test
@Test(timeout=10000)
public void testDuplicates() {
FileSystem fs = null;
try {
@ -153,7 +186,7 @@ public class TestCopyListing extends SimpleCopyListing {
}
}
@Test
@Test(timeout=10000)
public void testBuildListing() {
FileSystem fs = null;
try {
@ -209,7 +242,7 @@ public class TestCopyListing extends SimpleCopyListing {
}
}
@Test
@Test(timeout=10000)
public void testBuildListingForSingleFile() {
FileSystem fs = null;
String testRootString = "/singleFileListing";

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -34,6 +35,7 @@ import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class TestIntegration {
@ -68,7 +70,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testSingleFileMissingTarget() {
caseSingleFileMissingTarget(false);
caseSingleFileMissingTarget(true);
@ -91,7 +93,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testSingleFileTargetFile() {
caseSingleFileTargetFile(false);
caseSingleFileTargetFile(true);
@ -114,7 +116,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testSingleFileTargetDir() {
caseSingleFileTargetDir(false);
caseSingleFileTargetDir(true);
@ -138,7 +140,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testSingleDirTargetMissing() {
caseSingleDirTargetMissing(false);
caseSingleDirTargetMissing(true);
@ -161,7 +163,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testSingleDirTargetPresent() {
try {
@ -180,7 +182,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testUpdateSingleDirTargetPresent() {
try {
@ -199,7 +201,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testMultiFileTargetPresent() {
caseMultiFileTargetPresent(false);
caseMultiFileTargetPresent(true);
@ -223,7 +225,56 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testCustomCopyListing() {
try {
addEntries(listFile, "multifile1/file3", "multifile1/file4", "multifile1/file5");
createFiles("multifile1/file3", "multifile1/file4", "multifile1/file5");
mkdirs(target.toString());
Configuration conf = getConf();
try {
conf.setClass(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS,
CustomCopyListing.class, CopyListing.class);
DistCpOptions options = new DistCpOptions(Arrays.
asList(new Path(root + "/" + "multifile1")), target);
options.setSyncFolder(true);
options.setDeleteMissing(false);
options.setOverwrite(false);
try {
new DistCp(conf, options).execute();
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new IOException(e);
}
} finally {
conf.unset(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS);
}
checkResult(target, 2, "file4", "file5");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
private static class CustomCopyListing extends SimpleCopyListing {
public CustomCopyListing(Configuration configuration,
Credentials credentials) {
super(configuration, credentials);
}
@Override
protected boolean shouldCopy(Path path, DistCpOptions options) {
return !path.getName().equals("file3");
}
}
@Test(timeout=100000)
public void testMultiFileTargetMissing() {
caseMultiFileTargetMissing(false);
caseMultiFileTargetMissing(true);
@ -246,7 +297,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testMultiDirTargetPresent() {
try {
@ -265,7 +316,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testUpdateMultiDirTargetPresent() {
try {
@ -284,7 +335,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testMultiDirTargetMissing() {
try {
@ -304,7 +355,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testUpdateMultiDirTargetMissing() {
try {
@ -323,7 +374,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testDeleteMissingInDestination() {
try {
@ -343,7 +394,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testOverwrite() {
byte[] contents1 = "contents1".getBytes();
byte[] contents2 = "contents2".getBytes();
@ -375,7 +426,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testGlobTargetMissingSingleLevel() {
try {
@ -398,7 +449,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testUpdateGlobTargetMissingSingleLevel() {
try {
@ -420,7 +471,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testGlobTargetMissingMultiLevel() {
try {
@ -444,7 +495,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testUpdateGlobTargetMissingMultiLevel() {
try {
@ -468,7 +519,7 @@ public class TestIntegration {
}
}
@Test
@Test(timeout=100000)
public void testCleanup() {
try {
Path sourcePath = new Path("noscheme:///file");