HBASE-21995 Add a coprocessor to set HDFS ACL for hbase granted user
This commit is contained in:
parent
23a2f8abab
commit
d0cb77ce8a
|
@ -748,6 +748,16 @@ public interface MasterObserver {
|
|||
final SnapshotDescription snapshot, final TableDescriptor tableDescriptor)
|
||||
throws IOException {}
|
||||
|
||||
/**
|
||||
* Called after the snapshot operation has been completed.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param snapshot the SnapshotDescriptor for the snapshot
|
||||
* @param tableDescriptor the TableDescriptor of the table to snapshot
|
||||
*/
|
||||
default void postCompletedSnapshotAction(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
SnapshotDescription snapshot, TableDescriptor tableDescriptor) throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before listSnapshots request has been processed.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
|
|
|
@ -26,9 +26,6 @@ import java.io.InputStream;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.fs.CanSetDropBehind;
|
||||
import org.apache.hadoop.fs.CanSetReadahead;
|
||||
import org.apache.hadoop.fs.CanUnbuffer;
|
||||
|
@ -40,6 +37,10 @@ import org.apache.hadoop.fs.PositionedReadable;
|
|||
import org.apache.hadoop.fs.Seekable;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* The FileLink is a sort of hardlink, that allows access to a file given a set of locations.
|
||||
|
@ -297,6 +298,7 @@ public class FileLink {
|
|||
* @throws IOException on unexpected error, or file not found.
|
||||
*/
|
||||
private FSDataInputStream tryOpen() throws IOException {
|
||||
IOException exception = null;
|
||||
for (Path path: fileLink.getLocations()) {
|
||||
if (path.equals(currentPath)) continue;
|
||||
try {
|
||||
|
@ -312,14 +314,11 @@ public class FileLink {
|
|||
}
|
||||
currentPath = path;
|
||||
return(in);
|
||||
} catch (FileNotFoundException e) {
|
||||
// Try another file location
|
||||
} catch (RemoteException re) {
|
||||
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
|
||||
if (!(ioe instanceof FileNotFoundException)) throw re;
|
||||
} catch (FileNotFoundException | AccessControlException | RemoteException e) {
|
||||
exception = FileLink.handleAccessLocationException(fileLink, e, exception);
|
||||
}
|
||||
}
|
||||
throw new FileNotFoundException("Unable to open link: " + fileLink);
|
||||
throw exception;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -405,14 +404,47 @@ public class FileLink {
|
|||
* @throws IOException on unexpected error.
|
||||
*/
|
||||
public FileStatus getFileStatus(FileSystem fs) throws IOException {
|
||||
IOException exception = null;
|
||||
for (int i = 0; i < locations.length; ++i) {
|
||||
try {
|
||||
return fs.getFileStatus(locations[i]);
|
||||
} catch (FileNotFoundException e) {
|
||||
// Try another file location
|
||||
} catch (FileNotFoundException | AccessControlException e) {
|
||||
exception = handleAccessLocationException(this, e, exception);
|
||||
}
|
||||
}
|
||||
throw new FileNotFoundException("Unable to open link: " + this);
|
||||
throw exception;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle exceptions which are threw when access locations of file link
|
||||
* @param fileLink the file link
|
||||
* @param newException the exception caught by access the current location
|
||||
* @param previousException the previous exception caught by access the other locations
|
||||
* @return return AccessControlException if access one of the locations caught, otherwise return
|
||||
* FileNotFoundException. The AccessControlException is threw if user scan snapshot
|
||||
* feature is enabled, see
|
||||
* {@link org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclController}.
|
||||
* @throws IOException if the exception is neither AccessControlException nor
|
||||
* FileNotFoundException
|
||||
*/
|
||||
private static IOException handleAccessLocationException(FileLink fileLink,
|
||||
IOException newException, IOException previousException) throws IOException {
|
||||
if (newException instanceof RemoteException) {
|
||||
newException = ((RemoteException) newException)
|
||||
.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class);
|
||||
}
|
||||
if (newException instanceof FileNotFoundException) {
|
||||
// Try another file location
|
||||
if (previousException == null) {
|
||||
previousException = new FileNotFoundException("Unable to open link: " + fileLink);
|
||||
}
|
||||
} else if (newException instanceof AccessControlException) {
|
||||
// Try another file location
|
||||
previousException = newException;
|
||||
} else {
|
||||
throw newException;
|
||||
}
|
||||
return previousException;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1065,6 +1065,16 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public void postCompletedSnapshotAction(SnapshotDescription snapshot,
|
||||
TableDescriptor hTableDescriptor) throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.postCompletedSnapshotAction(this, snapshot, hTableDescriptor);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void preListSnapshot(final SnapshotDescription snapshot) throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
|
|
|
@ -59,7 +59,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
|
||||
|
||||
/**
|
||||
|
@ -228,6 +228,10 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
|
|||
status.markComplete(msg);
|
||||
LOG.info(msg);
|
||||
metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime());
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost()
|
||||
.postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), this.htd);
|
||||
}
|
||||
} catch (Exception e) { // FindBugs: REC_CATCH_EXCEPTION
|
||||
status.abort("Failed to complete snapshot " + snapshot.getName() + " on table " +
|
||||
snapshotTable + " because " + e.getMessage());
|
||||
|
|
|
@ -263,6 +263,7 @@ public final class PermissionStorage {
|
|||
static void removeTablePermissions(Configuration conf, TableName tableName, Table t)
|
||||
throws IOException{
|
||||
Delete d = new Delete(tableName.getName());
|
||||
d.addFamily(ACL_LIST_FAMILY);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removing permissions of removed table "+ tableName);
|
||||
|
@ -280,7 +281,7 @@ public final class PermissionStorage {
|
|||
static void removeNamespacePermissions(Configuration conf, String namespace, Table t)
|
||||
throws IOException{
|
||||
Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
|
||||
|
||||
d.addFamily(ACL_LIST_FAMILY);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removing permissions of removed namespace "+ namespace);
|
||||
}
|
||||
|
@ -839,17 +840,21 @@ public final class PermissionStorage {
|
|||
}
|
||||
|
||||
public static boolean isGlobalEntry(byte[] entryName) {
|
||||
return entryName != null && TableName.valueOf(entryName).equals(ACL_TABLE_NAME);
|
||||
return Bytes.equals(entryName, ACL_GLOBAL_NAME);
|
||||
}
|
||||
|
||||
public static boolean isNamespaceEntry(String entryName) {
|
||||
return entryName != null && entryName.charAt(0) == NAMESPACE_PREFIX;
|
||||
return isNamespaceEntry(Bytes.toBytes(entryName));
|
||||
}
|
||||
|
||||
public static boolean isNamespaceEntry(byte[] entryName) {
|
||||
return entryName != null && entryName.length !=0 && entryName[0] == NAMESPACE_PREFIX;
|
||||
}
|
||||
|
||||
public static boolean isTableEntry(byte[] entryName) {
|
||||
return !isNamespaceEntry(entryName) && !isGlobalEntry(entryName) && entryName != null;
|
||||
}
|
||||
|
||||
public static String toNamespaceEntry(String namespace) {
|
||||
return NAMESPACE_PREFIX + namespace;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,673 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.security.access;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.security.access.Permission.Action;
|
||||
import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper.PathHelper;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Set HDFS ACLs to hFiles to make HBase granted users have permission to scan snapshot
|
||||
* <p>
|
||||
* To use this feature, please mask sure HDFS config:
|
||||
* <ul>
|
||||
* <li>dfs.permissions.enabled = true</li>
|
||||
* <li>fs.permissions.umask-mode = 027 (or smaller umask than 027)</li>
|
||||
* </ul>
|
||||
* </p>
|
||||
* <p>
|
||||
* The implementation of this feature is as followings:
|
||||
* <ul>
|
||||
* <li>For common directories such as 'data' and 'archive', set other permission to '--x' to make
|
||||
* everyone have the permission to access the directory.</li>
|
||||
* <li>For namespace or table directories such as 'data/ns/table', 'archive/ns/table' and
|
||||
* '.hbase-snapshot/snapshotName', set user 'r-x' access acl and 'r-x' default acl when following
|
||||
* operations happen:
|
||||
* <ul>
|
||||
* <li>grant user with global, namespace or table permission;</li>
|
||||
* <li>revoke user from global, namespace or table;</li>
|
||||
* <li>snapshot table;</li>
|
||||
* <li>truncate table;</li>
|
||||
* </ul>
|
||||
* </li>
|
||||
* <li>Note: Because snapshots are at table level, so this feature just considers users with global,
|
||||
* namespace or table permissions, ignores users with table CF or cell permissions.</li>
|
||||
* </ul>
|
||||
* </p>
|
||||
*/
|
||||
@CoreCoprocessor
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
public class SnapshotScannerHDFSAclController implements MasterCoprocessor, MasterObserver {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclController.class);
|
||||
|
||||
private SnapshotScannerHDFSAclHelper hdfsAclHelper = null;
|
||||
private PathHelper pathHelper = null;
|
||||
private FileSystem fs = null;
|
||||
private volatile boolean initialized = false;
|
||||
/** Provider for mapping principal names to Users */
|
||||
private UserProvider userProvider;
|
||||
|
||||
@Override
|
||||
public Optional<MasterObserver> getMasterObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> c)
|
||||
throws IOException {
|
||||
if (c.getEnvironment().getConfiguration()
|
||||
.getBoolean(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, false)) {
|
||||
MasterCoprocessorEnvironment mEnv = c.getEnvironment();
|
||||
if (!(mEnv instanceof HasMasterServices)) {
|
||||
throw new IOException("Does not implement HMasterServices");
|
||||
}
|
||||
MasterServices masterServices = ((HasMasterServices) mEnv).getMasterServices();
|
||||
hdfsAclHelper = new SnapshotScannerHDFSAclHelper(masterServices.getConfiguration(),
|
||||
masterServices.getConnection());
|
||||
pathHelper = hdfsAclHelper.getPathHelper();
|
||||
fs = pathHelper.getFileSystem();
|
||||
hdfsAclHelper.setCommonDirectoryPermission();
|
||||
initialized = true;
|
||||
userProvider = UserProvider.instantiate(c.getEnvironment().getConfiguration());
|
||||
} else {
|
||||
LOG.warn("Try to initialize the coprocessor SnapshotScannerHDFSAclController but failure "
|
||||
+ "because the config " + SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE
|
||||
+ " is false.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> c) throws IOException {
|
||||
if (checkInitialized()) {
|
||||
try (Admin admin = c.getEnvironment().getConnection().getAdmin()) {
|
||||
if (admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) {
|
||||
// Check if hbase acl table has 'm' CF, if not, add 'm' CF
|
||||
TableDescriptor tableDescriptor = admin.getDescriptor(PermissionStorage.ACL_TABLE_NAME);
|
||||
boolean containHdfsAclFamily =
|
||||
Arrays.stream(tableDescriptor.getColumnFamilies()).anyMatch(family -> Bytes
|
||||
.equals(family.getName(), SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY));
|
||||
if (!containHdfsAclFamily) {
|
||||
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder
|
||||
.newBuilder(SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY).build());
|
||||
admin.modifyTable(builder.build());
|
||||
}
|
||||
} else {
|
||||
throw new TableNotFoundException("Table " + PermissionStorage.ACL_TABLE_NAME
|
||||
+ " is not created yet. Please check if " + getClass().getName()
|
||||
+ " is configured after " + AccessController.class.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c) {
|
||||
if (checkInitialized()) {
|
||||
hdfsAclHelper.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postCompletedCreateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
|
||||
TableDescriptor desc, RegionInfo[] regions) throws IOException {
|
||||
if (!desc.getTableName().isSystemTable() && checkInitialized()) {
|
||||
TableName tableName = desc.getTableName();
|
||||
List<Path> paths = hdfsAclHelper.getTableRootPaths(tableName, false);
|
||||
for (Path path : paths) {
|
||||
if (!fs.exists(path)) {
|
||||
fs.mkdirs(path);
|
||||
}
|
||||
}
|
||||
// Add table owner HDFS acls
|
||||
String owner =
|
||||
desc.getOwnerString() == null ? getActiveUser(c).getShortName() : desc.getOwnerString();
|
||||
hdfsAclHelper.addTableAcl(tableName, owner);
|
||||
try (Table aclTable =
|
||||
c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
|
||||
SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, owner, tableName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> c,
|
||||
NamespaceDescriptor ns) throws IOException {
|
||||
if (checkInitialized()) {
|
||||
List<Path> paths = hdfsAclHelper.getNamespaceRootPaths(ns.getName());
|
||||
for (Path path : paths) {
|
||||
if (!fs.exists(path)) {
|
||||
fs.mkdirs(path);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postCompletedSnapshotAction(ObserverContext<MasterCoprocessorEnvironment> c,
|
||||
SnapshotDescription snapshot, TableDescriptor tableDescriptor) throws IOException {
|
||||
if (!tableDescriptor.getTableName().isSystemTable() && checkInitialized()) {
|
||||
hdfsAclHelper.snapshotAcl(snapshot);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postCompletedTruncateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
|
||||
TableName tableName) throws IOException {
|
||||
if (!tableName.isSystemTable() && checkInitialized()) {
|
||||
hdfsAclHelper.resetTableAcl(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
TableName tableName) throws IOException {
|
||||
if (!tableName.isSystemTable() && checkInitialized()) {
|
||||
/*
|
||||
* remove table user access HDFS acl from namespace directory if the user has no permissions
|
||||
* of global, ns of the table or other tables of the ns, eg: Bob has 'ns1:t1' read permission,
|
||||
* when delete 'ns1:t1', if Bob has global read permission, '@ns1' read permission or
|
||||
* 'ns1:other_tables' read permission, then skip remove Bob access acl in ns1Dirs, otherwise,
|
||||
* remove Bob access acl.
|
||||
*/
|
||||
Set<String> removeUsers = new HashSet<>();
|
||||
try (Table aclTable =
|
||||
ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
|
||||
List<String> users = SnapshotScannerHDFSAclStorage.getTableUsers(aclTable, tableName);
|
||||
SnapshotScannerHDFSAclStorage.deleteTableHdfsAcl(aclTable, tableName);
|
||||
byte[] namespace = tableName.getNamespace();
|
||||
for (String user : users) {
|
||||
List<byte[]> userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, user);
|
||||
boolean remove = true;
|
||||
for (byte[] entry : userEntries) {
|
||||
if (PermissionStorage.isGlobalEntry(entry)) {
|
||||
remove = false;
|
||||
break;
|
||||
} else if (PermissionStorage.isNamespaceEntry(entry)
|
||||
&& Bytes.equals(PermissionStorage.fromNamespaceEntry(entry), namespace)) {
|
||||
remove = false;
|
||||
break;
|
||||
} else if (Bytes.equals(TableName.valueOf(entry).getNamespace(), namespace)) {
|
||||
remove = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (remove) {
|
||||
removeUsers.add(user);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (removeUsers.size() > 0) {
|
||||
hdfsAclHelper.removeNamespaceAcl(tableName, removeUsers);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
String namespace) throws IOException {
|
||||
if (checkInitialized()) {
|
||||
try (Table aclTable =
|
||||
ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
|
||||
SnapshotScannerHDFSAclStorage.deleteNamespaceHdfsAcl(aclTable, namespace);
|
||||
}
|
||||
/**
|
||||
* Delete namespace tmp directory because it's created by this coprocessor when namespace is
|
||||
* created to make namespace default acl can be inherited by tables. The namespace data
|
||||
* directory is deleted by DeleteNamespaceProcedure, the namespace archive directory is
|
||||
* deleted by HFileCleaner.
|
||||
*/
|
||||
Path tmpNsDir = pathHelper.getTmpNsDir(namespace);
|
||||
if (fs.exists(tmpNsDir)) {
|
||||
if (fs.listStatus(tmpNsDir).length == 0) {
|
||||
fs.delete(tmpNsDir, false);
|
||||
} else {
|
||||
LOG.error("The tmp directory {} of namespace {} is not empty after delete namespace",
|
||||
tmpNsDir, namespace);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postGrant(ObserverContext<MasterCoprocessorEnvironment> c,
|
||||
UserPermission userPermission, boolean mergeExistingPermissions) throws IOException {
|
||||
if (!checkInitialized()) {
|
||||
return;
|
||||
}
|
||||
try (Table aclTable =
|
||||
c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
|
||||
Configuration conf = c.getEnvironment().getConfiguration();
|
||||
String userName = userPermission.getUser();
|
||||
switch (userPermission.getAccessScope()) {
|
||||
case GLOBAL:
|
||||
UserPermission perm = getUserGlobalPermission(conf, userName);
|
||||
if (perm != null && containReadPermission(perm)) {
|
||||
if (!isHdfsAclSet(aclTable, userName)) {
|
||||
Pair<Set<String>, Set<TableName>> namespaceAndTable =
|
||||
SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
|
||||
Set<String> skipNamespaces = namespaceAndTable.getFirst();
|
||||
Set<TableName> skipTables = namespaceAndTable.getSecond().stream()
|
||||
.filter(t -> !skipNamespaces.contains(t.getNamespaceAsString()))
|
||||
.collect(Collectors.toSet());
|
||||
hdfsAclHelper.grantAcl(userPermission, skipNamespaces, skipTables);
|
||||
SnapshotScannerHDFSAclStorage.addUserGlobalHdfsAcl(aclTable, userName);
|
||||
}
|
||||
} else {
|
||||
// The merged user permission doesn't contain READ, so remove user global HDFS acls if
|
||||
// it's set
|
||||
removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
|
||||
}
|
||||
break;
|
||||
case NAMESPACE:
|
||||
String namespace = ((NamespacePermission) userPermission.getPermission()).getNamespace();
|
||||
UserPermission nsPerm = getUserNamespacePermission(conf, userName, namespace);
|
||||
if (nsPerm != null && containReadPermission(nsPerm)) {
|
||||
if (!isHdfsAclSet(aclTable, userName, namespace)) {
|
||||
Set<TableName> skipTables = SnapshotScannerHDFSAclStorage
|
||||
.getUserNamespaceAndTable(aclTable, userName).getSecond();
|
||||
hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), skipTables);
|
||||
}
|
||||
SnapshotScannerHDFSAclStorage.addUserNamespaceHdfsAcl(aclTable, userName, namespace);
|
||||
} else {
|
||||
// The merged user permission doesn't contain READ, so remove user namespace HDFS acls
|
||||
// if it's set
|
||||
removeUserNamespaceHdfsAcl(aclTable, userName, namespace, userPermission);
|
||||
}
|
||||
break;
|
||||
case TABLE:
|
||||
TableName tableName = ((TablePermission) userPermission.getPermission()).getTableName();
|
||||
UserPermission tPerm = getUserTablePermission(conf, userName, tableName);
|
||||
if (tPerm != null) {
|
||||
TablePermission tablePermission = (TablePermission) tPerm.getPermission();
|
||||
if (tablePermission.hasFamily() || tablePermission.hasQualifier()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (tPerm != null && containReadPermission(tPerm)) {
|
||||
if (!isHdfsAclSet(aclTable, userName, tableName)) {
|
||||
hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
|
||||
}
|
||||
SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, userName, tableName);
|
||||
} else {
|
||||
// The merged user permission doesn't contain READ, so remove user table HDFS acls if
|
||||
// it's set
|
||||
removeUserTableHdfsAcl(aclTable, userName, tableName, userPermission);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException(
|
||||
"Illegal user permission scope " + userPermission.getAccessScope());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postRevoke(ObserverContext<MasterCoprocessorEnvironment> c,
|
||||
UserPermission userPermission) throws IOException {
|
||||
if (checkInitialized()) {
|
||||
try (Table aclTable =
|
||||
c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
|
||||
String userName = userPermission.getUser();
|
||||
Configuration conf = c.getEnvironment().getConfiguration();
|
||||
switch (userPermission.getAccessScope()) {
|
||||
case GLOBAL:
|
||||
UserPermission userGlobalPerm = getUserGlobalPermission(conf, userName);
|
||||
if (userGlobalPerm == null || !containReadPermission(userGlobalPerm)) {
|
||||
removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
|
||||
}
|
||||
break;
|
||||
case NAMESPACE:
|
||||
NamespacePermission nsPerm = (NamespacePermission) userPermission.getPermission();
|
||||
UserPermission userNsPerm =
|
||||
getUserNamespacePermission(conf, userName, nsPerm.getNamespace());
|
||||
if (userNsPerm == null || !containReadPermission(userNsPerm)) {
|
||||
removeUserNamespaceHdfsAcl(aclTable, userName, nsPerm.getNamespace(), userPermission);
|
||||
}
|
||||
break;
|
||||
case TABLE:
|
||||
TablePermission tPerm = (TablePermission) userPermission.getPermission();
|
||||
UserPermission userTablePerm =
|
||||
getUserTablePermission(conf, userName, tPerm.getTableName());
|
||||
if (userTablePerm == null || !containReadPermission(userTablePerm)) {
|
||||
removeUserTableHdfsAcl(aclTable, userName, tPerm.getTableName(), userPermission);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException(
|
||||
"Illegal user permission scope " + userPermission.getAccessScope());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void removeUserGlobalHdfsAcl(Table aclTable, String userName,
|
||||
UserPermission userPermission) throws IOException {
|
||||
if (SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
|
||||
// remove user global acls but reserve ns and table acls
|
||||
Pair<Set<String>, Set<TableName>> namespaceAndTable =
|
||||
SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
|
||||
Set<String> skipNamespaces = namespaceAndTable.getFirst();
|
||||
Set<TableName> skipTables = namespaceAndTable.getSecond().stream()
|
||||
.filter(t -> !skipNamespaces.contains(t.getNamespaceAsString()))
|
||||
.collect(Collectors.toSet());
|
||||
hdfsAclHelper.revokeAcl(userPermission, skipNamespaces, skipTables);
|
||||
SnapshotScannerHDFSAclStorage.deleteUserGlobalHdfsAcl(aclTable, userName);
|
||||
}
|
||||
}
|
||||
|
||||
private void removeUserNamespaceHdfsAcl(Table aclTable, String userName, String namespace,
|
||||
UserPermission userPermission) throws IOException {
|
||||
// remove user ns acls but reserve table acls
|
||||
if (SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, namespace)) {
|
||||
if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
|
||||
Set<TableName> skipTables =
|
||||
SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName).getSecond();
|
||||
hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(), skipTables);
|
||||
}
|
||||
SnapshotScannerHDFSAclStorage.deleteUserNamespaceHdfsAcl(aclTable, userName, namespace);
|
||||
}
|
||||
}
|
||||
|
||||
private void removeUserTableHdfsAcl(Table aclTable, String userName, TableName tableName,
|
||||
UserPermission userPermission) throws IOException {
|
||||
if (SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl(aclTable, userName, tableName)) {
|
||||
if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)
|
||||
&& !SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
|
||||
tableName.getNamespaceAsString())) {
|
||||
// remove table acls
|
||||
hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
|
||||
}
|
||||
SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(aclTable, userName, tableName);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean containReadPermission(UserPermission userPermission) {
|
||||
if (userPermission != null) {
|
||||
return Arrays.stream(userPermission.getPermission().getActions())
|
||||
.anyMatch(action -> action == Action.READ);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private UserPermission getUserGlobalPermission(Configuration conf, String userName)
|
||||
throws IOException {
|
||||
List<UserPermission> permissions = PermissionStorage.getUserPermissions(conf,
|
||||
PermissionStorage.ACL_GLOBAL_NAME, null, null, userName, true);
|
||||
if (permissions != null && permissions.size() > 0) {
|
||||
return permissions.get(0);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private UserPermission getUserNamespacePermission(Configuration conf, String userName,
|
||||
String namespace) throws IOException {
|
||||
List<UserPermission> permissions =
|
||||
PermissionStorage.getUserNamespacePermissions(conf, namespace, userName, true);
|
||||
if (permissions != null && permissions.size() > 0) {
|
||||
return permissions.get(0);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private UserPermission getUserTablePermission(Configuration conf, String userName,
|
||||
TableName tableName) throws IOException {
|
||||
List<UserPermission> permissions =
|
||||
PermissionStorage.getUserTablePermissions(conf, tableName, null, null, userName, true);
|
||||
if (permissions != null && permissions.size() > 0) {
|
||||
return permissions.get(0);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private boolean isHdfsAclSet(Table aclTable, String userName) throws IOException {
|
||||
return isHdfsAclSet(aclTable, userName, null, null);
|
||||
}
|
||||
|
||||
private boolean isHdfsAclSet(Table aclTable, String userName, String namespace)
|
||||
throws IOException {
|
||||
return isHdfsAclSet(aclTable, userName, namespace, null);
|
||||
}
|
||||
|
||||
private boolean isHdfsAclSet(Table aclTable, String userName, TableName tableName)
|
||||
throws IOException {
|
||||
return isHdfsAclSet(aclTable, userName, null, tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if user global/namespace/table HDFS acls is already set to hfile
|
||||
*/
|
||||
private boolean isHdfsAclSet(Table aclTable, String userName, String namespace,
|
||||
TableName tableName) throws IOException {
|
||||
boolean isSet = SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName);
|
||||
if (namespace != null) {
|
||||
isSet = isSet
|
||||
|| SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, namespace);
|
||||
}
|
||||
if (tableName != null) {
|
||||
isSet = isSet
|
||||
|| SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
|
||||
tableName.getNamespaceAsString())
|
||||
|| SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl(aclTable, userName, tableName);
|
||||
}
|
||||
return isSet;
|
||||
}
|
||||
|
||||
private boolean checkInitialized() {
|
||||
if (initialized) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private User getActiveUser(ObserverContext<?> ctx) throws IOException {
|
||||
// for non-rpc handling, fallback to system user
|
||||
Optional<User> optionalUser = ctx.getCaller();
|
||||
if (optionalUser.isPresent()) {
|
||||
return optionalUser.get();
|
||||
}
|
||||
return userProvider.getCurrent();
|
||||
}
|
||||
|
||||
static final class SnapshotScannerHDFSAclStorage {
|
||||
/**
|
||||
* Add a new CF in HBase acl table to record if the HBase read permission is synchronized to
|
||||
* related hfiles. The record has two usages: 1. check if we need to remove HDFS acls for a
|
||||
* grant without READ permission(eg: grant user table read permission and then grant user table
|
||||
* write permission without merging the existing permissions, in this case, need to remove HDFS
|
||||
* acls); 2. skip some HDFS acl sync because it may be already set(eg: grant user table read
|
||||
* permission and then grant user ns read permission; grant user table read permission and then
|
||||
* grant user table write permission with merging the existing permissions).
|
||||
*/
|
||||
static final byte[] HDFS_ACL_FAMILY = Bytes.toBytes("m");
|
||||
// The value 'R' has no specific meaning, if cell value is not null, it means that the user HDFS
|
||||
// acls is set to hfiles.
|
||||
private static final byte[] HDFS_ACL_VALUE = Bytes.toBytes("R");
|
||||
|
||||
static void addUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
|
||||
addUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
|
||||
}
|
||||
|
||||
static void addUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
|
||||
throws IOException {
|
||||
addUserEntry(aclTable, user, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
|
||||
}
|
||||
|
||||
static void addUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
|
||||
throws IOException {
|
||||
addUserEntry(aclTable, user, tableName.getName());
|
||||
}
|
||||
|
||||
private static void addUserEntry(Table t, String user, byte[] entry) throws IOException {
|
||||
Put p = new Put(entry);
|
||||
p.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(user), HDFS_ACL_VALUE);
|
||||
t.put(p);
|
||||
}
|
||||
|
||||
static void deleteUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
|
||||
deleteUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
|
||||
}
|
||||
|
||||
static void deleteUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
|
||||
throws IOException {
|
||||
deleteUserEntry(aclTable, user, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
|
||||
}
|
||||
|
||||
static void deleteUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
|
||||
throws IOException {
|
||||
deleteUserEntry(aclTable, user, tableName.getName());
|
||||
}
|
||||
|
||||
private static void deleteUserEntry(Table aclTable, String user, byte[] entry)
|
||||
throws IOException {
|
||||
Delete delete = new Delete(entry);
|
||||
delete.addColumns(HDFS_ACL_FAMILY, Bytes.toBytes(user));
|
||||
aclTable.delete(delete);
|
||||
}
|
||||
|
||||
static void deleteNamespaceHdfsAcl(Table aclTable, String namespace) throws IOException {
|
||||
deleteEntry(aclTable, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
|
||||
}
|
||||
|
||||
static void deleteTableHdfsAcl(Table aclTable, TableName tableName) throws IOException {
|
||||
deleteEntry(aclTable, tableName.getName());
|
||||
}
|
||||
|
||||
private static void deleteEntry(Table aclTable, byte[] entry) throws IOException {
|
||||
Delete delete = new Delete(entry);
|
||||
delete.addFamily(HDFS_ACL_FAMILY);
|
||||
aclTable.delete(delete);
|
||||
}
|
||||
|
||||
static List<String> getTableUsers(Table aclTable, TableName tableName) throws IOException {
|
||||
return getEntryUsers(aclTable, tableName.getName());
|
||||
}
|
||||
|
||||
private static List<String> getEntryUsers(Table aclTable, byte[] entry) throws IOException {
|
||||
List<String> users = new ArrayList<>();
|
||||
Get get = new Get(entry);
|
||||
get.addFamily(HDFS_ACL_FAMILY);
|
||||
Result result = aclTable.get(get);
|
||||
List<Cell> cells = result.listCells();
|
||||
if (cells != null) {
|
||||
for (Cell cell : cells) {
|
||||
if (cell != null) {
|
||||
users.add(Bytes.toString(CellUtil.cloneQualifier(cell)));
|
||||
}
|
||||
}
|
||||
}
|
||||
return users;
|
||||
}
|
||||
|
||||
static Pair<Set<String>, Set<TableName>> getUserNamespaceAndTable(Table aclTable,
|
||||
String userName) throws IOException {
|
||||
Set<String> namespaces = new HashSet<>();
|
||||
Set<TableName> tables = new HashSet<>();
|
||||
List<byte[]> userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, userName);
|
||||
for (byte[] entry : userEntries) {
|
||||
if (PermissionStorage.isNamespaceEntry(entry)) {
|
||||
namespaces.add(Bytes.toString(PermissionStorage.fromNamespaceEntry(entry)));
|
||||
} else if (PermissionStorage.isTableEntry(entry)) {
|
||||
tables.add(TableName.valueOf(entry));
|
||||
}
|
||||
}
|
||||
return new Pair<>(namespaces, tables);
|
||||
}
|
||||
|
||||
static List<byte[]> getUserEntries(Table aclTable, String userName) throws IOException {
|
||||
Scan scan = new Scan();
|
||||
scan.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(userName));
|
||||
ResultScanner scanner = aclTable.getScanner(scan);
|
||||
List<byte[]> entry = new ArrayList<>();
|
||||
for (Result result : scanner) {
|
||||
if (result != null && result.getRow() != null) {
|
||||
entry.add(result.getRow());
|
||||
}
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
|
||||
static boolean hasUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
|
||||
return hasUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
|
||||
}
|
||||
|
||||
static boolean hasUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
|
||||
throws IOException {
|
||||
return hasUserEntry(aclTable, user,
|
||||
Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
|
||||
}
|
||||
|
||||
static boolean hasUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
|
||||
throws IOException {
|
||||
return hasUserEntry(aclTable, user, tableName.getName());
|
||||
}
|
||||
|
||||
private static boolean hasUserEntry(Table aclTable, String userName, byte[] entry)
|
||||
throws IOException {
|
||||
Get get = new Get(entry);
|
||||
get.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(userName));
|
||||
return aclTable.exists(get);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,737 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.security.access;
|
||||
|
||||
import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
|
||||
import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
|
||||
import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
|
||||
import static org.apache.hadoop.fs.permission.AclEntryType.USER;
|
||||
import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
|
||||
import static org.apache.hadoop.hbase.security.access.Permission.Action.READ;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclEntryScope;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.AuthUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* A helper to modify or remove HBase granted user default and access HDFS ACLs over hFiles.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SnapshotScannerHDFSAclHelper implements Closeable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclHelper.class);
|
||||
|
||||
public static final String USER_SCAN_SNAPSHOT_ENABLE = "hbase.user.scan.snapshot.enable";
|
||||
public static final String USER_SCAN_SNAPSHOT_THREAD_NUMBER =
|
||||
"hbase.user.scan.snapshot.thread.number";
|
||||
// The tmp directory to restore snapshot, it can not be a sub directory of HBase root dir
|
||||
public static final String SNAPSHOT_RESTORE_TMP_DIR = "hbase.snapshot.restore.tmp.dir";
|
||||
public static final String SNAPSHOT_RESTORE_TMP_DIR_DEFAULT =
|
||||
"/hbase/.tmpdir-to-restore-snapshot";
|
||||
// The default permission of the common directories if the feature is enabled.
|
||||
public static final String COMMON_DIRECTORY_PERMISSION =
|
||||
"hbase.user.scan.snapshot.common.directory.permission";
|
||||
// The secure HBase permission is 700, 751 means all others have execute access and the mask is
|
||||
// set to read-execute to make the extended access ACL entries can work. Be cautious to set
|
||||
// this value.
|
||||
public static final String COMMON_DIRECTORY_PERMISSION_DEFAULT = "751";
|
||||
// The default permission of the snapshot restore directories if the feature is enabled.
|
||||
public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION =
|
||||
"hbase.user.scan.snapshot.restore.directory.permission";
|
||||
// 753 means all others have write-execute access.
|
||||
public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT = "753";
|
||||
|
||||
private Admin admin;
|
||||
private final Configuration conf;
|
||||
private FileSystem fs;
|
||||
private PathHelper pathHelper;
|
||||
private ExecutorService pool;
|
||||
|
||||
public SnapshotScannerHDFSAclHelper(Configuration configuration, Connection connection)
|
||||
throws IOException {
|
||||
this.conf = configuration;
|
||||
this.pathHelper = new PathHelper(conf);
|
||||
this.fs = pathHelper.getFileSystem();
|
||||
this.pool = Executors.newFixedThreadPool(conf.getInt(USER_SCAN_SNAPSHOT_THREAD_NUMBER, 10),
|
||||
new ThreadFactoryBuilder().setNameFormat("hdfs-acl-thread-%d").setDaemon(true).build());
|
||||
this.admin = connection.getAdmin();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (pool != null) {
|
||||
pool.shutdown();
|
||||
}
|
||||
try {
|
||||
admin.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Close admin error", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void setCommonDirectoryPermission() throws IOException {
|
||||
// Set public directory permission to 751 to make all users have access permission.
|
||||
// And we also need the access permission of the parent of HBase root directory, but
|
||||
// it's not set here, because the owner of HBase root directory may don't own permission
|
||||
// to change it's parent permission to 751.
|
||||
// The {root/.tmp} and {root/.tmp/data} directories are created to make global user HDFS
|
||||
// ACLs can be inherited.
|
||||
List<Path> paths = Lists.newArrayList(pathHelper.getRootDir(), pathHelper.getMobDir(),
|
||||
pathHelper.getTmpDir(), pathHelper.getArchiveDir());
|
||||
paths.addAll(getGlobalRootPaths());
|
||||
for (Path path : paths) {
|
||||
if (!fs.exists(path)) {
|
||||
fs.mkdirs(path);
|
||||
}
|
||||
fs.setPermission(path, new FsPermission(
|
||||
conf.get(COMMON_DIRECTORY_PERMISSION, COMMON_DIRECTORY_PERMISSION_DEFAULT)));
|
||||
}
|
||||
// create snapshot restore directory
|
||||
Path restoreDir =
|
||||
new Path(conf.get(SNAPSHOT_RESTORE_TMP_DIR, SNAPSHOT_RESTORE_TMP_DIR_DEFAULT));
|
||||
if (!fs.exists(restoreDir)) {
|
||||
fs.mkdirs(restoreDir);
|
||||
fs.setPermission(restoreDir, new FsPermission(conf.get(SNAPSHOT_RESTORE_DIRECTORY_PERMISSION,
|
||||
SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set acl when grant user permission
|
||||
* @param userPermission the user and permission
|
||||
* @param skipNamespaces the namespace set to skip set acl because already set
|
||||
* @param skipTables the table set to skip set acl because already set
|
||||
* @return false if an error occurred, otherwise true
|
||||
*/
|
||||
public boolean grantAcl(UserPermission userPermission, Set<String> skipNamespaces,
|
||||
Set<TableName> skipTables) {
|
||||
try {
|
||||
long start = System.currentTimeMillis();
|
||||
handleGrantOrRevokeAcl(userPermission, HDFSAclOperation.OperationType.MODIFY, skipNamespaces,
|
||||
skipTables);
|
||||
LOG.info("Set HDFS acl when grant {}, cost {} ms", userPermission,
|
||||
System.currentTimeMillis() - start);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Set HDFS acl error when grant: {}", userPermission, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove acl when grant or revoke user permission
|
||||
* @param userPermission the user and permission
|
||||
* @param skipNamespaces the namespace set to skip remove acl
|
||||
* @param skipTables the table set to skip remove acl
|
||||
* @return false if an error occurred, otherwise true
|
||||
*/
|
||||
public boolean revokeAcl(UserPermission userPermission, Set<String> skipNamespaces,
|
||||
Set<TableName> skipTables) {
|
||||
try {
|
||||
long start = System.currentTimeMillis();
|
||||
handleGrantOrRevokeAcl(userPermission, HDFSAclOperation.OperationType.REMOVE, skipNamespaces,
|
||||
skipTables);
|
||||
LOG.info("Set HDFS acl when revoke {}, cost {} ms", userPermission,
|
||||
System.currentTimeMillis() - start);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Set HDFS acl error when revoke: {}", userPermission, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set acl when take a snapshot
|
||||
* @param snapshot the snapshot desc
|
||||
* @return false if an error occurred, otherwise true
|
||||
*/
|
||||
public boolean snapshotAcl(SnapshotDescription snapshot) {
|
||||
try {
|
||||
long start = System.currentTimeMillis();
|
||||
TableName tableName = snapshot.getTableName();
|
||||
// global user permission can be inherited from default acl automatically
|
||||
Set<String> userSet = getUsersWithTableReadAction(tableName);
|
||||
userSet.addAll(getUsersWithNamespaceReadAction(tableName.getNamespaceAsString()));
|
||||
Path path = pathHelper.getSnapshotDir(snapshot.getName());
|
||||
handleHDFSAcl(new HDFSAclOperation(fs, path, userSet, HDFSAclOperation.OperationType.MODIFY,
|
||||
true, HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS)).get();
|
||||
LOG.info("Set HDFS acl when snapshot {}, cost {} ms", snapshot.getName(),
|
||||
System.currentTimeMillis() - start);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Set HDFS acl error when snapshot {}", snapshot, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset acl when truncate table
|
||||
* @param tableName the specific table
|
||||
* @return false if an error occurred, otherwise true
|
||||
*/
|
||||
public boolean resetTableAcl(TableName tableName) {
|
||||
try {
|
||||
long start = System.currentTimeMillis();
|
||||
// global and namespace user permission can be inherited from default acl automatically
|
||||
setTableAcl(tableName, getUsersWithTableReadAction(tableName));
|
||||
LOG.info("Set HDFS acl when truncate {}, cost {} ms", tableName,
|
||||
System.currentTimeMillis() - start);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Set HDFS acl error when truncate {}", tableName, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove table access acl from namespace dir when delete table
|
||||
* @param tableName the table
|
||||
* @param removeUsers the users whose access acl will be removed
|
||||
* @return false if an error occurred, otherwise true
|
||||
*/
|
||||
public boolean removeNamespaceAcl(TableName tableName, Set<String> removeUsers) {
|
||||
try {
|
||||
long start = System.currentTimeMillis();
|
||||
List<AclEntry> aclEntries = removeUsers.stream()
|
||||
.map(removeUser -> aclEntry(ACCESS, removeUser)).collect(Collectors.toList());
|
||||
String namespace = tableName.getNamespaceAsString();
|
||||
List<Path> nsPaths = Lists.newArrayList(pathHelper.getTmpNsDir(namespace),
|
||||
pathHelper.getDataNsDir(namespace), pathHelper.getMobDataNsDir(namespace));
|
||||
// If table has no snapshots, then remove archive ns HDFS acl, otherwise reserve the archive
|
||||
// ns acl to make the snapshots can be scanned, in the second case, need to remove the archive
|
||||
// ns acl when all snapshots of the deleted table are deleted (will do it in later work).
|
||||
if (getTableSnapshotPaths(tableName).isEmpty()) {
|
||||
nsPaths.add(pathHelper.getArchiveNsDir(namespace));
|
||||
}
|
||||
for (Path nsPath : nsPaths) {
|
||||
fs.removeAclEntries(nsPath, aclEntries);
|
||||
}
|
||||
LOG.info("Remove HDFS acl when delete table {}, cost {} ms", tableName,
|
||||
System.currentTimeMillis() - start);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Set HDFS acl error when delete table {}", tableName, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set table owner acl when create table
|
||||
* @param tableName the table
|
||||
* @param user the table owner
|
||||
* @return false if an error occurred, otherwise true
|
||||
*/
|
||||
public boolean addTableAcl(TableName tableName, String user) {
|
||||
try {
|
||||
long start = System.currentTimeMillis();
|
||||
setTableAcl(tableName, Sets.newHashSet(user));
|
||||
LOG.info("Set HDFS acl when create table {}, cost {} ms", tableName,
|
||||
System.currentTimeMillis() - start);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Set HDFS acl error when create table {}", tableName, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void handleGrantOrRevokeAcl(UserPermission userPermission,
|
||||
HDFSAclOperation.OperationType operationType, Set<String> skipNamespaces,
|
||||
Set<TableName> skipTables) throws ExecutionException, InterruptedException, IOException {
|
||||
Set<String> users = Sets.newHashSet(userPermission.getUser());
|
||||
switch (userPermission.getAccessScope()) {
|
||||
case GLOBAL:
|
||||
handleGlobalAcl(users, skipNamespaces, skipTables, operationType);
|
||||
break;
|
||||
case NAMESPACE:
|
||||
NamespacePermission namespacePermission =
|
||||
(NamespacePermission) userPermission.getPermission();
|
||||
handleNamespaceAcl(Sets.newHashSet(namespacePermission.getNamespace()), users,
|
||||
skipNamespaces, skipTables, operationType);
|
||||
break;
|
||||
case TABLE:
|
||||
TablePermission tablePermission = (TablePermission) userPermission.getPermission();
|
||||
handleNamespaceAccessAcl(tablePermission.getNamespace(), users, operationType);
|
||||
handleTableAcl(Sets.newHashSet(tablePermission.getTableName()), users, skipNamespaces,
|
||||
skipTables, operationType);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException(
|
||||
"Illegal user permission scope " + userPermission.getAccessScope());
|
||||
}
|
||||
}
|
||||
|
||||
private void handleGlobalAcl(Set<String> users, Set<String> skipNamespaces,
|
||||
Set<TableName> skipTables, HDFSAclOperation.OperationType operationType)
|
||||
throws ExecutionException, InterruptedException, IOException {
|
||||
// handle global root directories HDFS acls
|
||||
List<HDFSAclOperation> hdfsAclOperations = getGlobalRootPaths().stream()
|
||||
.map(path -> new HDFSAclOperation(fs, path, users, operationType, false,
|
||||
HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS))
|
||||
.collect(Collectors.toList());
|
||||
handleHDFSAclParallel(hdfsAclOperations).get();
|
||||
// handle namespace HDFS acls
|
||||
handleNamespaceAcl(Sets.newHashSet(admin.listNamespaces()), users, skipNamespaces, skipTables,
|
||||
operationType);
|
||||
}
|
||||
|
||||
private void handleNamespaceAcl(Set<String> namespaces, Set<String> users,
|
||||
Set<String> skipNamespaces, Set<TableName> skipTables,
|
||||
HDFSAclOperation.OperationType operationType)
|
||||
throws ExecutionException, InterruptedException, IOException {
|
||||
namespaces.removeAll(skipNamespaces);
|
||||
// handle namespace root directories HDFS acls
|
||||
List<HDFSAclOperation> hdfsAclOperations = new ArrayList<>();
|
||||
Set<String> skipTableNamespaces =
|
||||
skipTables.stream().map(TableName::getNamespaceAsString).collect(Collectors.toSet());
|
||||
for (String ns : namespaces) {
|
||||
/**
|
||||
* When op is REMOVE, remove the DEFAULT namespace ACL while keep the ACCESS for skipTables,
|
||||
* otherwise remove both the DEFAULT + ACCESS ACLs. When op is MODIFY, just operate the
|
||||
* DEFAULT + ACCESS ACLs.
|
||||
*/
|
||||
HDFSAclOperation.OperationType op = operationType;
|
||||
HDFSAclOperation.AclType aclType = HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS;
|
||||
if (operationType == HDFSAclOperation.OperationType.REMOVE
|
||||
&& skipTableNamespaces.contains(ns)) {
|
||||
// remove namespace directories default HDFS acls for skip tables
|
||||
op = HDFSAclOperation.OperationType.REMOVE;
|
||||
aclType = HDFSAclOperation.AclType.DEFAULT;
|
||||
}
|
||||
for (Path path : getNamespaceRootPaths(ns)) {
|
||||
hdfsAclOperations.add(new HDFSAclOperation(fs, path, users, op, false, aclType));
|
||||
}
|
||||
}
|
||||
handleHDFSAclParallel(hdfsAclOperations).get();
|
||||
// handle table directories HDFS acls
|
||||
Set<TableName> tables = new HashSet<>();
|
||||
for (String namespace : namespaces) {
|
||||
tables.addAll(admin.listTableDescriptorsByNamespace(Bytes.toBytes(namespace)).stream()
|
||||
.map(TableDescriptor::getTableName).collect(Collectors.toSet()));
|
||||
}
|
||||
handleTableAcl(tables, users, skipNamespaces, skipTables, operationType);
|
||||
}
|
||||
|
||||
private void handleTableAcl(Set<TableName> tableNames, Set<String> users,
|
||||
Set<String> skipNamespaces, Set<TableName> skipTables,
|
||||
HDFSAclOperation.OperationType operationType)
|
||||
throws ExecutionException, InterruptedException, IOException {
|
||||
Set<TableName> filterTableNames = new HashSet<>();
|
||||
for (TableName tableName : tableNames) {
|
||||
if (!skipTables.contains(tableName)
|
||||
&& !skipNamespaces.contains(tableName.getNamespaceAsString())) {
|
||||
filterTableNames.add(tableName);
|
||||
}
|
||||
}
|
||||
List<CompletableFuture<Void>> futures = new ArrayList<>();
|
||||
// handle table HDFS acls
|
||||
for (TableName tableName : filterTableNames) {
|
||||
List<HDFSAclOperation> hdfsAclOperations = getTableRootPaths(tableName, true).stream()
|
||||
.map(path -> new HDFSAclOperation(fs, path, users, operationType, true,
|
||||
HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS))
|
||||
.collect(Collectors.toList());
|
||||
CompletableFuture<Void> future = handleHDFSAclSequential(hdfsAclOperations);
|
||||
futures.add(future);
|
||||
}
|
||||
CompletableFuture<Void> future =
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
|
||||
future.get();
|
||||
}
|
||||
|
||||
private void handleNamespaceAccessAcl(String namespace, Set<String> users,
|
||||
HDFSAclOperation.OperationType operationType)
|
||||
throws ExecutionException, InterruptedException {
|
||||
// handle namespace access HDFS acls
|
||||
List<HDFSAclOperation> hdfsAclOperations =
|
||||
getNamespaceRootPaths(namespace).stream().map(path -> new HDFSAclOperation(fs, path, users,
|
||||
operationType, false, HDFSAclOperation.AclType.ACCESS)).collect(Collectors.toList());
|
||||
CompletableFuture<Void> future = handleHDFSAclParallel(hdfsAclOperations);
|
||||
future.get();
|
||||
}
|
||||
|
||||
private void setTableAcl(TableName tableName, Set<String> users)
|
||||
throws ExecutionException, InterruptedException, IOException {
|
||||
HDFSAclOperation.OperationType operationType = HDFSAclOperation.OperationType.MODIFY;
|
||||
handleNamespaceAccessAcl(tableName.getNamespaceAsString(), users, operationType);
|
||||
handleTableAcl(Sets.newHashSet(tableName), users, new HashSet<>(0), new HashSet<>(0),
|
||||
operationType);
|
||||
}
|
||||
|
||||
/**
|
||||
* return paths that user will global permission will visit
|
||||
* @return the path list
|
||||
*/
|
||||
private List<Path> getGlobalRootPaths() {
|
||||
return Lists.newArrayList(pathHelper.getTmpDataDir(), pathHelper.getDataDir(),
|
||||
pathHelper.getMobDataDir(), pathHelper.getArchiveDataDir(), pathHelper.getSnapshotRootDir());
|
||||
}
|
||||
|
||||
/**
|
||||
* return paths that user will namespace permission will visit
|
||||
* @param namespace the namespace
|
||||
* @return the path list
|
||||
* @throws IOException if an error occurred
|
||||
*/
|
||||
List<Path> getNamespaceRootPaths(String namespace) {
|
||||
List<Path> paths =
|
||||
Lists.newArrayList(pathHelper.getTmpNsDir(namespace), pathHelper.getDataNsDir(namespace),
|
||||
pathHelper.getMobDataNsDir(namespace), pathHelper.getArchiveNsDir(namespace));
|
||||
return paths;
|
||||
}
|
||||
|
||||
/**
|
||||
* return paths that user will table permission will visit
|
||||
* @param tableName the table
|
||||
* @param includeSnapshotPath true if return table snapshots paths, otherwise false
|
||||
* @return the path list
|
||||
* @throws IOException if an error occurred
|
||||
*/
|
||||
List<Path> getTableRootPaths(TableName tableName, boolean includeSnapshotPath)
|
||||
throws IOException {
|
||||
List<Path> paths = Lists.newArrayList(pathHelper.getTmpTableDir(tableName),
|
||||
pathHelper.getDataTableDir(tableName), pathHelper.getMobTableDir(tableName),
|
||||
pathHelper.getArchiveTableDir(tableName));
|
||||
if (includeSnapshotPath) {
|
||||
paths.addAll(getTableSnapshotPaths(tableName));
|
||||
}
|
||||
return paths;
|
||||
}
|
||||
|
||||
private List<Path> getTableSnapshotPaths(TableName tableName) throws IOException {
|
||||
return admin.listSnapshots().stream()
|
||||
.filter(snapDesc -> snapDesc.getTableName().equals(tableName))
|
||||
.map(snapshotDescription -> pathHelper.getSnapshotDir(snapshotDescription.getName()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return users with namespace read permission
|
||||
* @param namespace the namespace
|
||||
* @return users with namespace read permission
|
||||
* @throws IOException if an error occurred
|
||||
*/
|
||||
private Set<String> getUsersWithNamespaceReadAction(String namespace) throws IOException {
|
||||
return PermissionStorage.getNamespacePermissions(conf, namespace).entries().stream()
|
||||
.filter(entry -> entry.getValue().getPermission().implies(READ))
|
||||
.map(entry -> entry.getKey()).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return users with table read permission
|
||||
* @param tableName the table
|
||||
* @return users with table read permission
|
||||
* @throws IOException if an error occurred
|
||||
*/
|
||||
private Set<String> getUsersWithTableReadAction(TableName tableName) throws IOException {
|
||||
return PermissionStorage.getTablePermissions(conf, tableName).entries().stream()
|
||||
.filter(entry -> entry.getValue().getPermission().implies(READ))
|
||||
.map(entry -> entry.getKey()).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
PathHelper getPathHelper() {
|
||||
return pathHelper;
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> handleHDFSAcl(HDFSAclOperation acl) {
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
List<HDFSAclOperation> childAclOperations = new ArrayList<>();
|
||||
try {
|
||||
acl.handleAcl();
|
||||
childAclOperations = acl.getChildAclOperations();
|
||||
} catch (FileNotFoundException e) {
|
||||
// Skip handle acl if file not found
|
||||
} catch (IOException e) {
|
||||
LOG.error("Set HDFS acl error for path {}", acl.path, e);
|
||||
}
|
||||
return childAclOperations;
|
||||
}, pool).thenComposeAsync(this::handleHDFSAclParallel, pool);
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> handleHDFSAclSequential(List<HDFSAclOperation> operations) {
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
for (HDFSAclOperation hdfsAclOperation : operations) {
|
||||
handleHDFSAcl(hdfsAclOperation).get();
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
LOG.error("Set HDFS acl error", e);
|
||||
}
|
||||
return null;
|
||||
}, pool);
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> handleHDFSAclParallel(List<HDFSAclOperation> operations) {
|
||||
List<CompletableFuture<Void>> futures =
|
||||
operations.stream().map(this::handleHDFSAcl).collect(Collectors.toList());
|
||||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
|
||||
}
|
||||
|
||||
private static AclEntry aclEntry(AclEntryScope scope, String name) {
|
||||
return new AclEntry.Builder().setScope(scope)
|
||||
.setType(AuthUtil.isGroupPrincipal(name) ? GROUP : USER).setName(name)
|
||||
.setPermission(READ_EXECUTE).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Inner class used to describe modify or remove what type of acl entries(ACCESS, DEFAULT,
|
||||
* ACCESS_AND_DEFAULT) for files or directories(and child files).
|
||||
*/
|
||||
private static class HDFSAclOperation {
|
||||
enum OperationType {
|
||||
MODIFY, REMOVE
|
||||
}
|
||||
|
||||
enum AclType {
|
||||
ACCESS, DEFAULT, DEFAULT_ADN_ACCESS
|
||||
}
|
||||
|
||||
private interface Operation {
|
||||
void apply(FileSystem fs, Path path, List<AclEntry> aclList) throws IOException;
|
||||
}
|
||||
|
||||
private FileSystem fs;
|
||||
private Path path;
|
||||
private Operation operation;
|
||||
private boolean recursive;
|
||||
private AclType aclType;
|
||||
private List<AclEntry> defaultAndAccessAclEntries;
|
||||
private List<AclEntry> accessAclEntries;
|
||||
private List<AclEntry> defaultAclEntries;
|
||||
|
||||
HDFSAclOperation(FileSystem fs, Path path, Set<String> users, OperationType operationType,
|
||||
boolean recursive, AclType aclType) {
|
||||
this.fs = fs;
|
||||
this.path = path;
|
||||
this.defaultAndAccessAclEntries = getAclEntries(AclType.DEFAULT_ADN_ACCESS, users);
|
||||
this.accessAclEntries = getAclEntries(AclType.ACCESS, users);
|
||||
this.defaultAclEntries = getAclEntries(AclType.DEFAULT, users);
|
||||
if (operationType == OperationType.MODIFY) {
|
||||
operation = FileSystem::modifyAclEntries;
|
||||
} else if (operationType == OperationType.REMOVE) {
|
||||
operation = FileSystem::removeAclEntries;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Illegal HDFS acl operation type: " + operationType);
|
||||
}
|
||||
this.recursive = recursive;
|
||||
this.aclType = aclType;
|
||||
}
|
||||
|
||||
HDFSAclOperation(Path path, HDFSAclOperation parent) {
|
||||
this.fs = parent.fs;
|
||||
this.path = path;
|
||||
this.defaultAndAccessAclEntries = parent.defaultAndAccessAclEntries;
|
||||
this.accessAclEntries = parent.accessAclEntries;
|
||||
this.defaultAclEntries = parent.defaultAclEntries;
|
||||
this.operation = parent.operation;
|
||||
this.recursive = parent.recursive;
|
||||
this.aclType = parent.aclType;
|
||||
}
|
||||
|
||||
List<HDFSAclOperation> getChildAclOperations() throws IOException {
|
||||
List<HDFSAclOperation> hdfsAclOperations = new ArrayList<>();
|
||||
if (recursive && fs.isDirectory(path)) {
|
||||
FileStatus[] fileStatuses = fs.listStatus(path);
|
||||
for (FileStatus fileStatus : fileStatuses) {
|
||||
hdfsAclOperations.add(new HDFSAclOperation(fileStatus.getPath(), this));
|
||||
}
|
||||
}
|
||||
return hdfsAclOperations;
|
||||
}
|
||||
|
||||
void handleAcl() throws IOException {
|
||||
if (fs.exists(path)) {
|
||||
if (fs.isDirectory(path)) {
|
||||
switch (aclType) {
|
||||
case ACCESS:
|
||||
operation.apply(fs, path, accessAclEntries);
|
||||
break;
|
||||
case DEFAULT:
|
||||
operation.apply(fs, path, defaultAclEntries);
|
||||
break;
|
||||
case DEFAULT_ADN_ACCESS:
|
||||
operation.apply(fs, path, defaultAndAccessAclEntries);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Illegal HDFS acl type: " + aclType);
|
||||
}
|
||||
} else {
|
||||
operation.apply(fs, path, accessAclEntries);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<AclEntry> getAclEntries(AclType aclType, Set<String> users) {
|
||||
List<AclEntry> aclEntries = new ArrayList<>();
|
||||
switch (aclType) {
|
||||
case ACCESS:
|
||||
for (String user : users) {
|
||||
aclEntries.add(aclEntry(ACCESS, user));
|
||||
}
|
||||
break;
|
||||
case DEFAULT:
|
||||
for (String user : users) {
|
||||
aclEntries.add(aclEntry(DEFAULT, user));
|
||||
}
|
||||
break;
|
||||
case DEFAULT_ADN_ACCESS:
|
||||
for (String user : users) {
|
||||
aclEntries.add(aclEntry(ACCESS, user));
|
||||
aclEntries.add(aclEntry(DEFAULT, user));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Illegal HDFS acl type: " + aclType);
|
||||
}
|
||||
return aclEntries;
|
||||
}
|
||||
}
|
||||
|
||||
static final class PathHelper {
|
||||
Configuration conf;
|
||||
Path rootDir;
|
||||
Path tmpDataDir;
|
||||
Path dataDir;
|
||||
Path mobDataDir;
|
||||
Path archiveDataDir;
|
||||
Path snapshotDir;
|
||||
|
||||
PathHelper(Configuration conf) {
|
||||
this.conf = conf;
|
||||
rootDir = new Path(conf.get(HConstants.HBASE_DIR));
|
||||
tmpDataDir = new Path(new Path(rootDir, HConstants.HBASE_TEMP_DIRECTORY),
|
||||
HConstants.BASE_NAMESPACE_DIR);
|
||||
dataDir = new Path(rootDir, HConstants.BASE_NAMESPACE_DIR);
|
||||
mobDataDir = new Path(MobUtils.getMobHome(rootDir), HConstants.BASE_NAMESPACE_DIR);
|
||||
archiveDataDir = new Path(new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY),
|
||||
HConstants.BASE_NAMESPACE_DIR);
|
||||
snapshotDir = new Path(rootDir, HConstants.SNAPSHOT_DIR_NAME);
|
||||
}
|
||||
|
||||
Path getRootDir() {
|
||||
return rootDir;
|
||||
}
|
||||
|
||||
Path getDataDir() {
|
||||
return dataDir;
|
||||
}
|
||||
|
||||
Path getMobDir() {
|
||||
return mobDataDir.getParent();
|
||||
}
|
||||
|
||||
Path getMobDataDir() {
|
||||
return mobDataDir;
|
||||
}
|
||||
|
||||
Path getTmpDir() {
|
||||
return new Path(rootDir, HConstants.HBASE_TEMP_DIRECTORY);
|
||||
}
|
||||
|
||||
Path getTmpDataDir() {
|
||||
return tmpDataDir;
|
||||
}
|
||||
|
||||
Path getArchiveDir() {
|
||||
return new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY);
|
||||
}
|
||||
|
||||
Path getArchiveDataDir() {
|
||||
return archiveDataDir;
|
||||
}
|
||||
|
||||
Path getDataNsDir(String namespace) {
|
||||
return new Path(dataDir, namespace);
|
||||
}
|
||||
|
||||
Path getMobDataNsDir(String namespace) {
|
||||
return new Path(mobDataDir, namespace);
|
||||
}
|
||||
|
||||
Path getDataTableDir(TableName tableName) {
|
||||
return new Path(getDataNsDir(tableName.getNamespaceAsString()),
|
||||
tableName.getQualifierAsString());
|
||||
}
|
||||
|
||||
Path getMobTableDir(TableName tableName) {
|
||||
return new Path(getMobDataNsDir(tableName.getNamespaceAsString()),
|
||||
tableName.getQualifierAsString());
|
||||
}
|
||||
|
||||
Path getArchiveNsDir(String namespace) {
|
||||
return new Path(archiveDataDir, namespace);
|
||||
}
|
||||
|
||||
Path getArchiveTableDir(TableName tableName) {
|
||||
return new Path(getArchiveNsDir(tableName.getNamespaceAsString()),
|
||||
tableName.getQualifierAsString());
|
||||
}
|
||||
|
||||
Path getTmpNsDir(String namespace) {
|
||||
return new Path(tmpDataDir, namespace);
|
||||
}
|
||||
|
||||
Path getTmpTableDir(TableName tableName) {
|
||||
return new Path(getTmpNsDir(tableName.getNamespaceAsString()),
|
||||
tableName.getQualifierAsString());
|
||||
}
|
||||
|
||||
Path getSnapshotRootDir() {
|
||||
return snapshotDir;
|
||||
}
|
||||
|
||||
Path getSnapshotDir(String snapshot) {
|
||||
return new Path(snapshotDir, snapshot);
|
||||
}
|
||||
|
||||
FileSystem getFileSystem() throws IOException {
|
||||
return rootDir.getFileSystem(conf);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,688 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.security.access;
|
||||
|
||||
import static org.apache.hadoop.hbase.security.access.Permission.Action.READ;
|
||||
import static org.apache.hadoop.hbase.security.access.Permission.Action.WRITE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.TableSnapshotScanner;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({ SecurityTests.class, LargeTests.class })
|
||||
public class TestSnapshotScannerHDFSAclController {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSnapshotScannerHDFSAclController.class);
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestSnapshotScannerHDFSAclController.class);
|
||||
|
||||
private static final String UN_GRANT_USER = "un_grant_user";
|
||||
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static Configuration conf = TEST_UTIL.getConfiguration();
|
||||
private static Admin admin = null;
|
||||
private static FileSystem fs = null;
|
||||
private static Path rootDir = null;
|
||||
private static User unGrantUser = null;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
// enable hdfs acl and set umask to 027
|
||||
conf.setBoolean("dfs.namenode.acls.enabled", true);
|
||||
conf.set("fs.permissions.umask-mode", "027");
|
||||
// enable hbase hdfs acl feature
|
||||
conf.setBoolean(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, true);
|
||||
// enable secure
|
||||
conf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
|
||||
conf.set(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR,
|
||||
SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT);
|
||||
SecureTestUtil.enableSecurity(conf);
|
||||
// add SnapshotScannerHDFSAclController coprocessor
|
||||
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
|
||||
conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) + ","
|
||||
+ SnapshotScannerHDFSAclController.class.getName());
|
||||
|
||||
TEST_UTIL.startMiniCluster();
|
||||
admin = TEST_UTIL.getAdmin();
|
||||
rootDir = TEST_UTIL.getDefaultRootDirPath();
|
||||
fs = rootDir.getFileSystem(conf);
|
||||
unGrantUser = User.createUserForTesting(conf, UN_GRANT_USER, new String[] {});
|
||||
|
||||
// set hbase directory permission
|
||||
FsPermission commonDirectoryPermission =
|
||||
new FsPermission(conf.get(SnapshotScannerHDFSAclHelper.COMMON_DIRECTORY_PERMISSION,
|
||||
SnapshotScannerHDFSAclHelper.COMMON_DIRECTORY_PERMISSION_DEFAULT));
|
||||
Path path = rootDir;
|
||||
while (path != null) {
|
||||
fs.setPermission(path, commonDirectoryPermission);
|
||||
path = path.getParent();
|
||||
}
|
||||
// set restore directory permission
|
||||
Path restoreDir = new Path(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT);
|
||||
if (!fs.exists(restoreDir)) {
|
||||
fs.mkdirs(restoreDir);
|
||||
fs.setPermission(restoreDir,
|
||||
new FsPermission(
|
||||
conf.get(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_DIRECTORY_PERMISSION,
|
||||
SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT)));
|
||||
}
|
||||
path = restoreDir.getParent();
|
||||
while (path != null) {
|
||||
fs.setPermission(path, commonDirectoryPermission);
|
||||
path = path.getParent();
|
||||
}
|
||||
|
||||
TEST_UTIL.waitTableAvailable(PermissionStorage.ACL_TABLE_NAME);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGrantGlobal() throws Exception {
|
||||
final String grantUserName = name.getMethodName();
|
||||
User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
|
||||
|
||||
String namespace1 = name.getMethodName();
|
||||
String namespace2 = namespace1 + "2";
|
||||
String namespace3 = namespace1 + "3";
|
||||
TableName table1 = TableName.valueOf(namespace1, "t1");
|
||||
TableName table12 = TableName.valueOf(namespace1, "t2");
|
||||
TableName table21 = TableName.valueOf(namespace2, "t21");
|
||||
TableName table3 = TableName.valueOf(namespace3, "t3");
|
||||
TableName table31 = TableName.valueOf(namespace3, "t31");
|
||||
String snapshot1 = namespace1 + "t1";
|
||||
String snapshot12 = namespace1 + "t12";
|
||||
String snapshot2 = namespace1 + "t2";
|
||||
String snapshot21 = namespace2 + "t21";
|
||||
String snapshot3 = namespace1 + "t3";
|
||||
String snapshot31 = namespace1 + "t31";
|
||||
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
|
||||
admin.snapshot(snapshot1, table1);
|
||||
|
||||
// case 1: grant G(R) -> grant G(W) -> grant G(R)
|
||||
SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
|
||||
admin.grant(
|
||||
new UserPermission(grantUserName, Permission.newBuilder().withActions(WRITE).build()), true);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
|
||||
SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, WRITE);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, -1);
|
||||
SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
|
||||
admin.snapshot(snapshot12, table1);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot12, 6);
|
||||
|
||||
// case 2: grant G(R),N(R) -> G(W)
|
||||
admin.grant(new UserPermission(grantUserName,
|
||||
Permission.newBuilder(namespace1).withActions(READ).build()),
|
||||
false);
|
||||
SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, WRITE);
|
||||
// table in ns1
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table12);
|
||||
admin.snapshot(snapshot2, table12);
|
||||
// table in ns2
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table21);
|
||||
admin.snapshot(snapshot21, table21);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot21, -1);
|
||||
|
||||
// case 3: grant G(R),T(R) -> G(W)
|
||||
SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table3);
|
||||
admin.snapshot(snapshot3, table3);
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table3, READ);
|
||||
SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, WRITE);
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table31);
|
||||
admin.snapshot(snapshot31, table31);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot31, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGrantNamespace() throws Exception {
|
||||
final String grantUserName = name.getMethodName();
|
||||
User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
|
||||
|
||||
String namespace = name.getMethodName();
|
||||
TableName table = TableName.valueOf(namespace, "t1");
|
||||
TableName table2 = TableName.valueOf(namespace, "t2");
|
||||
TableName table3 = TableName.valueOf(namespace, "t3");
|
||||
String snapshot = namespace + "t1";
|
||||
String snapshot2 = namespace + "t2";
|
||||
String snapshot3 = namespace + "t3";
|
||||
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
|
||||
admin.snapshot(snapshot, table);
|
||||
|
||||
// case 1: grant N(R) -> grant N(W) -> grant N(R)
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table3);
|
||||
admin.snapshot(snapshot3, table3);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, unGrantUser, snapshot, -1);
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, WRITE);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
|
||||
|
||||
// case 2: grant T(R) -> N(W)
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, WRITE);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table2);
|
||||
admin.snapshot(snapshot2, table2);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, WRITE);
|
||||
|
||||
// case 3: grant G(R) -> N(W)
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
|
||||
SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, WRITE);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, 6);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGrantTable() throws Exception {
|
||||
final String grantUserName = name.getMethodName();
|
||||
User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
|
||||
|
||||
String namespace = name.getMethodName();
|
||||
TableName table = TableName.valueOf(namespace, "t1");
|
||||
TableName table2 = TableName.valueOf(namespace, "t2");
|
||||
String snapshot = namespace + "t1";
|
||||
String snapshot2 = namespace + "t1-2";
|
||||
String snapshot3 = namespace + "t2";
|
||||
|
||||
try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, table)) {
|
||||
TestHDFSAclHelper.put(t);
|
||||
admin.snapshot(snapshot, table);
|
||||
// table owner can scan table snapshot
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL,
|
||||
User.createUserForTesting(conf, "owner", new String[] {}), snapshot, 6);
|
||||
// case 1: grant table family(R)
|
||||
SecureTestUtil.grantOnTable(TEST_UTIL, grantUserName, table, TestHDFSAclHelper.COLUMN1, null,
|
||||
READ);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
|
||||
// case 2: grant T(R)
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
|
||||
TestHDFSAclHelper.put2(t);
|
||||
admin.snapshot(snapshot2, table);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 10);
|
||||
}
|
||||
// create t2 and snapshot
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table2);
|
||||
admin.snapshot(snapshot3, table2);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
|
||||
|
||||
// case 3: grant T(R) -> grant T(W) with merging existing permissions
|
||||
TEST_UTIL.getAdmin().grant(
|
||||
new UserPermission(grantUserName, Permission.newBuilder(table).withActions(WRITE).build()),
|
||||
true);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
|
||||
// case 4: grant T(R) -> grant T(W) without merging existing permissions
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, WRITE);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRevokeGlobal() throws Exception {
|
||||
final String grantUserName = name.getMethodName();
|
||||
User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
|
||||
|
||||
String namespace = name.getMethodName();
|
||||
TableName table1 = TableName.valueOf(namespace, "t1");
|
||||
TableName table2 = TableName.valueOf(namespace, "t2");
|
||||
TableName table3 = TableName.valueOf(namespace, "t3");
|
||||
String snapshot1 = namespace + "t1";
|
||||
String snapshot2 = namespace + "t2";
|
||||
String snapshot3 = namespace + "t3";
|
||||
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
|
||||
admin.snapshot(snapshot1, table1);
|
||||
// case 1: grant G(R) -> revoke G(R)
|
||||
SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
|
||||
SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, -1);
|
||||
|
||||
// case 2: grant G(R), grant N(R), grant T(R) -> revoke G(R)
|
||||
SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table1, READ);
|
||||
SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table2);
|
||||
admin.snapshot(snapshot2, table2);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 6);
|
||||
SecureTestUtil.revokeFromNamespace(TEST_UTIL, grantUserName, namespace, READ);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
|
||||
|
||||
// case 3: grant G(R), grant T(R) -> revoke G(R)
|
||||
SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
|
||||
SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table3);
|
||||
admin.snapshot(snapshot3, table3);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRevokeNamespace() throws Exception {
|
||||
String grantUserName = name.getMethodName();
|
||||
User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
|
||||
|
||||
String namespace = name.getMethodName();
|
||||
TableName table1 = TableName.valueOf(namespace, "t1");
|
||||
TableName table2 = TableName.valueOf(namespace, "t2");
|
||||
TableName table3 = TableName.valueOf(namespace, "t3");
|
||||
TableName table4 = TableName.valueOf(namespace, "t4");
|
||||
String snapshot1 = namespace + "t1";
|
||||
String snapshot2 = namespace + "t2";
|
||||
String snapshot3 = namespace + "t3";
|
||||
String snapshot4 = namespace + "t4";
|
||||
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table1);
|
||||
admin.snapshot(snapshot1, table1);
|
||||
|
||||
// case 1: grant N(R) -> revoke N(R)
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
|
||||
admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(namespace).build()));
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table3);
|
||||
admin.snapshot(snapshot3, table3);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, -1);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
|
||||
|
||||
// case 2: grant N(R), grant G(R) -> revoke N(R)
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
|
||||
SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
|
||||
admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(namespace).build()));
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table4);
|
||||
admin.snapshot(snapshot4, table4);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot4, 6);
|
||||
SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
|
||||
|
||||
// case 3: grant N(R), grant T(R) -> revoke N(R)
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table1, READ);
|
||||
SecureTestUtil.revokeFromNamespace(TEST_UTIL, grantUserName, namespace, READ);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot1, 6);
|
||||
TestHDFSAclHelper.createTable(TEST_UTIL, table2);
|
||||
admin.snapshot(snapshot2, table2);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRevokeTable() throws Exception {
|
||||
final String grantUserName = name.getMethodName();
|
||||
User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
|
||||
|
||||
String namespace = name.getMethodName();
|
||||
TableName table = TableName.valueOf(namespace, "t1");
|
||||
String snapshot = namespace + "t1";
|
||||
|
||||
TestHDFSAclHelper.createTableAndPut(TEST_UTIL, table);
|
||||
admin.snapshot(snapshot, table);
|
||||
|
||||
// case 1: grant T(R) -> revoke table family
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
|
||||
SecureTestUtil.revokeFromTable(TEST_UTIL, grantUserName, table, TestHDFSAclHelper.COLUMN1, null,
|
||||
READ);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
|
||||
// case 2: grant T(R) -> revoke T(R)
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
|
||||
admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(table).build()));
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
|
||||
|
||||
// case 3: grant T(R), grant N(R) -> revoke T(R)
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
|
||||
admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(table).build()));
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
SecureTestUtil.revokeFromNamespace(TEST_UTIL, grantUserName, namespace, READ);
|
||||
|
||||
// case 4: grant T(R), grant G(R) -> revoke T(R)
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
|
||||
SecureTestUtil.grantGlobal(TEST_UTIL, grantUserName, READ);
|
||||
admin.revoke(new UserPermission(grantUserName, Permission.newBuilder(table).build()));
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
SecureTestUtil.revokeGlobal(TEST_UTIL, grantUserName, READ);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTruncateTable() throws Exception {
|
||||
String grantUserName = name.getMethodName();
|
||||
User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
|
||||
String grantUserName2 = grantUserName + "2";
|
||||
User grantUser2 = User.createUserForTesting(conf, grantUserName2, new String[] {});
|
||||
|
||||
String namespace = name.getMethodName();
|
||||
TableName tableName = TableName.valueOf(namespace, "t1");
|
||||
String snapshot = namespace + "t1";
|
||||
String snapshot2 = namespace + "t1-2";
|
||||
try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, tableName)) {
|
||||
TestHDFSAclHelper.put(t);
|
||||
// snapshot
|
||||
admin.snapshot(snapshot, tableName);
|
||||
// grant user2 namespace permission
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName2, namespace, READ);
|
||||
// grant user table permission
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, tableName, READ);
|
||||
// truncate table
|
||||
admin.disableTable(tableName);
|
||||
admin.truncateTable(tableName, true);
|
||||
TestHDFSAclHelper.put2(t);
|
||||
// snapshot
|
||||
admin.snapshot(snapshot2, tableName);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser2, snapshot, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 9);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser2, snapshot2, 9);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestoreSnapshot() throws Exception {
|
||||
final String grantUserName = name.getMethodName();
|
||||
User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
|
||||
String namespace = name.getMethodName();
|
||||
TableName table = TableName.valueOf(namespace, "t1");
|
||||
String snapshot = namespace + "t1";
|
||||
String snapshot2 = namespace + "t1-2";
|
||||
String snapshot3 = namespace + "t1-3";
|
||||
|
||||
try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, table)) {
|
||||
TestHDFSAclHelper.put(t);
|
||||
// grant t1, snapshot
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
|
||||
admin.snapshot(snapshot, table);
|
||||
// delete
|
||||
admin.disableTable(table);
|
||||
admin.deleteTable(table);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
|
||||
// restore snapshot and restore acl
|
||||
admin.restoreSnapshot(snapshot, true, true);
|
||||
TestHDFSAclHelper.put2(t);
|
||||
// snapshot
|
||||
admin.snapshot(snapshot2, table);
|
||||
// delete
|
||||
admin.disableTable(table);
|
||||
admin.deleteTable(table);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 10);
|
||||
|
||||
// restore snapshot and skip restore acl
|
||||
admin.restoreSnapshot(snapshot);
|
||||
admin.snapshot(snapshot3, table);
|
||||
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 10);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteTable() throws Exception {
|
||||
String namespace = name.getMethodName();
|
||||
String grantUserName1 = namespace + "1";
|
||||
String grantUserName2 = namespace + "2";
|
||||
String grantUserName3 = namespace + "3";
|
||||
User grantUser1 = User.createUserForTesting(conf, grantUserName1, new String[] {});
|
||||
User grantUser2 = User.createUserForTesting(conf, grantUserName2, new String[] {});
|
||||
User grantUser3 = User.createUserForTesting(conf, grantUserName3, new String[] {});
|
||||
|
||||
TableName tableName1 = TableName.valueOf(namespace, "t1");
|
||||
TableName tableName2 = TableName.valueOf(namespace, "t2");
|
||||
String snapshot1 = namespace + "t1";
|
||||
String snapshot2 = namespace + "t2";
|
||||
try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, tableName1);
|
||||
Table t2 = TestHDFSAclHelper.createTable(TEST_UTIL, tableName2)) {
|
||||
TestHDFSAclHelper.put(t);
|
||||
TestHDFSAclHelper.put(t2);
|
||||
// snapshot
|
||||
admin.snapshot(snapshot1, tableName1);
|
||||
admin.snapshot(snapshot2, tableName2);
|
||||
// grant user table permission
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName1, tableName1, READ);
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName2, tableName2, READ);
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName3, namespace, READ);
|
||||
// delete table
|
||||
admin.disableTable(tableName1);
|
||||
admin.deleteTable(tableName1);
|
||||
// grantUser2 and grantUser3 should have data/ns acl
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser1, snapshot1, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser2, snapshot2, 6);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser3, snapshot2, 6);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteNamespace() throws Exception {
|
||||
String grantUserName = name.getMethodName();
|
||||
User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
|
||||
|
||||
String namespace = name.getMethodName();
|
||||
TableName tableName = TableName.valueOf(namespace, "t1");
|
||||
String snapshot = namespace + "t1";
|
||||
try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, tableName)) {
|
||||
TestHDFSAclHelper.put(t);
|
||||
// snapshot
|
||||
admin.snapshot(snapshot, tableName);
|
||||
// grant user2 namespace permission
|
||||
SecureTestUtil.grantOnNamespace(TEST_UTIL, grantUserName, namespace, READ);
|
||||
// truncate table
|
||||
admin.disableTable(tableName);
|
||||
admin.deleteTable(tableName);
|
||||
// snapshot
|
||||
admin.deleteNamespace(namespace);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGrantMobTable() throws Exception {
|
||||
final String grantUserName = name.getMethodName();
|
||||
User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
|
||||
|
||||
String namespace = name.getMethodName();
|
||||
TableName table = TableName.valueOf(namespace, "t1");
|
||||
String snapshot = namespace + "t1";
|
||||
|
||||
try (Table t = TestHDFSAclHelper.createMobTable(TEST_UTIL, table)) {
|
||||
TestHDFSAclHelper.put(t);
|
||||
admin.snapshot(snapshot, table);
|
||||
TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
|
||||
TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final class TestHDFSAclHelper {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestHDFSAclHelper.class);
|
||||
|
||||
private TestHDFSAclHelper() {
|
||||
}
|
||||
|
||||
static void grantOnTable(HBaseTestingUtility util, String user, TableName tableName,
|
||||
Permission.Action... actions) throws Exception {
|
||||
SecureTestUtil.grantOnTable(util, user, tableName, null, null, actions);
|
||||
}
|
||||
|
||||
private static void createNamespace(HBaseTestingUtility util, String namespace)
|
||||
throws IOException {
|
||||
if (!Arrays.stream(util.getAdmin().listNamespaceDescriptors())
|
||||
.anyMatch(ns -> ns.getName().equals(namespace))) {
|
||||
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
|
||||
util.getAdmin().createNamespace(namespaceDescriptor);
|
||||
}
|
||||
}
|
||||
|
||||
static Table createTable(HBaseTestingUtility util, TableName tableName) throws IOException {
|
||||
createNamespace(util, tableName.getNamespaceAsString());
|
||||
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN1).build())
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN2).build())
|
||||
.setOwner(User.createUserForTesting(util.getConfiguration(), "owner", new String[] {}))
|
||||
.build();
|
||||
byte[][] splits = new byte[][] { Bytes.toBytes("2"), Bytes.toBytes("4") };
|
||||
return util.createTable(td, splits);
|
||||
}
|
||||
|
||||
static Table createMobTable(HBaseTestingUtility util, TableName tableName) throws IOException {
|
||||
createNamespace(util, tableName.getNamespaceAsString());
|
||||
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN1).setMobEnabled(true)
|
||||
.setMobThreshold(0).build())
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN2).setMobEnabled(true)
|
||||
.setMobThreshold(0).build())
|
||||
.setOwner(User.createUserForTesting(util.getConfiguration(), "owner", new String[] {}))
|
||||
.build();
|
||||
byte[][] splits = new byte[][] { Bytes.toBytes("2"), Bytes.toBytes("4") };
|
||||
return util.createTable(td, splits);
|
||||
}
|
||||
|
||||
static void createTableAndPut(HBaseTestingUtility util, TableName tableNam) throws IOException {
|
||||
try (Table t = createTable(util, tableNam)) {
|
||||
put(t);
|
||||
}
|
||||
}
|
||||
|
||||
static final byte[] COLUMN1 = Bytes.toBytes("A");
|
||||
static final byte[] COLUMN2 = Bytes.toBytes("B");
|
||||
|
||||
static void put(Table hTable) throws IOException {
|
||||
List<Put> puts = new ArrayList<>();
|
||||
for (int i = 0; i < 6; i++) {
|
||||
Put put = new Put(Bytes.toBytes(i));
|
||||
put.addColumn(COLUMN1, null, Bytes.toBytes(i));
|
||||
put.addColumn(COLUMN2, null, Bytes.toBytes(i + 1));
|
||||
puts.add(put);
|
||||
}
|
||||
hTable.put(puts);
|
||||
}
|
||||
|
||||
static void put2(Table hTable) throws IOException {
|
||||
List<Put> puts = new ArrayList<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
if (i == 5) {
|
||||
continue;
|
||||
}
|
||||
Put put = new Put(Bytes.toBytes(i));
|
||||
put.addColumn(COLUMN1, null, Bytes.toBytes(i + 2));
|
||||
put.addColumn(COLUMN2, null, Bytes.toBytes(i + 3));
|
||||
puts.add(put);
|
||||
}
|
||||
hTable.put(puts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if user is able to read expected rows from the specific snapshot
|
||||
* @param user the specific user
|
||||
* @param snapshot the snapshot to be scanned
|
||||
* @param expectedRowCount expected row count read from snapshot, -1 if expects
|
||||
* AccessControlException
|
||||
* @throws IOException user scan snapshot error
|
||||
* @throws InterruptedException user scan snapshot error
|
||||
*/
|
||||
static void canUserScanSnapshot(HBaseTestingUtility util, User user, String snapshot,
|
||||
int expectedRowCount) throws IOException, InterruptedException {
|
||||
PrivilegedExceptionAction<Void> action =
|
||||
getScanSnapshotAction(util.getConfiguration(), snapshot, expectedRowCount);
|
||||
user.runAs(action);
|
||||
}
|
||||
|
||||
private static PrivilegedExceptionAction<Void> getScanSnapshotAction(Configuration conf,
|
||||
String snapshotName, long expectedRowCount) {
|
||||
PrivilegedExceptionAction<Void> action = () -> {
|
||||
try {
|
||||
Path restoreDir = new Path(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT);
|
||||
Scan scan = new Scan();
|
||||
TableSnapshotScanner scanner =
|
||||
new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
|
||||
int rowCount = 0;
|
||||
while (true) {
|
||||
Result result = scanner.next();
|
||||
if (result == null) {
|
||||
break;
|
||||
}
|
||||
rowCount++;
|
||||
}
|
||||
scanner.close();
|
||||
assertEquals(expectedRowCount, rowCount);
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Scan snapshot error, snapshot {}", snapshotName, e);
|
||||
assertEquals(expectedRowCount, -1);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
return action;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue