HDFS-7093. Add config key to restrict setStoragePolicy. (Arpit Agarwal)

This commit is contained in:
arp 2014-09-28 19:28:51 -07:00
parent 400e1bb4ef
commit b38e52b5e8
5 changed files with 74 additions and 2 deletions

View File

@ -514,6 +514,8 @@ Release 2.6.0 - UNRELEASED
HDFS-7156. Update fsck documentation. (Masahiro Yamaguch via shv) HDFS-7156. Update fsck documentation. (Masahiro Yamaguch via shv)
HDFS-7093. Add config key to restrict setStoragePolicy. (Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -558,6 +558,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DOMAIN_SOCKET_PATH_KEY = "dfs.domain.socket.path"; public static final String DFS_DOMAIN_SOCKET_PATH_KEY = "dfs.domain.socket.path";
public static final String DFS_DOMAIN_SOCKET_PATH_DEFAULT = ""; public static final String DFS_DOMAIN_SOCKET_PATH_DEFAULT = "";
public static final String DFS_STORAGE_POLICY_ENABLED_KEY = "dfs.storage.policy.enabled";
public static final boolean DFS_STORAGE_POLICY_ENABLED_DEFAULT = true;
// HA related configuration // HA related configuration
public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes"; public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
public static final String DFS_HA_NAMENODE_ID_KEY = "dfs.ha.namenode.id"; public static final String DFS_HA_NAMENODE_ID_KEY = "dfs.ha.namenode.id";

View File

@ -84,6 +84,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.util.Time.now;
@ -420,6 +422,9 @@ private void logAuditEvent(boolean succeeded,
private final CacheManager cacheManager; private final CacheManager cacheManager;
private final DatanodeStatistics datanodeStatistics; private final DatanodeStatistics datanodeStatistics;
// whether setStoragePolicy is allowed.
private final boolean isStoragePolicyEnabled;
private String nameserviceId; private String nameserviceId;
private RollingUpgradeInfo rollingUpgradeInfo = null; private RollingUpgradeInfo rollingUpgradeInfo = null;
@ -781,6 +786,10 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics(); this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.blockIdGenerator = new SequentialBlockIdGenerator(this.blockManager); this.blockIdGenerator = new SequentialBlockIdGenerator(this.blockManager);
this.isStoragePolicyEnabled =
conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
DFS_STORAGE_POLICY_ENABLED_DEFAULT);
this.fsOwner = UserGroupInformation.getCurrentUser(); this.fsOwner = UserGroupInformation.getCurrentUser();
this.fsOwnerShortUserName = fsOwner.getShortUserName(); this.fsOwnerShortUserName = fsOwner.getShortUserName();
this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
@ -2273,8 +2282,17 @@ void setStoragePolicy(String src, final String policyName)
} }
private void setStoragePolicyInt(String src, final String policyName) private void setStoragePolicyInt(String src, final String policyName)
throws IOException { throws IOException, UnresolvedLinkException, AccessControlException {
checkSuperuserPrivilege();
if (!isStoragePolicyEnabled) {
throw new IOException("Failed to set storage policy since "
+ DFS_STORAGE_POLICY_ENABLED_KEY + " is set to false.");
}
FSPermissionChecker pc = null;
if (isPermissionEnabled) {
pc = getPermissionChecker();
}
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
waitForLoadingFSImage(); waitForLoadingFSImage();
@ -2283,6 +2301,12 @@ private void setStoragePolicyInt(String src, final String policyName)
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set storage policy for " + src); checkNameNodeSafeMode("Cannot set storage policy for " + src);
if (pc != null) {
checkPermission(pc, src, false, null, null, FsAction.WRITE, null,
false, true);
}
src = FSDirectory.resolvePath(src, pathComponents, dir); src = FSDirectory.resolvePath(src, pathComponents, dir);
// get the corresponding policy and make sure the policy name is valid // get the corresponding policy and make sure the policy name is valid

View File

@ -2132,4 +2132,12 @@
</description> </description>
</property> </property>
<property>
<name>dfs.storage.policy.enabled</name>
<value>true</value>
<description>
Allow users to change the storage policy on files and directories.
</description>
</property>
</configuration> </configuration>

View File

@ -21,6 +21,7 @@
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.*; import java.util.*;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -69,6 +70,40 @@ public class TestBlockStoragePolicy {
static final byte WARM = (byte) 8; static final byte WARM = (byte) 8;
static final byte HOT = (byte) 12; static final byte HOT = (byte) 12;
@Test (timeout=300000)
public void testConfigKeyEnabled() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
try {
cluster.waitActive();
cluster.getFileSystem().setStoragePolicy(new Path("/"), "COLD");
} finally {
cluster.shutdown();
}
}
/**
* Ensure that setStoragePolicy throws IOException when
* dfs.storage.policy.enabled is set to false.
* @throws IOException
*/
@Test (timeout=300000, expected=IOException.class)
public void testConfigKeyDisabled() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
try {
cluster.waitActive();
cluster.getFileSystem().setStoragePolicy(new Path("/"), "COLD");
} finally {
cluster.shutdown();
}
}
@Test @Test
public void testDefaultPolicies() { public void testDefaultPolicies() {
final Map<Byte, String> expectedPolicyStrings = new HashMap<Byte, String>(); final Map<Byte, String> expectedPolicyStrings = new HashMap<Byte, String>();