HADOOP-14254. Add a Distcp option to preserve Erasure Coding attributes. Contributed by Ayush Saxena.

This commit is contained in:
Ayush Saxena 2020-05-14 00:19:14 +05:30
parent 3cacf1ce56
commit c757cb61eb
9 changed files with 144 additions and 25 deletions

View File

@ -75,6 +75,7 @@ public final class CopyListingFileStatus implements Writable {
private FsPermission permission;
private String owner;
private String group;
private String ecPolicy;
// Retain static arrays of enum values to prevent repeated allocation of new
// arrays during deserialization.

View File

@ -46,14 +46,18 @@ public enum DistCpOptionSwitch {
* only the corresponding file attribute is preserved.
*/
PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
new Option("p", true, "preserve status (rbugpcaxt)(replication, " +
new Option("p", true, "preserve status (rbugpcaxte)(replication, " +
"block-size, user, group, permission, checksum-type, ACL, XATTR, " +
"timestamps). If -p is specified with no <arg>, then preserves " +
"timestamps, erasure coding policy). If -p is specified with no "
+ "<arg>, then "
+ "preserves " +
"replication, block size, user, group, permission, checksum type " +
"and timestamps. " +
"raw.* xattrs are preserved when both the source and destination " +
"paths are in the /.reserved/raw hierarchy (HDFS only). raw.* xattr" +
"preservation is independent of the -p flag. " +
"Erasure coding policy is only preserved when both source and "
+ "destination are of HDFS"+
"Refer to the DistCp documentation for more details.")),
/**

View File

@ -172,7 +172,8 @@ public enum FileAttribute {
CHECKSUMTYPE, // C
ACL, // A
XATTR, // X
TIMES; // T
TIMES, // T
ERASURECODINGPOLICY; // E
public static FileAttribute getAttribute(char symbol) {
for (FileAttribute attribute : values()) {

View File

@ -158,12 +158,14 @@ public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
try {
CopyListingFileStatus sourceCurrStatus;
FileSystem sourceFS;
FileStatus sourceStatus;
try {
sourceFS = sourcePath.getFileSystem(conf);
sourceStatus = sourceFS.getFileStatus(sourcePath);
final boolean preserveXAttrs =
fileAttributes.contains(FileAttribute.XATTR);
sourceCurrStatus = DistCpUtils.toCopyListingFileStatusHelper(sourceFS,
sourceFS.getFileStatus(sourcePath),
sourceStatus,
fileAttributes.contains(FileAttribute.ACL),
preserveXAttrs, preserveRawXattrs,
sourceFileStatus.getChunkOffset(),
@ -188,7 +190,7 @@ public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
}
if (sourceCurrStatus.isDirectory()) {
createTargetDirsWithRetry(description, target, context);
createTargetDirsWithRetry(description, target, context, sourceStatus);
return;
}
@ -217,7 +219,7 @@ public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
LOG.debug("copying " + sourceCurrStatus + " " + tmpTarget);
}
copyFileWithRetry(description, sourceCurrStatus, tmpTarget,
targetStatus, context, action, fileAttributes);
targetStatus, context, action, fileAttributes, sourceStatus);
}
DistCpUtils.preserve(target.getFileSystem(conf), tmpTarget,
sourceCurrStatus, fileAttributes, preserveRawXattrs);
@ -240,23 +242,24 @@ private String getFileType(FileStatus fileStatus) {
return fileStatus.isDirectory() ? "dir" : "file";
}
private static EnumSet<DistCpOptions.FileAttribute>
static EnumSet<DistCpOptions.FileAttribute>
getFileAttributeSettings(Mapper.Context context) {
String attributeString = context.getConfiguration().get(
DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel());
return DistCpUtils.unpackAttributes(attributeString);
}
@SuppressWarnings("checkstyle:parameternumber")
private void copyFileWithRetry(String description,
CopyListingFileStatus sourceFileStatus, Path target,
FileStatus targrtFileStatus, Context context, FileAction action,
EnumSet<DistCpOptions.FileAttribute> fileAttributes)
EnumSet<FileAttribute> fileAttributes, FileStatus sourceStatus)
throws IOException, InterruptedException {
long bytesCopied;
try {
bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
action, directWrite).execute(sourceFileStatus, target, context,
fileAttributes);
fileAttributes, sourceStatus);
} catch (Exception e) {
context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
@ -276,10 +279,11 @@ private void copyFileWithRetry(String description,
}
}
private void createTargetDirsWithRetry(String description,
Path target, Context context) throws IOException {
private void createTargetDirsWithRetry(String description, Path target,
Context context, FileStatus sourceStatus) throws IOException {
try {
new RetriableDirectoryCreateCommand(description).execute(target, context);
new RetriableDirectoryCreateCommand(description).execute(target,
context, sourceStatus);
} catch (Exception e) {
throw new IOException("mkdir failed for " + target, e);
}

View File

@ -18,11 +18,18 @@
package org.apache.hadoop.tools.mapred;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.util.RetriableCommand;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.Mapper;
import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings;
/**
* This class extends Retriable command to implement the creation of directories
* with retries on failure.
@ -46,11 +53,25 @@ public RetriableDirectoryCreateCommand(String description) {
*/
@Override
protected Object doExecute(Object... arguments) throws Exception {
assert arguments.length == 2 : "Unexpected argument list.";
assert arguments.length == 3 : "Unexpected argument list.";
Path target = (Path)arguments[0];
Mapper.Context context = (Mapper.Context)arguments[1];
FileStatus sourceStatus = (FileStatus)arguments[2];
FileSystem targetFS = target.getFileSystem(context.getConfiguration());
return targetFS.mkdirs(target);
if(!targetFS.mkdirs(target)) {
return false;
}
boolean preserveEC = getFileAttributeSettings(context)
.contains(DistCpOptions.FileAttribute.ERASURECODINGPOLICY);
if (preserveEC && sourceStatus.isErasureCoded()
&& targetFS instanceof DistributedFileSystem) {
ErasureCodingPolicy ecPolicy =
((HdfsFileStatus) sourceStatus).getErasureCodingPolicy();
DistributedFileSystem dfs = (DistributedFileSystem) targetFS;
dfs.setErasureCodingPolicy(target, ecPolicy.getName());
}
return true;
}
}

View File

@ -23,6 +23,11 @@
import java.io.OutputStream;
import java.util.EnumSet;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.tools.DistCpOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@ -47,6 +52,8 @@
import com.google.common.annotations.VisibleForTesting;
import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings;
/**
* This class extends RetriableCommand to implement the copy of files,
* with retries on failure.
@ -105,18 +112,20 @@ public RetriableFileCopyCommand(boolean skipCrc, String description,
@SuppressWarnings("unchecked")
@Override
protected Object doExecute(Object... arguments) throws Exception {
assert arguments.length == 4 : "Unexpected argument list.";
assert arguments.length == 5 : "Unexpected argument list.";
CopyListingFileStatus source = (CopyListingFileStatus)arguments[0];
assert !source.isDirectory() : "Unexpected file-status. Expected file.";
Path target = (Path)arguments[1];
Mapper.Context context = (Mapper.Context)arguments[2];
EnumSet<FileAttribute> fileAttributes
= (EnumSet<FileAttribute>)arguments[3];
return doCopy(source, target, context, fileAttributes);
FileStatus sourceStatus = (FileStatus)arguments[4];
return doCopy(source, target, context, fileAttributes, sourceStatus);
}
private long doCopy(CopyListingFileStatus source, Path target,
Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
Mapper.Context context, EnumSet<FileAttribute> fileAttributes,
FileStatus sourceStatus)
throws IOException {
LOG.info("Copying {} to {}", source.getPath(), target);
@ -140,7 +149,7 @@ private long doCopy(CopyListingFileStatus source, Path target,
long offset = (action == FileAction.APPEND) ?
targetFS.getFileStatus(target).getLen() : source.getChunkOffset();
long bytesRead = copyToFile(targetPath, targetFS, source,
offset, context, fileAttributes, sourceChecksum);
offset, context, fileAttributes, sourceChecksum, sourceStatus);
if (!source.isSplit()) {
DistCpUtils.compareFileLengthsAndChecksums(source.getLen(), sourceFS,
@ -179,15 +188,26 @@ private ChecksumOpt getChecksumOpt(EnumSet<FileAttribute> fileAttributes,
return null;
}
@SuppressWarnings("checkstyle:parameternumber")
private long copyToFile(Path targetPath, FileSystem targetFS,
CopyListingFileStatus source, long sourceOffset, Mapper.Context context,
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum,
FileStatus sourceStatus)
throws IOException {
FsPermission permission = FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(targetFS.getConf()));
int copyBufferSize = context.getConfiguration().getInt(
DistCpOptionSwitch.COPY_BUFFER_SIZE.getConfigLabel(),
DistCpConstants.COPY_BUFFER_SIZE_DEFAULT);
boolean preserveEC = getFileAttributeSettings(context)
.contains(DistCpOptions.FileAttribute.ERASURECODINGPOLICY);
ErasureCodingPolicy ecPolicy = null;
if (preserveEC && sourceStatus.isErasureCoded()
&& sourceStatus instanceof HdfsFileStatus
&& targetFS instanceof DistributedFileSystem) {
ecPolicy = ((HdfsFileStatus) sourceStatus).getErasureCodingPolicy();
}
final OutputStream outStream;
if (action == FileAction.OVERWRITE) {
// If there is an erasure coding policy set on the target directory,
@ -197,10 +217,24 @@ private long copyToFile(Path targetPath, FileSystem targetFS,
targetFS, targetPath);
final long blockSize = getBlockSize(fileAttributes, source,
targetFS, targetPath);
FSDataOutputStream out = targetFS.create(targetPath, permission,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
copyBufferSize, repl, blockSize, context,
getChecksumOpt(fileAttributes, sourceChecksum));
FSDataOutputStream out;
ChecksumOpt checksumOpt = getChecksumOpt(fileAttributes, sourceChecksum);
if (!preserveEC || ecPolicy == null) {
out = targetFS.create(targetPath, permission,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), copyBufferSize,
repl, blockSize, context, checksumOpt);
} else {
DistributedFileSystem dfs = (DistributedFileSystem) targetFS;
DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
dfs.createFile(targetPath).permission(permission).create()
.overwrite(true).bufferSize(copyBufferSize).replication(repl)
.blockSize(blockSize).progress(context).recursive()
.ecPolicyName(ecPolicy.getName());
if (checksumOpt != null) {
builder.checksumOpt(checksumOpt);
}
out = builder.build();
}
outStream = new BufferedOutputStream(out);
} else {
outStream = new BufferedOutputStream(targetFS.append(targetPath,

View File

@ -337,7 +337,7 @@ Command Line Options
| Flag | Description | Notes |
| ----------------- | ------------------------------------ | -------- |
| `-p[rbugpcaxt]` | Preserve r: replication number b: block size u: user g: group p: permission c: checksum-type a: ACL x: XAttr t: timestamp | When `-update` is specified, status updates will **not** be synchronized unless the file sizes also differ (i.e. unless the file is re-created). If -pa is specified, DistCp preserves the permissions also because ACLs are a super-set of permissions. The option -pr is only valid if both source and target directory are not erasure coded. **Note:** If -p option's are not specified, then by default block size is preserved. |
| `-p[rbugpcaxte]` | Preserve r: replication number b: block size u: user g: group p: permission c: checksum-type a: ACL x: XAttr t: timestamp e: erasure coding policy | When `-update` is specified, status updates will **not** be synchronized unless the file sizes also differ (i.e. unless the file is re-created). If -pa is specified, DistCp preserves the permissions also because ACLs are a super-set of permissions. The option -pr is only valid if both source and target directory are not erasure coded. **Note:** If -p option's are not specified, then by default block size is preserved. |
| `-i` | Ignore failures | As explained in the Appendix, this option will keep more accurate statistics about the copy than the default case. It also preserves logs from failed copies, which can be valuable for debugging. Finally, a failing map will not cause the job to fail before all splits are attempted. |
| `-log <logdir>` | Write logs to \<logdir\> | DistCp keeps logs of each file it attempts to copy as map output. If a map fails, the log output will not be retained if it is re-executed. |
| `-v` | Log additional info (path, size) in the SKIP/COPY log | This option can only be used with -log option. |

View File

@ -21,19 +21,27 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.tools.ECAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.tools.util.DistCpTestUtils;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Maps;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Tests distcp in combination with HDFS raw.* XAttrs.
*/
@ -164,4 +172,49 @@ private void doTestPreserveRawXAttrs(String src, String dest,
}
}
}
@Test
public void testPreserveEC() throws Exception {
final String src = "/src";
final String dest = "/dest";
final Path destDir1 = new Path("/dest/dir1");
final Path destSubDir1 = new Path(destDir1, "subdir1");
String[] args = {"-setPolicy", "-path", dir1.toString(),
"-policy", "XOR-2-1-1024k"};
fs.delete(new Path("/dest"), true);
fs.mkdirs(subDir1);
fs.create(file1).close();
DistributedFileSystem dfs = (DistributedFileSystem) fs;
dfs.enableErasureCodingPolicy("XOR-2-1-1024k");
int res = ToolRunner.run(conf, new ECAdmin(conf), args);
assertEquals("Unable to set EC policy on " + subDir1.toString(), res, 0);
// preserve all attributes
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src, dest,
"-pe", conf);
FileStatus srcStatus = fs.getFileStatus(new Path(src));
FileStatus srcDir1Status = fs.getFileStatus(dir1);
FileStatus srcSubDir1Status = fs.getFileStatus(subDir1);
FileStatus destStatus = fs.getFileStatus(new Path(dest));
FileStatus destDir1Status = fs.getFileStatus(destDir1);
FileStatus destSubDir1Status = fs.getFileStatus(destSubDir1);
assertFalse("/src is erasure coded!",
srcStatus.isErasureCoded());
assertFalse("/dest is erasure coded!",
destStatus.isErasureCoded());
assertTrue("/src/dir1 is not erasure coded!",
srcDir1Status.isErasureCoded());
assertTrue("/dest/dir1 is not erasure coded!",
destDir1Status.isErasureCoded());
assertTrue("/src/dir1/subdir1 is not erasure coded!",
srcSubDir1Status.isErasureCoded());
assertTrue("/dest/dir1/subdir1 is not erasure coded!",
destSubDir1Status.isErasureCoded());
}
}

View File

@ -139,12 +139,13 @@ public void testPackAttributes() {
@Test
public void testUnpackAttributes() {
EnumSet<FileAttribute> attributes = EnumSet.allOf(FileAttribute.class);
Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RCBUGPAXT"));
Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("RCBUGPAXTE"));
attributes.remove(FileAttribute.REPLICATION);
attributes.remove(FileAttribute.CHECKSUMTYPE);
attributes.remove(FileAttribute.ACL);
attributes.remove(FileAttribute.XATTR);
attributes.remove(FileAttribute.ERASURECODINGPOLICY);
Assert.assertEquals(attributes, DistCpUtils.unpackAttributes("BUGPT"));
attributes.remove(FileAttribute.TIMES);