Revert "HADOOP-14407. DistCp - Introduce a configurable copy buffer size. (Omkar Aradhya K S via Yongjun Zhang)"
This reverts commit fe185e2c3a
.
This commit is contained in:
parent
bd0138ea0a
commit
39474ed50d
|
@ -113,10 +113,6 @@ public class DistCpConstants {
|
||||||
/* DistCp CopyListing class override param */
|
/* DistCp CopyListing class override param */
|
||||||
public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class";
|
public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class";
|
||||||
|
|
||||||
/* DistCp Copy Buffer Size */
|
|
||||||
public static final String CONF_LABEL_COPY_BUFFER_SIZE =
|
|
||||||
"distcp.copy.buffer.size";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Conf label for SSL Trust-store location.
|
* Conf label for SSL Trust-store location.
|
||||||
*/
|
*/
|
||||||
|
@ -161,6 +157,4 @@ public class DistCpConstants {
|
||||||
public static final String HDFS_RESERVED_RAW_DIRECTORY_NAME = "/.reserved/raw";
|
public static final String HDFS_RESERVED_RAW_DIRECTORY_NAME = "/.reserved/raw";
|
||||||
|
|
||||||
static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp";
|
static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp";
|
||||||
|
|
||||||
public static final int COPY_BUFFER_SIZE_DEFAULT = 8 * 1024;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -188,14 +188,6 @@ public enum DistCpOptionSwitch {
|
||||||
+ "system implements getBlockLocations method and the target file "
|
+ "system implements getBlockLocations method and the target file "
|
||||||
+ "system implements concat method")),
|
+ "system implements concat method")),
|
||||||
|
|
||||||
/**
|
|
||||||
* Configurable copy buffer size.
|
|
||||||
*/
|
|
||||||
COPY_BUFFER_SIZE(DistCpConstants.CONF_LABEL_COPY_BUFFER_SIZE,
|
|
||||||
new Option("copybuffersize", true, "Size of the copy buffer to use. "
|
|
||||||
+ "By default <copybuffersize> is "
|
|
||||||
+ DistCpConstants.COPY_BUFFER_SIZE_DEFAULT + "B.")),
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Specify bandwidth per map in MB
|
* Specify bandwidth per map in MB
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -104,11 +104,6 @@ public class DistCpOptions {
|
||||||
// to copy in parallel. Default is 0 and file are not splitted.
|
// to copy in parallel. Default is 0 and file are not splitted.
|
||||||
private int blocksPerChunk = 0;
|
private int blocksPerChunk = 0;
|
||||||
|
|
||||||
/**
|
|
||||||
* The copyBufferSize to use in RetriableFileCopyCommand
|
|
||||||
*/
|
|
||||||
private int copyBufferSize = DistCpConstants.COPY_BUFFER_SIZE_DEFAULT;
|
|
||||||
|
|
||||||
public static enum FileAttribute{
|
public static enum FileAttribute{
|
||||||
REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR, TIMES;
|
REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR, TIMES;
|
||||||
|
|
||||||
|
@ -179,7 +174,6 @@ public class DistCpOptions {
|
||||||
this.targetPathExists = that.getTargetPathExists();
|
this.targetPathExists = that.getTargetPathExists();
|
||||||
this.filtersFile = that.getFiltersFile();
|
this.filtersFile = that.getFiltersFile();
|
||||||
this.blocksPerChunk = that.blocksPerChunk;
|
this.blocksPerChunk = that.blocksPerChunk;
|
||||||
this.copyBufferSize = that.copyBufferSize;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -470,7 +464,7 @@ public class DistCpOptions {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks if the input attribute should be preserved or not.
|
* Checks if the input attribute should be preserved or not
|
||||||
*
|
*
|
||||||
* @param attribute - Attribute to check
|
* @param attribute - Attribute to check
|
||||||
* @return True if attribute should be preserved, false otherwise
|
* @return True if attribute should be preserved, false otherwise
|
||||||
|
@ -646,16 +640,6 @@ public class DistCpOptions {
|
||||||
return blocksPerChunk > 0;
|
return blocksPerChunk > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void setCopyBufferSize(int newCopyBufferSize) {
|
|
||||||
this.copyBufferSize =
|
|
||||||
newCopyBufferSize > 0 ? newCopyBufferSize
|
|
||||||
: DistCpConstants.COPY_BUFFER_SIZE_DEFAULT;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getCopyBufferSize() {
|
|
||||||
return this.copyBufferSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void validate(DistCpOptionSwitch option, boolean value) {
|
public void validate(DistCpOptionSwitch option, boolean value) {
|
||||||
|
|
||||||
boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
|
boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
|
||||||
|
@ -752,8 +736,6 @@ public class DistCpOptions {
|
||||||
}
|
}
|
||||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK,
|
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK,
|
||||||
String.valueOf(blocksPerChunk));
|
String.valueOf(blocksPerChunk));
|
||||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.COPY_BUFFER_SIZE,
|
|
||||||
String.valueOf(copyBufferSize));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -791,7 +773,6 @@ public class DistCpOptions {
|
||||||
", targetPathExists=" + targetPathExists +
|
", targetPathExists=" + targetPathExists +
|
||||||
", filtersFile='" + filtersFile + '\'' +
|
", filtersFile='" + filtersFile + '\'' +
|
||||||
", blocksPerChunk=" + blocksPerChunk +
|
", blocksPerChunk=" + blocksPerChunk +
|
||||||
", copyBufferSize=" + copyBufferSize +
|
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -186,8 +186,6 @@ public class OptionsParser {
|
||||||
|
|
||||||
parseBlocksPerChunk(command, option);
|
parseBlocksPerChunk(command, option);
|
||||||
|
|
||||||
parseCopyBufferSize(command, option);
|
|
||||||
|
|
||||||
return option;
|
return option;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -223,29 +221,8 @@ public class OptionsParser {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A helper method to parse copyBufferSize.
|
* parseSizeLimit is a helper method for parsing the deprecated
|
||||||
*
|
* argument SIZE_LIMIT.
|
||||||
* @param command command line arguments
|
|
||||||
*/
|
|
||||||
private static void parseCopyBufferSize(CommandLine command,
|
|
||||||
DistCpOptions option) {
|
|
||||||
if (command.hasOption(DistCpOptionSwitch.COPY_BUFFER_SIZE.getSwitch())) {
|
|
||||||
String copyBufferSizeStr =
|
|
||||||
getVal(command, DistCpOptionSwitch.COPY_BUFFER_SIZE.getSwitch()
|
|
||||||
.trim());
|
|
||||||
try {
|
|
||||||
int copyBufferSize = Integer.parseInt(copyBufferSizeStr);
|
|
||||||
option.setCopyBufferSize(copyBufferSize);
|
|
||||||
} catch (NumberFormatException e) {
|
|
||||||
throw new IllegalArgumentException("copyBufferSize is invalid: "
|
|
||||||
+ copyBufferSizeStr, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* parseSizeLimit is a helper method for parsing the deprecated argument
|
|
||||||
* SIZE_LIMIT.
|
|
||||||
*
|
*
|
||||||
* @param command command line arguments
|
* @param command command line arguments
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.mapreduce.Mapper;
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||||
import org.apache.hadoop.tools.DistCpConstants;
|
import org.apache.hadoop.tools.DistCpConstants;
|
||||||
import org.apache.hadoop.tools.DistCpOptionSwitch;
|
|
||||||
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
||||||
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
|
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
|
||||||
import org.apache.hadoop.tools.util.DistCpUtils;
|
import org.apache.hadoop.tools.util.DistCpUtils;
|
||||||
|
@ -54,6 +53,7 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
public class RetriableFileCopyCommand extends RetriableCommand {
|
public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
|
|
||||||
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
|
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
|
||||||
|
private static int BUFFER_SIZE = 8 * 1024;
|
||||||
private boolean skipCrc = false;
|
private boolean skipCrc = false;
|
||||||
private FileAction action;
|
private FileAction action;
|
||||||
|
|
||||||
|
@ -169,9 +169,6 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
FsPermission permission = FsPermission.getFileDefault().applyUMask(
|
FsPermission permission = FsPermission.getFileDefault().applyUMask(
|
||||||
FsPermission.getUMask(targetFS.getConf()));
|
FsPermission.getUMask(targetFS.getConf()));
|
||||||
int copyBufferSize = context.getConfiguration().getInt(
|
|
||||||
DistCpOptionSwitch.COPY_BUFFER_SIZE.getConfigLabel(),
|
|
||||||
DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
|
|
||||||
final OutputStream outStream;
|
final OutputStream outStream;
|
||||||
if (action == FileAction.OVERWRITE) {
|
if (action == FileAction.OVERWRITE) {
|
||||||
final short repl = getReplicationFactor(fileAttributes, source,
|
final short repl = getReplicationFactor(fileAttributes, source,
|
||||||
|
@ -180,14 +177,14 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
targetFS, targetPath);
|
targetFS, targetPath);
|
||||||
FSDataOutputStream out = targetFS.create(targetPath, permission,
|
FSDataOutputStream out = targetFS.create(targetPath, permission,
|
||||||
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
|
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
|
||||||
copyBufferSize, repl, blockSize, context,
|
BUFFER_SIZE, repl, blockSize, context,
|
||||||
getChecksumOpt(fileAttributes, sourceChecksum));
|
getChecksumOpt(fileAttributes, sourceChecksum));
|
||||||
outStream = new BufferedOutputStream(out);
|
outStream = new BufferedOutputStream(out);
|
||||||
} else {
|
} else {
|
||||||
outStream = new BufferedOutputStream(targetFS.append(targetPath,
|
outStream = new BufferedOutputStream(targetFS.append(targetPath,
|
||||||
copyBufferSize));
|
BUFFER_SIZE));
|
||||||
}
|
}
|
||||||
return copyBytes(source, sourceOffset, outStream, copyBufferSize,
|
return copyBytes(source, sourceOffset, outStream, BUFFER_SIZE,
|
||||||
context);
|
context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -240,7 +240,6 @@ Flag | Description | Notes
|
||||||
`-numListstatusThreads` | Number of threads to use for building file listing | At most 40 threads.
|
`-numListstatusThreads` | Number of threads to use for building file listing | At most 40 threads.
|
||||||
`-skipcrccheck` | Whether to skip CRC checks between source and target paths. |
|
`-skipcrccheck` | Whether to skip CRC checks between source and target paths. |
|
||||||
`-blocksperchunk <blocksperchunk>` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `<blocksperchunk>` blocks to be transferred in parallel, and reassembled on the destination. By default, `<blocksperchunk>` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. |
|
`-blocksperchunk <blocksperchunk>` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `<blocksperchunk>` blocks to be transferred in parallel, and reassembled on the destination. By default, `<blocksperchunk>` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. |
|
||||||
`-copybuffersize <copybuffersize>` | Size of the copy buffer to use. By default, `<copybuffersize>` is set to 8192B |
|
|
||||||
|
|
||||||
Architecture of DistCp
|
Architecture of DistCp
|
||||||
----------------------
|
----------------------
|
||||||
|
|
|
@ -407,8 +407,7 @@ public class TestOptionsParser {
|
||||||
+ "copyStrategy='uniformsize', preserveStatus=[], "
|
+ "copyStrategy='uniformsize', preserveStatus=[], "
|
||||||
+ "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, "
|
+ "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, "
|
||||||
+ "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, "
|
+ "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, "
|
||||||
+ "targetPathExists=true, filtersFile='null', blocksPerChunk=0, "
|
+ "targetPathExists=true, filtersFile='null', blocksPerChunk=0}";
|
||||||
+ "copyBufferSize=8192}";
|
|
||||||
String optionString = option.toString();
|
String optionString = option.toString();
|
||||||
Assert.assertEquals(val, optionString);
|
Assert.assertEquals(val, optionString);
|
||||||
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
|
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
|
||||||
|
@ -774,43 +773,4 @@ public class TestOptionsParser {
|
||||||
"hdfs://localhost:8020/target/"});
|
"hdfs://localhost:8020/target/"});
|
||||||
Assert.assertEquals(options.getFiltersFile(), "/tmp/filters.txt");
|
Assert.assertEquals(options.getFiltersFile(), "/tmp/filters.txt");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testParseCopyBufferSize() {
|
|
||||||
DistCpOptions options =
|
|
||||||
OptionsParser.parse(new String[] {
|
|
||||||
"hdfs://localhost:8020/source/first",
|
|
||||||
"hdfs://localhost:8020/target/" });
|
|
||||||
Assert.assertEquals(options.getCopyBufferSize(),
|
|
||||||
DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
|
|
||||||
|
|
||||||
options =
|
|
||||||
OptionsParser.parse(new String[] { "-copybuffersize", "0",
|
|
||||||
"hdfs://localhost:8020/source/first",
|
|
||||||
"hdfs://localhost:8020/target/" });
|
|
||||||
Assert.assertEquals(options.getCopyBufferSize(),
|
|
||||||
DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
|
|
||||||
|
|
||||||
options =
|
|
||||||
OptionsParser.parse(new String[] { "-copybuffersize", "-1",
|
|
||||||
"hdfs://localhost:8020/source/first",
|
|
||||||
"hdfs://localhost:8020/target/" });
|
|
||||||
Assert.assertEquals(options.getCopyBufferSize(),
|
|
||||||
DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
|
|
||||||
|
|
||||||
options =
|
|
||||||
OptionsParser.parse(new String[] { "-copybuffersize", "4194304",
|
|
||||||
"hdfs://localhost:8020/source/first",
|
|
||||||
"hdfs://localhost:8020/target/" });
|
|
||||||
Assert.assertEquals(options.getCopyBufferSize(), 4194304);
|
|
||||||
|
|
||||||
try {
|
|
||||||
OptionsParser
|
|
||||||
.parse(new String[] { "-copybuffersize", "hello",
|
|
||||||
"hdfs://localhost:8020/source/first",
|
|
||||||
"hdfs://localhost:8020/target/" });
|
|
||||||
Assert.fail("Non numberic copybuffersize parsed successfully!");
|
|
||||||
} catch (IllegalArgumentException ignore) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue