HADOOP-14820 Wasb mkdirs security checks inconsistent with HDFS.
Contributed by Sivaguru Sankaridurg
This commit is contained in:
parent
06578bc2ea
commit
a3e1a2dce2
|
@ -2425,13 +2425,13 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
private Path getAncestor(Path f) throws IOException {
|
||||
|
||||
for (Path current = f.getParent(), parent = current.getParent();
|
||||
for (Path current = f, parent = current.getParent();
|
||||
parent != null; // Stop when you get to the root
|
||||
current = parent, parent = current.getParent()) {
|
||||
|
||||
String currentKey = pathToKey(current);
|
||||
FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
|
||||
if (currentMetadata != null) {
|
||||
if (currentMetadata != null && currentMetadata.isDir()) {
|
||||
Path ancestor = keyToPath(currentMetadata.getKey());
|
||||
LOG.debug("Found ancestor {}, for path: {}", ancestor.toString(), f.toString());
|
||||
return ancestor;
|
||||
|
@ -2448,7 +2448,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException {
|
||||
|
||||
|
||||
LOG.debug("Creating directory: {}", f.toString());
|
||||
|
||||
if (containsColon(f)) {
|
||||
|
@ -2459,6 +2458,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
Path absolutePath = makeAbsolute(f);
|
||||
Path ancestor = getAncestor(absolutePath);
|
||||
|
||||
if (absolutePath.equals(ancestor)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
performAuthCheck(ancestor, WasbAuthorizationOperations.WRITE, "mkdirs", absolutePath);
|
||||
|
||||
PermissionStatus permissionStatus = null;
|
||||
|
|
|
@ -603,6 +603,70 @@ public class TestNativeAzureFileSystemAuthorization
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Positive test for mkdirs -p with existing hierarchy
|
||||
* @throws Throwable
|
||||
*/
|
||||
@Test
|
||||
public void testMkdirsWithExistingHierarchyCheckPositive1() throws Throwable {
|
||||
|
||||
Path testPath = new Path("/testMkdirsWithExistingHierarchyCheckPositive1");
|
||||
|
||||
authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
|
||||
fs.updateWasbAuthorizer(authorizer);
|
||||
|
||||
try {
|
||||
fs.mkdirs(testPath);
|
||||
ContractTestUtils.assertIsDirectory(fs, testPath);
|
||||
|
||||
/* Don't need permissions to create a directory that already exists */
|
||||
authorizer.deleteAllAuthRules();
|
||||
|
||||
fs.mkdirs(testPath);
|
||||
ContractTestUtils.assertIsDirectory(fs, testPath);
|
||||
}
|
||||
finally {
|
||||
allowRecursiveDelete(fs, testPath.toString());
|
||||
fs.delete(testPath, true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMkdirsWithExistingHierarchyCheckPositive2() throws Throwable {
|
||||
|
||||
Path testPath = new Path("/testMkdirsWithExistingHierarchyCheckPositive2");
|
||||
Path childPath1 = new Path(testPath, "1");
|
||||
Path childPath2 = new Path(childPath1, "2");
|
||||
Path childPath3 = new Path(childPath2, "3");
|
||||
|
||||
authorizer.addAuthRule("/",
|
||||
WasbAuthorizationOperations.WRITE.toString(), true);
|
||||
|
||||
authorizer.addAuthRule(childPath1.toString(),
|
||||
WasbAuthorizationOperations.WRITE.toString(), true);
|
||||
|
||||
fs.updateWasbAuthorizer(authorizer);
|
||||
|
||||
try {
|
||||
fs.mkdirs(childPath1);
|
||||
ContractTestUtils.assertIsDirectory(fs, childPath1);
|
||||
|
||||
// Path already exists => no-op.
|
||||
fs.mkdirs(testPath);
|
||||
ContractTestUtils.assertIsDirectory(fs, testPath);
|
||||
|
||||
// Path already exists => no-op.
|
||||
fs.mkdirs(childPath1);
|
||||
ContractTestUtils.assertIsDirectory(fs, childPath1);
|
||||
|
||||
// Check permissions against existing ancestor childPath1
|
||||
fs.mkdirs(childPath3);
|
||||
ContractTestUtils.assertIsDirectory(fs, childPath3);
|
||||
} finally {
|
||||
allowRecursiveDelete(fs, testPath.toString());
|
||||
fs.delete(testPath, true);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Negative test for mkdirs access check
|
||||
* @throws Throwable
|
||||
|
|
|
@ -108,17 +108,13 @@ public class TestAzureFileSystemInstrumentation {
|
|||
|
||||
// Create a directory
|
||||
assertTrue(fs.mkdirs(new Path("a")));
|
||||
// At the time of writing, it takes 1 request to create the actual directory,
|
||||
// plus 2 requests per level to check that there's no blob with that name and
|
||||
// 1 request per level above to create it if it doesn't exist.
|
||||
// So for the path above (/user/<name>/a), it takes 2 requests each to check
|
||||
// there's no blob called /user, no blob called /user/<name> and no blob
|
||||
// called /user/<name>/a, and then 3 request for the creation of the three
|
||||
// levels, and then 2 requests for checking/stamping the version of AS,
|
||||
// totaling 11.
|
||||
// Also, there's the initial 1 request for container check so total is 12.
|
||||
// The getAncestor call at the very beginning adds another 4 calls, totalling 16.
|
||||
base = assertWebResponsesInRange(base, 1, 16);
|
||||
// At the time of writing
|
||||
// getAncestor uses 2 calls for each folder level /user/<name>/a
|
||||
// plus 1 call made by checkContainer
|
||||
// mkdir checks the hierarchy with 2 calls per level
|
||||
// mkdirs calls storeEmptyDir to create the empty folder, which makes 5 calls
|
||||
// For a total of 7 + 6 + 5 = 18 web responses
|
||||
base = assertWebResponsesInRange(base, 1, 18);
|
||||
assertEquals(1,
|
||||
AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_DIRECTORIES_CREATED));
|
||||
|
||||
|
|
Loading…
Reference in New Issue