HADOOP-13169. Randomize file list in SimpleCopyListing. Contributed by Rajesh Balamohan.

(cherry picked from commit 98bdb51397)
This commit is contained in:
Chris Nauroth 2016-09-19 15:16:47 -07:00
parent 035f5f8f1d
commit e19f910245
3 changed files with 189 additions and 12 deletions

View File

@ -59,6 +59,10 @@ public class DistCpConstants {
public static final String CONF_LABEL_APPEND = "distcp.copy.append";
public static final String CONF_LABEL_DIFF = "distcp.copy.diff";
public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
public static final String CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE =
"distcp.simplelisting.file.status.size";
public static final String CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES =
"distcp.simplelisting.randomize.files";
public static final String CONF_LABEL_FILTERS_FILE =
"distcp.filters.file";
public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.tools;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@ -42,7 +43,10 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import static org.apache.hadoop.tools.DistCpConstants
.HDFS_RESERVED_RAW_DIRECTORY_NAME;
@ -56,13 +60,19 @@ import static org.apache.hadoop.tools.DistCpConstants
public class SimpleCopyListing extends CopyListing {
private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class);
public static final int DEFAULT_FILE_STATUS_SIZE = 1000;
public static final boolean DEFAULT_RANDOMIZE_FILE_LISTING = true;
private long totalPaths = 0;
private long totalDirs = 0;
private long totalBytesToCopy = 0;
private int numListstatusThreads = 1;
private final int fileStatusLimit;
private final boolean randomizeFileListing;
private final int maxRetries = 3;
private CopyFilter copyFilter;
private DistCpSync distCpSync;
private final Random rnd = new Random();
/**
* Protected constructor, to initialize configuration.
@ -76,6 +86,17 @@ public class SimpleCopyListing extends CopyListing {
numListstatusThreads = getConf().getInt(
DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
DistCpConstants.DEFAULT_LISTSTATUS_THREADS);
fileStatusLimit = Math.max(1, getConf()
.getInt(DistCpConstants.CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE,
DEFAULT_FILE_STATUS_SIZE));
randomizeFileListing = getConf().getBoolean(
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES,
DEFAULT_RANDOMIZE_FILE_LISTING);
if (LOG.isDebugEnabled()) {
LOG.debug("numListstatusThreads=" + numListstatusThreads
+ ", fileStatusLimit=" + fileStatusLimit
+ ", randomizeFileListing=" + randomizeFileListing);
}
copyFilter = CopyFilter.getCopyFilter(getConf());
copyFilter.initialize();
}
@ -83,9 +104,13 @@ public class SimpleCopyListing extends CopyListing {
@VisibleForTesting
protected SimpleCopyListing(Configuration configuration,
Credentials credentials,
int numListstatusThreads) {
int numListstatusThreads,
int fileStatusLimit,
boolean randomizeFileListing) {
super(configuration, credentials);
this.numListstatusThreads = numListstatusThreads;
this.fileStatusLimit = Math.max(1, fileStatusLimit);
this.randomizeFileListing = randomizeFileListing;
}
protected SimpleCopyListing(Configuration configuration,
@ -236,6 +261,7 @@ public class SimpleCopyListing extends CopyListing {
FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
try {
List<FileStatusInfo> fileStatuses = Lists.newArrayList();
for (DiffInfo diff : diffList) {
// add snapshot paths prefix
diff.target = new Path(options.getSourcePaths().get(0), diff.target);
@ -259,10 +285,13 @@ public class SimpleCopyListing extends CopyListing {
sourceDirs.add(sourceStatus);
traverseDirectory(fileListWriter, sourceFS, sourceDirs,
sourceRoot, options, excludeList);
sourceRoot, options, excludeList, fileStatuses);
}
}
}
if (randomizeFileListing) {
writeToFileListing(fileStatuses, fileListWriter);
}
fileListWriter.close();
fileListWriter = null;
} finally {
@ -296,6 +325,7 @@ public class SimpleCopyListing extends CopyListing {
}
try {
List<FileStatusInfo> statusList = Lists.newArrayList();
for (Path path: options.getSourcePaths()) {
FileSystem sourceFS = path.getFileSystem(getConf());
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
@ -326,8 +356,14 @@ public class SimpleCopyListing extends CopyListing {
preserveAcls && sourceStatus.isDirectory(),
preserveXAttrs && sourceStatus.isDirectory(),
preserveRawXAttrs && sourceStatus.isDirectory());
writeToFileListing(fileListWriter, sourceCopyListingStatus,
sourcePathRoot);
if (randomizeFileListing) {
addToFileListing(statusList,
new FileStatusInfo(sourceCopyListingStatus, sourcePathRoot),
fileListWriter);
} else {
writeToFileListing(fileListWriter, sourceCopyListingStatus,
sourcePathRoot);
}
if (sourceStatus.isDirectory()) {
if (LOG.isDebugEnabled()) {
@ -337,9 +373,12 @@ public class SimpleCopyListing extends CopyListing {
}
}
traverseDirectory(fileListWriter, sourceFS, sourceDirs,
sourcePathRoot, options, null);
sourcePathRoot, options, null, statusList);
}
}
if (randomizeFileListing) {
writeToFileListing(statusList, fileListWriter);
}
fileListWriter.close();
printStats();
LOG.info("Build file listing completed.");
@ -349,6 +388,52 @@ public class SimpleCopyListing extends CopyListing {
}
}
private void addToFileListing(List<FileStatusInfo> fileStatusInfoList,
FileStatusInfo statusInfo, SequenceFile.Writer fileListWriter)
throws IOException {
fileStatusInfoList.add(statusInfo);
if (fileStatusInfoList.size() > fileStatusLimit) {
writeToFileListing(fileStatusInfoList, fileListWriter);
}
}
@VisibleForTesting
void setSeedForRandomListing(long seed) {
this.rnd.setSeed(seed);
}
private void writeToFileListing(List<FileStatusInfo> fileStatusInfoList,
SequenceFile.Writer fileListWriter) throws IOException {
/**
* In cloud storage systems, it is possible to get region hotspot.
* Shuffling paths can avoid such cases and also ensure that
* some mappers do not get lots of similar paths.
*/
Collections.shuffle(fileStatusInfoList, rnd);
for (FileStatusInfo fileStatusInfo : fileStatusInfoList) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + fileStatusInfo.fileStatus.getPath());
}
writeToFileListing(fileListWriter, fileStatusInfo.fileStatus,
fileStatusInfo.sourceRootPath);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Number of paths written to fileListing="
+ fileStatusInfoList.size());
}
fileStatusInfoList.clear();
}
private static class FileStatusInfo {
private CopyListingFileStatus fileStatus;
private Path sourceRootPath;
FileStatusInfo(CopyListingFileStatus fileStatus, Path sourceRootPath) {
this.fileStatus = fileStatus;
this.sourceRootPath = sourceRootPath;
}
}
private Path computeSourceRootPath(FileStatus sourceStatus,
DistCpOptions options) throws IOException {
@ -516,15 +601,18 @@ public class SimpleCopyListing extends CopyListing {
ArrayList<FileStatus> sourceDirs,
Path sourcePathRoot,
DistCpOptions options,
HashSet<String> excludeList)
HashSet<String> excludeList,
List<FileStatusInfo> fileStatuses)
throws IOException {
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
assert numListstatusThreads > 0;
LOG.debug("Starting thread pool of " + numListstatusThreads +
" listStatus workers.");
if (LOG.isDebugEnabled()) {
LOG.debug("Starting thread pool of " + numListstatusThreads +
" listStatus workers.");
}
ProducerConsumer<FileStatus, FileStatus[]> workers =
new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
for (int i = 0; i < numListstatusThreads; i++) {
@ -551,8 +639,14 @@ public class SimpleCopyListing extends CopyListing {
preserveAcls && child.isDirectory(),
preserveXAttrs && child.isDirectory(),
preserveRawXattrs && child.isDirectory());
writeToFileListing(fileListWriter, childCopyListingStatus,
sourcePathRoot);
if (randomizeFileListing) {
addToFileListing(fileStatuses,
new FileStatusInfo(childCopyListingStatus, sourcePathRoot),
fileListWriter);
} else {
writeToFileListing(fileListWriter, childCopyListingStatus,
sourcePathRoot);
}
}
if (retry < maxRetries) {
if (child.isDirectory()) {

View File

@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.Credentials;
@ -46,7 +45,9 @@ import java.io.OutputStream;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@RunWith(value = Parameterized.class)
public class TestCopyListing extends SimpleCopyListing {
@ -77,7 +78,7 @@ public class TestCopyListing extends SimpleCopyListing {
}
public TestCopyListing(int numListstatusThreads) {
super(config, CREDENTIALS, numListstatusThreads);
super(config, CREDENTIALS, numListstatusThreads, 0, false);
}
protected TestCopyListing(Configuration configuration) {
@ -221,6 +222,84 @@ public class TestCopyListing extends SimpleCopyListing {
}
}
@Test(timeout=60000)
public void testWithRandomFileListing() throws IOException {
FileSystem fs = null;
try {
fs = FileSystem.get(getConf());
List<Path> srcPaths = new ArrayList<>();
List<Path> srcFiles = new ArrayList<>();
Path target = new Path("/tmp/out/1");
final int pathCount = 25;
for (int i = 0; i < pathCount; i++) {
Path p = new Path("/tmp", String.valueOf(i));
srcPaths.add(p);
fs.mkdirs(p);
Path fileName = new Path(p, i + ".txt");
srcFiles.add(fileName);
try (OutputStream out = fs.create(fileName)) {
out.write(i);
}
}
Path listingFile = new Path("/tmp/file");
DistCpOptions options = new DistCpOptions(srcPaths, target);
options.setSyncFolder(true);
// Check without randomizing files
getConf().setBoolean(
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
listing.buildListing(listingFile, options);
Assert.assertEquals(listing.getNumberOfPaths(), pathCount);
validateFinalListing(listingFile, srcFiles);
fs.delete(listingFile, true);
// Check with randomized file listing
getConf().setBoolean(
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, true);
listing = new SimpleCopyListing(getConf(), CREDENTIALS);
// Set the seed for randomness, so that it can be verified later
long seed = System.nanoTime();
listing.setSeedForRandomListing(seed);
listing.buildListing(listingFile, options);
Assert.assertEquals(listing.getNumberOfPaths(), pathCount);
// validate randomness
Collections.shuffle(srcFiles, new Random(seed));
validateFinalListing(listingFile, srcFiles);
} finally {
TestDistCpUtils.delete(fs, "/tmp");
}
}
private void validateFinalListing(Path pathToListFile, List<Path> srcFiles)
throws IOException {
FileSystem fs = pathToListFile.getFileSystem(config);
try (SequenceFile.Reader reader = new SequenceFile.Reader(
config, SequenceFile.Reader.file(pathToListFile))) {
CopyListingFileStatus currentVal = new CopyListingFileStatus();
Text currentKey = new Text();
int idx = 0;
while (reader.next(currentKey)) {
reader.getCurrentValue(currentVal);
Assert.assertEquals("srcFiles.size=" + srcFiles.size()
+ ", idx=" + idx, fs.makeQualified(srcFiles.get(idx)),
currentVal.getPath());
if (LOG.isDebugEnabled()) {
LOG.debug("val=" + fs.makeQualified(srcFiles.get(idx)));
}
idx++;
}
}
}
@Test(timeout=10000)
public void testBuildListingForSingleFile() {
FileSystem fs = null;