HDFS-15243. Add an option to prevent sub-directories of protected directories from deletion. Contributed by liuyanyu.

This commit is contained in:
Ayush Saxena 2020-05-12 13:11:31 +05:30
parent bd342bef64
commit 0fe49036e5
5 changed files with 121 additions and 0 deletions

View File

@ -1460,6 +1460,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.state.context.enabled";
public static final boolean DFS_NAMENODE_STATE_CONTEXT_ENABLED_DEFAULT = false;
/**
* whether to protect the subdirectories of directories which
* set on fs.protected.directories.
*/
public static final String DFS_PROTECTED_SUBDIRECTORIES_ENABLE =
"dfs.protected.subdirectories.enable";
// Default value for DFS_PROTECTED_SUBDIRECTORIES_ENABLE.
public static final boolean DFS_PROTECTED_SUBDIRECTORIES_ENABLE_DEFAULT =
false;
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

View File

@ -1787,6 +1787,18 @@ public class DFSUtil {
+ descendant);
}
}
if (fsd.isProtectedSubDirectoriesEnable()) {
while (!src.isEmpty()) {
int index = src.lastIndexOf(Path.SEPARATOR_CHAR);
src = src.substring(0, index);
if (protectedDirs.contains(src)) {
throw new AccessControlException(
"Cannot delete/rename subdirectory under protected subdirectory "
+ src);
}
}
}
}
/**

View File

@ -90,6 +90,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECI
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROTECTED_SUBDIRECTORIES_ENABLE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROTECTED_SUBDIRECTORIES_ENABLE_DEFAULT;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
@ -170,6 +172,7 @@ public class FSDirectory implements Closeable {
//
// Each entry in this set must be a normalized path.
private volatile SortedSet<String> protectedDirectories;
private final boolean isProtectedSubDirectoriesEnable;
private final boolean isPermissionEnabled;
private final boolean isPermissionContentSummarySubAccess;
@ -382,6 +385,9 @@ public class FSDirectory implements Closeable {
DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_DEFAULT);
this.protectedDirectories = parseProtectedDirectories(conf);
this.isProtectedSubDirectoriesEnable = conf.getBoolean(
DFS_PROTECTED_SUBDIRECTORIES_ENABLE,
DFS_PROTECTED_SUBDIRECTORIES_ENABLE_DEFAULT);
Preconditions.checkArgument(this.inodeXAttrsLimit >= 0,
"Cannot set a negative limit on the number of xattrs per inode (%s).",
@ -542,6 +548,10 @@ public class FSDirectory implements Closeable {
return protectedDirectories;
}
public boolean isProtectedSubDirectoriesEnable() {
return isProtectedSubDirectoriesEnable;
}
/**
* Set directories that cannot be removed unless empty, even by an
* administrator.

View File

@ -5870,4 +5870,12 @@
directories when permissions is enabled. Default value is false;
</description>
</property>
<property>
<name>dfs.protected.subdirectories.enable</name>
<value>false</value>
<description>whether to protect the subdirectories of directories which
set on fs.protected.directories.
</description>
</property>
</configuration>

View File

@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROTECTED_SUBDIRECTORIES_ENABLE;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -193,6 +194,25 @@ public class TestProtectedDirectories {
return matrix;
}
private Collection<TestMatrixEntry> createTestMatrixForProtectSubDirs() {
Collection<TestMatrixEntry> matrix = new ArrayList<TestMatrixEntry>();
// Nested unprotected dirs.
matrix.add(TestMatrixEntry.get()
.addUnprotectedDir("/1", true)
.addUnprotectedDir("/1/2", true)
.addUnprotectedDir("/1/2/3", true)
.addUnprotectedDir("/1/2/3/4", true));
// Non-empty protected dir.
matrix.add(TestMatrixEntry.get()
.addProtectedDir("/1", false)
.addUnprotectedDir("/1/2", false)
.addUnprotectedDir("/1/2/3", false)
.addUnprotectedDir("/1/2/3/4", true));
return matrix;
}
@Test
public void testReconfigureProtectedPaths() throws Throwable {
Configuration conf = new HdfsConfiguration();
@ -292,6 +312,67 @@ public class TestProtectedDirectories {
}
}
@Test
public void testRenameProtectSubDirs() throws Throwable {
for (TestMatrixEntry testMatrixEntry :
createTestMatrixForProtectSubDirs()) {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFS_PROTECTED_SUBDIRECTORIES_ENABLE, true);
MiniDFSCluster cluster = setupTestCase(
conf, testMatrixEntry.getProtectedPaths(),
testMatrixEntry.getUnprotectedPaths());
try {
LOG.info("Running {}", testMatrixEntry);
FileSystem fs = cluster.getFileSystem();
for (Path srcPath : testMatrixEntry.getAllPathsToBeDeleted()) {
assertThat(
testMatrixEntry + ": Testing whether "
+ srcPath + " can be renamed",
renamePath(fs, srcPath,
new Path(srcPath.toString() + "_renamed")),
is(testMatrixEntry.canPathBeRenamed(srcPath)));
}
} finally {
cluster.shutdown();
}
}
}
@Test
public void testDeleteProtectSubDirs() throws Throwable {
for (TestMatrixEntry testMatrixEntry :
createTestMatrixForProtectSubDirs()) {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFS_PROTECTED_SUBDIRECTORIES_ENABLE, true);
MiniDFSCluster cluster = setupTestCase(
conf, testMatrixEntry.getProtectedPaths(),
testMatrixEntry.getUnprotectedPaths());
try {
LOG.info("Running {}", testMatrixEntry);
FileSystem fs = cluster.getFileSystem();
for (Path path : testMatrixEntry.getAllPathsToBeDeleted()) {
final long countBefore = cluster.getNamesystem().getFilesTotal();
assertThat(
testMatrixEntry + ": Testing whether "
+ path + " can be deleted",
deletePath(fs, path),
is(testMatrixEntry.canPathBeDeleted(path)));
final long countAfter = cluster.getNamesystem().getFilesTotal();
if (!testMatrixEntry.canPathBeDeleted(path)) {
assertThat(
"Either all paths should be deleted or none",
countAfter, is(countBefore));
}
}
} finally {
cluster.shutdown();
}
}
}
/**
* Verify that configured paths are normalized by removing
* redundant separators.