HDFS-6843. Create FileStatus isEncrypted() method (clamb via cmccabe)

(cherry picked from commit e3803d002c)
This commit is contained in:
Colin Patrick Mccabe 2014-09-17 12:55:35 -07:00
parent 5e54aae62b
commit 6cb8ed0d22
11 changed files with 214 additions and 26 deletions

View File

@ -248,6 +248,8 @@ Release 2.6.0 - UNRELEASED
HADOOP-10833. Remove unused cache in UserProvider. (Benoy Antony)
HDFS-6843. Create FileStatus isEncrypted() method (clamb via cmccabe)
BUG FIXES
HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry

View File

@ -200,6 +200,15 @@ public class FileStatus implements Writable, Comparable {
public FsPermission getPermission() {
return permission;
}
/**
* Tell whether the underlying file or directory is encrypted or not.
*
* @return true if the underlying file is encrypted.
*/
public boolean isEncrypted() {
return permission.getEncryptedBit();
}
/**
* Get the owner of the file.

View File

@ -294,6 +294,13 @@ public class FsPermission implements Writable {
return false;
}
/**
* Returns true if the file is encrypted or directory is in an encryption zone
*/
public boolean getEncryptedBit() {
return false;
}
/** Set the user file creation mask (umask) */
public static void setUMask(Configuration conf, FsPermission umask) {
conf.set(UMASK_LABEL, String.format("%1$03o", umask.toShort()));

View File

@ -64,6 +64,33 @@ all operations on a valid FileSystem MUST result in a new FileSystem that is als
def isSymlink(FS, p) = p in symlinks(FS)
### 'boolean inEncryptionZone(Path p)'
Return True if the data for p is encrypted. The nature of the encryption and the
mechanism for creating an encryption zone are implementation details not covered
in this specification. No guarantees are made about the quality of the
encryption. The metadata is not encrypted.
#### Preconditions
if not exists(FS, p) : raise FileNotFoundException
#### Postconditions
#### Invariants
All files and directories under a directory in an encryption zone are also in an
encryption zone
forall d in directories(FS): inEncyptionZone(FS, d) implies
forall c in children(FS, d) where (isFile(FS, c) or isDir(FS, c)) :
inEncyptionZone(FS, c)
For all files in an encrypted zone, the data is encrypted, but the encryption
type and specification are not defined.
forall f in files(FS) where inEncyptionZone(FS, c):
isEncrypted(data(f))
### `FileStatus getFileStatus(Path p)`
@ -88,6 +115,10 @@ Get the status of a path
stat.length = 0
stat.isdir = False
stat.symlink = FS.Symlinks[p]
if inEncryptionZone(FS, p) :
stat.isEncrypted = True
else
stat.isEncrypted = False
### `Path getHomeDirectory()`

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.fs.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
@ -30,6 +31,7 @@ import java.io.IOException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
/**
@ -65,6 +67,16 @@ public abstract class AbstractContractOpenTest extends AbstractFSContractTestBas
assertMinusOne("initial byte read", result);
}
@Test
public void testFsIsEncrypted() throws Exception {
describe("create an empty file and call FileStatus.isEncrypted()");
final Path path = path("file");
createFile(getFileSystem(), path, false, new byte[0]);
final FileStatus stat = getFileSystem().getFileStatus(path);
assertFalse("Expecting false for stat.isEncrypted()",
stat.isEncrypted());
}
@Test
public void testOpenReadDir() throws Throwable {
describe("create & read a directory");

View File

@ -21,39 +21,46 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.FsPermission;
/**
* HDFS permission subclass used to indicate an ACL is present. The ACL bit is
* not visible directly to users of {@link FsPermission} serialization. This is
* HDFS permission subclass used to indicate an ACL is present and/or that the
* underlying file/dir is encrypted. The ACL/encrypted bits are not visible
* directly to users of {@link FsPermission} serialization. This is
* done for backwards compatibility in case any existing clients assume the
* value of FsPermission is in a particular range.
*/
@InterfaceAudience.Private
public class FsAclPermission extends FsPermission {
public class FsPermissionExtension extends FsPermission {
private final static short ACL_BIT = 1 << 12;
private final static short ENCRYPTED_BIT = 1 << 13;
private final boolean aclBit;
private final boolean encryptedBit;
/**
* Constructs a new FsAclPermission based on the given FsPermission.
* Constructs a new FsPermissionExtension based on the given FsPermission.
*
* @param perm FsPermission containing permission bits
*/
public FsAclPermission(FsPermission perm) {
public FsPermissionExtension(FsPermission perm, boolean hasAcl,
boolean isEncrypted) {
super(perm.toShort());
aclBit = true;
aclBit = hasAcl;
encryptedBit = isEncrypted;
}
/**
* Creates a new FsAclPermission by calling the base class constructor.
* Creates a new FsPermissionExtension by calling the base class constructor.
*
* @param perm short containing permission bits
*/
public FsAclPermission(short perm) {
public FsPermissionExtension(short perm) {
super(perm);
aclBit = (perm & ACL_BIT) != 0;
encryptedBit = (perm & ENCRYPTED_BIT) != 0;
}
@Override
public short toExtendedShort() {
return (short)(toShort() | (aclBit ? ACL_BIT : 0));
return (short)(toShort() |
(aclBit ? ACL_BIT : 0) | (encryptedBit ? ENCRYPTED_BIT : 0));
}
@Override
@ -61,6 +68,11 @@ public class FsAclPermission extends FsPermission {
return aclBit;
}
@Override
public boolean getEncryptedBit() {
return encryptedBit;
}
@Override
public boolean equals(Object o) {
// This intentionally delegates to the base class. This is only overridden

View File

@ -65,7 +65,7 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdfs.protocol.FsAclPermission;
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
@ -1263,7 +1263,7 @@ public class PBHelper {
}
public static FsPermission convert(FsPermissionProto p) {
return new FsAclPermission((short)p.getPerm());
return new FsPermissionExtension((short)p.getPerm());
}

View File

@ -63,7 +63,7 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
import org.apache.hadoop.hdfs.protocol.FsAclPermission;
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
@ -2314,18 +2314,25 @@ public class FSDirectory implements Closeable {
long size = 0; // length is zero for directories
short replication = 0;
long blocksize = 0;
final boolean isEncrypted;
final FileEncryptionInfo feInfo = isRawPath ? null :
getFileEncryptionInfo(node, snapshot);
if (node.isFile()) {
final INodeFile fileNode = node.asFile();
size = fileNode.computeFileSize(snapshot);
replication = fileNode.getFileReplication(snapshot);
blocksize = fileNode.getPreferredBlockSize();
isEncrypted = (feInfo != null) ||
(isRawPath && isInAnEZ(INodesInPath.fromINode(node)));
} else {
isEncrypted = isInAnEZ(INodesInPath.fromINode(node));
}
int childrenNum = node.isDirectory() ?
node.asDirectory().getChildrenNum(snapshot) : 0;
FileEncryptionInfo feInfo = isRawPath ? null :
getFileEncryptionInfo(node, snapshot);
return new HdfsFileStatus(
size,
node.isDirectory(),
@ -2333,7 +2340,7 @@ public class FSDirectory implements Closeable {
blocksize,
node.getModificationTime(snapshot),
node.getAccessTime(snapshot),
getPermissionForFileStatus(node, snapshot),
getPermissionForFileStatus(node, snapshot, isEncrypted),
node.getUserName(snapshot),
node.getGroupName(snapshot),
node.isSymlink() ? node.asSymlink().getSymlink() : null,
@ -2353,6 +2360,7 @@ public class FSDirectory implements Closeable {
short replication = 0;
long blocksize = 0;
LocatedBlocks loc = null;
final boolean isEncrypted;
final FileEncryptionInfo feInfo = isRawPath ? null :
getFileEncryptionInfo(node, snapshot);
if (node.isFile()) {
@ -2372,6 +2380,10 @@ public class FSDirectory implements Closeable {
if (loc == null) {
loc = new LocatedBlocks();
}
isEncrypted = (feInfo != null) ||
(isRawPath && isInAnEZ(INodesInPath.fromINode(node)));
} else {
isEncrypted = isInAnEZ(INodesInPath.fromINode(node));
}
int childrenNum = node.isDirectory() ?
node.asDirectory().getChildrenNum(snapshot) : 0;
@ -2380,7 +2392,7 @@ public class FSDirectory implements Closeable {
new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
blocksize, node.getModificationTime(snapshot),
node.getAccessTime(snapshot),
getPermissionForFileStatus(node, snapshot),
getPermissionForFileStatus(node, snapshot, isEncrypted),
node.getUserName(snapshot), node.getGroupName(snapshot),
node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
node.getId(), loc, childrenNum, feInfo);
@ -2396,17 +2408,21 @@ public class FSDirectory implements Closeable {
/**
* Returns an inode's FsPermission for use in an outbound FileStatus. If the
* inode has an ACL, then this method will convert to a FsAclPermission.
* inode has an ACL or is for an encrypted file/dir, then this method will
* return an FsPermissionExtension.
*
* @param node INode to check
* @param snapshot int snapshot ID
* @param isEncrypted boolean true if the file/dir is encrypted
* @return FsPermission from inode, with ACL bit on if the inode has an ACL
* and encrypted bit on if it represents an encrypted file/dir.
*/
private static FsPermission getPermissionForFileStatus(INode node,
int snapshot) {
int snapshot, boolean isEncrypted) {
FsPermission perm = node.getFsPermission(snapshot);
if (node.getAclFeature(snapshot) != null) {
perm = new FsAclPermission(perm);
boolean hasAcl = node.getAclFeature(snapshot) != null;
if (hasAcl || isEncrypted) {
perm = new FsPermissionExtension(perm, hasAcl, isEncrypted);
}
return perm;
}

View File

@ -180,9 +180,16 @@ public class JsonUtil {
}
/** Convert a string to a FsPermission object. */
private static FsPermission toFsPermission(final String s, Boolean aclBit) {
private static FsPermission toFsPermission(final String s, Boolean aclBit,
Boolean encBit) {
FsPermission perm = new FsPermission(Short.parseShort(s, 8));
return (aclBit != null && aclBit) ? new FsAclPermission(perm) : perm;
final boolean aBit = (aclBit != null) ? aclBit : false;
final boolean eBit = (encBit != null) ? encBit : false;
if (aBit || eBit) {
return new FsPermissionExtension(perm, aBit, eBit);
} else {
return perm;
}
}
static enum PathType {
@ -214,6 +221,9 @@ public class JsonUtil {
if (perm.getAclBit()) {
m.put("aclBit", true);
}
if (perm.getEncryptedBit()) {
m.put("encBit", true);
}
m.put("accessTime", status.getAccessTime());
m.put("modificationTime", status.getModificationTime());
m.put("blockSize", status.getBlockSize());
@ -240,7 +250,7 @@ public class JsonUtil {
final String owner = (String) m.get("owner");
final String group = (String) m.get("group");
final FsPermission permission = toFsPermission((String) m.get("permission"),
(Boolean)m.get("aclBit"));
(Boolean)m.get("aclBit"), (Boolean)m.get("encBit"));
final long aTime = (Long) m.get("accessTime");
final long mTime = (Long) m.get("modificationTime");
final long blockSize = (Long) m.get("blockSize");

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.fs.FSTestWrapper;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContextTestWrapper;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FileSystemTestWrapper;
@ -712,6 +713,93 @@ public class TestEncryptionZones {
assertNumZones(0);
}
@Test(timeout = 120000)
public void testIsEncryptedMethod() throws Exception {
doTestIsEncryptedMethod(new Path("/"));
doTestIsEncryptedMethod(new Path("/.reserved/raw"));
}
private void doTestIsEncryptedMethod(Path prefix) throws Exception {
try {
dTIEM(prefix);
} finally {
for (FileStatus s : fsWrapper.listStatus(prefix)) {
fsWrapper.delete(s.getPath(), true);
}
}
}
private void dTIEM(Path prefix) throws Exception {
final HdfsAdmin dfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
// Create an unencrypted file to check isEncrypted returns false
final Path baseFile = new Path(prefix, "base");
fsWrapper.createFile(baseFile);
FileStatus stat = fsWrapper.getFileStatus(baseFile);
assertFalse("Expected isEncrypted to return false for " + baseFile,
stat.isEncrypted());
// Create an encrypted file to check isEncrypted returns true
final Path zone = new Path(prefix, "zone");
fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
dfsAdmin.createEncryptionZone(zone, TEST_KEY);
final Path encFile = new Path(zone, "encfile");
fsWrapper.createFile(encFile);
stat = fsWrapper.getFileStatus(encFile);
assertTrue("Expected isEncrypted to return true for enc file" + encFile,
stat.isEncrypted());
// check that it returns true for an ez root
stat = fsWrapper.getFileStatus(zone);
assertTrue("Expected isEncrypted to return true for ezroot",
stat.isEncrypted());
// check that it returns true for a dir in the ez
final Path zoneSubdir = new Path(zone, "subdir");
fsWrapper.mkdir(zoneSubdir, FsPermission.getDirDefault(), true);
stat = fsWrapper.getFileStatus(zoneSubdir);
assertTrue(
"Expected isEncrypted to return true for ez subdir " + zoneSubdir,
stat.isEncrypted());
// check that it returns false for a non ez dir
final Path nonEzDirPath = new Path(prefix, "nonzone");
fsWrapper.mkdir(nonEzDirPath, FsPermission.getDirDefault(), true);
stat = fsWrapper.getFileStatus(nonEzDirPath);
assertFalse(
"Expected isEncrypted to return false for directory " + nonEzDirPath,
stat.isEncrypted());
// check that it returns true for listings within an ez
FileStatus[] statuses = fsWrapper.listStatus(zone);
for (FileStatus s : statuses) {
assertTrue("Expected isEncrypted to return true for ez stat " + zone,
s.isEncrypted());
}
statuses = fsWrapper.listStatus(encFile);
for (FileStatus s : statuses) {
assertTrue(
"Expected isEncrypted to return true for ez file stat " + encFile,
s.isEncrypted());
}
// check that it returns false for listings outside an ez
statuses = fsWrapper.listStatus(nonEzDirPath);
for (FileStatus s : statuses) {
assertFalse(
"Expected isEncrypted to return false for nonez stat " + nonEzDirPath,
s.isEncrypted());
}
statuses = fsWrapper.listStatus(baseFile);
for (FileStatus s : statuses) {
assertFalse(
"Expected isEncrypted to return false for non ez stat " + baseFile,
s.isEncrypted());
}
}
private class MyInjector extends EncryptionFaultInjector {
int generateCount;
CountDownLatch ready;

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.FsAclPermission;
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@ -822,7 +822,8 @@ public abstract class FSAclBaseTest {
fs.setPermission(path, FsPermission.createImmutable((short)0700));
assertPermission((short)0700);
fs.setPermission(path,
new FsAclPermission(FsPermission.createImmutable((short)0755)));
new FsPermissionExtension(FsPermission.
createImmutable((short)0755), true, true));
INode inode = cluster.getNamesystem().getFSDirectory().getNode(
path.toUri().getPath(), false);
assertNotNull(inode);