diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index a0e176acd94..5e1d7176fa5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -514,6 +514,8 @@ Release 2.6.0 - UNRELEASED
HDFS-7156. Update fsck documentation. (Masahiro Yamaguch via shv)
+ HDFS-7093. Add config key to restrict setStoragePolicy. (Arpit Agarwal)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 3aa9acfe42b..b016750469e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -558,6 +558,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DOMAIN_SOCKET_PATH_KEY = "dfs.domain.socket.path";
public static final String DFS_DOMAIN_SOCKET_PATH_DEFAULT = "";
+ public static final String DFS_STORAGE_POLICY_ENABLED_KEY = "dfs.storage.policy.enabled";
+ public static final boolean DFS_STORAGE_POLICY_ENABLED_DEFAULT = true;
+
// HA related configuration
public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
public static final String DFS_HA_NAMENODE_ID_KEY = "dfs.ha.namenode.id";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 4ae47b7d0b6..9b4380c9832 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -84,6 +84,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROU
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
import static org.apache.hadoop.util.Time.now;
@@ -420,6 +422,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private final CacheManager cacheManager;
private final DatanodeStatistics datanodeStatistics;
+ // whether setStoragePolicy is allowed.
+ private final boolean isStoragePolicyEnabled;
+
private String nameserviceId;
private RollingUpgradeInfo rollingUpgradeInfo = null;
@@ -781,6 +786,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.blockIdGenerator = new SequentialBlockIdGenerator(this.blockManager);
+ this.isStoragePolicyEnabled =
+ conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
+ DFS_STORAGE_POLICY_ENABLED_DEFAULT);
+
this.fsOwner = UserGroupInformation.getCurrentUser();
this.fsOwnerShortUserName = fsOwner.getShortUserName();
this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
@@ -2273,8 +2282,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
private void setStoragePolicyInt(String src, final String policyName)
- throws IOException {
- checkSuperuserPrivilege();
+ throws IOException, UnresolvedLinkException, AccessControlException {
+
+ if (!isStoragePolicyEnabled) {
+ throw new IOException("Failed to set storage policy since "
+ + DFS_STORAGE_POLICY_ENABLED_KEY + " is set to false.");
+ }
+ FSPermissionChecker pc = null;
+ if (isPermissionEnabled) {
+ pc = getPermissionChecker();
+ }
+
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
waitForLoadingFSImage();
@@ -2283,6 +2301,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set storage policy for " + src);
+
+ if (pc != null) {
+ checkPermission(pc, src, false, null, null, FsAction.WRITE, null,
+ false, true);
+ }
+
src = FSDirectory.resolvePath(src, pathComponents, dir);
// get the corresponding policy and make sure the policy name is valid
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 2d4109a801e..23b25fa7480 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2132,4 +2132,12 @@
+
+ dfs.storage.policy.enabled
+ true
+
+ Allow users to change the storage policy on files and directories.
+
+
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
index 39d143946f0..771b7bd5b59 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySu
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.util.*;
import com.google.common.collect.Lists;
@@ -69,6 +70,40 @@ public class TestBlockStoragePolicy {
static final byte WARM = (byte) 8;
static final byte HOT = (byte) 12;
+
+ @Test (timeout=300000)
+ public void testConfigKeyEnabled() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, true);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build();
+ try {
+ cluster.waitActive();
+ cluster.getFileSystem().setStoragePolicy(new Path("/"), "COLD");
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Ensure that setStoragePolicy throws IOException when
+ * dfs.storage.policy.enabled is set to false.
+ * @throws IOException
+ */
+ @Test (timeout=300000, expected=IOException.class)
+ public void testConfigKeyDisabled() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build();
+ try {
+ cluster.waitActive();
+ cluster.getFileSystem().setStoragePolicy(new Path("/"), "COLD");
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
@Test
public void testDefaultPolicies() {
final Map expectedPolicyStrings = new HashMap();