MAPREDUCE-5898. distcp to support preserving HDFS extended attributes(XAttrs). Contributed by Yi Liu.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1600900 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6a4f6d6b3e
commit
f81c7b0252
|
@ -142,6 +142,9 @@ Trunk (Unreleased)
|
|||
MAPREDUCE-5867. Fix NPE in KillAMPreemptionPolicy related to
|
||||
ProportionalCapacityPreemptionPolicy (Sunil G via devaraj)
|
||||
|
||||
MAPREDUCE-5898. distcp to support preserving HDFS extended attributes(XAttrs)
|
||||
(Yi Liu via umamahesh)
|
||||
|
||||
Release 2.5.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -129,6 +129,7 @@ public abstract class CopyListing extends Configured {
|
|||
/**
|
||||
* Validate the final resulting path listing. Checks if there are duplicate
|
||||
* entries. If preserving ACLs, checks that file system can support ACLs.
|
||||
* If preserving XAttrs, checks that file system can support XAttrs.
|
||||
*
|
||||
* @param pathToListFile - path listing build by doBuildListing
|
||||
* @param options - Input options to distcp
|
||||
|
@ -151,6 +152,7 @@ public abstract class CopyListing extends Configured {
|
|||
|
||||
Text currentKey = new Text();
|
||||
Set<URI> aclSupportCheckFsSet = Sets.newHashSet();
|
||||
Set<URI> xAttrSupportCheckFsSet = Sets.newHashSet();
|
||||
while (reader.next(currentKey)) {
|
||||
if (currentKey.equals(lastKey)) {
|
||||
CopyListingFileStatus currentFileStatus = new CopyListingFileStatus();
|
||||
|
@ -167,6 +169,14 @@ public abstract class CopyListing extends Configured {
|
|||
aclSupportCheckFsSet.add(lastFsUri);
|
||||
}
|
||||
}
|
||||
if (options.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) {
|
||||
FileSystem lastFs = lastFileStatus.getPath().getFileSystem(config);
|
||||
URI lastFsUri = lastFs.getUri();
|
||||
if (!xAttrSupportCheckFsSet.contains(lastFsUri)) {
|
||||
DistCpUtils.checkFileSystemXAttrSupport(lastFs);
|
||||
xAttrSupportCheckFsSet.add(lastFsUri);
|
||||
}
|
||||
}
|
||||
lastKey.set(currentKey);
|
||||
}
|
||||
} finally {
|
||||
|
@ -256,4 +266,10 @@ public abstract class CopyListing extends Configured {
|
|||
super(message);
|
||||
}
|
||||
}
|
||||
|
||||
public static class XAttrsNotSupportedException extends RuntimeException {
|
||||
public XAttrsNotSupportedException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,10 @@ import java.io.DataInput;
|
|||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -34,6 +37,7 @@ import org.apache.hadoop.io.WritableUtils;
|
|||
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* CopyListingFileStatus is a specialized subclass of {@link FileStatus} for
|
||||
|
@ -45,6 +49,7 @@ import com.google.common.collect.Lists;
|
|||
public final class CopyListingFileStatus extends FileStatus {
|
||||
|
||||
private static final byte NO_ACL_ENTRIES = -1;
|
||||
private static final int NO_XATTRS = -1;
|
||||
|
||||
// Retain static arrays of enum values to prevent repeated allocation of new
|
||||
// arrays during deserialization.
|
||||
|
@ -53,6 +58,7 @@ public final class CopyListingFileStatus extends FileStatus {
|
|||
private static final FsAction[] FS_ACTIONS = FsAction.values();
|
||||
|
||||
private List<AclEntry> aclEntries;
|
||||
private Map<String, byte[]> xAttrs;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
|
@ -88,6 +94,24 @@ public final class CopyListingFileStatus extends FileStatus {
|
|||
public void setAclEntries(List<AclEntry> aclEntries) {
|
||||
this.aclEntries = aclEntries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all xAttrs.
|
||||
*
|
||||
* @return Map<String, byte[]> containing all xAttrs
|
||||
*/
|
||||
public Map<String, byte[]> getXAttrs() {
|
||||
return xAttrs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets optional xAttrs.
|
||||
*
|
||||
* @param xAttrs Map<String, byte[]> containing all xAttrs
|
||||
*/
|
||||
public void setXAttrs(Map<String, byte[]> xAttrs) {
|
||||
this.xAttrs = xAttrs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
|
@ -104,6 +128,26 @@ public final class CopyListingFileStatus extends FileStatus {
|
|||
} else {
|
||||
out.writeByte(NO_ACL_ENTRIES);
|
||||
}
|
||||
|
||||
if (xAttrs != null) {
|
||||
out.writeInt(xAttrs.size());
|
||||
Iterator<Entry<String, byte[]>> iter = xAttrs.entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
Entry<String, byte[]> entry = iter.next();
|
||||
WritableUtils.writeString(out, entry.getKey());
|
||||
final byte[] value = entry.getValue();
|
||||
if (value != null) {
|
||||
out.writeInt(value.length);
|
||||
if (value.length > 0) {
|
||||
out.write(value);
|
||||
}
|
||||
} else {
|
||||
out.writeInt(-1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
out.writeInt(NO_XATTRS);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -123,6 +167,25 @@ public final class CopyListingFileStatus extends FileStatus {
|
|||
} else {
|
||||
aclEntries = null;
|
||||
}
|
||||
|
||||
int xAttrsSize = in.readInt();
|
||||
if (xAttrsSize != NO_XATTRS) {
|
||||
xAttrs = Maps.newHashMap();
|
||||
for (int i = 0; i < xAttrsSize; ++i) {
|
||||
final String name = WritableUtils.readString(in);
|
||||
final int valueLen = in.readInt();
|
||||
byte[] value = null;
|
||||
if (valueLen > -1) {
|
||||
value = new byte[valueLen];
|
||||
if (valueLen > 0) {
|
||||
in.readFully(value);
|
||||
}
|
||||
}
|
||||
xAttrs.put(name, value);
|
||||
}
|
||||
} else {
|
||||
xAttrs = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -134,12 +197,13 @@ public final class CopyListingFileStatus extends FileStatus {
|
|||
return false;
|
||||
}
|
||||
CopyListingFileStatus other = (CopyListingFileStatus)o;
|
||||
return Objects.equal(aclEntries, other.aclEntries);
|
||||
return Objects.equal(aclEntries, other.aclEntries) &&
|
||||
Objects.equal(xAttrs, other.xAttrs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(super.hashCode(), aclEntries);
|
||||
return Objects.hashCode(super.hashCode(), aclEntries, xAttrs);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -147,6 +211,7 @@ public final class CopyListingFileStatus extends FileStatus {
|
|||
StringBuilder sb = new StringBuilder(super.toString());
|
||||
sb.append('{');
|
||||
sb.append("aclEntries = " + aclEntries);
|
||||
sb.append(", xAttrs = " + xAttrs);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
|
|
|
@ -128,6 +128,9 @@ public class DistCp extends Configured implements Tool {
|
|||
} catch (AclsNotSupportedException e) {
|
||||
LOG.error("ACLs not supported on at least one file system: ", e);
|
||||
return DistCpConstants.ACLS_NOT_SUPPORTED;
|
||||
} catch (XAttrsNotSupportedException e) {
|
||||
LOG.error("XAttrs not supported on at least one file system: ", e);
|
||||
return DistCpConstants.XATTRS_NOT_SUPPORTED;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception encountered ", e);
|
||||
return DistCpConstants.UNKNOWN_ERROR;
|
||||
|
@ -304,6 +307,9 @@ public class DistCp extends Configured implements Tool {
|
|||
if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
|
||||
DistCpUtils.checkFileSystemAclSupport(targetFS);
|
||||
}
|
||||
if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) {
|
||||
DistCpUtils.checkFileSystemXAttrSupport(targetFS);
|
||||
}
|
||||
if (inputOptions.shouldAtomicCommit()) {
|
||||
Path workDir = inputOptions.getAtomicWorkPath();
|
||||
if (workDir == null) {
|
||||
|
|
|
@ -117,6 +117,7 @@ public class DistCpConstants {
|
|||
public static final int INVALID_ARGUMENT = -1;
|
||||
public static final int DUPLICATE_INPUT = -2;
|
||||
public static final int ACLS_NOT_SUPPORTED = -3;
|
||||
public static final int XATTRS_NOT_SUPPORTED = -4;
|
||||
public static final int UNKNOWN_ERROR = -999;
|
||||
|
||||
/**
|
||||
|
|
|
@ -45,10 +45,10 @@ public enum DistCpOptionSwitch {
|
|||
*
|
||||
*/
|
||||
PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
|
||||
new Option("p", true, "preserve status (rbugpca)(replication, " +
|
||||
"block-size, user, group, permission, checksum-type, ACL). If " +
|
||||
"-p is specified with no <arg>, then preserves replication, block " +
|
||||
"size, user, group, permission and checksum type.")),
|
||||
new Option("p", true, "preserve status (rbugpcax)(replication, " +
|
||||
"block-size, user, group, permission, checksum-type, ACL, XATTR). " +
|
||||
"If -p is specified with no <arg>, then preserves replication, " +
|
||||
"block size, user, group, permission and checksum type.")),
|
||||
|
||||
/**
|
||||
* Update target location by copying only files that are missing
|
||||
|
|
|
@ -66,7 +66,7 @@ public class DistCpOptions {
|
|||
private boolean targetPathExists = true;
|
||||
|
||||
public static enum FileAttribute{
|
||||
REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL;
|
||||
REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR;
|
||||
|
||||
public static FileAttribute getAttribute(char symbol) {
|
||||
for (FileAttribute attribute : values()) {
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
@ -36,7 +35,6 @@ import org.apache.hadoop.security.Credentials;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.List;
|
||||
import java.util.Stack;
|
||||
|
||||
/**
|
||||
|
@ -123,7 +121,7 @@ public class SimpleCopyListing extends CopyListing {
|
|||
* the the source root is a directory, then the source root entry is not
|
||||
* written to the sequence file, because only the contents of the source
|
||||
* directory need to be copied in this case.
|
||||
* See {@link org.apache.hadoop.tools.util.DistCpUtils.getRelativePath} for
|
||||
* See {@link org.apache.hadoop.tools.util.DistCpUtils#getRelativePath} for
|
||||
* how relative path is computed.
|
||||
* See computeSourceRootPath method for how the root path of the source is
|
||||
* computed.
|
||||
|
@ -147,7 +145,8 @@ public class SimpleCopyListing extends CopyListing {
|
|||
if (!explore || rootStatus.isDirectory()) {
|
||||
CopyListingFileStatus rootCopyListingStatus =
|
||||
DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus,
|
||||
options.shouldPreserve(FileAttribute.ACL));
|
||||
options.shouldPreserve(FileAttribute.ACL),
|
||||
options.shouldPreserve(FileAttribute.XATTR));
|
||||
writeToFileListingRoot(fileListWriter, rootCopyListingStatus,
|
||||
sourcePathRoot, options);
|
||||
}
|
||||
|
@ -159,7 +158,8 @@ public class SimpleCopyListing extends CopyListing {
|
|||
CopyListingFileStatus sourceCopyListingStatus =
|
||||
DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
|
||||
options.shouldPreserve(FileAttribute.ACL) &&
|
||||
sourceStatus.isDirectory());
|
||||
sourceStatus.isDirectory(), options.shouldPreserve(
|
||||
FileAttribute.XATTR) && sourceStatus.isDirectory());
|
||||
writeToFileListing(fileListWriter, sourceCopyListingStatus,
|
||||
sourcePathRoot, options);
|
||||
|
||||
|
@ -271,7 +271,8 @@ public class SimpleCopyListing extends CopyListing {
|
|||
+ sourceStatus.getPath() + " for copy.");
|
||||
CopyListingFileStatus childCopyListingStatus =
|
||||
DistCpUtils.toCopyListingFileStatus(sourceFS, child,
|
||||
options.shouldPreserve(FileAttribute.ACL) && child.isDirectory());
|
||||
options.shouldPreserve(FileAttribute.ACL) && child.isDirectory(),
|
||||
options.shouldPreserve(FileAttribute.XATTR) && child.isDirectory());
|
||||
writeToFileListing(fileListWriter, childCopyListingStatus,
|
||||
sourcePathRoot, options);
|
||||
if (isDirectoryAndNotEmpty(sourceFS, child)) {
|
||||
|
|
|
@ -213,7 +213,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|||
sourceFS = sourcePath.getFileSystem(conf);
|
||||
sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS,
|
||||
sourceFS.getFileStatus(sourcePath),
|
||||
fileAttributes.contains(FileAttribute.ACL));
|
||||
fileAttributes.contains(FileAttribute.ACL),
|
||||
fileAttributes.contains(FileAttribute.XATTR));
|
||||
} catch (FileNotFoundException e) {
|
||||
throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.fs.permission.AclUtil;
|
|||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.tools.CopyListing.XAttrsNotSupportedException;
|
||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
||||
import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
|
||||
|
@ -39,8 +40,11 @@ import org.apache.hadoop.mapreduce.InputFormat;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.text.DecimalFormat;
|
||||
import java.net.URI;
|
||||
import java.net.InetAddress;
|
||||
|
@ -210,6 +214,18 @@ public class DistCpUtils {
|
|||
!srcFileStatus.getPermission().equals(targetFileStatus.getPermission())) {
|
||||
targetFS.setPermission(path, srcFileStatus.getPermission());
|
||||
}
|
||||
|
||||
if (attributes.contains(FileAttribute.XATTR)) {
|
||||
Map<String, byte[]> srcXAttrs = srcFileStatus.getXAttrs();
|
||||
Map<String, byte[]> targetXAttrs = getXAttrs(targetFS, path);
|
||||
if (!srcXAttrs.equals(targetXAttrs)) {
|
||||
Iterator<Entry<String, byte[]>> iter = srcXAttrs.entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
Entry<String, byte[]> entry = iter.next();
|
||||
targetFS.setXAttr(path, entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (attributes.contains(FileAttribute.REPLICATION) && ! targetFileStatus.isDirectory() &&
|
||||
srcFileStatus.getReplication() != targetFileStatus.getReplication()) {
|
||||
|
@ -247,19 +263,34 @@ public class DistCpUtils {
|
|||
.getEntries();
|
||||
return AclUtil.getAclFromPermAndEntries(fileStatus.getPermission(), entries);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a file's all xAttrs.
|
||||
*
|
||||
* @param fileSystem FileSystem containing the file
|
||||
* @param path file path
|
||||
* @return Map<String, byte[]> containing all xAttrs
|
||||
* @throws IOException if there is an I/O error
|
||||
*/
|
||||
public static Map<String, byte[]> getXAttrs(FileSystem fileSystem,
|
||||
Path path) throws IOException {
|
||||
return fileSystem.getXAttrs(path);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a FileStatus to a CopyListingFileStatus. If preserving ACLs,
|
||||
* populates the CopyListingFileStatus with the ACLs.
|
||||
* populates the CopyListingFileStatus with the ACLs. If preserving XAttrs,
|
||||
* populates the CopyListingFileStatus with the XAttrs.
|
||||
*
|
||||
* @param fileSystem FileSystem containing the file
|
||||
* @param fileStatus FileStatus of file
|
||||
* @param preserveAcls boolean true if preserving ACLs
|
||||
* @param preserveXAttrs boolean true if preserving XAttrs
|
||||
* @throws IOException if there is an I/O error
|
||||
*/
|
||||
public static CopyListingFileStatus toCopyListingFileStatus(
|
||||
FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls)
|
||||
throws IOException {
|
||||
FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls,
|
||||
boolean preserveXAttrs) throws IOException {
|
||||
CopyListingFileStatus copyListingFileStatus =
|
||||
new CopyListingFileStatus(fileStatus);
|
||||
if (preserveAcls) {
|
||||
|
@ -270,6 +301,10 @@ public class DistCpUtils {
|
|||
copyListingFileStatus.setAclEntries(aclEntries);
|
||||
}
|
||||
}
|
||||
if (preserveXAttrs) {
|
||||
Map<String, byte[]> xAttrs = fileSystem.getXAttrs(fileStatus.getPath());
|
||||
copyListingFileStatus.setXAttrs(xAttrs);
|
||||
}
|
||||
return copyListingFileStatus;
|
||||
}
|
||||
|
||||
|
@ -314,6 +349,25 @@ public class DistCpUtils {
|
|||
+ fs.getUri());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if a file system supports XAttrs by running a getXAttrs request
|
||||
* on the file system root. This method is used before distcp job submission
|
||||
* to fail fast if the user requested preserving XAttrs, but the file system
|
||||
* cannot support XAttrs.
|
||||
*
|
||||
* @param fs FileSystem to check
|
||||
* @throws XAttrsNotSupportedException if fs does not support XAttrs
|
||||
*/
|
||||
public static void checkFileSystemXAttrSupport(FileSystem fs)
|
||||
throws XAttrsNotSupportedException {
|
||||
try {
|
||||
fs.getXAttrs(new Path(Path.SEPARATOR));
|
||||
} catch (Exception e) {
|
||||
throw new XAttrsNotSupportedException("XAttrs not supported for file system: "
|
||||
+ fs.getUri());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* String utility to convert a number-of-bytes to human readable format.
|
||||
|
|
|
@ -0,0 +1,322 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* Tests distcp in combination with HDFS XAttrs.
|
||||
*/
|
||||
public class TestDistCpWithXAttrs {
|
||||
|
||||
private static MiniDFSCluster cluster;
|
||||
private static Configuration conf;
|
||||
private static FileSystem fs;
|
||||
|
||||
//XAttrs
|
||||
private static final String name1 = "user.a1";
|
||||
private static final byte[] value1 = {0x31, 0x32, 0x33};
|
||||
private static final String name2 = "trusted.a2";
|
||||
private static final byte[] value2 = {0x37, 0x38, 0x39};
|
||||
private static final String name3 = "user.a3";
|
||||
private static final byte[] value3 = null;
|
||||
private static final String name4 = "user.a4";
|
||||
private static final byte[] value4 = null;
|
||||
|
||||
private static final Path dir1 = new Path("/src/dir1");
|
||||
private static final Path subDir1 = new Path(dir1, "subdir1");
|
||||
private static final Path file1 = new Path("/src/file1");
|
||||
private static final Path dir2 = new Path("/src/dir2");
|
||||
private static final Path file2 = new Path(dir2, "file2");
|
||||
private static final Path file3 = new Path(dir2, "file3");
|
||||
private static final Path file4 = new Path(dir2, "file4");
|
||||
private static final Path dstDir1 = new Path("/dstPreserveXAttrs/dir1");
|
||||
private static final Path dstSubDir1 = new Path(dstDir1, "subdir1");
|
||||
private static final Path dstFile1 = new Path("/dstPreserveXAttrs/file1");
|
||||
private static final Path dstDir2 = new Path("/dstPreserveXAttrs/dir2");
|
||||
private static final Path dstFile2 = new Path(dstDir2, "file2");
|
||||
private static final Path dstFile3 = new Path(dstDir2, "file3");
|
||||
private static final Path dstFile4 = new Path(dstDir2, "file4");
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
initCluster(true, true);
|
||||
fs.mkdirs(subDir1);
|
||||
fs.create(file1).close();
|
||||
fs.mkdirs(dir2);
|
||||
fs.create(file2).close();
|
||||
fs.create(file3).close();
|
||||
fs.create(file4).close();
|
||||
|
||||
// dir1
|
||||
fs.setXAttr(dir1, name1, value1);
|
||||
fs.setXAttr(dir1, name2, value2);
|
||||
|
||||
// subDir1
|
||||
fs.setXAttr(subDir1, name1, value1);
|
||||
fs.setXAttr(subDir1, name3, value3);
|
||||
|
||||
// file1
|
||||
fs.setXAttr(file1, name1, value1);
|
||||
fs.setXAttr(file1, name2, value2);
|
||||
fs.setXAttr(file1, name3, value3);
|
||||
|
||||
// dir2
|
||||
fs.setXAttr(dir2, name2, value2);
|
||||
|
||||
// file2
|
||||
fs.setXAttr(file2, name1, value1);
|
||||
fs.setXAttr(file2, name4, value4);
|
||||
|
||||
// file3
|
||||
fs.setXAttr(file3, name3, value3);
|
||||
fs.setXAttr(file3, name4, value4);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdown() {
|
||||
IOUtils.cleanup(null, fs);
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreserveXAttrs() throws Exception {
|
||||
assertRunDistCp(DistCpConstants.SUCCESS, "/dstPreserveXAttrs");
|
||||
|
||||
// dstDir1
|
||||
Map<String, byte[]> xAttrs = Maps.newHashMap();
|
||||
xAttrs.put(name1, value1);
|
||||
xAttrs.put(name2, value2);
|
||||
assertXAttrs(dstDir1, xAttrs);
|
||||
|
||||
// dstSubDir1
|
||||
xAttrs.clear();
|
||||
xAttrs.put(name1, value1);
|
||||
xAttrs.put(name3, new byte[0]);
|
||||
assertXAttrs(dstSubDir1, xAttrs);
|
||||
|
||||
// dstFile1
|
||||
xAttrs.clear();
|
||||
xAttrs.put(name1, value1);
|
||||
xAttrs.put(name2, value2);
|
||||
xAttrs.put(name3, new byte[0]);
|
||||
assertXAttrs(dstFile1, xAttrs);
|
||||
|
||||
// dstDir2
|
||||
xAttrs.clear();
|
||||
xAttrs.put(name2, value2);
|
||||
assertXAttrs(dstDir2, xAttrs);
|
||||
|
||||
// dstFile2
|
||||
xAttrs.clear();
|
||||
xAttrs.put(name1, value1);
|
||||
xAttrs.put(name4, new byte[0]);
|
||||
assertXAttrs(dstFile2, xAttrs);
|
||||
|
||||
// dstFile3
|
||||
xAttrs.clear();
|
||||
xAttrs.put(name3, new byte[0]);
|
||||
xAttrs.put(name4, new byte[0]);
|
||||
assertXAttrs(dstFile3, xAttrs);
|
||||
|
||||
// dstFile4
|
||||
xAttrs.clear();
|
||||
assertXAttrs(dstFile4, xAttrs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXAttrsNotEnabled() throws Exception {
|
||||
try {
|
||||
restart(false);
|
||||
assertRunDistCp(DistCpConstants.XATTRS_NOT_SUPPORTED,
|
||||
"/dstXAttrsNotEnabled");
|
||||
} finally {
|
||||
restart(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXAttrsNotImplemented() throws Exception {
|
||||
assertRunDistCp(DistCpConstants.XATTRS_NOT_SUPPORTED,
|
||||
"stubfs://dstXAttrsNotImplemented");
|
||||
}
|
||||
|
||||
/**
|
||||
* Stub FileSystem implementation used for testing the case of attempting
|
||||
* distcp with XAttrs preserved on a file system that does not support XAttrs.
|
||||
* The base class implementation throws UnsupportedOperationException for
|
||||
* the XAttr methods, so we don't need to override them.
|
||||
*/
|
||||
public static class StubFileSystem extends FileSystem {
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream append(Path f, int bufferSize,
|
||||
Progressable progress) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream create(Path f, FsPermission permission,
|
||||
boolean overwrite, int bufferSize, short replication, long blockSize,
|
||||
Progressable progress) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus getFileStatus(Path f) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getUri() {
|
||||
return URI.create("stubfs:///");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getWorkingDirectory() {
|
||||
return new Path(Path.SEPARATOR);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus[] listStatus(Path f) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rename(Path src, Path dst) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWorkingDirectory(Path dir) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts the XAttrs returned by getXAttrs for a specific path.
|
||||
*
|
||||
* @param path String path to check
|
||||
* @param xAttrs XAttr[] expected xAttrs
|
||||
* @throws Exception if there is any error
|
||||
*/
|
||||
private static void assertXAttrs(Path path, Map<String, byte[]> expectedXAttrs)
|
||||
throws Exception {
|
||||
Map<String, byte[]> xAttrs = fs.getXAttrs(path);
|
||||
assertEquals(expectedXAttrs.size(), xAttrs.size());
|
||||
Iterator<Entry<String, byte[]>> i = expectedXAttrs.entrySet().iterator();
|
||||
while (i.hasNext()) {
|
||||
Entry<String, byte[]> e = i.next();
|
||||
String name = e.getKey();
|
||||
byte[] value = e.getValue();
|
||||
if (value == null) {
|
||||
assertTrue(xAttrs.containsKey(name) && xAttrs.get(name) == null);
|
||||
} else {
|
||||
assertArrayEquals(value, xAttrs.get(name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs distcp from /src to specified destination, preserving XAttrs. Asserts
|
||||
* expected exit code.
|
||||
*
|
||||
* @param int exitCode expected exit code
|
||||
* @param dst String distcp destination
|
||||
* @throws Exception if there is any error
|
||||
*/
|
||||
private static void assertRunDistCp(int exitCode, String dst)
|
||||
throws Exception {
|
||||
DistCp distCp = new DistCp(conf, null);
|
||||
assertEquals(exitCode,
|
||||
ToolRunner.run(conf, distCp, new String[] { "-px", "/src", dst }));
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the cluster, wait for it to become active, and get FileSystem.
|
||||
*
|
||||
* @param format if true, format the NameNode and DataNodes before starting up
|
||||
* @param xAttrsEnabled if true, XAttr support is enabled
|
||||
* @throws Exception if any step fails
|
||||
*/
|
||||
private static void initCluster(boolean format, boolean xAttrsEnabled)
|
||||
throws Exception {
|
||||
conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, xAttrsEnabled);
|
||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "stubfs:///");
|
||||
conf.setClass("fs.stubfs.impl", StubFileSystem.class, FileSystem.class);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(format)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
/**
|
||||
* Restarts the cluster with XAttrs enabled or disabled.
|
||||
*
|
||||
* @param xAttrsEnabled if true, XAttr support is enabled
|
||||
* @throws Exception if any step fails
|
||||
*/
|
||||
private static void restart(boolean xAttrsEnabled) throws Exception {
|
||||
shutdown();
|
||||
initCluster(false, xAttrsEnabled);
|
||||
}
|
||||
}
|
|
@ -414,6 +414,7 @@ public class TestOptionsParser {
|
|||
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-p",
|
||||
|
@ -426,6 +427,7 @@ public class TestOptionsParser {
|
|||
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-pbr",
|
||||
|
@ -439,6 +441,7 @@ public class TestOptionsParser {
|
|||
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-pbrgup",
|
||||
|
@ -452,9 +455,10 @@ public class TestOptionsParser {
|
|||
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-pbrgupca",
|
||||
"-pbrgupcax",
|
||||
"-f",
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/"});
|
||||
|
@ -465,6 +469,7 @@ public class TestOptionsParser {
|
|||
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.ACL));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.XATTR));
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-pc",
|
||||
|
@ -478,6 +483,7 @@ public class TestOptionsParser {
|
|||
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR));
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-p",
|
||||
|
|
|
@ -450,6 +450,7 @@ public class TestCopyMapper {
|
|||
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
|
||||
EnumSet.allOf(DistCpOptions.FileAttribute.class);
|
||||
preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
|
||||
preserveStatus.remove(DistCpOptions.FileAttribute.XATTR);
|
||||
|
||||
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
|
||||
DistCpUtils.packAttributes(preserveStatus));
|
||||
|
@ -588,6 +589,7 @@ public class TestCopyMapper {
|
|||
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
|
||||
EnumSet.allOf(DistCpOptions.FileAttribute.class);
|
||||
preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
|
||||
preserveStatus.remove(DistCpOptions.FileAttribute.XATTR);
|
||||
|
||||
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
|
||||
DistCpUtils.packAttributes(preserveStatus));
|
||||
|
@ -663,6 +665,7 @@ public class TestCopyMapper {
|
|||
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
|
||||
EnumSet.allOf(DistCpOptions.FileAttribute.class);
|
||||
preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
|
||||
preserveStatus.remove(DistCpOptions.FileAttribute.XATTR);
|
||||
|
||||
final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
|
||||
= stubContext.getContext();
|
||||
|
|
Loading…
Reference in New Issue