MAPREDUCE-6007. Add support to distcp to preserve raw.* namespace extended attributes. (clamb)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1616657 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
311d2f0773
commit
041b8326a1
|
@ -11,5 +11,8 @@ fs-encryption (Unreleased)
|
|||
|
||||
IMPROVEMENTS
|
||||
|
||||
MAPREDUCE-6007. Add support to distcp to preserve raw.* namespace
|
||||
extended attributes. (clamb)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
|
|
|
@ -191,6 +191,26 @@ $H3 Update and Overwrite
|
|||
|
||||
If `-update` is used, `1` is overwritten as well.
|
||||
|
||||
$H3 raw Namespace Extended Attribute Preservation
|
||||
|
||||
This section only applies to HDFS.
|
||||
|
||||
If the target and all of the source pathnames are in the /.reserved/raw
|
||||
hierarchy, then 'raw' namespace extended attributes will be preserved.
|
||||
'raw' xattrs are used by the system for internal functions such as encryption
|
||||
meta data. They are only visible to users when accessed through the
|
||||
/.reserved/raw hierarchy.
|
||||
|
||||
raw xattrs are preserved based solely on whether /.reserved/raw prefixes are
|
||||
supplied. The -p (preserve, see below) flag does not impact preservation of
|
||||
raw xattrs.
|
||||
|
||||
To prevent raw xattrs from being preserved, simply do not use the
|
||||
/.reserved/raw prefix on any of the source and target paths.
|
||||
|
||||
If the /.reserved/raw prefix is specified on only a subset of the source and
|
||||
target paths, an error will be displayed and a non-0 exit code returned.
|
||||
|
||||
Command Line Options
|
||||
--------------------
|
||||
|
||||
|
|
|
@ -42,6 +42,8 @@ public class DistCpConstants {
|
|||
public static final String CONF_LABEL_LOG_PATH = "distcp.log.path";
|
||||
public static final String CONF_LABEL_IGNORE_FAILURES = "distcp.ignore.failures";
|
||||
public static final String CONF_LABEL_PRESERVE_STATUS = "distcp.preserve.status";
|
||||
public static final String CONF_LABEL_PRESERVE_RAWXATTRS =
|
||||
"distcp.preserve.rawxattrs";
|
||||
public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
|
||||
public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
|
||||
public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource";
|
||||
|
@ -128,4 +130,8 @@ public class DistCpConstants {
|
|||
public static final int MIN_RECORDS_PER_CHUNK_DEFAULT = 5;
|
||||
public static final int SPLIT_RATIO_DEFAULT = 2;
|
||||
|
||||
/**
|
||||
* Value of reserved raw HDFS directory when copying raw.* xattrs.
|
||||
*/
|
||||
static final String HDFS_RESERVED_RAW_DIRECTORY_NAME = "/.reserved/raw";
|
||||
}
|
||||
|
|
|
@ -48,7 +48,11 @@ public enum DistCpOptionSwitch {
|
|||
new Option("p", true, "preserve status (rbugpcax)(replication, " +
|
||||
"block-size, user, group, permission, checksum-type, ACL, XATTR). " +
|
||||
"If -p is specified with no <arg>, then preserves replication, " +
|
||||
"block size, user, group, permission and checksum type.")),
|
||||
"block size, user, group, permission and checksum type." +
|
||||
"raw.* xattrs are preserved when both the source and destination " +
|
||||
"paths are in the /.reserved/raw hierarchy (HDFS only). raw.* xattr" +
|
||||
"preservation is independent of the -p flag." +
|
||||
"Refer to the DistCp documentation for more details.")),
|
||||
|
||||
/**
|
||||
* Update target location by copying only files that are missing
|
||||
|
|
|
@ -52,6 +52,8 @@ public class DistCpOptions {
|
|||
|
||||
private EnumSet<FileAttribute> preserveStatus = EnumSet.noneOf(FileAttribute.class);
|
||||
|
||||
private boolean preserveRawXattrs;
|
||||
|
||||
private Path atomicWorkPath;
|
||||
|
||||
private Path logPath;
|
||||
|
@ -123,6 +125,7 @@ public class DistCpOptions {
|
|||
this.sslConfigurationFile = that.getSslConfigurationFile();
|
||||
this.copyStrategy = that.copyStrategy;
|
||||
this.preserveStatus = that.preserveStatus;
|
||||
this.preserveRawXattrs = that.preserveRawXattrs;
|
||||
this.atomicWorkPath = that.getAtomicWorkPath();
|
||||
this.logPath = that.getLogPath();
|
||||
this.sourceFileListing = that.getSourceFileListing();
|
||||
|
@ -345,7 +348,7 @@ public class DistCpOptions {
|
|||
}
|
||||
|
||||
/**
|
||||
* Checks if the input attibute should be preserved or not
|
||||
* Checks if the input attribute should be preserved or not
|
||||
*
|
||||
* @param attribute - Attribute to check
|
||||
* @return True if attribute should be preserved, false otherwise
|
||||
|
@ -369,6 +372,21 @@ public class DistCpOptions {
|
|||
preserveStatus.add(fileAttribute);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if raw.* xattrs should be preserved.
|
||||
* @return true if raw.* xattrs should be preserved.
|
||||
*/
|
||||
public boolean shouldPreserveRawXattrs() {
|
||||
return preserveRawXattrs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicate that raw.* xattrs should be preserved
|
||||
*/
|
||||
public void preserveRawXattrs() {
|
||||
preserveRawXattrs = true;
|
||||
}
|
||||
|
||||
/** Get work path for atomic commit. If null, the work
|
||||
* path would be parentOf(targetPath) + "/._WIP_" + nameOf(targetPath)
|
||||
*
|
||||
|
@ -565,6 +583,7 @@ public class DistCpOptions {
|
|||
", sourcePaths=" + sourcePaths +
|
||||
", targetPath=" + targetPath +
|
||||
", targetPathExists=" + targetPathExists +
|
||||
", preserveRawXattrs=" + preserveRawXattrs +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,9 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import java.io.*;
|
||||
import java.util.Stack;
|
||||
|
||||
import static org.apache.hadoop.tools.DistCpConstants
|
||||
.HDFS_RESERVED_RAW_DIRECTORY_NAME;
|
||||
|
||||
/**
|
||||
* The SimpleCopyListing is responsible for making the exhaustive list of
|
||||
* all files/directories under its specified list of input-paths.
|
||||
|
@ -67,6 +70,10 @@ public class SimpleCopyListing extends CopyListing {
|
|||
Path targetPath = options.getTargetPath();
|
||||
FileSystem targetFS = targetPath.getFileSystem(getConf());
|
||||
boolean targetIsFile = targetFS.isFile(targetPath);
|
||||
targetPath = targetFS.makeQualified(targetPath);
|
||||
final boolean targetIsReservedRaw =
|
||||
Path.getPathWithoutSchemeAndAuthority(targetPath).toString().
|
||||
startsWith(HDFS_RESERVED_RAW_DIRECTORY_NAME);
|
||||
|
||||
//If target is a file, then source has to be single file
|
||||
if (targetIsFile) {
|
||||
|
@ -93,6 +100,27 @@ public class SimpleCopyListing extends CopyListing {
|
|||
if (!fs.exists(path)) {
|
||||
throw new InvalidInputException(path + " doesn't exist");
|
||||
}
|
||||
if (Path.getPathWithoutSchemeAndAuthority(path).toString().
|
||||
startsWith(HDFS_RESERVED_RAW_DIRECTORY_NAME)) {
|
||||
if (!targetIsReservedRaw) {
|
||||
final String msg = "The source path '" + path + "' starts with " +
|
||||
HDFS_RESERVED_RAW_DIRECTORY_NAME + " but the target path '" +
|
||||
targetPath + "' does not. Either all or none of the paths must " +
|
||||
"have this prefix.";
|
||||
throw new InvalidInputException(msg);
|
||||
}
|
||||
} else if (targetIsReservedRaw) {
|
||||
final String msg = "The target path '" + targetPath + "' starts with " +
|
||||
HDFS_RESERVED_RAW_DIRECTORY_NAME + " but the source path '" +
|
||||
path + "' does not. Either all or none of the paths must " +
|
||||
"have this prefix.";
|
||||
throw new InvalidInputException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
if (targetIsReservedRaw) {
|
||||
options.preserveRawXattrs();
|
||||
getConf().setBoolean(DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, true);
|
||||
}
|
||||
|
||||
/* This is requires to allow map tasks to access each of the source
|
||||
|
@ -135,6 +163,9 @@ public class SimpleCopyListing extends CopyListing {
|
|||
try {
|
||||
for (Path path: options.getSourcePaths()) {
|
||||
FileSystem sourceFS = path.getFileSystem(getConf());
|
||||
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
|
||||
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
|
||||
final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs();
|
||||
path = makeQualified(path);
|
||||
|
||||
FileStatus rootStatus = sourceFS.getFileStatus(path);
|
||||
|
@ -145,8 +176,7 @@ public class SimpleCopyListing extends CopyListing {
|
|||
if (!explore || rootStatus.isDirectory()) {
|
||||
CopyListingFileStatus rootCopyListingStatus =
|
||||
DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus,
|
||||
options.shouldPreserve(FileAttribute.ACL),
|
||||
options.shouldPreserve(FileAttribute.XATTR));
|
||||
preserveAcls, preserveXAttrs, preserveRawXAttrs);
|
||||
writeToFileListingRoot(fileListWriter, rootCopyListingStatus,
|
||||
sourcePathRoot, options);
|
||||
}
|
||||
|
@ -157,9 +187,9 @@ public class SimpleCopyListing extends CopyListing {
|
|||
}
|
||||
CopyListingFileStatus sourceCopyListingStatus =
|
||||
DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
|
||||
options.shouldPreserve(FileAttribute.ACL) &&
|
||||
sourceStatus.isDirectory(), options.shouldPreserve(
|
||||
FileAttribute.XATTR) && sourceStatus.isDirectory());
|
||||
preserveAcls && sourceStatus.isDirectory(),
|
||||
preserveXAttrs && sourceStatus.isDirectory(),
|
||||
preserveRawXAttrs && sourceStatus.isDirectory());
|
||||
writeToFileListing(fileListWriter, sourceCopyListingStatus,
|
||||
sourcePathRoot, options);
|
||||
|
||||
|
@ -261,6 +291,9 @@ public class SimpleCopyListing extends CopyListing {
|
|||
DistCpOptions options)
|
||||
throws IOException {
|
||||
FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
|
||||
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
|
||||
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
|
||||
final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
|
||||
Stack<FileStatus> pathStack = new Stack<FileStatus>();
|
||||
pathStack.push(sourceStatus);
|
||||
|
||||
|
@ -271,8 +304,9 @@ public class SimpleCopyListing extends CopyListing {
|
|||
+ sourceStatus.getPath() + " for copy.");
|
||||
CopyListingFileStatus childCopyListingStatus =
|
||||
DistCpUtils.toCopyListingFileStatus(sourceFS, child,
|
||||
options.shouldPreserve(FileAttribute.ACL) && child.isDirectory(),
|
||||
options.shouldPreserve(FileAttribute.XATTR) && child.isDirectory());
|
||||
preserveAcls && child.isDirectory(),
|
||||
preserveXAttrs && child.isDirectory(),
|
||||
preserveRawXattrs && child.isDirectory());
|
||||
writeToFileListing(fileListWriter, childCopyListingStatus,
|
||||
sourcePathRoot, options);
|
||||
if (isDirectoryAndNotEmpty(sourceFS, child)) {
|
||||
|
|
|
@ -83,7 +83,9 @@ public class CopyCommitter extends FileOutputCommitter {
|
|||
cleanupTempFiles(jobContext);
|
||||
|
||||
String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
|
||||
if (attributes != null && !attributes.isEmpty()) {
|
||||
final boolean preserveRawXattrs =
|
||||
conf.getBoolean(DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
|
||||
if ((attributes != null && !attributes.isEmpty()) || preserveRawXattrs) {
|
||||
preserveFileAttributesForDirectories(conf);
|
||||
}
|
||||
|
||||
|
@ -167,6 +169,8 @@ public class CopyCommitter extends FileOutputCommitter {
|
|||
LOG.info("About to preserve attributes: " + attrSymbols);
|
||||
|
||||
EnumSet<FileAttribute> attributes = DistCpUtils.unpackAttributes(attrSymbols);
|
||||
final boolean preserveRawXattrs =
|
||||
conf.getBoolean(DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
|
||||
|
||||
Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
|
||||
FileSystem clusterFS = sourceListing.getFileSystem(conf);
|
||||
|
@ -194,7 +198,8 @@ public class CopyCommitter extends FileOutputCommitter {
|
|||
if (targetRoot.equals(targetFile) && syncOrOverwrite) continue;
|
||||
|
||||
FileSystem targetFS = targetFile.getFileSystem(conf);
|
||||
DistCpUtils.preserve(targetFS, targetFile, srcFileStatus, attributes);
|
||||
DistCpUtils.preserve(targetFS, targetFile, srcFileStatus, attributes,
|
||||
preserveRawXattrs);
|
||||
|
||||
taskAttemptContext.progress();
|
||||
taskAttemptContext.setStatus("Preserving status on directory entries. [" +
|
||||
|
|
|
@ -200,6 +200,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|||
|
||||
EnumSet<DistCpOptions.FileAttribute> fileAttributes
|
||||
= getFileAttributeSettings(context);
|
||||
final boolean preserveRawXattrs = context.getConfiguration().getBoolean(
|
||||
DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
|
||||
|
||||
final String description = "Copying " + sourcePath + " to " + target;
|
||||
context.setStatus(description);
|
||||
|
@ -211,10 +213,12 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|||
FileSystem sourceFS;
|
||||
try {
|
||||
sourceFS = sourcePath.getFileSystem(conf);
|
||||
final boolean preserveXAttrs =
|
||||
fileAttributes.contains(FileAttribute.XATTR);
|
||||
sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS,
|
||||
sourceFS.getFileStatus(sourcePath),
|
||||
fileAttributes.contains(FileAttribute.ACL),
|
||||
fileAttributes.contains(FileAttribute.XATTR));
|
||||
preserveXAttrs, preserveRawXattrs);
|
||||
} catch (FileNotFoundException e) {
|
||||
throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
|
||||
}
|
||||
|
@ -249,8 +253,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|||
action, fileAttributes);
|
||||
}
|
||||
|
||||
DistCpUtils.preserve(target.getFileSystem(conf), target,
|
||||
sourceCurrStatus, fileAttributes);
|
||||
DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus,
|
||||
fileAttributes, preserveRawXattrs);
|
||||
} catch (IOException exception) {
|
||||
handleFailures(exception, sourceFileStatus, target, context);
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.tools.util;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -25,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.XAttr;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclUtil;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
@ -151,7 +153,7 @@ public class DistCpUtils {
|
|||
* @return - String containing first letters of each attribute to preserve
|
||||
*/
|
||||
public static String packAttributes(EnumSet<FileAttribute> attributes) {
|
||||
StringBuffer buffer = new StringBuffer(5);
|
||||
StringBuffer buffer = new StringBuffer(FileAttribute.values().length);
|
||||
int len = 0;
|
||||
for (FileAttribute attribute : attributes) {
|
||||
buffer.append(attribute.name().charAt(0));
|
||||
|
@ -186,13 +188,15 @@ public class DistCpUtils {
|
|||
* @param targetFS - File system
|
||||
* @param path - Path that needs to preserve original file status
|
||||
* @param srcFileStatus - Original file status
|
||||
* @param attributes - Attribute set that need to be preserved
|
||||
* @param attributes - Attribute set that needs to be preserved
|
||||
* @param preserveRawXattrs if true, raw.* xattrs should be preserved
|
||||
* @throws IOException - Exception if any (particularly relating to group/owner
|
||||
* change or any transient error)
|
||||
*/
|
||||
public static void preserve(FileSystem targetFS, Path path,
|
||||
CopyListingFileStatus srcFileStatus,
|
||||
EnumSet<FileAttribute> attributes) throws IOException {
|
||||
EnumSet<FileAttribute> attributes,
|
||||
boolean preserveRawXattrs) throws IOException {
|
||||
|
||||
FileStatus targetFileStatus = targetFS.getFileStatus(path);
|
||||
String group = targetFileStatus.getGroup();
|
||||
|
@ -214,15 +218,20 @@ public class DistCpUtils {
|
|||
!srcFileStatus.getPermission().equals(targetFileStatus.getPermission())) {
|
||||
targetFS.setPermission(path, srcFileStatus.getPermission());
|
||||
}
|
||||
|
||||
if (attributes.contains(FileAttribute.XATTR)) {
|
||||
|
||||
final boolean preserveXAttrs = attributes.contains(FileAttribute.XATTR);
|
||||
if (preserveXAttrs || preserveRawXattrs) {
|
||||
final String rawNS = XAttr.NameSpace.RAW.name().toLowerCase();
|
||||
Map<String, byte[]> srcXAttrs = srcFileStatus.getXAttrs();
|
||||
Map<String, byte[]> targetXAttrs = getXAttrs(targetFS, path);
|
||||
if (!srcXAttrs.equals(targetXAttrs)) {
|
||||
if (srcXAttrs != null && !srcXAttrs.equals(targetXAttrs)) {
|
||||
Iterator<Entry<String, byte[]>> iter = srcXAttrs.entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
Entry<String, byte[]> entry = iter.next();
|
||||
targetFS.setXAttr(path, entry.getKey(), entry.getValue());
|
||||
final String xattrName = entry.getKey();
|
||||
if (xattrName.startsWith(rawNS) || preserveXAttrs) {
|
||||
targetFS.setXAttr(path, entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -286,11 +295,12 @@ public class DistCpUtils {
|
|||
* @param fileStatus FileStatus of file
|
||||
* @param preserveAcls boolean true if preserving ACLs
|
||||
* @param preserveXAttrs boolean true if preserving XAttrs
|
||||
* @param preserveRawXAttrs boolean true if preserving raw.* XAttrs
|
||||
* @throws IOException if there is an I/O error
|
||||
*/
|
||||
public static CopyListingFileStatus toCopyListingFileStatus(
|
||||
FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls,
|
||||
boolean preserveXAttrs) throws IOException {
|
||||
boolean preserveXAttrs, boolean preserveRawXAttrs) throws IOException {
|
||||
CopyListingFileStatus copyListingFileStatus =
|
||||
new CopyListingFileStatus(fileStatus);
|
||||
if (preserveAcls) {
|
||||
|
@ -301,9 +311,25 @@ public class DistCpUtils {
|
|||
copyListingFileStatus.setAclEntries(aclEntries);
|
||||
}
|
||||
}
|
||||
if (preserveXAttrs) {
|
||||
Map<String, byte[]> xAttrs = fileSystem.getXAttrs(fileStatus.getPath());
|
||||
copyListingFileStatus.setXAttrs(xAttrs);
|
||||
if (preserveXAttrs || preserveRawXAttrs) {
|
||||
Map<String, byte[]> srcXAttrs = fileSystem.getXAttrs(fileStatus.getPath());
|
||||
if (preserveXAttrs && preserveRawXAttrs) {
|
||||
copyListingFileStatus.setXAttrs(srcXAttrs);
|
||||
} else {
|
||||
Map<String, byte[]> trgXAttrs = Maps.newHashMap();
|
||||
final String rawNS = XAttr.NameSpace.RAW.name().toLowerCase();
|
||||
for (Map.Entry<String, byte[]> ent : srcXAttrs.entrySet()) {
|
||||
final String xattrName = ent.getKey();
|
||||
if (xattrName.startsWith(rawNS)) {
|
||||
if (preserveRawXAttrs) {
|
||||
trgXAttrs.put(xattrName, ent.getValue());
|
||||
}
|
||||
} else if (preserveXAttrs) {
|
||||
trgXAttrs.put(xattrName, ent.getValue());
|
||||
}
|
||||
}
|
||||
copyListingFileStatus.setXAttrs(trgXAttrs);
|
||||
}
|
||||
}
|
||||
return copyListingFileStatus;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.tools.util.DistCpTestUtils;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* Tests distcp in combination with HDFS raw.* XAttrs.
|
||||
*/
|
||||
public class TestDistCpWithRawXAttrs {
|
||||
|
||||
private static MiniDFSCluster cluster;
|
||||
private static Configuration conf;
|
||||
private static FileSystem fs;
|
||||
|
||||
private static final String rawName1 = "raw.a1";
|
||||
private static final byte[] rawValue1 = {0x37, 0x38, 0x39};
|
||||
private static final String userName1 = "user.a1";
|
||||
private static final byte[] userValue1 = {0x38, 0x38, 0x38};
|
||||
|
||||
private static final Path dir1 = new Path("/src/dir1");
|
||||
private static final Path subDir1 = new Path(dir1, "subdir1");
|
||||
private static final Path file1 = new Path("/src/file1");
|
||||
private static final String rawRootName = "/.reserved/raw";
|
||||
private static final String rootedDestName = "/dest";
|
||||
private static final String rootedSrcName = "/src";
|
||||
private static final String rawDestName = "/.reserved/raw/dest";
|
||||
private static final String rawSrcName = "/.reserved/raw/src";
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdown() {
|
||||
IOUtils.cleanup(null, fs);
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/* Test that XAttrs and raw.* XAttrs are preserved when appropriate. */
|
||||
@Test
|
||||
public void testPreserveRawXAttrs1() throws Exception {
|
||||
final String relSrc = "/./.reserved/../.reserved/raw/../raw/src/../src";
|
||||
final String relDst = "/./.reserved/../.reserved/raw/../raw/dest/../dest";
|
||||
doTestPreserveRawXAttrs(relSrc, relDst, "-px", true, true,
|
||||
DistCpConstants.SUCCESS);
|
||||
doTestPreserveRawXAttrs(rootedSrcName, rootedDestName, "-px",
|
||||
false, true, DistCpConstants.SUCCESS);
|
||||
doTestPreserveRawXAttrs(rootedSrcName, rawDestName, "-px",
|
||||
false, true, DistCpConstants.INVALID_ARGUMENT);
|
||||
doTestPreserveRawXAttrs(rawSrcName, rootedDestName, "-px",
|
||||
false, true, DistCpConstants.INVALID_ARGUMENT);
|
||||
doTestPreserveRawXAttrs(rawSrcName, rawDestName, "-px",
|
||||
true, true, DistCpConstants.SUCCESS);
|
||||
final Path savedWd = fs.getWorkingDirectory();
|
||||
try {
|
||||
fs.setWorkingDirectory(new Path("/.reserved/raw"));
|
||||
doTestPreserveRawXAttrs("../.." + rawSrcName, "../.." + rawDestName,
|
||||
"-px", true, true, DistCpConstants.SUCCESS);
|
||||
} finally {
|
||||
fs.setWorkingDirectory(savedWd);
|
||||
}
|
||||
}
|
||||
|
||||
/* Test that XAttrs are not preserved and raw.* are when appropriate. */
|
||||
@Test
|
||||
public void testPreserveRawXAttrs2() throws Exception {
|
||||
doTestPreserveRawXAttrs(rootedSrcName, rootedDestName, "-p",
|
||||
false, false, DistCpConstants.SUCCESS);
|
||||
doTestPreserveRawXAttrs(rootedSrcName, rawDestName, "-p",
|
||||
false, false, DistCpConstants.INVALID_ARGUMENT);
|
||||
doTestPreserveRawXAttrs(rawSrcName, rootedDestName, "-p",
|
||||
false, false, DistCpConstants.INVALID_ARGUMENT);
|
||||
doTestPreserveRawXAttrs(rawSrcName, rawDestName, "-p",
|
||||
true, false, DistCpConstants.SUCCESS);
|
||||
}
|
||||
|
||||
/* Test that XAttrs are not preserved and raw.* are when appropriate. */
|
||||
@Test
|
||||
public void testPreserveRawXAttrs3() throws Exception {
|
||||
doTestPreserveRawXAttrs(rootedSrcName, rootedDestName, null,
|
||||
false, false, DistCpConstants.SUCCESS);
|
||||
doTestPreserveRawXAttrs(rootedSrcName, rawDestName, null,
|
||||
false, false, DistCpConstants.INVALID_ARGUMENT);
|
||||
doTestPreserveRawXAttrs(rawSrcName, rootedDestName, null,
|
||||
false, false, DistCpConstants.INVALID_ARGUMENT);
|
||||
doTestPreserveRawXAttrs(rawSrcName, rawDestName, null,
|
||||
true, false, DistCpConstants.SUCCESS);
|
||||
}
|
||||
|
||||
private static Path[] pathnames = { new Path("dir1"),
|
||||
new Path("dir1/subdir1"),
|
||||
new Path("file1") };
|
||||
|
||||
private static void makeFilesAndDirs(FileSystem fs) throws Exception {
|
||||
fs.delete(new Path("/src"), true);
|
||||
fs.delete(new Path("/dest"), true);
|
||||
fs.mkdirs(subDir1);
|
||||
fs.create(file1).close();
|
||||
}
|
||||
|
||||
private void initXAttrs() throws Exception {
|
||||
makeFilesAndDirs(fs);
|
||||
for (Path p : pathnames) {
|
||||
fs.setXAttr(new Path(rawRootName + "/src", p), rawName1, rawValue1);
|
||||
fs.setXAttr(new Path(rawRootName + "/src", p), userName1, userValue1);
|
||||
}
|
||||
}
|
||||
|
||||
private void doTestPreserveRawXAttrs(String src, String dest,
|
||||
String preserveOpts, boolean expectRaw, boolean expectUser,
|
||||
int expectedExitCode) throws Exception {
|
||||
initXAttrs();
|
||||
|
||||
DistCpTestUtils.assertRunDistCp(expectedExitCode, src, dest,
|
||||
preserveOpts, conf);
|
||||
|
||||
if (expectedExitCode == DistCpConstants.SUCCESS) {
|
||||
Map<String, byte[]> xAttrs = Maps.newHashMap();
|
||||
for (Path p : pathnames) {
|
||||
xAttrs.clear();
|
||||
if (expectRaw) {
|
||||
xAttrs.put(rawName1, rawValue1);
|
||||
}
|
||||
if (expectUser) {
|
||||
xAttrs.put(userName1, userValue1);
|
||||
}
|
||||
DistCpTestUtils.assertXAttrs(new Path(dest, p), fs, xAttrs);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,13 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
|
@ -37,8 +33,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.tools.util.DistCpTestUtils;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -79,6 +75,7 @@ public class TestDistCpWithXAttrs {
|
|||
private static final Path dstFile2 = new Path(dstDir2, "file2");
|
||||
private static final Path dstFile3 = new Path(dstDir2, "file3");
|
||||
private static final Path dstFile4 = new Path(dstDir2, "file4");
|
||||
private static final String rootedSrcName = "/src";
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
|
@ -125,55 +122,56 @@ public class TestDistCpWithXAttrs {
|
|||
|
||||
@Test
|
||||
public void testPreserveXAttrs() throws Exception {
|
||||
assertRunDistCp(DistCpConstants.SUCCESS, "/dstPreserveXAttrs");
|
||||
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, rootedSrcName,
|
||||
"/dstPreserveXAttrs", "-px", conf);
|
||||
|
||||
// dstDir1
|
||||
Map<String, byte[]> xAttrs = Maps.newHashMap();
|
||||
xAttrs.put(name1, value1);
|
||||
xAttrs.put(name2, value2);
|
||||
assertXAttrs(dstDir1, xAttrs);
|
||||
DistCpTestUtils.assertXAttrs(dstDir1, fs, xAttrs);
|
||||
|
||||
// dstSubDir1
|
||||
xAttrs.clear();
|
||||
xAttrs.put(name1, value1);
|
||||
xAttrs.put(name3, new byte[0]);
|
||||
assertXAttrs(dstSubDir1, xAttrs);
|
||||
DistCpTestUtils.assertXAttrs(dstSubDir1, fs, xAttrs);
|
||||
|
||||
// dstFile1
|
||||
xAttrs.clear();
|
||||
xAttrs.put(name1, value1);
|
||||
xAttrs.put(name2, value2);
|
||||
xAttrs.put(name3, new byte[0]);
|
||||
assertXAttrs(dstFile1, xAttrs);
|
||||
DistCpTestUtils.assertXAttrs(dstFile1, fs, xAttrs);
|
||||
|
||||
// dstDir2
|
||||
xAttrs.clear();
|
||||
xAttrs.put(name2, value2);
|
||||
assertXAttrs(dstDir2, xAttrs);
|
||||
DistCpTestUtils.assertXAttrs(dstDir2, fs, xAttrs);
|
||||
|
||||
// dstFile2
|
||||
xAttrs.clear();
|
||||
xAttrs.put(name1, value1);
|
||||
xAttrs.put(name4, new byte[0]);
|
||||
assertXAttrs(dstFile2, xAttrs);
|
||||
DistCpTestUtils.assertXAttrs(dstFile2, fs, xAttrs);
|
||||
|
||||
// dstFile3
|
||||
xAttrs.clear();
|
||||
xAttrs.put(name3, new byte[0]);
|
||||
xAttrs.put(name4, new byte[0]);
|
||||
assertXAttrs(dstFile3, xAttrs);
|
||||
DistCpTestUtils.assertXAttrs(dstFile3, fs, xAttrs);
|
||||
|
||||
// dstFile4
|
||||
xAttrs.clear();
|
||||
assertXAttrs(dstFile4, xAttrs);
|
||||
DistCpTestUtils.assertXAttrs(dstFile4, fs, xAttrs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXAttrsNotEnabled() throws Exception {
|
||||
try {
|
||||
restart(false);
|
||||
assertRunDistCp(DistCpConstants.XATTRS_NOT_SUPPORTED,
|
||||
"/dstXAttrsNotEnabled");
|
||||
DistCpTestUtils.assertRunDistCp(DistCpConstants.XATTRS_NOT_SUPPORTED,
|
||||
rootedSrcName, "/dstXAttrsNotEnabled", "-px", conf);
|
||||
} finally {
|
||||
restart(true);
|
||||
}
|
||||
|
@ -181,8 +179,8 @@ public class TestDistCpWithXAttrs {
|
|||
|
||||
@Test
|
||||
public void testXAttrsNotImplemented() throws Exception {
|
||||
assertRunDistCp(DistCpConstants.XATTRS_NOT_SUPPORTED,
|
||||
"stubfs://dstXAttrsNotImplemented");
|
||||
DistCpTestUtils.assertRunDistCp(DistCpConstants.XATTRS_NOT_SUPPORTED,
|
||||
rootedSrcName, "stubfs://dstXAttrsNotImplemented", "-px", conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -251,45 +249,6 @@ public class TestDistCpWithXAttrs {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts the XAttrs returned by getXAttrs for a specific path.
|
||||
*
|
||||
* @param path String path to check
|
||||
* @param xAttrs XAttr[] expected xAttrs
|
||||
* @throws Exception if there is any error
|
||||
*/
|
||||
private static void assertXAttrs(Path path, Map<String, byte[]> expectedXAttrs)
|
||||
throws Exception {
|
||||
Map<String, byte[]> xAttrs = fs.getXAttrs(path);
|
||||
assertEquals(expectedXAttrs.size(), xAttrs.size());
|
||||
Iterator<Entry<String, byte[]>> i = expectedXAttrs.entrySet().iterator();
|
||||
while (i.hasNext()) {
|
||||
Entry<String, byte[]> e = i.next();
|
||||
String name = e.getKey();
|
||||
byte[] value = e.getValue();
|
||||
if (value == null) {
|
||||
assertTrue(xAttrs.containsKey(name) && xAttrs.get(name) == null);
|
||||
} else {
|
||||
assertArrayEquals(value, xAttrs.get(name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs distcp from /src to specified destination, preserving XAttrs. Asserts
|
||||
* expected exit code.
|
||||
*
|
||||
* @param int exitCode expected exit code
|
||||
* @param dst String distcp destination
|
||||
* @throws Exception if there is any error
|
||||
*/
|
||||
private static void assertRunDistCp(int exitCode, String dst)
|
||||
throws Exception {
|
||||
DistCp distCp = new DistCp(conf, null);
|
||||
assertEquals(exitCode,
|
||||
ToolRunner.run(conf, distCp, new String[] { "-px", "/src", dst }));
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the cluster, wait for it to become active, and get FileSystem.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.tools.util;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import org.apache.hadoop.tools.DistCp;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
/**
|
||||
* Utility class for DistCpTests
|
||||
*/
|
||||
public class DistCpTestUtils {
|
||||
|
||||
/**
|
||||
* Asserts the XAttrs returned by getXAttrs for a specific path match an
|
||||
* expected set of XAttrs.
|
||||
*
|
||||
* @param path String path to check
|
||||
* @param fs FileSystem to use for the path
|
||||
* @param expectedXAttrs XAttr[] expected xAttrs
|
||||
* @throws Exception if there is any error
|
||||
*/
|
||||
public static void assertXAttrs(Path path, FileSystem fs,
|
||||
Map<String, byte[]> expectedXAttrs)
|
||||
throws Exception {
|
||||
Map<String, byte[]> xAttrs = fs.getXAttrs(path);
|
||||
assertEquals(path.toString(), expectedXAttrs.size(), xAttrs.size());
|
||||
Iterator<Entry<String, byte[]>> i = expectedXAttrs.entrySet().iterator();
|
||||
while (i.hasNext()) {
|
||||
Entry<String, byte[]> e = i.next();
|
||||
String name = e.getKey();
|
||||
byte[] value = e.getValue();
|
||||
if (value == null) {
|
||||
assertTrue(xAttrs.containsKey(name) && xAttrs.get(name) == null);
|
||||
} else {
|
||||
assertArrayEquals(value, xAttrs.get(name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs distcp from src to dst, preserving XAttrs. Asserts the
|
||||
* expected exit code.
|
||||
*
|
||||
* @param exitCode expected exit code
|
||||
* @param src distcp src path
|
||||
* @param dst distcp destination
|
||||
* @param options distcp command line options
|
||||
* @param conf Configuration to use
|
||||
* @throws Exception if there is any error
|
||||
*/
|
||||
public static void assertRunDistCp(int exitCode, String src, String dst,
|
||||
String options, Configuration conf)
|
||||
throws Exception {
|
||||
DistCp distCp = new DistCp(conf, null);
|
||||
String[] optsArr = options == null ?
|
||||
new String[] { src, dst } :
|
||||
new String[] { options, src, dst };
|
||||
assertEquals(exitCode,
|
||||
ToolRunner.run(conf, distCp, optsArr));
|
||||
}
|
||||
}
|
|
@ -114,14 +114,14 @@ public class TestDistCpUtils {
|
|||
fs.setPermission(path, noPerm);
|
||||
fs.setOwner(path, "nobody", "nobody");
|
||||
|
||||
DistCpUtils.preserve(fs, path, srcStatus, attributes);
|
||||
DistCpUtils.preserve(fs, path, srcStatus, attributes, false);
|
||||
FileStatus target = fs.getFileStatus(path);
|
||||
Assert.assertEquals(target.getPermission(), noPerm);
|
||||
Assert.assertEquals(target.getOwner(), "nobody");
|
||||
Assert.assertEquals(target.getGroup(), "nobody");
|
||||
|
||||
attributes.add(FileAttribute.PERMISSION);
|
||||
DistCpUtils.preserve(fs, path, srcStatus, attributes);
|
||||
DistCpUtils.preserve(fs, path, srcStatus, attributes, false);
|
||||
target = fs.getFileStatus(path);
|
||||
Assert.assertEquals(target.getPermission(), srcStatus.getPermission());
|
||||
Assert.assertEquals(target.getOwner(), "nobody");
|
||||
|
@ -129,7 +129,7 @@ public class TestDistCpUtils {
|
|||
|
||||
attributes.add(FileAttribute.GROUP);
|
||||
attributes.add(FileAttribute.USER);
|
||||
DistCpUtils.preserve(fs, path, srcStatus, attributes);
|
||||
DistCpUtils.preserve(fs, path, srcStatus, attributes, false);
|
||||
target = fs.getFileStatus(path);
|
||||
Assert.assertEquals(target.getPermission(), srcStatus.getPermission());
|
||||
Assert.assertEquals(target.getOwner(), srcStatus.getOwner());
|
||||
|
|
Loading…
Reference in New Issue