diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java
index 27d8f501d1a..104d8c3dd37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java
@@ -34,6 +34,12 @@ public static EncryptionFaultInjector getInstance() {
return instance;
}
+ @VisibleForTesting
+ public void startFileNoKey() throws IOException {}
+
+ @VisibleForTesting
+ public void startFileBeforeGenerateKey() throws IOException {}
+
@VisibleForTesting
public void startFileAfterGenerateKey() throws IOException {}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
index ad5869b1526..0f6d4a6772f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
@@ -260,12 +260,14 @@ EncryptionZone getEZINodeForPath(INodesInPath iip) {
*
* @param srcIIP source IIP
* @param dstIIP destination IIP
- * @param src source path, used for debugging
* @throws IOException if the src cannot be renamed to the dst
*/
- void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src)
+ void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP)
throws IOException {
assert dir.hasReadLock();
+ if (!hasCreatedEncryptionZone()) {
+ return;
+ }
final EncryptionZoneInt srcParentEZI =
getParentEncryptionZoneForPath(srcIIP);
final EncryptionZoneInt dstParentEZI =
@@ -274,17 +276,17 @@ void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src)
final boolean dstInEZ = (dstParentEZI != null);
if (srcInEZ && !dstInEZ) {
throw new IOException(
- src + " can't be moved from an encryption zone.");
+ srcIIP.getPath() + " can't be moved from an encryption zone.");
} else if (dstInEZ && !srcInEZ) {
throw new IOException(
- src + " can't be moved into an encryption zone.");
+ srcIIP.getPath() + " can't be moved into an encryption zone.");
}
if (srcInEZ) {
if (srcParentEZI != dstParentEZI) {
final String srcEZPath = getFullPathName(srcParentEZI);
final String dstEZPath = getFullPathName(dstParentEZI);
- final StringBuilder sb = new StringBuilder(src);
+ final StringBuilder sb = new StringBuilder(srcIIP.getPath());
sb.append(" can't be moved from encryption zone ");
sb.append(srcEZPath);
sb.append(" to encryption zone ");
@@ -300,15 +302,14 @@ void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src)
*
* Called while holding the FSDirectory lock.
*/
- XAttr createEncryptionZone(String src, CipherSuite suite,
+ XAttr createEncryptionZone(INodesInPath srcIIP, CipherSuite suite,
CryptoProtocolVersion version, String keyName)
throws IOException {
assert dir.hasWriteLock();
// Check if src is a valid path for new EZ creation
- final INodesInPath srcIIP = dir.getINodesInPath4Write(src, false);
- if (srcIIP == null || srcIIP.getLastINode() == null) {
- throw new FileNotFoundException("cannot find " + src);
+ if (srcIIP.getLastINode() == null) {
+ throw new FileNotFoundException("cannot find " + srcIIP.getPath());
}
if (dir.isNonEmptyDirectory(srcIIP)) {
throw new IOException(
@@ -322,8 +323,8 @@ XAttr createEncryptionZone(String src, CipherSuite suite,
if (hasCreatedEncryptionZone() && encryptionZones.
get(srcINode.getId()) != null) {
- throw new IOException("Directory " + src + " is already an encryption " +
- "zone.");
+ throw new IOException(
+ "Directory " + srcIIP.getPath() + " is already an encryption zone.");
}
final HdfsProtos.ZoneEncryptionInfoProto proto =
@@ -335,7 +336,7 @@ XAttr createEncryptionZone(String src, CipherSuite suite,
xattrs.add(ezXAttr);
// updating the xattr will call addEncryptionZone,
// done this way to handle edit log loading
- FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs,
+ FSDirXAttrOp.unprotectedSetXAttrs(dir, srcIIP, xattrs,
EnumSet.of(XAttrSetFlag.CREATE));
return ezXAttr;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
index 5457f0810db..d7a36112781 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
@@ -72,8 +72,11 @@ private FSDirEncryptionZoneOp() {}
* @return New EDEK, or null if ezKeyName is null
* @throws IOException
*/
- static EncryptedKeyVersion generateEncryptedDataEncryptionKey(
+ private static EncryptedKeyVersion generateEncryptedDataEncryptionKey(
final FSDirectory fsd, final String ezKeyName) throws IOException {
+ // must not be holding lock during this operation
+ assert !fsd.getFSNamesystem().hasReadLock();
+ assert !fsd.getFSNamesystem().hasWriteLock();
if (ezKeyName == null) {
return null;
}
@@ -147,23 +150,21 @@ static HdfsFileStatus createEncryptionZone(final FSDirectory fsd,
final String keyName, final boolean logRetryCache) throws IOException {
final CipherSuite suite = CipherSuite.convert(cipher);
List xAttrs = Lists.newArrayListWithCapacity(1);
- final String src;
// For now this is hard coded, as we only support one method.
final CryptoProtocolVersion version =
CryptoProtocolVersion.ENCRYPTION_ZONES;
+ final INodesInPath iip;
fsd.writeLock();
try {
- final INodesInPath iip = fsd.resolvePath(pc, srcArg);
- src = iip.getPath();
- final XAttr ezXAttr = fsd.ezManager.createEncryptionZone(src, suite,
+ iip = fsd.resolvePathForWrite(pc, srcArg);
+ final XAttr ezXAttr = fsd.ezManager.createEncryptionZone(iip, suite,
version, keyName);
xAttrs.add(ezXAttr);
} finally {
fsd.writeUnlock();
}
- fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
- final INodesInPath iip = fsd.getINodesInPath4Write(src, false);
+ fsd.getEditLog().logSetXAttrs(iip.getPath(), xAttrs, logRetryCache);
return fsd.getAuditFileInfo(iip);
}
@@ -223,8 +224,9 @@ static BatchedListEntries listEncryptionZones(
* @param info file encryption information
* @throws IOException
*/
- static void setFileEncryptionInfo(final FSDirectory fsd, final String src,
- final FileEncryptionInfo info) throws IOException {
+ static void setFileEncryptionInfo(final FSDirectory fsd,
+ final INodesInPath iip, final FileEncryptionInfo info)
+ throws IOException {
// Make the PB for the xattr
final HdfsProtos.PerFileEncryptionInfoProto proto =
PBHelperClient.convertPerFileEncInfo(info);
@@ -235,7 +237,7 @@ static void setFileEncryptionInfo(final FSDirectory fsd, final String src,
xAttrs.add(fileEncryptionAttr);
fsd.writeLock();
try {
- FSDirXAttrOp.unprotectedSetXAttrs(fsd, src, xAttrs,
+ FSDirXAttrOp.unprotectedSetXAttrs(fsd, iip, xAttrs,
EnumSet.of(XAttrSetFlag.CREATE));
} finally {
fsd.writeUnlock();
@@ -246,21 +248,18 @@ static void setFileEncryptionInfo(final FSDirectory fsd, final String src,
* This function combines the per-file encryption info (obtained
* from the inode's XAttrs), and the encryption info from its zone, and
* returns a consolidated FileEncryptionInfo instance. Null is returned
- * for non-encrypted files.
+ * for non-encrypted or raw files.
*
* @param fsd fsdirectory
- * @param inode inode of the file
- * @param snapshotId ID of the snapshot that
- * we want to get encryption info from
* @param iip inodes in the path containing the file, passed in to
- * avoid obtaining the list of inodes again; if iip is
- * null then the list of inodes will be obtained again
+ * avoid obtaining the list of inodes again
* @return consolidated file encryption info; null for non-encrypted files
*/
static FileEncryptionInfo getFileEncryptionInfo(final FSDirectory fsd,
- final INode inode, final int snapshotId, final INodesInPath iip)
- throws IOException {
- if (!inode.isFile() || !fsd.ezManager.hasCreatedEncryptionZone()) {
+ final INodesInPath iip) throws IOException {
+ if (iip.isRaw() ||
+ !fsd.ezManager.hasCreatedEncryptionZone() ||
+ !iip.getLastINode().isFile()) {
return null;
}
fsd.readLock();
@@ -280,8 +279,8 @@ static FileEncryptionInfo getFileEncryptionInfo(final FSDirectory fsd,
final CryptoProtocolVersion version = encryptionZone.getVersion();
final CipherSuite suite = encryptionZone.getSuite();
final String keyName = encryptionZone.getKeyName();
- XAttr fileXAttr = FSDirXAttrOp.unprotectedGetXAttrByPrefixedName(inode,
- snapshotId, CRYPTO_XATTR_FILE_ENCRYPTION_INFO);
+ XAttr fileXAttr = FSDirXAttrOp.unprotectedGetXAttrByPrefixedName(
+ iip, CRYPTO_XATTR_FILE_ENCRYPTION_INFO);
if (fileXAttr == null) {
NameNode.LOG.warn("Could not find encryption XAttr for file " +
@@ -295,15 +294,53 @@ static FileEncryptionInfo getFileEncryptionInfo(final FSDirectory fsd,
return PBHelperClient.convert(fileProto, suite, version, keyName);
} catch (InvalidProtocolBufferException e) {
throw new IOException("Could not parse file encryption info for " +
- "inode " + inode, e);
+ "inode " + iip.getPath(), e);
}
} finally {
fsd.readUnlock();
}
}
+ /**
+ * If the file and encryption key are valid, return the encryption info,
+ * else throw a retry exception. The startFile method generates the EDEK
+ * outside of the lock so the zone must be reverified.
+ *
+ * @param dir fsdirectory
+ * @param iip inodes in the file path
+ * @param ezInfo the encryption key
+ * @return FileEncryptionInfo for the file
+ * @throws RetryStartFileException if key is inconsistent with current zone
+ */
+ static FileEncryptionInfo getFileEncryptionInfo(FSDirectory dir,
+ INodesInPath iip, EncryptionKeyInfo ezInfo)
+ throws RetryStartFileException {
+ FileEncryptionInfo feInfo = null;
+ final EncryptionZone zone = getEZForPath(dir, iip);
+ if (zone != null) {
+ // The path is now within an EZ, but we're missing encryption parameters
+ if (ezInfo == null) {
+ throw new RetryStartFileException();
+ }
+ // Path is within an EZ and we have provided encryption parameters.
+ // Make sure that the generated EDEK matches the settings of the EZ.
+ final String ezKeyName = zone.getKeyName();
+ if (!ezKeyName.equals(ezInfo.edek.getEncryptionKeyName())) {
+ throw new RetryStartFileException();
+ }
+ feInfo = new FileEncryptionInfo(ezInfo.suite, ezInfo.protocolVersion,
+ ezInfo.edek.getEncryptedKeyVersion().getMaterial(),
+ ezInfo.edek.getEncryptedKeyIv(),
+ ezKeyName, ezInfo.edek.getEncryptionKeyVersionName());
+ }
+ return feInfo;
+ }
+
static boolean isInAnEZ(final FSDirectory fsd, final INodesInPath iip)
throws UnresolvedLinkException, SnapshotAccessControlException {
+ if (!fsd.ezManager.hasCreatedEncryptionZone()) {
+ return false;
+ }
fsd.readLock();
try {
return fsd.ezManager.isInAnEZ(iip);
@@ -399,4 +436,67 @@ public void run() {
}
}
}
+
+ /**
+ * If the file is in an encryption zone, we optimistically create an
+ * EDEK for the file by calling out to the configured KeyProvider.
+ * Since this typically involves doing an RPC, the fsn lock is yielded.
+ *
+ * Since the path can flip-flop between being in an encryption zone and not
+ * in the meantime, the call MUST re-resolve the IIP and re-check
+ * preconditions if this method does not return null;
+ *
+ * @param fsn the namesystem.
+ * @param iip the inodes for the path
+ * @param supportedVersions client's supported versions
+ * @return EncryptionKeyInfo if the path is in an EZ, else null
+ */
+ static EncryptionKeyInfo getEncryptionKeyInfo(FSNamesystem fsn,
+ INodesInPath iip, CryptoProtocolVersion[] supportedVersions)
+ throws IOException {
+ FSDirectory fsd = fsn.getFSDirectory();
+ // Nothing to do if the path is not within an EZ
+ final EncryptionZone zone = getEZForPath(fsd, iip);
+ if (zone == null) {
+ EncryptionFaultInjector.getInstance().startFileNoKey();
+ return null;
+ }
+ CryptoProtocolVersion protocolVersion = fsn.chooseProtocolVersion(
+ zone, supportedVersions);
+ CipherSuite suite = zone.getSuite();
+ String ezKeyName = zone.getKeyName();
+
+ Preconditions.checkNotNull(protocolVersion);
+ Preconditions.checkNotNull(suite);
+ Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
+ "Chose an UNKNOWN CipherSuite!");
+ Preconditions.checkNotNull(ezKeyName);
+
+ // Generate EDEK while not holding the fsn lock.
+ fsn.writeUnlock();
+ try {
+ EncryptionFaultInjector.getInstance().startFileBeforeGenerateKey();
+ return new EncryptionKeyInfo(protocolVersion, suite, ezKeyName,
+ generateEncryptedDataEncryptionKey(fsd, ezKeyName));
+ } finally {
+ fsn.writeLock();
+ EncryptionFaultInjector.getInstance().startFileAfterGenerateKey();
+ }
+ }
+
+ static class EncryptionKeyInfo {
+ final CryptoProtocolVersion protocolVersion;
+ final CipherSuite suite;
+ final String ezKeyName;
+ final KeyProviderCryptoExtension.EncryptedKeyVersion edek;
+
+ EncryptionKeyInfo(
+ CryptoProtocolVersion protocolVersion, CipherSuite suite,
+ String ezKeyName, KeyProviderCryptoExtension.EncryptedKeyVersion edek) {
+ this.protocolVersion = protocolVersion;
+ this.suite = suite;
+ this.ezKeyName = ezKeyName;
+ this.edek = edek;
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
index 911b178ca5f..12d5cfead83 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -190,7 +190,7 @@ static INodesInPath unprotectedRenameTo(FSDirectory fsd,
return null;
}
- fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
+ fsd.ezManager.checkMoveValidity(srcIIP, dstIIP);
// Ensure dst has quota to accommodate rename
verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
verifyQuotaForRename(fsd, srcIIP, dstIIP);
@@ -382,7 +382,7 @@ static RenameResult unprotectedRenameTo(FSDirectory fsd,
}
BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
- fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
+ fsd.ezManager.checkMoveValidity(srcIIP, dstIIP);
final INode dstInode = dstIIP.getLastINode();
List snapshottableDirs = new ArrayList<>();
if (dstInode != null) { // Destination exists
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index 6c7a92fccc5..63301309f01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -161,7 +161,7 @@ static GetBlockLocationsResult getBlockLocations(
final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.READ);
- fsd.checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
+ fsd.checkUnreadableBySuperuser(pc, iip);
}
final long fileSize = iip.isSnapshot()
@@ -176,9 +176,8 @@ static GetBlockLocationsResult getBlockLocations(
isUc = false;
}
- final FileEncryptionInfo feInfo = iip.isRaw() ? null
- : FSDirEncryptionZoneOp.getFileEncryptionInfo(fsd, inode,
- iip.getPathSnapshotId(), iip);
+ final FileEncryptionInfo feInfo =
+ FSDirEncryptionZoneOp.getFileEncryptionInfo(fsd, iip);
final LocatedBlocks blocks = bm.createLocatedBlocks(
inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
@@ -411,22 +410,21 @@ private static HdfsFileStatus createFileStatus(
long size = 0; // length is zero for directories
short replication = 0;
long blocksize = 0;
- final boolean isEncrypted;
final INode node = iip.getLastINode();
final int snapshot = iip.getPathSnapshotId();
- final boolean isRawPath = iip.isRaw();
LocatedBlocks loc = null;
- final FileEncryptionInfo feInfo = isRawPath ? null : FSDirEncryptionZoneOp
- .getFileEncryptionInfo(fsd, node, snapshot, iip);
+ final boolean isEncrypted = FSDirEncryptionZoneOp.isInAnEZ(fsd, iip);
+ FileEncryptionInfo feInfo = null;
if (node.isFile()) {
final INodeFile fileNode = node.asFile();
size = fileNode.computeFileSize(snapshot);
replication = fileNode.getFileReplication(snapshot);
blocksize = fileNode.getPreferredBlockSize();
- isEncrypted = (feInfo != null)
- || (isRawPath && FSDirEncryptionZoneOp.isInAnEZ(fsd, iip));
+ if (isEncrypted) {
+ feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(fsd, iip);
+ }
if (needLocation) {
final boolean inSnapshot = snapshot != Snapshot.CURRENT_STATE_ID;
final boolean isUc = !inSnapshot && fileNode.isUnderConstruction();
@@ -439,8 +437,6 @@ private static HdfsFileStatus createFileStatus(
loc = new LocatedBlocks();
}
}
- } else {
- isEncrypted = FSDirEncryptionZoneOp.isInAnEZ(fsd, iip);
}
int childrenNum = node.isDirectory() ?
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 83ac64129b3..f69cfdb2064 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -19,14 +19,10 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.crypto.CipherSuite;
-import org.apache.hadoop.crypto.CryptoProtocolVersion;
-import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileEncryptionInfo;
-import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsAction;
@@ -37,7 +33,6 @@
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSLimitException;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -289,6 +284,37 @@ static Node getClientNode(BlockManager bm, String clientMachine) {
return clientNode;
}
+ static INodesInPath resolvePathForStartFile(FSDirectory dir,
+ FSPermissionChecker pc, String src, EnumSet flag,
+ boolean createParent) throws IOException {
+ INodesInPath iip = dir.resolvePathForWrite(pc, src);
+ if (dir.isPermissionEnabled()) {
+ dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
+ }
+ INode inode = iip.getLastINode();
+ if (inode != null) {
+ // Verify that the destination does not exist as a directory already.
+ if (inode.isDirectory()) {
+ throw new FileAlreadyExistsException(iip.getPath() +
+ " already exists as a directory");
+ }
+ // Verifies it's indeed a file and perms allow overwrite
+ INodeFile.valueOf(inode, src);
+ if (dir.isPermissionEnabled() && flag.contains(CreateFlag.OVERWRITE)) {
+ dir.checkPathAccess(pc, iip, FsAction.WRITE);
+ }
+ } else {
+ if (!createParent) {
+ dir.verifyParentDir(iip, src);
+ }
+ if (!flag.contains(CreateFlag.CREATE)) {
+ throw new FileNotFoundException("Can't overwrite non-existent " + src);
+ }
+ }
+ return iip;
+ }
+
+
/**
* Create a new file or overwrite an existing file
*
@@ -299,88 +325,22 @@ static Node getClientNode(BlockManager bm, String clientMachine) {
* {@link ClientProtocol#create}
*/
static HdfsFileStatus startFile(
- FSNamesystem fsn, FSPermissionChecker pc, String src,
+ FSNamesystem fsn, INodesInPath iip,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet flag, boolean createParent,
short replication, long blockSize,
- EncryptionKeyInfo ezInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
+ FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
boolean logRetryEntry)
throws IOException {
assert fsn.hasWriteLock();
- boolean create = flag.contains(CreateFlag.CREATE);
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
- CipherSuite suite = null;
- CryptoProtocolVersion version = null;
- KeyProviderCryptoExtension.EncryptedKeyVersion edek = null;
-
- if (ezInfo != null) {
- edek = ezInfo.edek;
- suite = ezInfo.suite;
- version = ezInfo.protocolVersion;
- }
-
+ final String src = iip.getPath();
FSDirectory fsd = fsn.getFSDirectory();
- INodesInPath iip = fsd.resolvePathForWrite(pc, src);
- src = iip.getPath();
- // Verify that the destination does not exist as a directory already.
- final INode inode = iip.getLastINode();
- if (inode != null && inode.isDirectory()) {
- throw new FileAlreadyExistsException(src +
- " already exists as a directory");
- }
-
- if (FSDirectory.isExactReservedName(src) || (FSDirectory.isReservedName(src)
- && !FSDirectory.isReservedRawName(src)
- && !FSDirectory.isReservedInodesName(src)) ) {
- throw new InvalidPathException(src);
- }
-
- final INodeFile myFile = INodeFile.valueOf(inode, src, true);
- if (fsd.isPermissionEnabled()) {
- if (overwrite && myFile != null) {
- fsd.checkPathAccess(pc, iip, FsAction.WRITE);
- }
- /*
- * To overwrite existing file, need to check 'w' permission
- * of parent (equals to ancestor in this case)
- */
- fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
- }
-
- if (!createParent) {
- fsd.verifyParentDir(iip, src);
- }
-
- if (myFile == null && !create) {
- throw new FileNotFoundException("Can't overwrite non-existent " +
- src + " for client " + clientMachine);
- }
-
- FileEncryptionInfo feInfo = null;
-
- final EncryptionZone zone = FSDirEncryptionZoneOp.getEZForPath(fsd, iip);
- if (zone != null) {
- // The path is now within an EZ, but we're missing encryption parameters
- if (suite == null || edek == null) {
- throw new RetryStartFileException();
- }
- // Path is within an EZ and we have provided encryption parameters.
- // Make sure that the generated EDEK matches the settings of the EZ.
- final String ezKeyName = zone.getKeyName();
- if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
- throw new RetryStartFileException();
- }
- feInfo = new FileEncryptionInfo(suite, version,
- edek.getEncryptedKeyVersion().getMaterial(),
- edek.getEncryptedKeyIv(),
- ezKeyName, edek.getEncryptionKeyVersionName());
- }
-
- if (myFile != null) {
+ if (iip.getLastINode() != null) {
if (overwrite) {
List toRemoveINodes = new ChunkedArrayList<>();
List toRemoveUCFiles = new ChunkedArrayList<>();
@@ -415,11 +375,9 @@ static HdfsFileStatus startFile(
newNode.getFileUnderConstructionFeature().getClientName(),
newNode.getId());
if (feInfo != null) {
- FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, src, feInfo);
- newNode = fsd.getInode(newNode.getId()).asFile();
+ FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, iip, feInfo);
}
- setNewINodeStoragePolicy(fsd.getBlockManager(), newNode, iip,
- isLazyPersist);
+ setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
@@ -428,30 +386,6 @@ static HdfsFileStatus startFile(
return FSDirStatAndListingOp.getFileInfo(fsd, iip);
}
- static EncryptionKeyInfo getEncryptionKeyInfo(FSNamesystem fsn,
- FSPermissionChecker pc, String src,
- CryptoProtocolVersion[] supportedVersions)
- throws IOException {
- FSDirectory fsd = fsn.getFSDirectory();
- INodesInPath iip = fsd.resolvePathForWrite(pc, src);
- // Nothing to do if the path is not within an EZ
- final EncryptionZone zone = FSDirEncryptionZoneOp.getEZForPath(fsd, iip);
- if (zone == null) {
- return null;
- }
- CryptoProtocolVersion protocolVersion = fsn.chooseProtocolVersion(
- zone, supportedVersions);
- CipherSuite suite = zone.getSuite();
- String ezKeyName = zone.getKeyName();
-
- Preconditions.checkNotNull(protocolVersion);
- Preconditions.checkNotNull(suite);
- Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
- "Chose an UNKNOWN CipherSuite!");
- Preconditions.checkNotNull(ezKeyName);
- return new EncryptionKeyInfo(protocolVersion, suite, ezKeyName);
- }
-
static INodeFile addFileForEditLog(
FSDirectory fsd, long id, INodesInPath existing, byte[] localName,
PermissionStatus permissions, List aclEntries,
@@ -790,10 +724,9 @@ private static void logAllocatedBlock(String src, BlockInfo b) {
NameNode.stateChangeLog.info(sb.toString());
}
- private static void setNewINodeStoragePolicy(BlockManager bm, INodeFile
- inode, INodesInPath iip, boolean isLazyPersist)
- throws IOException {
-
+ private static void setNewINodeStoragePolicy(BlockManager bm,
+ INodesInPath iip, boolean isLazyPersist) throws IOException {
+ INodeFile inode = iip.getLastINode().asFile();
if (isLazyPersist) {
BlockStoragePolicy lpPolicy =
bm.getStoragePolicy("LAZY_PERSIST");
@@ -847,19 +780,4 @@ static class ValidateAddBlockResult {
this.clientMachine = clientMachine;
}
}
-
- static class EncryptionKeyInfo {
- final CryptoProtocolVersion protocolVersion;
- final CipherSuite suite;
- final String ezKeyName;
- KeyProviderCryptoExtension.EncryptedKeyVersion edek;
-
- EncryptionKeyInfo(
- CryptoProtocolVersion protocolVersion, CipherSuite suite,
- String ezKeyName) {
- this.protocolVersion = protocolVersion;
- this.suite = suite;
- this.ezKeyName = ezKeyName;
- }
- }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index 08016c3bf47..6badf2472fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -75,7 +75,7 @@ static HdfsFileStatus setXAttr(
iip = fsd.resolvePathForWrite(pc, src);
src = iip.getPath();
checkXAttrChangeAccess(fsd, iip, xAttr, pc);
- unprotectedSetXAttrs(fsd, src, xAttrs, flag);
+ unprotectedSetXAttrs(fsd, iip, xAttrs, flag);
} finally {
fsd.writeUnlock();
}
@@ -253,14 +253,11 @@ static List filterINodeXAttrs(
}
static INode unprotectedSetXAttrs(
- FSDirectory fsd, final String src, final List xAttrs,
+ FSDirectory fsd, final INodesInPath iip, final List xAttrs,
final EnumSet flag)
throws IOException {
assert fsd.hasWriteLock();
- INodesInPath iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath(src),
- true);
INode inode = FSDirectory.resolveLastINode(iip);
- int snapshotId = iip.getLatestSnapshotId();
List existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
List newXAttrs = setINodeXAttrs(fsd, existingXAttrs, xAttrs, flag);
final boolean isFile = inode.isFile();
@@ -287,7 +284,7 @@ static INode unprotectedSetXAttrs(
}
}
- XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
+ XAttrStorage.updateINodeXAttrs(inode, newXAttrs, iip.getLatestSnapshotId());
return inode;
}
@@ -361,22 +358,20 @@ static List setINodeXAttrs(
return xAttrs;
}
- static XAttr getXAttrByPrefixedName(FSDirectory fsd, INode inode,
- int snapshotId, String prefixedName) throws IOException {
+ static XAttr getXAttrByPrefixedName(FSDirectory fsd, INodesInPath iip,
+ String prefixedName) throws IOException {
fsd.readLock();
try {
- return XAttrStorage.readINodeXAttrByPrefixedName(inode, snapshotId,
- prefixedName);
+ return XAttrStorage.readINodeXAttrByPrefixedName(iip, prefixedName);
} finally {
fsd.readUnlock();
}
}
static XAttr unprotectedGetXAttrByPrefixedName(
- INode inode, int snapshotId, String prefixedName)
+ INodesInPath iip, String prefixedName)
throws IOException {
- return XAttrStorage.readINodeXAttrByPrefixedName(inode, snapshotId,
- prefixedName);
+ return XAttrStorage.readINodeXAttrByPrefixedName(iip, prefixedName);
}
private static void checkXAttrChangeAccess(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 12013bffebf..824da0a9e7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -1660,11 +1660,10 @@ void checkPermission(FSPermissionChecker pc, INodesInPath iip,
}
}
- void checkUnreadableBySuperuser(
- FSPermissionChecker pc, INode inode, int snapshotId)
+ void checkUnreadableBySuperuser(FSPermissionChecker pc, INodesInPath iip)
throws IOException {
if (pc.isSuperUser()) {
- if (FSDirXAttrOp.getXAttrByPrefixedName(this, inode, snapshotId,
+ if (FSDirXAttrOp.getXAttrByPrefixedName(this, iip,
SECURITY_XATTR_UNREADABLE_BY_SUPERUSER) != null) {
throw new AccessControlException(
"Access is denied for " + pc.getUser() + " since the superuser "
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 84c29bfbb39..21c94f8565f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -895,7 +895,8 @@ fsDir, renameReservedPathsOnUpgrade(deleteOp.path, logVersion),
}
case OP_SET_XATTR: {
SetXAttrOp setXAttrOp = (SetXAttrOp) op;
- FSDirXAttrOp.unprotectedSetXAttrs(fsDir, setXAttrOp.src,
+ INodesInPath iip = fsDir.getINodesInPath4Write(setXAttrOp.src);
+ FSDirXAttrOp.unprotectedSetXAttrs(fsDir, iip,
setXAttrOp.xAttrs,
EnumSet.of(XAttrSetFlag.CREATE,
XAttrSetFlag.REPLACE));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index a0340f4ff0c..4f2548a25bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -150,6 +150,7 @@
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
@@ -220,6 +221,7 @@
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.namenode.FSDirEncryptionZoneOp.EncryptionKeyInfo;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
@@ -2129,7 +2131,7 @@ HdfsFileStatus startFile(String src, PermissionStatus permissions,
return status;
}
- private HdfsFileStatus startFileInt(final String src,
+ private HdfsFileStatus startFileInt(String src,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet flag, boolean createParent, short replication,
long blockSize, CryptoProtocolVersion[] supportedVersions,
@@ -2148,7 +2150,11 @@ private HdfsFileStatus startFileInt(final String src,
.append(Arrays.toString(supportedVersions));
NameNode.stateChangeLog.debug(builder.toString());
}
- if (!DFSUtil.isValidName(src)) {
+ if (!DFSUtil.isValidName(src) ||
+ FSDirectory.isExactReservedName(src) ||
+ (FSDirectory.isReservedName(src)
+ && !FSDirectory.isReservedRawName(src)
+ && !FSDirectory.isReservedInodesName(src))) {
throw new InvalidPathException(src);
}
blockManager.verifyReplication(src, replication, clientMachine);
@@ -2159,69 +2165,60 @@ private HdfsFileStatus startFileInt(final String src,
}
FSPermissionChecker pc = getPermissionChecker();
-
- /**
- * If the file is in an encryption zone, we optimistically create an
- * EDEK for the file by calling out to the configured KeyProvider.
- * Since this typically involves doing an RPC, we take the readLock
- * initially, then drop it to do the RPC.
- *
- * Since the path can flip-flop between being in an encryption zone and not
- * in the meantime, we need to recheck the preconditions when we retake the
- * lock to do the create. If the preconditions are not met, we throw a
- * special RetryStartFileException to ask the DFSClient to try the create
- * again later.
- */
- FSDirWriteFileOp.EncryptionKeyInfo ezInfo = null;
-
- if (provider != null) {
- readLock();
- try {
- checkOperation(OperationCategory.READ);
- ezInfo = FSDirWriteFileOp
- .getEncryptionKeyInfo(this, pc, src, supportedVersions);
- } finally {
- readUnlock();
- }
-
- // Generate EDEK if necessary while not holding the lock
- if (ezInfo != null) {
- ezInfo.edek = FSDirEncryptionZoneOp
- .generateEncryptedDataEncryptionKey(dir, ezInfo.ezKeyName);
- }
- EncryptionFaultInjector.getInstance().startFileAfterGenerateKey();
- }
-
- boolean skipSync = false;
+ INodesInPath iip = null;
+ boolean skipSync = true; // until we do something that might create edits
HdfsFileStatus stat = null;
+ BlocksMapUpdateInfo toRemoveBlocks = null;
- // Proceed with the create, using the computed cipher suite and
- // generated EDEK
- BlocksMapUpdateInfo toRemoveBlocks = new BlocksMapUpdateInfo();
+ checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create file" + src);
+
+ iip = FSDirWriteFileOp.resolvePathForStartFile(
+ dir, pc, src, flag, createParent);
+
+ FileEncryptionInfo feInfo = null;
+ if (provider != null) {
+ EncryptionKeyInfo ezInfo = FSDirEncryptionZoneOp.getEncryptionKeyInfo(
+ this, iip, supportedVersions);
+ // if the path has an encryption zone, the lock was released while
+ // generating the EDEK. re-resolve the path to ensure the namesystem
+ // and/or EZ has not mutated
+ if (ezInfo != null) {
+ checkOperation(OperationCategory.WRITE);
+ iip = FSDirWriteFileOp.resolvePathForStartFile(
+ dir, pc, iip.getPath(), flag, createParent);
+ feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(
+ dir, iip, ezInfo);
+ }
+ }
+
+ skipSync = false; // following might generate edits
+ toRemoveBlocks = new BlocksMapUpdateInfo();
dir.writeLock();
try {
- stat = FSDirWriteFileOp.startFile(this, pc, src, permissions, holder,
- clientMachine, flag, createParent,
- replication, blockSize, ezInfo,
- toRemoveBlocks, logRetryCache);
+ stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
+ clientMachine, flag, createParent,
+ replication, blockSize, feInfo,
+ toRemoveBlocks, logRetryCache);
+ } catch (IOException e) {
+ skipSync = e instanceof StandbyException;
+ throw e;
} finally {
dir.writeUnlock();
}
- } catch (IOException e) {
- skipSync = e instanceof StandbyException;
- throw e;
} finally {
writeUnlock();
// There might be transactions logged while trying to recover the lease.
// They need to be sync'ed even when an exception was thrown.
if (!skipSync) {
getEditLog().logSync();
- removeBlocks(toRemoveBlocks);
- toRemoveBlocks.clear();
+ if (toRemoveBlocks != null) {
+ removeBlocks(toRemoveBlocks);
+ toRemoveBlocks.clear();
+ }
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java
index 65a4ada3a7f..8a91e2a723b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java
@@ -51,9 +51,10 @@ public static String getName(int n) {
* @param prefixedName xAttr name with prefix
* @return the xAttr
*/
- public static XAttr readINodeXAttrByPrefixedName(INode inode,
- int snapshotId, String prefixedName) {
- XAttrFeature f = inode.getXAttrFeature(snapshotId);
+ public static XAttr readINodeXAttrByPrefixedName(INodesInPath iip,
+ String prefixedName) {
+ XAttrFeature f =
+ iip.getLastINode().getXAttrFeature(iip.getPathSnapshotId());
return f == null ? null : f.getXAttr(prefixedName);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java
index 30ccf5e1879..f4dc2fa8f16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java
@@ -19,7 +19,6 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.io.RandomAccessFile;
@@ -1048,7 +1047,7 @@ private void dTIEM(Path prefix) throws Exception {
}
private class MyInjector extends EncryptionFaultInjector {
- int generateCount;
+ volatile int generateCount;
CountDownLatch ready;
CountDownLatch wait;
@@ -1058,13 +1057,27 @@ public MyInjector() {
}
@Override
- public void startFileAfterGenerateKey() throws IOException {
+ public void startFileNoKey() throws IOException {
+ generateCount = -1;
+ syncWithLatches();
+ }
+
+ @Override
+ public void startFileBeforeGenerateKey() throws IOException {
+ syncWithLatches();
+ }
+
+ private void syncWithLatches() throws IOException {
ready.countDown();
try {
wait.await();
} catch (InterruptedException e) {
throw new IOException(e);
}
+ }
+
+ @Override
+ public void startFileAfterGenerateKey() throws IOException {
generateCount++;
}
}
@@ -1100,10 +1113,14 @@ public Void call() throws Exception {
Future future =
executor.submit(new CreateFileTask(fsWrapper, file));
injector.ready.await();
- // Do the fault
- doFault();
- // Allow create to proceed
- injector.wait.countDown();
+ try {
+ // Do the fault
+ doFault();
+ // Allow create to proceed
+ } finally {
+ // Always decrement latch to avoid hanging the tests on failure.
+ injector.wait.countDown();
+ }
future.get();
// Cleanup and postconditions
doCleanup();
@@ -1126,20 +1143,21 @@ public void testStartFileRetry() throws Exception {
fsWrapper.mkdir(zone1, FsPermission.getDirDefault(), true);
ExecutorService executor = Executors.newSingleThreadExecutor();
- // Test when the parent directory becomes an EZ
+ // Test when the parent directory becomes an EZ. With no initial EZ,
+ // the fsn lock must not be yielded.
executor.submit(new InjectFaultTask() {
- @Override
- public void doFault() throws Exception {
- dfsAdmin.createEncryptionZone(zone1, TEST_KEY, NO_TRASH);
- }
@Override
public void doCleanup() throws Exception {
- assertEquals("Expected a startFile retry", 2, injector.generateCount);
+ assertEquals("Expected no startFile key generation",
+ -1, injector.generateCount);
fsWrapper.delete(file, false);
}
}).get();
- // Test when the parent directory unbecomes an EZ
+ // Test when the parent directory unbecomes an EZ. The generation of
+ // the EDEK will yield the lock, then re-resolve the path and use the
+ // previous EDEK.
+ dfsAdmin.createEncryptionZone(zone1, TEST_KEY, NO_TRASH);
executor.submit(new InjectFaultTask() {
@Override
public void doFault() throws Exception {
@@ -1152,7 +1170,9 @@ public void doCleanup() throws Exception {
}
}).get();
- // Test when the parent directory becomes a different EZ
+ // Test when the parent directory becomes a different EZ. The generation
+ // of the EDEK will yield the lock, re-resolve will detect the EZ has
+ // changed, and client will be asked to retry a 2nd time
fsWrapper.mkdir(zone1, FsPermission.getDirDefault(), true);
final String otherKey = "other_key";
DFSTestUtil.createKey(otherKey, cluster, conf);