HBASE-5714 Add write permissions check before any hbck run that modifies hdfs (Liang Xie)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1375227 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2012-08-20 21:30:38 +00:00
parent aa0bab5a06
commit 634cd51bae
2 changed files with 86 additions and 0 deletions

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.DeserializationException;
@ -58,6 +59,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@ -1156,8 +1159,45 @@ public abstract class FSUtils {
logFSTree(LOG, fs, root, "|-");
}
/**
* Throw an exception if an action is not permitted by a user on a file.
*
* @param ugi
* the user
* @param file
* the file
* @param action
* the action
*/
public static void checkAccess(UserGroupInformation ugi, FileStatus file,
FsAction action) throws AccessControlException {
if (ugi.getUserName().equals(file.getOwner())) {
if (file.getPermission().getUserAction().implies(action)) {
return;
}
} else if (contains(ugi.getGroupNames(), file.getGroup())) {
if (file.getPermission().getGroupAction().implies(action)) {
return;
}
} else if (file.getPermission().getOtherAction().implies(action)) {
return;
}
throw new AccessControlException("Permission denied:" + " action=" + action
+ " path=" + file.getPath() + " user=" + ugi.getUserName());
}
private static boolean contains(String[] groups, String user) {
for (String group : groups) {
if (group.equals(user)) {
return true;
}
}
return false;
}
/**
* Recursive helper to log the state of the FS
*
* @see #logFileSystemState(FileSystem, Path, Log)
*/
private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -80,12 +81,15 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zookeeper.KeeperException;
import com.google.common.base.Joiner;
@ -187,6 +191,7 @@ public class HBaseFsck {
private boolean rerun = false; // if we tried to fix something, rerun hbck
private static boolean summary = false; // if we want to print less output
private boolean checkMetaOnly = false;
private boolean ignorePreCheckPermission = false; // if pre-check permission
/*********
* State
@ -1193,6 +1198,27 @@ public class HBaseFsck {
}
}
private void preCheckPermission() throws IOException, AccessControlException {
if (shouldIgnorePreCheckPermission()) {
return;
}
Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
FileSystem fs = hbaseDir.getFileSystem(conf);
UserGroupInformation ugi = User.getCurrent().getUGI();
FileStatus[] files = fs.listStatus(hbaseDir);
for (FileStatus file : files) {
try {
FSUtils.checkAccess(ugi, file, FsAction.WRITE);
} catch (AccessControlException ace) {
LOG.warn("Got AccessControlException when preCheckPermission ", ace);
System.err.println("Current user " + ugi.getUserName() + " does not have write perms to " + file.getPath()
+ ". Please rerun hbck as hdfs user " + file.getOwner());
throw new AccessControlException(ace);
}
}
}
/**
* Deletes region from meta table
*/
@ -3019,6 +3045,14 @@ public class HBaseFsck {
return fixSplitParents;
}
public boolean shouldIgnorePreCheckPermission() {
return ignorePreCheckPermission;
}
public void setIgnorePreCheckPermission(boolean ignorePreCheckPermission) {
this.ignorePreCheckPermission = ignorePreCheckPermission;
}
/**
* @param mm maximum number of regions to merge into a single region.
*/
@ -3093,6 +3127,7 @@ public class HBaseFsck {
System.err.println(" -sidelineBigOverlaps When fixing region overlaps, allow to sideline big overlaps");
System.err.println(" -maxOverlapsToSideline <n> When fixing region overlaps, allow at most <n> regions to sideline per group. (n=" + DEFAULT_OVERLAPS_TO_SIDELINE +" by default)");
System.err.println(" -fixSplitParents Try to force offline split parents to be online.");
System.err.println(" -ignorePreCheckPermission ignore filesystem permission pre-check");
System.err.println("");
System.err.println(" -repair Shortcut for -fixAssignments -fixMeta -fixHdfsHoles " +
"-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps");
@ -3176,6 +3211,8 @@ public class HBaseFsck {
fsck.setSidelineBigOverlaps(true);
} else if (cmd.equals("-fixSplitParents")) {
fsck.setFixSplitParents(true);
} else if (cmd.equals("-ignorePreCheckPermission")) {
fsck.setIgnorePreCheckPermission(true);
} else if (cmd.equals("-repair")) {
// this attempts to merge overlapping hdfs regions, needs testing
// under load
@ -3234,6 +3271,15 @@ public class HBaseFsck {
System.out.println("Allow checking/fixes for table: " + cmd);
}
}
// pre-check current user has FS write permission or not
try {
fsck.preCheckPermission();
} catch (AccessControlException ace) {
Runtime.getRuntime().exit(-1);
} catch (IOException ioe) {
Runtime.getRuntime().exit(-1);
}
// do the real work of fsck
fsck.connect();
int code = fsck.onlineHbck();