From 911ae15f69d460871436173e3431f3ddda83ed09 Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Mon, 19 Sep 2016 15:16:47 -0700 Subject: [PATCH] HADOOP-13169. Randomize file list in SimpleCopyListing. Contributed by Rajesh Balamohan. (cherry picked from commit 98bdb5139769eb55893971b43b9c23da9513a784) (cherry picked from commit e19f910245cd42c6caa88fea5930f446ab618c94) --- .../apache/hadoop/tools/DistCpConstants.java | 4 + .../hadoop/tools/SimpleCopyListing.java | 114 ++++++++++++++++-- .../apache/hadoop/tools/TestCopyListing.java | 83 ++++++++++++- 3 files changed, 189 insertions(+), 12 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 21dca628f78..525f4cec400 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -59,6 +59,10 @@ public class DistCpConstants { public static final String CONF_LABEL_APPEND = "distcp.copy.append"; public static final String CONF_LABEL_DIFF = "distcp.copy.diff"; public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb"; + public static final String CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE = + "distcp.simplelisting.file.status.size"; + public static final String CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES = + "distcp.simplelisting.randomize.files"; public static final String CONF_LABEL_FILTERS_FILE = "distcp.filters.file"; public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE = diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index 3f52203d32f..bc30aa16609 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -18,6 +18,7 @@ package org.apache.hadoop.tools; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -42,7 +43,10 @@ import com.google.common.annotations.VisibleForTesting; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Random; import static org.apache.hadoop.tools.DistCpConstants .HDFS_RESERVED_RAW_DIRECTORY_NAME; @@ -56,13 +60,19 @@ import static org.apache.hadoop.tools.DistCpConstants public class SimpleCopyListing extends CopyListing { private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class); + public static final int DEFAULT_FILE_STATUS_SIZE = 1000; + public static final boolean DEFAULT_RANDOMIZE_FILE_LISTING = true; + private long totalPaths = 0; private long totalDirs = 0; private long totalBytesToCopy = 0; private int numListstatusThreads = 1; + private final int fileStatusLimit; + private final boolean randomizeFileListing; private final int maxRetries = 3; private CopyFilter copyFilter; private DistCpSync distCpSync; + private final Random rnd = new Random(); /** * Protected constructor, to initialize configuration. @@ -76,6 +86,17 @@ public class SimpleCopyListing extends CopyListing { numListstatusThreads = getConf().getInt( DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS, DistCpConstants.DEFAULT_LISTSTATUS_THREADS); + fileStatusLimit = Math.max(1, getConf() + .getInt(DistCpConstants.CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE, + DEFAULT_FILE_STATUS_SIZE)); + randomizeFileListing = getConf().getBoolean( + DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, + DEFAULT_RANDOMIZE_FILE_LISTING); + if (LOG.isDebugEnabled()) { + LOG.debug("numListstatusThreads=" + numListstatusThreads + + ", fileStatusLimit=" + fileStatusLimit + + ", randomizeFileListing=" + randomizeFileListing); + } copyFilter = CopyFilter.getCopyFilter(getConf()); copyFilter.initialize(); } @@ -83,9 +104,13 @@ public class SimpleCopyListing extends CopyListing { @VisibleForTesting protected SimpleCopyListing(Configuration configuration, Credentials credentials, - int numListstatusThreads) { + int numListstatusThreads, + int fileStatusLimit, + boolean randomizeFileListing) { super(configuration, credentials); this.numListstatusThreads = numListstatusThreads; + this.fileStatusLimit = Math.max(1, fileStatusLimit); + this.randomizeFileListing = randomizeFileListing; } protected SimpleCopyListing(Configuration configuration, @@ -236,6 +261,7 @@ public class SimpleCopyListing extends CopyListing { FileSystem sourceFS = sourceRoot.getFileSystem(getConf()); try { + List fileStatuses = Lists.newArrayList(); for (DiffInfo diff : diffList) { // add snapshot paths prefix diff.target = new Path(options.getSourcePaths().get(0), diff.target); @@ -259,10 +285,13 @@ public class SimpleCopyListing extends CopyListing { sourceDirs.add(sourceStatus); traverseDirectory(fileListWriter, sourceFS, sourceDirs, - sourceRoot, options, excludeList); + sourceRoot, options, excludeList, fileStatuses); } } } + if (randomizeFileListing) { + writeToFileListing(fileStatuses, fileListWriter); + } fileListWriter.close(); fileListWriter = null; } finally { @@ -296,6 +325,7 @@ public class SimpleCopyListing extends CopyListing { } try { + List statusList = Lists.newArrayList(); for (Path path: options.getSourcePaths()) { FileSystem sourceFS = path.getFileSystem(getConf()); final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL); @@ -326,8 +356,14 @@ public class SimpleCopyListing extends CopyListing { preserveAcls && sourceStatus.isDirectory(), preserveXAttrs && sourceStatus.isDirectory(), preserveRawXAttrs && sourceStatus.isDirectory()); - writeToFileListing(fileListWriter, sourceCopyListingStatus, - sourcePathRoot); + if (randomizeFileListing) { + addToFileListing(statusList, + new FileStatusInfo(sourceCopyListingStatus, sourcePathRoot), + fileListWriter); + } else { + writeToFileListing(fileListWriter, sourceCopyListingStatus, + sourcePathRoot); + } if (sourceStatus.isDirectory()) { if (LOG.isDebugEnabled()) { @@ -337,9 +373,12 @@ public class SimpleCopyListing extends CopyListing { } } traverseDirectory(fileListWriter, sourceFS, sourceDirs, - sourcePathRoot, options, null); + sourcePathRoot, options, null, statusList); } } + if (randomizeFileListing) { + writeToFileListing(statusList, fileListWriter); + } fileListWriter.close(); printStats(); LOG.info("Build file listing completed."); @@ -349,6 +388,52 @@ public class SimpleCopyListing extends CopyListing { } } + private void addToFileListing(List fileStatusInfoList, + FileStatusInfo statusInfo, SequenceFile.Writer fileListWriter) + throws IOException { + fileStatusInfoList.add(statusInfo); + if (fileStatusInfoList.size() > fileStatusLimit) { + writeToFileListing(fileStatusInfoList, fileListWriter); + } + } + + @VisibleForTesting + void setSeedForRandomListing(long seed) { + this.rnd.setSeed(seed); + } + + private void writeToFileListing(List fileStatusInfoList, + SequenceFile.Writer fileListWriter) throws IOException { + /** + * In cloud storage systems, it is possible to get region hotspot. + * Shuffling paths can avoid such cases and also ensure that + * some mappers do not get lots of similar paths. + */ + Collections.shuffle(fileStatusInfoList, rnd); + for (FileStatusInfo fileStatusInfo : fileStatusInfoList) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + fileStatusInfo.fileStatus.getPath()); + } + writeToFileListing(fileListWriter, fileStatusInfo.fileStatus, + fileStatusInfo.sourceRootPath); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Number of paths written to fileListing=" + + fileStatusInfoList.size()); + } + fileStatusInfoList.clear(); + } + + private static class FileStatusInfo { + private CopyListingFileStatus fileStatus; + private Path sourceRootPath; + + FileStatusInfo(CopyListingFileStatus fileStatus, Path sourceRootPath) { + this.fileStatus = fileStatus; + this.sourceRootPath = sourceRootPath; + } + } + private Path computeSourceRootPath(FileStatus sourceStatus, DistCpOptions options) throws IOException { @@ -516,15 +601,18 @@ public class SimpleCopyListing extends CopyListing { ArrayList sourceDirs, Path sourcePathRoot, DistCpOptions options, - HashSet excludeList) + HashSet excludeList, + List fileStatuses) throws IOException { final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL); final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR); final boolean preserveRawXattrs = options.shouldPreserveRawXattrs(); assert numListstatusThreads > 0; - LOG.debug("Starting thread pool of " + numListstatusThreads + - " listStatus workers."); + if (LOG.isDebugEnabled()) { + LOG.debug("Starting thread pool of " + numListstatusThreads + + " listStatus workers."); + } ProducerConsumer workers = new ProducerConsumer(numListstatusThreads); for (int i = 0; i < numListstatusThreads; i++) { @@ -551,8 +639,14 @@ public class SimpleCopyListing extends CopyListing { preserveAcls && child.isDirectory(), preserveXAttrs && child.isDirectory(), preserveRawXattrs && child.isDirectory()); - writeToFileListing(fileListWriter, childCopyListingStatus, - sourcePathRoot); + if (randomizeFileListing) { + addToFileListing(fileStatuses, + new FileStatusInfo(childCopyListingStatus, sourcePathRoot), + fileListWriter); + } else { + writeToFileListing(fileListWriter, childCopyListingStatus, + sourcePathRoot); + } } if (retry < maxRetries) { if (child.isDirectory()) { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java index 896763d26e5..ea63e235138 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java @@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.tools.util.TestDistCpUtils; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.security.Credentials; @@ -46,7 +45,9 @@ import java.io.OutputStream; import java.util.Arrays; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Random; @RunWith(value = Parameterized.class) public class TestCopyListing extends SimpleCopyListing { @@ -77,7 +78,7 @@ public class TestCopyListing extends SimpleCopyListing { } public TestCopyListing(int numListstatusThreads) { - super(config, CREDENTIALS, numListstatusThreads); + super(config, CREDENTIALS, numListstatusThreads, 0, false); } protected TestCopyListing(Configuration configuration) { @@ -221,6 +222,84 @@ public class TestCopyListing extends SimpleCopyListing { } } + @Test(timeout=60000) + public void testWithRandomFileListing() throws IOException { + FileSystem fs = null; + try { + fs = FileSystem.get(getConf()); + List srcPaths = new ArrayList<>(); + List srcFiles = new ArrayList<>(); + Path target = new Path("/tmp/out/1"); + final int pathCount = 25; + for (int i = 0; i < pathCount; i++) { + Path p = new Path("/tmp", String.valueOf(i)); + srcPaths.add(p); + fs.mkdirs(p); + + Path fileName = new Path(p, i + ".txt"); + srcFiles.add(fileName); + try (OutputStream out = fs.create(fileName)) { + out.write(i); + } + } + + Path listingFile = new Path("/tmp/file"); + DistCpOptions options = new DistCpOptions(srcPaths, target); + options.setSyncFolder(true); + + // Check without randomizing files + getConf().setBoolean( + DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false); + SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS); + listing.buildListing(listingFile, options); + + Assert.assertEquals(listing.getNumberOfPaths(), pathCount); + validateFinalListing(listingFile, srcFiles); + fs.delete(listingFile, true); + + // Check with randomized file listing + getConf().setBoolean( + DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, true); + listing = new SimpleCopyListing(getConf(), CREDENTIALS); + + // Set the seed for randomness, so that it can be verified later + long seed = System.nanoTime(); + listing.setSeedForRandomListing(seed); + listing.buildListing(listingFile, options); + Assert.assertEquals(listing.getNumberOfPaths(), pathCount); + + // validate randomness + Collections.shuffle(srcFiles, new Random(seed)); + validateFinalListing(listingFile, srcFiles); + } finally { + TestDistCpUtils.delete(fs, "/tmp"); + } + } + + private void validateFinalListing(Path pathToListFile, List srcFiles) + throws IOException { + FileSystem fs = pathToListFile.getFileSystem(config); + + try (SequenceFile.Reader reader = new SequenceFile.Reader( + config, SequenceFile.Reader.file(pathToListFile))) { + CopyListingFileStatus currentVal = new CopyListingFileStatus(); + + Text currentKey = new Text(); + int idx = 0; + while (reader.next(currentKey)) { + reader.getCurrentValue(currentVal); + Assert.assertEquals("srcFiles.size=" + srcFiles.size() + + ", idx=" + idx, fs.makeQualified(srcFiles.get(idx)), + currentVal.getPath()); + if (LOG.isDebugEnabled()) { + LOG.debug("val=" + fs.makeQualified(srcFiles.get(idx))); + } + idx++; + } + } + } + + @Test(timeout=10000) public void testBuildListingForSingleFile() { FileSystem fs = null;