HDFS-4361. When listing snapshottable directories, only return those where the user has permission to take snapshots. Contributed by Jing Zhao

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1441202 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-01-31 21:35:19 +00:00
parent 2372e394dd
commit 2710e961e5
5 changed files with 108 additions and 22 deletions

View File

@ -140,3 +140,6 @@ Branch-2802 Snapshot (Unreleased)
HDFS-4189. Renames the getMutableXxx methods to getXxx4Write and fix a bug
that some getExistingPathINodes calls should be getINodesInPath4Write.
(szetszwo)
HDFS-4361. When listing snapshottable directories, only return those
where the user has permission to take snapshots. (Jing Zhao via szetszwo)

View File

@ -5792,26 +5792,30 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
/**
* Get the list of all the current snapshottable directories
* Get the list of snapshottable directories that are owned
* by the current user. Return all the snapshottable directories if the
* current user is a super user.
* @return The list of all the current snapshottable directories
* @throws IOException
*/
public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
throws IOException {
SnapshottableDirectoryStatus[] status = null;
readLock();
try {
checkOperation(OperationCategory.READ);
SnapshottableDirectoryStatus[] status = snapshotManager
.getSnapshottableDirListing();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(), getRemoteIp(),
"listSnapshottableDirectory", null, null, null);
}
return status;
FSPermissionChecker checker = new FSPermissionChecker(
fsOwner.getShortUserName(), supergroup);
status = snapshotManager
.getSnapshottableDirListing(checker.isSuper ? null : checker.user);
} finally {
readUnlock();
}
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(), getRemoteIp(),
"listSnapshottableDirectory", null, null, null);
}
return status;
}
/**

View File

@ -566,7 +566,7 @@ public abstract class INode implements Diff.Element<byte[]> {
return buf.toString();
}
private static final byte[] EMPTY_BYTES = {};
public static final byte[] EMPTY_BYTES = {};
@Override
public final int compareTo(byte[] bytes) {

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
@ -204,24 +205,33 @@ public class SnapshotManager implements SnapshotStats {
}
/**
* @return All the current snapshottable directories
* List all the snapshottable directories that are owned by the current user.
* @param userName Current user name.
* @return Snapshottable directories that are owned by the current user,
* represented as an array of {@link SnapshottableDirectoryStatus}. If
* {@code userName} is null, return all the snapshottable dirs.
*/
public SnapshottableDirectoryStatus[] getSnapshottableDirListing() {
public SnapshottableDirectoryStatus[] getSnapshottableDirListing(
String userName) {
if (snapshottables.isEmpty()) {
return null;
}
SnapshottableDirectoryStatus[] status =
new SnapshottableDirectoryStatus[snapshottables.size()];
for (int i = 0; i < snapshottables.size(); i++) {
INodeDirectorySnapshottable dir = snapshottables.get(i);
status[i] = new SnapshottableDirectoryStatus(dir.getModificationTime(),
dir.getAccessTime(), dir.getFsPermission(), dir.getUserName(),
dir.getGroupName(), dir.getLocalNameBytes(), dir.getNumSnapshots(),
dir.getSnapshotQuota(), dir.getParent() == null ? new byte[0]
: DFSUtil.string2Bytes(dir.getParent().getFullPathName()));
List<SnapshottableDirectoryStatus> statusList =
new ArrayList<SnapshottableDirectoryStatus>();
for (INodeDirectorySnapshottable dir : snapshottables) {
if (userName == null || userName.equals(dir.getUserName())) {
SnapshottableDirectoryStatus status = new SnapshottableDirectoryStatus(
dir.getModificationTime(), dir.getAccessTime(),
dir.getFsPermission(), dir.getUserName(), dir.getGroupName(),
dir.getLocalNameBytes(), dir.getNumSnapshots(),
dir.getSnapshotQuota(), dir.getParent() == null ? INode.EMPTY_BYTES
: DFSUtil.string2Bytes(dir.getParent().getFullPathName()));
statusList.add(status);
}
}
return status;
return statusList.toArray(new SnapshottableDirectoryStatus[statusList
.size()]);
}
/**

View File

@ -17,17 +17,21 @@
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -38,6 +42,7 @@ public class TestSnapshottableDirListing {
static final short REPLICATION = 3;
static final long BLOCKSIZE = 1024;
private final Path root = new Path("/");
private final Path dir1 = new Path("/TestSnapshot1");
private final Path dir2 = new Path("/TestSnapshot2");
@ -146,4 +151,68 @@ public class TestSnapshottableDirListing {
assertEquals(dir2.getName(), dirs[0].getDirStatus().getLocalName());
assertEquals(dir2, dirs[0].getFullPath());
}
/**
* Test the listing with different user names to make sure only directories
* that are owned by the user are listed.
*/
@Test
public void testListWithDifferentUser() throws Exception {
// first make dir1 and dir2 snapshottable
hdfs.allowSnapshot(dir1.toString());
hdfs.allowSnapshot(dir2.toString());
hdfs.setPermission(root, FsPermission.valueOf("-rwxrwxrwx"));
// create two dirs and make them snapshottable under the name of user1
UserGroupInformation ugi1 = UserGroupInformation.createUserForTesting(
"user1", new String[] { "group1" });
DistributedFileSystem fs1 = (DistributedFileSystem) DFSTestUtil
.getFileSystemAs(ugi1, conf);
Path dir1_user1 = new Path("/dir1_user1");
Path dir2_user1 = new Path("/dir2_user1");
fs1.mkdirs(dir1_user1);
fs1.mkdirs(dir2_user1);
fs1.allowSnapshot(dir1_user1.toString());
fs1.allowSnapshot(dir2_user1.toString());
// user2
UserGroupInformation ugi2 = UserGroupInformation.createUserForTesting(
"user2", new String[] { "group2" });
DistributedFileSystem fs2 = (DistributedFileSystem) DFSTestUtil
.getFileSystemAs(ugi2, conf);
Path dir_user2 = new Path("/dir_user2");
Path subdir_user2 = new Path(dir_user2, "subdir");
fs2.mkdirs(dir_user2);
fs2.mkdirs(subdir_user2);
fs2.allowSnapshot(dir_user2.toString());
fs2.allowSnapshot(subdir_user2.toString());
// super user
String supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
UserGroupInformation superUgi = UserGroupInformation.createUserForTesting(
"superuser", new String[] { supergroup });
DistributedFileSystem fs3 = (DistributedFileSystem) DFSTestUtil
.getFileSystemAs(superUgi, conf);
// list the snapshottable dirs for superuser
SnapshottableDirectoryStatus[] dirs = fs3.getSnapshottableDirListing();
// 6 snapshottable dirs: dir1, dir2, dir1_user1, dir2_user1, dir_user2, and
// subdir_user2
assertEquals(6, dirs.length);
// list the snapshottable dirs for user1
dirs = fs1.getSnapshottableDirListing();
// 2 dirs owned by user1: dir1_user1 and dir2_user1
assertEquals(2, dirs.length);
assertEquals(dir1_user1, dirs[0].getFullPath());
assertEquals(dir2_user1, dirs[1].getFullPath());
// list the snapshottable dirs for user2
dirs = fs2.getSnapshottableDirListing();
// 2 dirs owned by user2: dir_user2 and subdir_user2
assertEquals(2, dirs.length);
assertEquals(dir_user2, dirs[0].getFullPath());
assertEquals(subdir_user2, dirs[1].getFullPath());
}
}