HDFS-13009. Allow creation of encryption zone if directory is not empty

This commit is contained in:
Zehao Chen 2020-09-22 10:14:34 -05:00
parent c8c1cc43d3
commit 38ee793319
2 changed files with 58 additions and 28 deletions

View File

@ -550,10 +550,6 @@ public class EncryptionZoneManager {
"Directory " + srcIIP.getPath() + " is already an encryption zone.");
}
if (dir.isNonEmptyDirectory(srcIIP)) {
throw new IOException(
"Attempt to create an encryption zone for a non-empty directory.");
}
final HdfsProtos.ZoneEncryptionInfoProto proto =
PBHelperClient.convert(suite, version, keyName);
final XAttr ezXAttr = XAttrHelper

View File

@ -462,34 +462,26 @@ public class TestEncryptionZones {
assertExceptionContains("is already an encryption zone", e);
}
/* create EZ on parent of an EZ should fail */
try {
dfsAdmin.createEncryptionZone(zoneParent, TEST_KEY, NO_TRASH);
fail("EZ over an EZ");
} catch (IOException e) {
assertExceptionContains("encryption zone for a non-empty directory", e);
}
/* create EZ on parent of an EZ should not fail */
dfsAdmin.createEncryptionZone(zoneParent, TEST_KEY, NO_TRASH);
assertNumZones(++numZones);
/* create EZ on a folder with a folder fails */
final Path notEmpty = new Path("/notEmpty");
final Path notEmptyChild = new Path(notEmpty, "child");
/* create EZ on a folder with a folder succeeds */
Path notEmpty = new Path("/notEmpty");
Path notEmptyChild = new Path(notEmpty, "child");
fsWrapper.mkdir(notEmptyChild, FsPermission.getDirDefault(), true);
try {
dfsAdmin.createEncryptionZone(notEmpty, TEST_KEY, NO_TRASH);
fail("Created EZ on an non-empty directory with folder");
} catch (IOException e) {
assertExceptionContains("create an encryption zone", e);
}
fsWrapper.delete(notEmptyChild, false);
dfsAdmin.createEncryptionZone(notEmpty, TEST_KEY, NO_TRASH);
fsWrapper.delete(notEmpty, true);
notEmpty = new Path("/notEmpty");
notEmptyChild = new Path(notEmpty, "child");
/* create EZ on a folder with a file fails */
fsWrapper.createFile(notEmptyChild);
try {
dfsAdmin.createEncryptionZone(notEmpty, TEST_KEY, NO_TRASH);
fail("Created EZ on an non-empty directory with file");
} catch (IOException e) {
assertExceptionContains("create an encryption zone", e);
}
dfsAdmin.createEncryptionZone(notEmpty, TEST_KEY, NO_TRASH);
fsWrapper.delete(notEmpty, true);
fsWrapper.createFile(notEmptyChild);
/* Test failure of create EZ on a file. */
try {
@ -524,7 +516,7 @@ public class TestEncryptionZones {
assertExceptionContains("Must specify a key name when creating", e);
}
assertNumZones(1);
assertNumZones(2);
/* Test success of creating an EZ when they key exists. */
DFSTestUtil.createKey(myKeyName, cluster, conf);
@ -2410,4 +2402,46 @@ public class TestEncryptionZones {
"not found.", ioe);
}
}
@Test
public void testCreationEZOnNonEmptyDirectory() throws Exception {
final Path zonePath = new Path("/zone");
fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
Path p1 = new Path(zonePath, "p1");
// Create a file in non-encrypted area.
String unEncryptedBytes = "hello world";
OutputStream unEncryptedOs = fs.create(p1, false);
unEncryptedOs.write(unEncryptedBytes.getBytes());
unEncryptedOs.close();
// Create an Encryption Zone.
dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH);
// Create a file in an encrpyted area.
Path p2 = new Path(zonePath, "p2");
OutputStream encryptedOs = fs.create(p2, false);
String encryptedBytes = "foo bar";
encryptedOs.write(encryptedBytes.getBytes());
encryptedOs.close();
InputStream unEncryptedStream = fs.open(p1);
verifyStreamsSame(unEncryptedBytes, unEncryptedStream);
// Raw bytes should be the same as original contents.
InputStream unEncryptedReservedStream = fs.open(
new Path("/.reserved/raw" + p1.toString()));
verifyStreamsSame(unEncryptedBytes, unEncryptedReservedStream);
InputStream encryptedStream = fs.open(p2);
verifyStreamsSame(encryptedBytes, encryptedStream);
InputStream encryptedReservedStream = fs.open(
new Path("/.reserved/raw" + p2.toString()));
InputStream cryptoStream =
fs.open(p2).getWrappedStream();
Assert.assertTrue("cryptoStream should be an instance of "
+ "CryptoInputStream", (cryptoStream instanceof CryptoInputStream));
InputStream encryptedCryptoStream =
((CryptoInputStream)cryptoStream).getWrappedStream();
// Raw bytes shouldn't be same as original data.
verifyRaw(encryptedBytes, encryptedCryptoStream, encryptedReservedStream);
}
}