diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5f5adf99a3e..b894b3455fa 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -30,6 +30,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5759. Remove unnecessary conf load in Limits (Sandy Ryza) + MAPREDUCE-5014. Extend Distcp to accept a custom CopyListing. + (Srikanth Sundarrajan via amareshwari) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java index 8965e0a54e8..52d59936816 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java @@ -30,6 +30,7 @@ import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.security.Credentials; import java.io.IOException; +import java.lang.reflect.Constructor; /** * The CopyListing abstraction is responsible for how the list of @@ -193,14 +194,34 @@ public abstract class CopyListing extends Configured { * @param credentials Credentials object on which the FS delegation tokens are cached * @param options The input Options, to help choose the appropriate CopyListing Implementation. * @return An instance of the appropriate CopyListing implementation. + * @throws java.io.IOException - Exception if any */ public static CopyListing getCopyListing(Configuration configuration, Credentials credentials, - DistCpOptions options) { - if (options.getSourceFileListing() == null) { - return new GlobbedCopyListing(configuration, credentials); - } else { - return new FileBasedCopyListing(configuration, credentials); + DistCpOptions options) + throws IOException { + + String copyListingClassName = configuration.get(DistCpConstants. + CONF_LABEL_COPY_LISTING_CLASS, ""); + Class copyListingClass; + try { + if (! copyListingClassName.isEmpty()) { + copyListingClass = configuration.getClass(DistCpConstants. + CONF_LABEL_COPY_LISTING_CLASS, GlobbedCopyListing.class, + CopyListing.class); + } else { + if (options.getSourceFileListing() == null) { + copyListingClass = GlobbedCopyListing.class; + } else { + copyListingClass = FileBasedCopyListing.class; + } + } + copyListingClassName = copyListingClass.getName(); + Constructor constructor = copyListingClass. + getDeclaredConstructor(Configuration.class, Credentials.class); + return constructor.newInstance(configuration, credentials); + } catch (Exception e) { + throw new IOException("Unable to instantiate " + copyListingClassName, e); } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index bf7086a4d44..06d71047dd0 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -320,7 +320,7 @@ public class DistCp extends Configured implements Tool { * @return Returns the path where the copy listing is created * @throws IOException - If any */ - private Path createInputFileListing(Job job) throws IOException { + protected Path createInputFileListing(Job job) throws IOException { Path fileListingPath = getFileListingPath(); CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(), job.getCredentials(), inputOptions); @@ -335,7 +335,7 @@ public class DistCp extends Configured implements Tool { * @return - Path where the copy listing file has to be saved * @throws IOException - Exception if any */ - private Path getFileListingPath() throws IOException { + protected Path getFileListingPath() throws IOException { String fileListPathStr = metaFolder + "/fileList.seq"; Path path = new Path(fileListPathStr); return new Path(path.toUri().normalize().toString()); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 705b7f925c4..09ff65495ac 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -82,6 +82,9 @@ public class DistCpConstants { /* Meta folder where the job's intermediate data is kept */ public static final String CONF_LABEL_META_FOLDER = "distcp.meta.folder"; + /* DistCp CopyListing class override param */ + public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class"; + /** * Conf label for SSL Trust-store location. */ diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index abd184517c9..c494995fb40 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -130,17 +130,20 @@ public class SimpleCopyListing extends CopyListing { if (LOG.isDebugEnabled()) { LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy."); } - writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot, localFile); + writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot, + localFile, options); if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) { if (LOG.isDebugEnabled()) { LOG.debug("Traversing non-empty source dir: " + sourceStatus.getPath()); } - traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, localFile); + traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, + localFile, options); } } } else { - writeToFileListing(fileListWriter, rootStatus, sourcePathRoot, localFile); + writeToFileListing(fileListWriter, rootStatus, sourcePathRoot, + localFile, options); } } fileListWriter.close(); @@ -174,6 +177,17 @@ public class SimpleCopyListing extends CopyListing { } } + /** + * Provide an option to skip copy of a path, Allows for exclusion + * of files such as {@link org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME} + * @param path - Path being considered for copy while building the file listing + * @param options - Input options passed during DistCp invocation + * @return - True if the path should be considered for copy, false otherwise + */ + protected boolean shouldCopy(Path path, DistCpOptions options) { + return true; + } + /** {@inheritDoc} */ @Override protected long getBytesToCopy() { @@ -215,7 +229,9 @@ public class SimpleCopyListing extends CopyListing { private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter, FileStatus sourceStatus, - Path sourcePathRoot, boolean localFile) + Path sourcePathRoot, + boolean localFile, + DistCpOptions options) throws IOException { FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf()); Stack pathStack = new Stack(); @@ -226,7 +242,8 @@ public class SimpleCopyListing extends CopyListing { if (LOG.isDebugEnabled()) LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy."); - writeToFileListing(fileListWriter, child, sourcePathRoot, localFile); + writeToFileListing(fileListWriter, child, sourcePathRoot, + localFile, options); if (isDirectoryAndNotEmpty(sourceFS, child)) { if (LOG.isDebugEnabled()) LOG.debug("Traversing non-empty source dir: " @@ -238,8 +255,10 @@ public class SimpleCopyListing extends CopyListing { } private void writeToFileListing(SequenceFile.Writer fileListWriter, - FileStatus fileStatus, Path sourcePathRoot, - boolean localFile) throws IOException { + FileStatus fileStatus, + Path sourcePathRoot, + boolean localFile, + DistCpOptions options) throws IOException { if (fileStatus.getPath().equals(sourcePathRoot) && fileStatus.isDirectory()) return; // Skip the root-paths. @@ -253,6 +272,10 @@ public class SimpleCopyListing extends CopyListing { status = getFileStatus(fileStatus); } + if (!shouldCopy(fileStatus.getPath(), options)) { + return; + } + fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot, fileStatus.getPath())), status); fileListWriter.sync(); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java index a932771baf7..11cf7821e39 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.tools.util.TestDistCpUtils; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.security.Credentials; @@ -82,7 +83,39 @@ public class TestCopyListing extends SimpleCopyListing { return 0; } - @Test + @Test(timeout=10000) + public void testSkipCopy() throws Exception { + SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS) { + @Override + protected boolean shouldCopy(Path path, DistCpOptions options) { + return !path.getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME); + } + }; + FileSystem fs = FileSystem.get(getConf()); + List srcPaths = new ArrayList(); + srcPaths.add(new Path("/tmp/in4/1")); + srcPaths.add(new Path("/tmp/in4/2")); + Path target = new Path("/tmp/out4/1"); + TestDistCpUtils.createFile(fs, "/tmp/in4/1/_SUCCESS"); + TestDistCpUtils.createFile(fs, "/tmp/in4/1/file"); + TestDistCpUtils.createFile(fs, "/tmp/in4/2"); + fs.mkdirs(target); + DistCpOptions options = new DistCpOptions(srcPaths, target); + Path listingFile = new Path("/tmp/list4"); + listing.buildListing(listingFile, options); + Assert.assertEquals(listing.getNumberOfPaths(), 2); + SequenceFile.Reader reader = new SequenceFile.Reader(getConf(), + SequenceFile.Reader.file(listingFile)); + FileStatus fileStatus = new FileStatus(); + Text relativePath = new Text(); + Assert.assertTrue(reader.next(relativePath, fileStatus)); + Assert.assertEquals(relativePath.toString(), "/1/file"); + Assert.assertTrue(reader.next(relativePath, fileStatus)); + Assert.assertEquals(relativePath.toString(), "/2"); + Assert.assertFalse(reader.next(relativePath, fileStatus)); + } + + @Test(timeout=10000) public void testMultipleSrcToFile() { FileSystem fs = null; try { @@ -127,7 +160,7 @@ public class TestCopyListing extends SimpleCopyListing { } } - @Test + @Test(timeout=10000) public void testDuplicates() { FileSystem fs = null; try { @@ -153,7 +186,7 @@ public class TestCopyListing extends SimpleCopyListing { } } - @Test + @Test(timeout=10000) public void testBuildListing() { FileSystem fs = null; try { @@ -209,7 +242,7 @@ public class TestCopyListing extends SimpleCopyListing { } } - @Test + @Test(timeout=10000) public void testBuildListingForSingleFile() { FileSystem fs = null; String testRootString = "/singleFileListing"; diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java index 6d9947703d8..c806b102339 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.JobSubmissionFiles; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.tools.util.TestDistCpUtils; import org.junit.Assert; import org.junit.BeforeClass; @@ -34,6 +35,7 @@ import org.junit.Test; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; public class TestIntegration { @@ -68,7 +70,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testSingleFileMissingTarget() { caseSingleFileMissingTarget(false); caseSingleFileMissingTarget(true); @@ -91,7 +93,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testSingleFileTargetFile() { caseSingleFileTargetFile(false); caseSingleFileTargetFile(true); @@ -114,7 +116,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testSingleFileTargetDir() { caseSingleFileTargetDir(false); caseSingleFileTargetDir(true); @@ -138,7 +140,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testSingleDirTargetMissing() { caseSingleDirTargetMissing(false); caseSingleDirTargetMissing(true); @@ -161,7 +163,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testSingleDirTargetPresent() { try { @@ -180,7 +182,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testUpdateSingleDirTargetPresent() { try { @@ -199,7 +201,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testMultiFileTargetPresent() { caseMultiFileTargetPresent(false); caseMultiFileTargetPresent(true); @@ -223,7 +225,56 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) + public void testCustomCopyListing() { + + try { + addEntries(listFile, "multifile1/file3", "multifile1/file4", "multifile1/file5"); + createFiles("multifile1/file3", "multifile1/file4", "multifile1/file5"); + mkdirs(target.toString()); + + Configuration conf = getConf(); + try { + conf.setClass(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS, + CustomCopyListing.class, CopyListing.class); + DistCpOptions options = new DistCpOptions(Arrays. + asList(new Path(root + "/" + "multifile1")), target); + options.setSyncFolder(true); + options.setDeleteMissing(false); + options.setOverwrite(false); + try { + new DistCp(conf, options).execute(); + } catch (Exception e) { + LOG.error("Exception encountered ", e); + throw new IOException(e); + } + } finally { + conf.unset(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS); + } + + checkResult(target, 2, "file4", "file5"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + private static class CustomCopyListing extends SimpleCopyListing { + + public CustomCopyListing(Configuration configuration, + Credentials credentials) { + super(configuration, credentials); + } + + @Override + protected boolean shouldCopy(Path path, DistCpOptions options) { + return !path.getName().equals("file3"); + } + } + + @Test(timeout=100000) public void testMultiFileTargetMissing() { caseMultiFileTargetMissing(false); caseMultiFileTargetMissing(true); @@ -246,7 +297,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testMultiDirTargetPresent() { try { @@ -265,7 +316,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testUpdateMultiDirTargetPresent() { try { @@ -284,7 +335,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testMultiDirTargetMissing() { try { @@ -304,7 +355,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testUpdateMultiDirTargetMissing() { try { @@ -323,7 +374,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testDeleteMissingInDestination() { try { @@ -343,7 +394,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testOverwrite() { byte[] contents1 = "contents1".getBytes(); byte[] contents2 = "contents2".getBytes(); @@ -375,7 +426,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testGlobTargetMissingSingleLevel() { try { @@ -398,7 +449,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testUpdateGlobTargetMissingSingleLevel() { try { @@ -420,7 +471,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testGlobTargetMissingMultiLevel() { try { @@ -444,7 +495,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testUpdateGlobTargetMissingMultiLevel() { try { @@ -468,7 +519,7 @@ public class TestIntegration { } } - @Test + @Test(timeout=100000) public void testCleanup() { try { Path sourcePath = new Path("noscheme:///file");