HADOOP-14407. DistCp - Introduce a configurable copy buffer size. (Omkar Aradhya K S via Yongjun Zhang)

This commit is contained in:
Yongjun Zhang 2017-05-18 15:35:22 -07:00
parent 40e6a85d25
commit b4adc8392c
8 changed files with 78 additions and 6 deletions

View File

@ -109,6 +109,10 @@ public class DistCpConstants {
/* DistCp CopyListing class override param */ /* DistCp CopyListing class override param */
public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class"; public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class";
/* DistCp Copy Buffer Size */
public static final String CONF_LABEL_COPY_BUFFER_SIZE =
"distcp.copy.buffer.size";
/** /**
* Constants for DistCp return code to shell / consumer of ToolRunner's run * Constants for DistCp return code to shell / consumer of ToolRunner's run
*/ */
@ -141,4 +145,6 @@ public class DistCpConstants {
public static final String HDFS_RESERVED_RAW_DIRECTORY_NAME = "/.reserved/raw"; public static final String HDFS_RESERVED_RAW_DIRECTORY_NAME = "/.reserved/raw";
static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp"; static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp";
public static final int COPY_BUFFER_SIZE_DEFAULT = 8 * 1024;
} }

View File

@ -175,6 +175,10 @@ public final boolean splitLargeFile() {
return options.getBlocksPerChunk() > 0; return options.getBlocksPerChunk() > 0;
} }
public int getCopyBufferSize() {
return options.getCopyBufferSize();
}
public void setTargetPathExists(boolean targetPathExists) { public void setTargetPathExists(boolean targetPathExists) {
this.targetPathExists = targetPathExists; this.targetPathExists = targetPathExists;
} }

View File

@ -179,6 +179,14 @@ public enum DistCpOptionSwitch {
+ "system implements getBlockLocations method and the target file " + "system implements getBlockLocations method and the target file "
+ "system implements concat method")), + "system implements concat method")),
/**
* Configurable copy buffer size.
*/
COPY_BUFFER_SIZE(DistCpConstants.CONF_LABEL_COPY_BUFFER_SIZE,
new Option("copybuffersize", true, "Size of the copy buffer to use. "
+ "By default <copybuffersize> is "
+ DistCpConstants.COPY_BUFFER_SIZE_DEFAULT + "B.")),
/** /**
* Specify bandwidth per map in MB, accepts bandwidth as a fraction * Specify bandwidth per map in MB, accepts bandwidth as a fraction
*/ */

View File

@ -143,6 +143,8 @@ public final class DistCpOptions {
// to copy in parallel. Default is 0 and file are not splitted. // to copy in parallel. Default is 0 and file are not splitted.
private final int blocksPerChunk; private final int blocksPerChunk;
private final int copyBufferSize;
/** /**
* File attributes for preserve. * File attributes for preserve.
* *
@ -200,6 +202,8 @@ private DistCpOptions(Builder builder) {
this.preserveStatus = builder.preserveStatus; this.preserveStatus = builder.preserveStatus;
this.blocksPerChunk = builder.blocksPerChunk; this.blocksPerChunk = builder.blocksPerChunk;
this.copyBufferSize = builder.copyBufferSize;
} }
public Path getSourceFileListing() { public Path getSourceFileListing() {
@ -302,7 +306,7 @@ public Set<FileAttribute> getPreserveAttributes() {
} }
/** /**
* Checks if the input attribute should be preserved or not * Checks if the input attribute should be preserved or not.
* *
* @param attribute - Attribute to check * @param attribute - Attribute to check
* @return True if attribute should be preserved, false otherwise * @return True if attribute should be preserved, false otherwise
@ -315,6 +319,10 @@ public int getBlocksPerChunk() {
return blocksPerChunk; return blocksPerChunk;
} }
public int getCopyBufferSize() {
return copyBufferSize;
}
/** /**
* Add options to configuration. These will be used in the Mapper/committer * Add options to configuration. These will be used in the Mapper/committer
* *
@ -351,6 +359,8 @@ public void appendToConf(Configuration conf) {
} }
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK, DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK,
String.valueOf(blocksPerChunk)); String.valueOf(blocksPerChunk));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.COPY_BUFFER_SIZE,
String.valueOf(copyBufferSize));
} }
/** /**
@ -385,6 +395,7 @@ public String toString() {
", targetPath=" + targetPath + ", targetPath=" + targetPath +
", filtersFile='" + filtersFile + '\'' + ", filtersFile='" + filtersFile + '\'' +
", blocksPerChunk=" + blocksPerChunk + ", blocksPerChunk=" + blocksPerChunk +
", copyBufferSize=" + copyBufferSize +
'}'; '}';
} }
@ -429,6 +440,9 @@ public static class Builder {
private int blocksPerChunk = 0; private int blocksPerChunk = 0;
private int copyBufferSize =
DistCpConstants.COPY_BUFFER_SIZE_DEFAULT;
public Builder(List<Path> sourcePaths, Path targetPath) { public Builder(List<Path> sourcePaths, Path targetPath) {
Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(), Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(),
"Source paths should not be null or empty!"); "Source paths should not be null or empty!");
@ -664,6 +678,13 @@ public Builder withBlocksPerChunk(int newBlocksPerChunk) {
this.blocksPerChunk = newBlocksPerChunk; this.blocksPerChunk = newBlocksPerChunk;
return this; return this;
} }
public Builder withCopyBufferSize(int newCopyBufferSize) {
this.copyBufferSize =
newCopyBufferSize > 0 ? newCopyBufferSize
: DistCpConstants.COPY_BUFFER_SIZE_DEFAULT;
return this;
}
} }
} }

View File

@ -213,6 +213,18 @@ public static DistCpOptions parse(String[] args)
} }
} }
if (command.hasOption(DistCpOptionSwitch.COPY_BUFFER_SIZE.getSwitch())) {
final String copyBufferSizeStr = getVal(command,
DistCpOptionSwitch.COPY_BUFFER_SIZE.getSwitch().trim());
try {
int copyBufferSize = Integer.parseInt(copyBufferSizeStr);
builder.withCopyBufferSize(copyBufferSize);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("copyBufferSize is invalid: "
+ copyBufferSizeStr, e);
}
}
return builder.build(); return builder.build();
} }

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptionSwitch;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction; import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.tools.util.DistCpUtils;
@ -53,7 +54,6 @@
public class RetriableFileCopyCommand extends RetriableCommand { public class RetriableFileCopyCommand extends RetriableCommand {
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class); private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
private static int BUFFER_SIZE = 8 * 1024;
private boolean skipCrc = false; private boolean skipCrc = false;
private FileAction action; private FileAction action;
@ -169,6 +169,9 @@ private long copyToFile(Path targetPath, FileSystem targetFS,
throws IOException { throws IOException {
FsPermission permission = FsPermission.getFileDefault().applyUMask( FsPermission permission = FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(targetFS.getConf())); FsPermission.getUMask(targetFS.getConf()));
int copyBufferSize = context.getConfiguration().getInt(
DistCpOptionSwitch.COPY_BUFFER_SIZE.getConfigLabel(),
DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
final OutputStream outStream; final OutputStream outStream;
if (action == FileAction.OVERWRITE) { if (action == FileAction.OVERWRITE) {
// If there is an erasure coding policy set on the target directory, // If there is an erasure coding policy set on the target directory,
@ -180,14 +183,14 @@ private long copyToFile(Path targetPath, FileSystem targetFS,
targetFS, targetPath); targetFS, targetPath);
FSDataOutputStream out = targetFS.create(targetPath, permission, FSDataOutputStream out = targetFS.create(targetPath, permission,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
BUFFER_SIZE, repl, blockSize, context, copyBufferSize, repl, blockSize, context,
getChecksumOpt(fileAttributes, sourceChecksum)); getChecksumOpt(fileAttributes, sourceChecksum));
outStream = new BufferedOutputStream(out); outStream = new BufferedOutputStream(out);
} else { } else {
outStream = new BufferedOutputStream(targetFS.append(targetPath, outStream = new BufferedOutputStream(targetFS.append(targetPath,
BUFFER_SIZE)); copyBufferSize));
} }
return copyBytes(source, sourceOffset, outStream, BUFFER_SIZE, return copyBytes(source, sourceOffset, outStream, copyBufferSize,
context); context);
} }

View File

@ -238,6 +238,7 @@ Flag | Description | Notes
`-numListstatusThreads` | Number of threads to use for building file listing | At most 40 threads. `-numListstatusThreads` | Number of threads to use for building file listing | At most 40 threads.
`-skipcrccheck` | Whether to skip CRC checks between source and target paths. | `-skipcrccheck` | Whether to skip CRC checks between source and target paths. |
`-blocksperchunk <blocksperchunk>` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `<blocksperchunk>` blocks to be transferred in parallel, and reassembled on the destination. By default, `<blocksperchunk>` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. | `-blocksperchunk <blocksperchunk>` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `<blocksperchunk>` blocks to be transferred in parallel, and reassembled on the destination. By default, `<blocksperchunk>` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. |
`-copybuffersize <copybuffersize>` | Size of the copy buffer to use. By default, `<copybuffersize>` is set to 8192B |
Architecture of DistCp Architecture of DistCp
---------------------- ----------------------

View File

@ -287,7 +287,7 @@ public void testToString() {
"mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " + "mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " +
"atomicWorkPath=null, logPath=null, sourceFileListing=abc, " + "atomicWorkPath=null, logPath=null, sourceFileListing=abc, " +
"sourcePaths=null, targetPath=xyz, filtersFile='null'," + "sourcePaths=null, targetPath=xyz, filtersFile='null'," +
" blocksPerChunk=0}"; " blocksPerChunk=0, copyBufferSize=8192}";
String optionString = option.toString(); String optionString = option.toString();
Assert.assertEquals(val, optionString); Assert.assertEquals(val, optionString);
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
@ -497,4 +497,21 @@ public void testSetOptionsForSplitLargeFile() {
Assert.assertFalse(builder.build().shouldAppend()); Assert.assertFalse(builder.build().shouldAppend());
} }
@Test
public void testSetCopyBufferSize() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertEquals(DistCpConstants.COPY_BUFFER_SIZE_DEFAULT,
builder.build().getCopyBufferSize());
builder.withCopyBufferSize(4194304);
Assert.assertEquals(4194304,
builder.build().getCopyBufferSize());
builder.withCopyBufferSize(-1);
Assert.assertEquals(DistCpConstants.COPY_BUFFER_SIZE_DEFAULT,
builder.build().getCopyBufferSize());
}
} }