MAPREDUCE-5809. Merging change r1595283 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1595290 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-05-16 18:32:15 +00:00
parent 2125caaa2f
commit d20f565b4c
30 changed files with 939 additions and 273 deletions

View File

@ -99,6 +99,21 @@ public FileStatus(long length, boolean isdir,
assert (isdir && symlink == null) || !isdir;
}
/**
* Copy constructor.
*
* @param other FileStatus to copy
*/
public FileStatus(FileStatus other) throws IOException {
// It's important to call the getters here instead of directly accessing the
// members. Subclasses like ViewFsFileStatus can override the getters.
this(other.getLen(), other.isDirectory(), other.getReplication(),
other.getBlockSize(), other.getModificationTime(), other.getAccessTime(),
other.getPermission(), other.getOwner(), other.getGroup(),
(other.isSymlink() ? other.getSymlink() : null),
other.getPath());
}
/**
* Get the length of this file, in bytes.
* @return the length of this file, in bytes.

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.permission;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.collect.Lists;
/**
* AclUtil contains utility methods for manipulating ACLs.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Unstable
public final class AclUtil {
/**
* Given permissions and extended ACL entries, returns the full logical ACL.
*
* @param perm FsPermission containing permissions
* @param entries List<AclEntry> containing extended ACL entries
* @return List<AclEntry> containing full logical ACL
*/
public static List<AclEntry> getAclFromPermAndEntries(FsPermission perm,
List<AclEntry> entries) {
List<AclEntry> acl = Lists.newArrayListWithCapacity(entries.size() + 3);
// Owner entry implied by owner permission bits.
acl.add(new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.USER)
.setPermission(perm.getUserAction())
.build());
// All extended access ACL entries.
boolean hasAccessAcl = false;
Iterator<AclEntry> entryIter = entries.iterator();
AclEntry curEntry = null;
while (entryIter.hasNext()) {
curEntry = entryIter.next();
if (curEntry.getScope() == AclEntryScope.DEFAULT) {
break;
}
hasAccessAcl = true;
acl.add(curEntry);
}
// Mask entry implied by group permission bits, or group entry if there is
// no access ACL (only default ACL).
acl.add(new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(hasAccessAcl ? AclEntryType.MASK : AclEntryType.GROUP)
.setPermission(perm.getGroupAction())
.build());
// Other entry implied by other bits.
acl.add(new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.OTHER)
.setPermission(perm.getOtherAction())
.build());
// Default ACL entries.
if (curEntry != null && curEntry.getScope() == AclEntryScope.DEFAULT) {
acl.add(curEntry);
while (entryIter.hasNext()) {
acl.add(entryIter.next());
}
}
return acl;
}
/**
* Translates the given permission bits to the equivalent minimal ACL.
*
* @param perm FsPermission to translate
* @return List<AclEntry> containing exactly 3 entries representing the owner,
* group and other permissions
*/
public static List<AclEntry> getMinimalAcl(FsPermission perm) {
return Lists.newArrayList(
new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.USER)
.setPermission(perm.getUserAction())
.build(),
new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.GROUP)
.setPermission(perm.getGroupAction())
.build(),
new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.OTHER)
.setPermission(perm.getOtherAction())
.build());
}
/**
* Checks if the given entries represent a minimal ACL (contains exactly 3
* entries).
*
* @param entries List<AclEntry> entries to check
* @return boolean true if the entries represent a minimal ACL
*/
public static boolean isMinimalAcl(List<AclEntry> entries) {
return entries.size() == 3;
}
/**
* There is no reason to instantiate this class.
*/
private AclUtil() {
}
}

View File

@ -15,12 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
package org.apache.hadoop.fs.permission;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
@ -28,8 +29,9 @@
* Groups a list of ACL entries into separate lists for access entries vs.
* default entries.
*/
@InterfaceAudience.Private
final class ScopedAclEntries {
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Unstable
public final class ScopedAclEntries {
private static final int PIVOT_NOT_FOUND = -1;
private final List<AclEntry> accessEntries;

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.fs.shell;
import java.io.IOException;
import java.util.Iterator;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@ -31,8 +31,10 @@
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.AclUtil;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.ScopedAclEntries;
/**
* Acl related operations
@ -84,67 +86,34 @@ protected void processPath(PathData item) throws IOException {
(perm.getOtherAction().implies(FsAction.EXECUTE) ? "t" : "T"));
}
if (perm.getAclBit()) {
AclStatus aclStatus = item.fs.getAclStatus(item.path);
List<AclEntry> entries = aclStatus.getEntries();
printExtendedAcl(perm, entries);
} else {
printMinimalAcl(perm);
}
List<AclEntry> entries = perm.getAclBit() ?
item.fs.getAclStatus(item.path).getEntries() :
Collections.<AclEntry>emptyList();
ScopedAclEntries scopedEntries = new ScopedAclEntries(
AclUtil.getAclFromPermAndEntries(perm, entries));
printAclEntriesForSingleScope(scopedEntries.getAccessEntries());
printAclEntriesForSingleScope(scopedEntries.getDefaultEntries());
out.println();
}
/**
* Prints an extended ACL, including all extended ACL entries and also the
* base entries implied by the permission bits.
* Prints all the ACL entries in a single scope.
*
* @param perm FsPermission of file
* @param entries List<AclEntry> containing ACL entries of file
*/
private void printExtendedAcl(FsPermission perm, List<AclEntry> entries) {
// Print owner entry implied by owner permission bits.
out.println(new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.USER)
.setPermission(perm.getUserAction())
.build());
// Print all extended access ACL entries.
boolean hasAccessAcl = false;
Iterator<AclEntry> entryIter = entries.iterator();
AclEntry curEntry = null;
while (entryIter.hasNext()) {
curEntry = entryIter.next();
if (curEntry.getScope() == AclEntryScope.DEFAULT) {
break;
}
hasAccessAcl = true;
printExtendedAclEntry(curEntry, perm.getGroupAction());
private void printAclEntriesForSingleScope(List<AclEntry> entries) {
if (entries.isEmpty()) {
return;
}
// Print mask entry implied by group permission bits, or print group entry
// if there is no access ACL (only default ACL).
out.println(new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(hasAccessAcl ? AclEntryType.MASK : AclEntryType.GROUP)
.setPermission(perm.getGroupAction())
.build());
// Print other entry implied by other bits.
out.println(new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.OTHER)
.setPermission(perm.getOtherAction())
.build());
// Print default ACL entries.
if (curEntry != null && curEntry.getScope() == AclEntryScope.DEFAULT) {
out.println(curEntry);
// ACL sort order guarantees default mask is the second-to-last entry.
if (AclUtil.isMinimalAcl(entries)) {
for (AclEntry entry: entries) {
out.println(entry);
}
} else {
// ACL sort order guarantees mask is the second-to-last entry.
FsAction maskPerm = entries.get(entries.size() - 2).getPermission();
while (entryIter.hasNext()) {
printExtendedAclEntry(entryIter.next(), maskPerm);
for (AclEntry entry: entries) {
printExtendedAclEntry(entry, maskPerm);
}
}
}
@ -172,30 +141,6 @@ private void printExtendedAclEntry(AclEntry entry, FsAction maskPerm) {
out.println(entry);
}
}
/**
* Prints a minimal ACL, consisting of exactly 3 ACL entries implied by the
* permission bits.
*
* @param perm FsPermission of file
*/
private void printMinimalAcl(FsPermission perm) {
out.println(new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.USER)
.setPermission(perm.getUserAction())
.build());
out.println(new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.GROUP)
.setPermission(perm.getGroupAction())
.build());
out.println(new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.OTHER)
.setPermission(perm.getOtherAction())
.build());
}
}
/**

View File

@ -27,8 +27,10 @@
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.AclUtil;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.ScopedAclEntries;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
@ -90,7 +92,7 @@ public static void copyINodeDefaultAcl(INode child) {
FsPermission childPerm = child.getFsPermission();
// Copy each default ACL entry from parent to new child's access ACL.
boolean parentDefaultIsMinimal = isMinimalAcl(parentDefaultEntries);
boolean parentDefaultIsMinimal = AclUtil.isMinimalAcl(parentDefaultEntries);
for (AclEntry entry: parentDefaultEntries) {
AclEntryType type = entry.getType();
String name = entry.getName();
@ -127,7 +129,7 @@ public static void copyINodeDefaultAcl(INode child) {
Collections.<AclEntry>emptyList();
final FsPermission newPerm;
if (!isMinimalAcl(accessEntries) || !defaultEntries.isEmpty()) {
if (!AclUtil.isMinimalAcl(accessEntries) || !defaultEntries.isEmpty()) {
// Save the new ACL to the child.
child.addAclFeature(createAclFeature(accessEntries, defaultEntries));
newPerm = createFsPermissionForExtendedAcl(accessEntries, childPerm);
@ -172,7 +174,7 @@ public static List<AclEntry> readINodeLogicalAcl(INode inode) {
FsPermission perm = inode.getFsPermission();
AclFeature f = inode.getAclFeature();
if (f == null) {
return getMinimalAcl(perm);
return AclUtil.getMinimalAcl(perm);
}
final List<AclEntry> existingAcl;
@ -208,7 +210,7 @@ public static List<AclEntry> readINodeLogicalAcl(INode inode) {
} else {
// It's possible that there is a default ACL but no access ACL. In this
// case, add the minimal access ACL implied by the permission bits.
existingAcl.addAll(getMinimalAcl(perm));
existingAcl.addAll(AclUtil.getMinimalAcl(perm));
}
// Add all default entries after the access entries.
@ -267,7 +269,7 @@ public static void updateINodeAcl(INode inode, List<AclEntry> newAcl,
assert newAcl.size() >= 3;
FsPermission perm = inode.getFsPermission();
final FsPermission newPerm;
if (!isMinimalAcl(newAcl)) {
if (!AclUtil.isMinimalAcl(newAcl)) {
// This is an extended ACL. Split entries into access vs. default.
ScopedAclEntries scoped = new ScopedAclEntries(newAcl);
List<AclEntry> accessEntries = scoped.getAccessEntries();
@ -321,7 +323,7 @@ private static AclFeature createAclFeature(List<AclEntry> accessEntries,
// For the access ACL, the feature only needs to hold the named user and
// group entries. For a correctly sorted ACL, these will be in a
// predictable range.
if (!isMinimalAcl(accessEntries)) {
if (!AclUtil.isMinimalAcl(accessEntries)) {
featureEntries.addAll(
accessEntries.subList(1, accessEntries.size() - 2));
}
@ -366,41 +368,4 @@ private static FsPermission createFsPermissionForMinimalAcl(
accessEntries.get(2).getPermission(),
existingPerm.getStickyBit());
}
/**
* Translates the given permission bits to the equivalent minimal ACL.
*
* @param perm FsPermission to translate
* @return List<AclEntry> containing exactly 3 entries representing the owner,
* group and other permissions
*/
private static List<AclEntry> getMinimalAcl(FsPermission perm) {
return Lists.newArrayList(
new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.USER)
.setPermission(perm.getUserAction())
.build(),
new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.GROUP)
.setPermission(perm.getGroupAction())
.build(),
new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS)
.setType(AclEntryType.OTHER)
.setPermission(perm.getOtherAction())
.build());
}
/**
* Checks if the given entries represent a minimal ACL (contains exactly 3
* entries).
*
* @param entries List<AclEntry> entries to check
* @return boolean true if the entries represent a minimal ACL
*/
private static boolean isMinimalAcl(List<AclEntry> entries) {
return entries.size() == 3;
}
}

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.ScopedAclEntries;
import org.apache.hadoop.hdfs.protocol.AclException;
/**

View File

@ -58,6 +58,8 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5861. finishedSubMaps field in LocalContainerLauncher does not
need to be volatile. (Tsuyoshi OZAWA via junping_du)
MAPREDUCE-5809. Enhance distcp to support preserving HDFS ACLs. (cnauroth)
OPTIMIZATIONS
BUG FIXES

View File

@ -22,7 +22,6 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IOUtils;
@ -31,11 +30,15 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.URI;
import java.util.Set;
import com.google.common.collect.Sets;
/**
* The CopyListing abstraction is responsible for how the list of
* sources and targets is constructed, for DistCp's copy function.
* The copy-listing should be a SequenceFile<Text, FileStatus>,
* The copy-listing should be a SequenceFile<Text, CopyListingFileStatus>,
* located at the path specified to buildListing(),
* each entry being a pair of (Source relative path, source file status),
* all the paths being fully qualified.
@ -85,7 +88,7 @@ public final void buildListing(Path pathToListFile,
config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, getBytesToCopy());
config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths());
checkForDuplicates(pathToListFile);
validateFinalListing(pathToListFile, options);
}
/**
@ -124,13 +127,15 @@ protected abstract void doBuildListing(Path pathToListFile,
protected abstract long getNumberOfPaths();
/**
* Validate the final resulting path listing to see if there are any duplicate entries
* Validate the final resulting path listing. Checks if there are duplicate
* entries. If preserving ACLs, checks that file system can support ACLs.
*
* @param pathToListFile - path listing build by doBuildListing
* @param options - Input options to distcp
* @throws IOException - Any issues while checking for duplicates and throws
* @throws DuplicateFileException - if there are duplicates
*/
private void checkForDuplicates(Path pathToListFile)
private void validateFinalListing(Path pathToListFile, DistCpOptions options)
throws DuplicateFileException, IOException {
Configuration config = getConf();
@ -142,17 +147,26 @@ private void checkForDuplicates(Path pathToListFile)
config, SequenceFile.Reader.file(sortedList));
try {
Text lastKey = new Text("*"); //source relative path can never hold *
FileStatus lastFileStatus = new FileStatus();
CopyListingFileStatus lastFileStatus = new CopyListingFileStatus();
Text currentKey = new Text();
Set<URI> aclSupportCheckFsSet = Sets.newHashSet();
while (reader.next(currentKey)) {
if (currentKey.equals(lastKey)) {
FileStatus currentFileStatus = new FileStatus();
CopyListingFileStatus currentFileStatus = new CopyListingFileStatus();
reader.getCurrentValue(currentFileStatus);
throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " +
currentFileStatus.getPath() + " would cause duplicates. Aborting");
}
reader.getCurrentValue(lastFileStatus);
if (options.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
FileSystem lastFs = lastFileStatus.getPath().getFileSystem(config);
URI lastFsUri = lastFs.getUri();
if (!aclSupportCheckFsSet.contains(lastFsUri)) {
DistCpUtils.checkFileSystemAclSupport(lastFs);
aclSupportCheckFsSet.add(lastFsUri);
}
}
lastKey.set(currentKey);
}
} finally {
@ -236,4 +250,10 @@ public InvalidInputException(String message) {
super(message);
}
}
public static class AclsNotSupportedException extends RuntimeException {
public AclsNotSupportedException(String message) {
super(message);
}
}
}

View File

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclUtil;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.io.WritableUtils;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
/**
* CopyListingFileStatus is a specialized subclass of {@link FileStatus} for
* attaching additional data members useful to distcp. This class does not
* override {@link FileStatus#compareTo}, because the additional data members
* are not relevant to sort order.
*/
@InterfaceAudience.Private
public final class CopyListingFileStatus extends FileStatus {
private static final byte NO_ACL_ENTRIES = -1;
// Retain static arrays of enum values to prevent repeated allocation of new
// arrays during deserialization.
private static final AclEntryType[] ACL_ENTRY_TYPES = AclEntryType.values();
private static final AclEntryScope[] ACL_ENTRY_SCOPES = AclEntryScope.values();
private static final FsAction[] FS_ACTIONS = FsAction.values();
private List<AclEntry> aclEntries;
/**
* Default constructor.
*/
public CopyListingFileStatus() {
}
/**
* Creates a new CopyListingFileStatus by copying the members of the given
* FileStatus.
*
* @param fileStatus FileStatus to copy
*/
public CopyListingFileStatus(FileStatus fileStatus) throws IOException {
super(fileStatus);
}
/**
* Returns the full logical ACL.
*
* @return List<AclEntry> containing full logical ACL
*/
public List<AclEntry> getAclEntries() {
return AclUtil.getAclFromPermAndEntries(getPermission(),
aclEntries != null ? aclEntries : Collections.<AclEntry>emptyList());
}
/**
* Sets optional ACL entries.
*
* @param aclEntries List<AclEntry> containing all ACL entries
*/
public void setAclEntries(List<AclEntry> aclEntries) {
this.aclEntries = aclEntries;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
if (aclEntries != null) {
// byte is sufficient, because 32 ACL entries is the max enforced by HDFS.
out.writeByte(aclEntries.size());
for (AclEntry entry: aclEntries) {
out.writeByte(entry.getScope().ordinal());
out.writeByte(entry.getType().ordinal());
WritableUtils.writeString(out, entry.getName());
out.writeByte(entry.getPermission().ordinal());
}
} else {
out.writeByte(NO_ACL_ENTRIES);
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
byte aclEntriesSize = in.readByte();
if (aclEntriesSize != NO_ACL_ENTRIES) {
aclEntries = Lists.newArrayListWithCapacity(aclEntriesSize);
for (int i = 0; i < aclEntriesSize; ++i) {
aclEntries.add(new AclEntry.Builder()
.setScope(ACL_ENTRY_SCOPES[in.readByte()])
.setType(ACL_ENTRY_TYPES[in.readByte()])
.setName(WritableUtils.readString(in))
.setPermission(FS_ACTIONS[in.readByte()])
.build());
}
} else {
aclEntries = null;
}
}
@Override
public boolean equals(Object o) {
if (!super.equals(o)) {
return false;
}
if (getClass() != o.getClass()) {
return false;
}
CopyListingFileStatus other = (CopyListingFileStatus)o;
return Objects.equal(aclEntries, other.aclEntries);
}
@Override
public int hashCode() {
return Objects.hashCode(super.hashCode(), aclEntries);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(super.toString());
sb.append('{');
sb.append("aclEntries = " + aclEntries);
sb.append('}');
return sb.toString();
}
}

View File

@ -125,6 +125,9 @@ public int run(String[] argv) {
} catch (DuplicateFileException e) {
LOG.error("Duplicate files in input path: ", e);
return DistCpConstants.DUPLICATE_INPUT;
} catch (AclsNotSupportedException e) {
LOG.error("ACLs not supported on at least one file system: ", e);
return DistCpConstants.ACLS_NOT_SUPPORTED;
} catch (Exception e) {
LOG.error("Exception encountered ", e);
return DistCpConstants.UNKNOWN_ERROR;
@ -298,7 +301,9 @@ private void configureOutputFormat(Job job) throws IOException {
FileSystem targetFS = targetPath.getFileSystem(configuration);
targetPath = targetPath.makeQualified(targetFS.getUri(),
targetFS.getWorkingDirectory());
if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
DistCpUtils.checkFileSystemAclSupport(targetFS);
}
if (inputOptions.shouldAtomicCommit()) {
Path workDir = inputOptions.getAtomicWorkPath();
if (workDir == null) {

View File

@ -115,6 +115,7 @@ public class DistCpConstants {
public static final int SUCCESS = 0;
public static final int INVALID_ARGUMENT = -1;
public static final int DUPLICATE_INPUT = -2;
public static final int ACLS_NOT_SUPPORTED = -3;
public static final int UNKNOWN_ERROR = -999;
/**

View File

@ -45,8 +45,10 @@ public enum DistCpOptionSwitch {
*
*/
PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
new Option("p", true, "preserve status (rbugpc)" +
"(replication, block-size, user, group, permission, checksum-type)")),
new Option("p", true, "preserve status (rbugpca)(replication, " +
"block-size, user, group, permission, checksum-type, ACL). If " +
"-p is specified with no <arg>, then preserves replication, block " +
"size, user, group, permission and checksum type.")),
/**
* Update target location by copying only files that are missing

View File

@ -65,7 +65,7 @@ public class DistCpOptions {
private boolean targetPathExists = true;
public static enum FileAttribute{
REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE;
REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL;
public static FileAttribute getAttribute(char symbol) {
for (FileAttribute attribute : values()) {

View File

@ -23,11 +23,12 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
@ -35,6 +36,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.*;
import java.util.List;
import java.util.Stack;
/**
@ -139,28 +141,34 @@ public void doBuildListing(SequenceFile.Writer fileListWriter,
FileStatus rootStatus = sourceFS.getFileStatus(path);
Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
boolean localFile = (rootStatus.getClass() != FileStatus.class);
FileStatus[] sourceFiles = sourceFS.listStatus(path);
boolean explore = (sourceFiles != null && sourceFiles.length > 0);
if (!explore || rootStatus.isDirectory()) {
writeToFileListingRoot(fileListWriter, rootStatus, sourcePathRoot,
localFile, options);
CopyListingFileStatus rootCopyListingStatus =
DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus,
options.shouldPreserve(FileAttribute.ACL));
writeToFileListingRoot(fileListWriter, rootCopyListingStatus,
sourcePathRoot, options);
}
if (explore) {
for (FileStatus sourceStatus: sourceFiles) {
if (LOG.isDebugEnabled()) {
LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
}
writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot,
localFile, options);
CopyListingFileStatus sourceCopyListingStatus =
DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
options.shouldPreserve(FileAttribute.ACL) &&
sourceStatus.isDirectory());
writeToFileListing(fileListWriter, sourceCopyListingStatus,
sourcePathRoot, options);
if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Traversing non-empty source dir: " + sourceStatus.getPath());
}
traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot,
localFile, options);
options);
}
}
}
@ -233,7 +241,7 @@ private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
return SequenceFile.createWriter(getConf(),
SequenceFile.Writer.file(pathToListFile),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(FileStatus.class),
SequenceFile.Writer.valueClass(CopyListingFileStatus.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
}
@ -250,7 +258,6 @@ private static FileStatus[] getChildren(FileSystem fileSystem,
private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
FileStatus sourceStatus,
Path sourcePathRoot,
boolean localFile,
DistCpOptions options)
throws IOException {
FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
@ -262,8 +269,11 @@ private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
if (LOG.isDebugEnabled())
LOG.debug("Recording source-path: "
+ sourceStatus.getPath() + " for copy.");
writeToFileListing(fileListWriter, child, sourcePathRoot,
localFile, options);
CopyListingFileStatus childCopyListingStatus =
DistCpUtils.toCopyListingFileStatus(sourceFS, child,
options.shouldPreserve(FileAttribute.ACL) && child.isDirectory());
writeToFileListing(fileListWriter, childCopyListingStatus,
sourcePathRoot, options);
if (isDirectoryAndNotEmpty(sourceFS, child)) {
if (LOG.isDebugEnabled())
LOG.debug("Traversing non-empty source dir: "
@ -275,8 +285,7 @@ private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
}
private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
FileStatus fileStatus, Path sourcePathRoot,
boolean localFile,
CopyListingFileStatus fileStatus, Path sourcePathRoot,
DistCpOptions options) throws IOException {
boolean syncOrOverwrite = options.shouldSyncFolder() ||
options.shouldOverwrite();
@ -288,14 +297,12 @@ private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
}
return;
}
writeToFileListing(fileListWriter, fileStatus, sourcePathRoot, localFile,
options);
writeToFileListing(fileListWriter, fileStatus, sourcePathRoot, options);
}
private void writeToFileListing(SequenceFile.Writer fileListWriter,
FileStatus fileStatus,
CopyListingFileStatus fileStatus,
Path sourcePathRoot,
boolean localFile,
DistCpOptions options) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
@ -303,9 +310,6 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter,
}
FileStatus status = fileStatus;
if (localFile) {
status = getFileStatus(fileStatus);
}
if (!shouldCopy(fileStatus.getPath(), options)) {
return;
@ -320,19 +324,4 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter,
}
totalPaths++;
}
private static final ByteArrayOutputStream buffer = new ByteArrayOutputStream(64);
private DataInputBuffer in = new DataInputBuffer();
private FileStatus getFileStatus(FileStatus fileStatus) throws IOException {
FileStatus status = new FileStatus();
buffer.reset();
DataOutputStream out = new DataOutputStream(buffer);
fileStatus.write(out);
in.reset(buffer.toByteArray(), 0, buffer.size());
status.readFields(in);
return status;
}
}

View File

@ -178,7 +178,7 @@ private void preserveFileAttributesForDirectories(Configuration conf) throws IOE
long preservedEntries = 0;
try {
FileStatus srcFileStatus = new FileStatus();
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
// Iterate over every source path that was copied.
@ -246,9 +246,9 @@ private void deleteMissing(Configuration conf) throws IOException {
// Delete all from target that doesn't also exist on source.
long deletedEntries = 0;
try {
FileStatus srcFileStatus = new FileStatus();
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
FileStatus trgtFileStatus = new FileStatus();
CopyListingFileStatus trgtFileStatus = new CopyListingFileStatus();
Text trgtRelPath = new Text();
FileSystem targetFS = targetFinalPath.getFileSystem(conf);

View File

@ -24,9 +24,11 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptionSwitch;
import org.apache.hadoop.tools.DistCpOptions;
@ -37,12 +39,13 @@
import java.io.*;
import java.util.EnumSet;
import java.util.Arrays;
import java.util.List;
/**
* Mapper class that executes the DistCp copy operation.
* Implements the o.a.h.mapreduce.Mapper<> interface.
*/
public class CopyMapper extends Mapper<Text, FileStatus, Text, Text> {
public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text> {
/**
* Hadoop counters for the DistCp CopyMapper.
@ -172,8 +175,8 @@ private Path findCacheFile(Path[] cacheFiles, String fileName) {
* @throws IOException
*/
@Override
public void map(Text relPath, FileStatus sourceFileStatus, Context context)
throws IOException, InterruptedException {
public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
Context context) throws IOException, InterruptedException {
Path sourcePath = sourceFileStatus.getPath();
if (LOG.isDebugEnabled())
@ -191,11 +194,13 @@ public void map(Text relPath, FileStatus sourceFileStatus, Context context)
LOG.info(description);
try {
FileStatus sourceCurrStatus;
CopyListingFileStatus sourceCurrStatus;
FileSystem sourceFS;
try {
sourceFS = sourcePath.getFileSystem(conf);
sourceCurrStatus = sourceFS.getFileStatus(sourcePath);
sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS,
sourceFS.getFileStatus(sourcePath),
fileAttributes.contains(FileAttribute.ACL));
} catch (FileNotFoundException e) {
throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
}

View File

@ -23,11 +23,11 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@ -44,7 +44,8 @@
* that the total-number of bytes to be copied for each input split is
* uniform.
*/
public class UniformSizeInputFormat extends InputFormat<Text, FileStatus> {
public class UniformSizeInputFormat
extends InputFormat<Text, CopyListingFileStatus> {
private static final Log LOG
= LogFactory.getLog(UniformSizeInputFormat.class);
@ -76,7 +77,7 @@ private List<InputSplit> getSplits(Configuration configuration, int numSplits,
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
long nBytesPerSplit = (long) Math.ceil(totalSizeBytes * 1.0 / numSplits);
FileStatus srcFileStatus = new FileStatus();
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
long currentSplitSize = 0;
long lastSplitStart = 0;
@ -161,9 +162,9 @@ private SequenceFile.Reader getListingFileReader(Configuration configuration) {
* @throws InterruptedException
*/
@Override
public RecordReader<Text, FileStatus> createRecordReader(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
return new SequenceFileRecordReader<Text, FileStatus>();
public RecordReader<Text, CopyListingFileStatus> createRecordReader(
InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
}
}

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
@ -90,7 +91,7 @@ private DynamicInputChunk(String chunkId, Configuration configuration)
private void openForWrite() throws IOException {
writer = SequenceFile.createWriter(
chunkFilePath.getFileSystem(configuration), configuration,
chunkFilePath, Text.class, FileStatus.class,
chunkFilePath, Text.class, CopyListingFileStatus.class,
SequenceFile.CompressionType.NONE);
}
@ -117,7 +118,7 @@ public static DynamicInputChunk createChunkForWrite(String chunkId,
* @param value Corresponding value from the listing file.
* @throws IOException Exception onf failure to write to the file.
*/
public void write(Text key, FileStatus value) throws IOException {
public void write(Text key, CopyListingFileStatus value) throws IOException {
writer.append(key, value);
}

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.tools.CopyListingFileStatus;
import java.util.List;
import java.util.ArrayList;
@ -133,7 +133,7 @@ private List<InputSplit> createSplits(JobContext jobContext,
List<DynamicInputChunk> chunksFinal = new ArrayList<DynamicInputChunk>();
FileStatus fileStatus = new FileStatus();
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relPath = new Text();
int recordCounter = 0;
int chunkCount = 0;

View File

@ -25,15 +25,21 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclUtil;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
import org.apache.hadoop.tools.CopyListing.AclsNotSupportedException;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.mapreduce.InputFormat;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.text.DecimalFormat;
import java.net.URI;
@ -181,7 +187,7 @@ public static EnumSet<FileAttribute> unpackAttributes(String attributes) {
* change or any transient error)
*/
public static void preserve(FileSystem targetFS, Path path,
FileStatus srcFileStatus,
CopyListingFileStatus srcFileStatus,
EnumSet<FileAttribute> attributes) throws IOException {
FileStatus targetFileStatus = targetFS.getFileStatus(path);
@ -189,7 +195,18 @@ public static void preserve(FileSystem targetFS, Path path,
String user = targetFileStatus.getOwner();
boolean chown = false;
if (attributes.contains(FileAttribute.PERMISSION) &&
if (attributes.contains(FileAttribute.ACL)) {
List<AclEntry> srcAcl = srcFileStatus.getAclEntries();
List<AclEntry> targetAcl = getAcl(targetFS, targetFileStatus);
if (!srcAcl.equals(targetAcl)) {
targetFS.setAcl(path, srcAcl);
}
// setAcl can't preserve sticky bit, so also call setPermission if needed.
if (srcFileStatus.getPermission().getStickyBit() !=
targetFileStatus.getPermission().getStickyBit()) {
targetFS.setPermission(path, srcFileStatus.getPermission());
}
} else if (attributes.contains(FileAttribute.PERMISSION) &&
!srcFileStatus.getPermission().equals(targetFileStatus.getPermission())) {
targetFS.setPermission(path, srcFileStatus.getPermission());
}
@ -216,6 +233,46 @@ public static void preserve(FileSystem targetFS, Path path,
}
}
/**
* Returns a file's full logical ACL.
*
* @param fileSystem FileSystem containing the file
* @param fileStatus FileStatus of file
* @return List<AclEntry> containing full logical ACL
* @throws IOException if there is an I/O error
*/
public static List<AclEntry> getAcl(FileSystem fileSystem,
FileStatus fileStatus) throws IOException {
List<AclEntry> entries = fileSystem.getAclStatus(fileStatus.getPath())
.getEntries();
return AclUtil.getAclFromPermAndEntries(fileStatus.getPermission(), entries);
}
/**
* Converts a FileStatus to a CopyListingFileStatus. If preserving ACLs,
* populates the CopyListingFileStatus with the ACLs.
*
* @param fileSystem FileSystem containing the file
* @param fileStatus FileStatus of file
* @param preserveAcls boolean true if preserving ACLs
* @throws IOException if there is an I/O error
*/
public static CopyListingFileStatus toCopyListingFileStatus(
FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls)
throws IOException {
CopyListingFileStatus copyListingFileStatus =
new CopyListingFileStatus(fileStatus);
if (preserveAcls) {
FsPermission perm = fileStatus.getPermission();
if (perm.getAclBit()) {
List<AclEntry> aclEntries = fileSystem.getAclStatus(
fileStatus.getPath()).getEntries();
copyListingFileStatus.setAclEntries(aclEntries);
}
}
return copyListingFileStatus;
}
/**
* Sort sequence file containing FileStatus and Text as key and value respecitvely
*
@ -227,7 +284,8 @@ public static void preserve(FileSystem targetFS, Path path,
*/
public static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing)
throws IOException {
SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, FileStatus.class, conf);
SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class,
CopyListingFileStatus.class, conf);
Path output = new Path(sourceListing.toString() + "_sorted");
if (fs.exists(output)) {
@ -238,6 +296,25 @@ public static Path sortListing(FileSystem fs, Configuration conf, Path sourceLis
return output;
}
/**
* Determines if a file system supports ACLs by running a canary getAclStatus
* request on the file system root. This method is used before distcp job
* submission to fail fast if the user requested preserving ACLs, but the file
* system cannot support ACLs.
*
* @param fs FileSystem to check
* @throws AclsNotSupportedException if fs does not support ACLs
*/
public static void checkFileSystemAclSupport(FileSystem fs)
throws AclsNotSupportedException {
try {
fs.getAclStatus(new Path(Path.SEPARATOR));
} catch (Exception e) {
throw new AclsNotSupportedException("ACLs not supported for file system: "
+ fs.getUri());
}
}
/**
* String utility to convert a number-of-bytes to human readable format.
*/

View File

@ -23,7 +23,6 @@
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.conf.Configuration;
import java.util.List;
@ -33,18 +32,19 @@
public class StubContext {
private StubStatusReporter reporter = new StubStatusReporter();
private RecordReader<Text, FileStatus> reader;
private RecordReader<Text, CopyListingFileStatus> reader;
private StubInMemoryWriter writer = new StubInMemoryWriter();
private Mapper<Text, FileStatus, Text, Text>.Context mapperContext;
private Mapper<Text, CopyListingFileStatus, Text, Text>.Context mapperContext;
public StubContext(Configuration conf, RecordReader<Text, FileStatus> reader,
int taskId) throws IOException, InterruptedException {
public StubContext(Configuration conf,
RecordReader<Text, CopyListingFileStatus> reader, int taskId)
throws IOException, InterruptedException {
WrappedMapper<Text, FileStatus, Text, Text> wrappedMapper
= new WrappedMapper<Text, FileStatus, Text, Text>();
WrappedMapper<Text, CopyListingFileStatus, Text, Text> wrappedMapper
= new WrappedMapper<Text, CopyListingFileStatus, Text, Text>();
MapContextImpl<Text, FileStatus, Text, Text> contextImpl
= new MapContextImpl<Text, FileStatus, Text, Text>(conf,
MapContextImpl<Text, CopyListingFileStatus, Text, Text> contextImpl
= new MapContextImpl<Text, CopyListingFileStatus, Text, Text>(conf,
getTaskAttemptID(taskId), reader, writer,
null, reporter, null);
@ -52,7 +52,7 @@ public StubContext(Configuration conf, RecordReader<Text, FileStatus> reader,
this.mapperContext = wrappedMapper.getMapContext(contextImpl);
}
public Mapper<Text, FileStatus, Text, Text>.Context getContext() {
public Mapper<Text, CopyListingFileStatus, Text, Text>.Context getContext() {
return mapperContext;
}
@ -60,7 +60,7 @@ public StatusReporter getReporter() {
return reporter;
}
public RecordReader<Text, FileStatus> getReader() {
public RecordReader<Text, CopyListingFileStatus> getReader() {
return reader;
}

View File

@ -24,7 +24,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.tools.util.TestDistCpUtils;
@ -106,7 +105,7 @@ protected boolean shouldCopy(Path path, DistCpOptions options) {
Assert.assertEquals(listing.getNumberOfPaths(), 3);
SequenceFile.Reader reader = new SequenceFile.Reader(getConf(),
SequenceFile.Reader.file(listingFile));
FileStatus fileStatus = new FileStatus();
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relativePath = new Text();
Assert.assertTrue(reader.next(relativePath, fileStatus));
Assert.assertEquals(relativePath.toString(), "/1");
@ -274,7 +273,7 @@ public void testBuildListingForSingleFile() {
reader = new SequenceFile.Reader(getConf(), SequenceFile.Reader.file(listFile));
FileStatus fileStatus = new FileStatus();
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relativePath = new Text();
Assert.assertTrue(reader.next(relativePath, fileStatus));
Assert.assertTrue(relativePath.toString().equals(""));

View File

@ -0,0 +1,329 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import static org.apache.hadoop.fs.permission.AclEntryScope.*;
import static org.apache.hadoop.fs.permission.AclEntryType.*;
import static org.apache.hadoop.fs.permission.FsAction.*;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests distcp in combination with HDFS ACLs.
*/
public class TestDistCpWithAcls {
private static MiniDFSCluster cluster;
private static Configuration conf;
private static FileSystem fs;
@BeforeClass
public static void init() throws Exception {
initCluster(true, true);
// Create this directory structure:
// /src
// /dir1
// /subdir1
// /dir2
// /dir2/file2
// /dir2/file3
// /dir3sticky
// /file1
fs.mkdirs(new Path("/src/dir1/subdir1"));
fs.mkdirs(new Path("/src/dir2"));
fs.create(new Path("/src/dir2/file2")).close();
fs.create(new Path("/src/dir2/file3")).close();
fs.mkdirs(new Path("/src/dir3sticky"));
fs.create(new Path("/src/file1")).close();
// Set a mix of ACLs and plain permissions throughout the tree.
fs.modifyAclEntries(new Path("/src/dir1"), Arrays.asList(
aclEntry(DEFAULT, USER, "bruce", ALL)));
fs.modifyAclEntries(new Path("/src/dir2/file2"), Arrays.asList(
aclEntry(ACCESS, GROUP, "sales", NONE)));
fs.setPermission(new Path("/src/dir2/file3"),
new FsPermission((short)0660));
fs.modifyAclEntries(new Path("/src/file1"), Arrays.asList(
aclEntry(ACCESS, USER, "diana", READ)));
fs.setPermission(new Path("/src/dir3sticky"),
new FsPermission((short)01777));
}
@AfterClass
public static void shutdown() {
IOUtils.cleanup(null, fs);
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testPreserveAcls() throws Exception {
assertRunDistCp(DistCpConstants.SUCCESS, "/dstPreserveAcls");
assertAclEntries("/dstPreserveAcls/dir1", new AclEntry[] {
aclEntry(DEFAULT, USER, ALL),
aclEntry(DEFAULT, USER, "bruce", ALL),
aclEntry(DEFAULT, GROUP, READ_EXECUTE),
aclEntry(DEFAULT, MASK, ALL),
aclEntry(DEFAULT, OTHER, READ_EXECUTE) } );
assertPermission("/dstPreserveAcls/dir1", (short)0755);
assertAclEntries("/dstPreserveAcls/dir1/subdir1", new AclEntry[] { });
assertPermission("/dstPreserveAcls/dir1/subdir1", (short)0755);
assertAclEntries("/dstPreserveAcls/dir2", new AclEntry[] { });
assertPermission("/dstPreserveAcls/dir2", (short)0755);
assertAclEntries("/dstPreserveAcls/dir2/file2", new AclEntry[] {
aclEntry(ACCESS, GROUP, READ),
aclEntry(ACCESS, GROUP, "sales", NONE) } );
assertPermission("/dstPreserveAcls/dir2/file2", (short)0644);
assertAclEntries("/dstPreserveAcls/dir2/file3", new AclEntry[] { });
assertPermission("/dstPreserveAcls/dir2/file3", (short)0660);
assertAclEntries("/dstPreserveAcls/dir3sticky", new AclEntry[] { });
assertPermission("/dstPreserveAcls/dir3sticky", (short)01777);
assertAclEntries("/dstPreserveAcls/file1", new AclEntry[] {
aclEntry(ACCESS, USER, "diana", READ),
aclEntry(ACCESS, GROUP, READ) } );
assertPermission("/dstPreserveAcls/file1", (short)0644);
}
@Test
public void testAclsNotEnabled() throws Exception {
try {
restart(false);
assertRunDistCp(DistCpConstants.ACLS_NOT_SUPPORTED, "/dstAclsNotEnabled");
} finally {
restart(true);
}
}
@Test
public void testAclsNotImplemented() throws Exception {
assertRunDistCp(DistCpConstants.ACLS_NOT_SUPPORTED,
"stubfs://dstAclsNotImplemented");
}
/**
* Stub FileSystem implementation used for testing the case of attempting
* distcp with ACLs preserved on a file system that does not support ACLs.
* The base class implementation throws UnsupportedOperationException for the
* ACL methods, so we don't need to override them.
*/
public static class StubFileSystem extends FileSystem {
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
return null;
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return null;
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return false;
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return null;
}
@Override
public URI getUri() {
return URI.create("stubfs:///");
}
@Override
public Path getWorkingDirectory() {
return new Path(Path.SEPARATOR);
}
@Override
public FileStatus[] listStatus(Path f) throws IOException {
return null;
}
@Override
public boolean mkdirs(Path f, FsPermission permission)
throws IOException {
return false;
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return null;
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
return false;
}
@Override
public void setWorkingDirectory(Path dir) {
}
}
/**
* Create a new AclEntry with scope, type and permission (no name).
*
* @param scope AclEntryScope scope of the ACL entry
* @param type AclEntryType ACL entry type
* @param permission FsAction set of permissions in the ACL entry
* @return AclEntry new AclEntry
*/
private static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
FsAction permission) {
return new AclEntry.Builder()
.setScope(scope)
.setType(type)
.setPermission(permission)
.build();
}
/**
* Create a new AclEntry with scope, type, name and permission.
*
* @param scope AclEntryScope scope of the ACL entry
* @param type AclEntryType ACL entry type
* @param name String optional ACL entry name
* @param permission FsAction set of permissions in the ACL entry
* @return AclEntry new AclEntry
*/
private static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
String name, FsAction permission) {
return new AclEntry.Builder()
.setScope(scope)
.setType(type)
.setName(name)
.setPermission(permission)
.build();
}
/**
* Asserts the ACL entries returned by getAclStatus for a specific path.
*
* @param path String path to check
* @param entries AclEntry[] expected ACL entries
* @throws Exception if there is any error
*/
private static void assertAclEntries(String path, AclEntry[] entries)
throws Exception {
assertArrayEquals(entries, fs.getAclStatus(new Path(path)).getEntries()
.toArray(new AclEntry[0]));
}
/**
* Asserts the value of the FsPermission bits on the inode of a specific path.
*
* @param path String path to check
* @param perm short expected permission bits
* @throws Exception if there is any error
*/
private static void assertPermission(String path, short perm)
throws Exception {
assertEquals(perm,
fs.getFileStatus(new Path(path)).getPermission().toShort());
}
/**
* Runs distcp from /src to specified destination, preserving ACLs. Asserts
* expected exit code.
*
* @param int exitCode expected exit code
* @param dst String distcp destination
* @throws Exception if there is any error
*/
private static void assertRunDistCp(int exitCode, String dst)
throws Exception {
DistCp distCp = new DistCp(conf, null);
assertEquals(exitCode, ToolRunner.run(
conf, distCp, new String[] { "-pa", "/src", dst }));
}
/**
* Initialize the cluster, wait for it to become active, and get FileSystem.
*
* @param format if true, format the NameNode and DataNodes before starting up
* @param aclsEnabled if true, ACL support is enabled
* @throws Exception if any step fails
*/
private static void initCluster(boolean format, boolean aclsEnabled)
throws Exception {
conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, aclsEnabled);
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "stubfs:///");
conf.setClass("fs.stubfs.impl", StubFileSystem.class, FileSystem.class);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(format)
.build();
cluster.waitActive();
fs = cluster.getFileSystem();
}
/**
* Restarts the cluster with ACLs enabled or disabled.
*
* @param aclsEnabled if true, ACL support is enabled
* @throws Exception if any step fails
*/
private static void restart(boolean aclsEnabled) throws Exception {
shutdown();
initCluster(false, aclsEnabled);
}
}

View File

@ -23,7 +23,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@ -531,7 +530,7 @@ private void checkResult(Path listFile, int count) throws IOException {
SequenceFile.Reader.file(listFile));
try {
Text relPath = new Text();
FileStatus fileStatus = new FileStatus();
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
while (reader.next(relPath, fileStatus)) {
if (fileStatus.isDirectory() && relPath.toString().equals("")) {
// ignore root with empty relPath, which is an entry to be

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.tools;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -121,7 +120,7 @@ private void verifyContents(Path listingPath) throws Exception {
SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem(),
listingPath, new Configuration());
Text key = new Text();
FileStatus value = new FileStatus();
CopyListingFileStatus value = new CopyListingFileStatus();
Map<String, String> actualValues = new HashMap<String, String>();
while (reader.next(key, value)) {
if (value.isDirectory() && key.toString().equals("")) {

View File

@ -410,6 +410,7 @@ public void testPreserve() {
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
options = OptionsParser.parse(new String[] {
"-p",
@ -421,6 +422,7 @@ public void testPreserve() {
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
options = OptionsParser.parse(new String[] {
"-pbr",
@ -433,6 +435,7 @@ public void testPreserve() {
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
options = OptionsParser.parse(new String[] {
"-pbrgup",
@ -445,9 +448,10 @@ public void testPreserve() {
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
options = OptionsParser.parse(new String[] {
"-pbrgupc",
"-pbrgupca",
"-f",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
@ -457,6 +461,7 @@ public void testPreserve() {
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
Assert.assertTrue(options.shouldPreserve(FileAttribute.ACL));
options = OptionsParser.parse(new String[] {
"-pc",
@ -469,6 +474,7 @@ public void testPreserve() {
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
options = OptionsParser.parse(new String[] {
"-p",
@ -485,7 +491,7 @@ public void testPreserve() {
try {
OptionsParser.parse(new String[] {
"-pabc",
"-pabcd",
"-f",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target"});

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptionSwitch;
import org.apache.hadoop.tools.DistCpOptions;
@ -222,7 +223,7 @@ private void testCopy(boolean preserveChecksum) {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
@ -238,7 +239,7 @@ private void testCopy(boolean preserveChecksum) {
for (Path path: pathList) {
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
fs.getFileStatus(path), context);
new CopyListingFileStatus(fs.getFileStatus(path)), context);
}
// Check that the maps worked.
@ -283,12 +284,11 @@ private void testCopy(boolean preserveChecksum) {
}
private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
Mapper<Text, FileStatus, Text, Text>.Context context) {
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context) {
try {
for (Path path : pathList) {
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
fs.getFileStatus(path), context);
new CopyListingFileStatus(fs.getFileStatus(path)), context);
}
Assert.assertEquals(nFiles,
@ -309,7 +309,7 @@ public void testMakeDirFailure() {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
@ -320,7 +320,7 @@ public void testMakeDirFailure() {
copyMapper.setup(context);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
fs.getFileStatus(pathList.get(0)), context);
new CopyListingFileStatus(fs.getFileStatus(pathList.get(0))), context);
Assert.assertTrue("There should have been an exception.", false);
}
@ -343,7 +343,7 @@ public void testDirToFile() {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
mkdirs(SOURCE_PATH + "/src/file");
@ -351,7 +351,8 @@ public void testDirToFile() {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
new CopyListingFileStatus(fs.getFileStatus(
new Path(SOURCE_PATH + "/src/file"))),
context);
} catch (IOException e) {
Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
@ -372,22 +373,24 @@ public void testPreserve() {
final CopyMapper copyMapper = new CopyMapper();
final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
@Override
public Mapper<Text, FileStatus, Text, Text>.Context run() {
try {
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
return stubContext.getContext();
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new RuntimeException(e);
}
}
});
final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
tmpUser.doAs(
new PrivilegedAction<Mapper<Text, CopyListingFileStatus, Text, Text>.Context>() {
@Override
public Mapper<Text, CopyListingFileStatus, Text, Text>.Context run() {
try {
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
return stubContext.getContext();
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new RuntimeException(e);
}
}
});
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
EnumSet.allOf(DistCpOptions.FileAttribute.class);
preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
DistCpUtils.packAttributes(preserveStatus));
@ -415,7 +418,8 @@ public Integer run() {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
new CopyListingFileStatus(tmpFS.getFileStatus(
new Path(SOURCE_PATH + "/src/file"))),
context);
Assert.fail("Expected copy to fail");
} catch (AccessControlException e) {
@ -442,19 +446,20 @@ public void testCopyReadableFiles() {
final CopyMapper copyMapper = new CopyMapper();
final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
@Override
public Mapper<Text, FileStatus, Text, Text>.Context run() {
try {
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
return stubContext.getContext();
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new RuntimeException(e);
}
}
});
final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
tmpUser.doAs(
new PrivilegedAction<Mapper<Text, CopyListingFileStatus, Text, Text>.Context>() {
@Override
public Mapper<Text, CopyListingFileStatus, Text, Text>.Context run() {
try {
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
return stubContext.getContext();
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new RuntimeException(e);
}
}
});
touchFile(SOURCE_PATH + "/src/file");
mkdirs(TARGET_PATH);
@ -481,7 +486,8 @@ public Integer run() {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
new CopyListingFileStatus(tmpFS.getFileStatus(
new Path(SOURCE_PATH + "/src/file"))),
context);
} catch (Exception e) {
throw new RuntimeException(e);
@ -518,9 +524,11 @@ public StubContext run() {
}
});
final Mapper<Text, FileStatus, Text, Text>.Context context = stubContext.getContext();
final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
stubContext.getContext();
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
EnumSet.allOf(DistCpOptions.FileAttribute.class);
preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
DistCpUtils.packAttributes(preserveStatus));
@ -551,7 +559,8 @@ public Integer run() {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
new CopyListingFileStatus(tmpFS.getFileStatus(
new Path(SOURCE_PATH + "/src/file"))),
context);
Assert.assertEquals(stubContext.getWriter().values().size(), 1);
Assert.assertTrue(stubContext.getWriter().values().get(0).toString().startsWith("SKIP"));
@ -594,8 +603,9 @@ public StubContext run() {
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
EnumSet.allOf(DistCpOptions.FileAttribute.class);
preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
final Mapper<Text, FileStatus, Text, Text>.Context context
final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
@ -629,7 +639,8 @@ public Integer run() {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
new CopyListingFileStatus(tmpFS.getFileStatus(
new Path(SOURCE_PATH + "/src/file"))),
context);
Assert.fail("Didn't expect the file to be copied");
} catch (AccessControlException ignore) {
@ -661,7 +672,7 @@ public void testFileToDir() {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
touchFile(SOURCE_PATH + "/src/file");
@ -669,7 +680,8 @@ public void testFileToDir() {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
new CopyListingFileStatus(fs.getFileStatus(
new Path(SOURCE_PATH + "/src/file"))),
context);
} catch (IOException e) {
Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
@ -688,7 +700,7 @@ private void doTestIgnoreFailures(boolean ignoreFailures) {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
@ -705,7 +717,7 @@ private void doTestIgnoreFailures(boolean ignoreFailures) {
if (!fileStatus.isDirectory()) {
fs.delete(path, true);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
fileStatus, context);
new CopyListingFileStatus(fileStatus), context);
}
}
if (ignoreFailures) {
@ -745,7 +757,7 @@ public void testCopyFailOnBlockSizeDifference() {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
@ -759,7 +771,7 @@ public void testCopyFailOnBlockSizeDifference() {
for (Path path : pathList) {
final FileStatus fileStatus = fs.getFileStatus(path);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
fileStatus, context);
new CopyListingFileStatus(fileStatus), context);
}
Assert.fail("Copy should have failed because of block-size difference.");
@ -780,7 +792,7 @@ private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
@ -798,7 +810,7 @@ private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
for (Path path : pathList) {
final FileStatus fileStatus = fs.getFileStatus(path);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
fileStatus, context);
new CopyListingFileStatus(fileStatus), context);
}
// Check that the block-size/replication aren't preserved.
@ -855,7 +867,7 @@ public void testSingleFileCopy() {
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
context.getConfiguration().set(
@ -863,7 +875,8 @@ public void testSingleFileCopy() {
targetFilePath.getParent().toString()); // Parent directory.
copyMapper.setup(context);
final FileStatus sourceFileStatus = fs.getFileStatus(sourceFilePath);
final CopyListingFileStatus sourceFileStatus = new CopyListingFileStatus(
fs.getFileStatus(sourceFilePath));
long before = fs.getFileStatus(targetFilePath).getModificationTime();
copyMapper.map(new Text(DistCpUtils.getRelativePath(
@ -907,7 +920,7 @@ private void testPreserveUserGroupImpl(boolean preserve){
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
@ -926,7 +939,7 @@ private void testPreserveUserGroupImpl(boolean preserve){
for (Path path : pathList) {
final FileStatus fileStatus = fs.getFileStatus(path);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
fileStatus, context);
new CopyListingFileStatus(fileStatus), context);
}
// Check that the user/group attributes are preserved

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.tools.CopyListing;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.StubContext;
import org.apache.hadoop.security.Credentials;
@ -122,8 +123,8 @@ public void testGetSplits(int nMaps) throws Exception {
for (int i=0; i<splits.size(); ++i) {
InputSplit split = splits.get(i);
int currentSplitSize = 0;
RecordReader<Text, FileStatus> recordReader = uniformSizeInputFormat.createRecordReader(
split, null);
RecordReader<Text, CopyListingFileStatus> recordReader =
uniformSizeInputFormat.createRecordReader(split, null);
StubContext stubContext = new StubContext(jobContext.getConfiguration(),
recordReader, 0);
final TaskAttemptContext taskAttemptContext
@ -168,7 +169,7 @@ private void checkSplits(Path listFile, List<InputSplit> splits) throws IOExcept
try {
reader.seek(lastEnd);
FileStatus srcFileStatus = new FileStatus();
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
} finally {

View File

@ -25,13 +25,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.tools.CopyListing;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.StubContext;
import org.apache.hadoop.security.Credentials;
@ -118,15 +118,15 @@ public void testGetSplits() throws Exception {
+"/tmp/testDynInputFormat/fileList.seq"), options);
JobContext jobContext = new JobContextImpl(configuration, new JobID());
DynamicInputFormat<Text, FileStatus> inputFormat =
new DynamicInputFormat<Text, FileStatus>();
DynamicInputFormat<Text, CopyListingFileStatus> inputFormat =
new DynamicInputFormat<Text, CopyListingFileStatus>();
List<InputSplit> splits = inputFormat.getSplits(jobContext);
int nFiles = 0;
int taskId = 0;
for (InputSplit split : splits) {
RecordReader<Text, FileStatus> recordReader =
RecordReader<Text, CopyListingFileStatus> recordReader =
inputFormat.createRecordReader(split, null);
StubContext stubContext = new StubContext(jobContext.getConfiguration(),
recordReader, taskId);
@ -136,7 +136,7 @@ public void testGetSplits() throws Exception {
recordReader.initialize(splits.get(0), taskAttemptContext);
float previousProgressValue = 0f;
while (recordReader.nextKeyValue()) {
FileStatus fileStatus = recordReader.getCurrentValue();
CopyListingFileStatus fileStatus = recordReader.getCurrentValue();
String source = fileStatus.getPath().toString();
System.out.println(source);
Assert.assertTrue(expectedFilePaths.contains(source));

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assert;
@ -106,7 +107,8 @@ public void testPreserve() {
Path src = new Path("/tmp/src");
fs.mkdirs(path);
fs.mkdirs(src);
FileStatus srcStatus = fs.getFileStatus(src);
CopyListingFileStatus srcStatus = new CopyListingFileStatus(
fs.getFileStatus(src));
FsPermission noPerm = new FsPermission((short) 0);
fs.setPermission(path, noPerm);