From 0b909d028fd7279398808893c83ff6bad68f67b0 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Thu, 21 May 2015 08:05:10 -0700 Subject: [PATCH] HDFS-8421. Move startFile() and related functions into FSDirWriteFileOp. Contributed by Haohui Mai. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/namenode/FSDirWriteFileOp.java | 324 +++++++++++++++++- .../hdfs/server/namenode/FSDirectory.java | 91 ----- .../hdfs/server/namenode/FSEditLogLoader.java | 15 +- .../hdfs/server/namenode/FSNamesystem.java | 280 +++------------ 5 files changed, 371 insertions(+), 342 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 414bba58720..9e5f51df5b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -240,6 +240,9 @@ Release 2.8.0 - UNRELEASED HDFS-4383. Document the lease limits. (Arshad Mohammad via aajisaka) + HDFS-8421. Move startFile() and related functions into FSDirWriteFileOp. + (wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 1ff0899e5eb..307bd594226 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -18,11 +18,27 @@ package org.apache.hadoop.hdfs.server.namenode; import com.google.common.base.Preconditions; +import org.apache.commons.io.Charsets; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.crypto.CipherSuite; +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -34,15 +50,22 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.util.ChunkedArrayList; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID; +import static org.apache.hadoop.util.Time.now; + class FSDirWriteFileOp { private FSDirWriteFileOp() {} static boolean unprotectedRemoveBlock( @@ -277,6 +300,210 @@ class FSDirWriteFileOp { return clientNode; } + /** + * Create a new file or overwrite an existing file
+ * + * Once the file is create the client then allocates a new block with the next + * call using {@link ClientProtocol#addBlock}. + *

+ * For description of parameters and exceptions thrown see + * {@link ClientProtocol#create} + */ + static HdfsFileStatus startFile( + FSNamesystem fsn, FSPermissionChecker pc, String src, + PermissionStatus permissions, String holder, String clientMachine, + EnumSet flag, boolean createParent, + short replication, long blockSize, + EncryptionKeyInfo ezInfo, INode.BlocksMapUpdateInfo toRemoveBlocks, + boolean logRetryEntry) + throws IOException { + assert fsn.hasWriteLock(); + + boolean create = flag.contains(CreateFlag.CREATE); + boolean overwrite = flag.contains(CreateFlag.OVERWRITE); + boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST); + + CipherSuite suite = null; + CryptoProtocolVersion version = null; + KeyProviderCryptoExtension.EncryptedKeyVersion edek = null; + + if (ezInfo != null) { + edek = ezInfo.edek; + suite = ezInfo.suite; + version = ezInfo.protocolVersion; + } + + boolean isRawPath = FSDirectory.isReservedRawName(src); + FSDirectory fsd = fsn.getFSDirectory(); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + src = fsd.resolvePath(pc, src, pathComponents); + INodesInPath iip = fsd.getINodesInPath4Write(src); + + // Verify that the destination does not exist as a directory already. + final INode inode = iip.getLastINode(); + if (inode != null && inode.isDirectory()) { + throw new FileAlreadyExistsException(src + + " already exists as a directory"); + } + + final INodeFile myFile = INodeFile.valueOf(inode, src, true); + if (fsd.isPermissionEnabled()) { + if (overwrite && myFile != null) { + fsd.checkPathAccess(pc, iip, FsAction.WRITE); + } + /* + * To overwrite existing file, need to check 'w' permission + * of parent (equals to ancestor in this case) + */ + fsd.checkAncestorAccess(pc, iip, FsAction.WRITE); + } + + if (!createParent) { + fsd.verifyParentDir(iip, src); + } + + if (myFile == null && !create) { + throw new FileNotFoundException("Can't overwrite non-existent " + + src + " for client " + clientMachine); + } + + FileEncryptionInfo feInfo = null; + + final EncryptionZone zone = fsd.getEZForPath(iip); + if (zone != null) { + // The path is now within an EZ, but we're missing encryption parameters + if (suite == null || edek == null) { + throw new RetryStartFileException(); + } + // Path is within an EZ and we have provided encryption parameters. + // Make sure that the generated EDEK matches the settings of the EZ. + final String ezKeyName = zone.getKeyName(); + if (!ezKeyName.equals(edek.getEncryptionKeyName())) { + throw new RetryStartFileException(); + } + feInfo = new FileEncryptionInfo(suite, version, + edek.getEncryptedKeyVersion().getMaterial(), + edek.getEncryptedKeyIv(), + ezKeyName, edek.getEncryptionKeyVersionName()); + } + + if (myFile != null) { + if (overwrite) { + List toRemoveINodes = new ChunkedArrayList<>(); + List toRemoveUCFiles = new ChunkedArrayList<>(); + long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks, + toRemoveINodes, toRemoveUCFiles, now()); + if (ret >= 0) { + iip = INodesInPath.replace(iip, iip.length() - 1, null); + FSDirDeleteOp.incrDeletedFileCount(ret); + fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true); + } + } else { + // If lease soft limit time is expired, recover the lease + fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip, + src, holder, clientMachine, false); + throw new FileAlreadyExistsException(src + " for client " + + clientMachine + " already exists"); + } + } + fsn.checkFsObjectLimit(); + INodeFile newNode = null; + Map.Entry parent = FSDirMkdirOp + .createAncestorDirectories(fsd, iip, permissions); + if (parent != null) { + iip = addFile(fsd, parent.getKey(), parent.getValue(), permissions, + replication, blockSize, holder, clientMachine); + newNode = iip != null ? iip.getLastINode().asFile() : null; + } + if (newNode == null) { + throw new IOException("Unable to add " + src + " to namespace"); + } + fsn.leaseManager.addLease( + newNode.getFileUnderConstructionFeature().getClientName(), + newNode.getId()); + if (feInfo != null) { + fsd.setFileEncryptionInfo(src, feInfo); + newNode = fsd.getInode(newNode.getId()).asFile(); + } + setNewINodeStoragePolicy(fsn.getBlockManager(), newNode, iip, + isLazyPersist); + fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry); + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " + + src + " inode " + newNode.getId() + " " + holder); + } + return FSDirStatAndListingOp.getFileInfo(fsd, src, false, isRawPath, true); + } + + static EncryptionKeyInfo getEncryptionKeyInfo(FSNamesystem fsn, + FSPermissionChecker pc, String src, + CryptoProtocolVersion[] supportedVersions) + throws IOException { + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + FSDirectory fsd = fsn.getFSDirectory(); + src = fsd.resolvePath(pc, src, pathComponents); + INodesInPath iip = fsd.getINodesInPath4Write(src); + // Nothing to do if the path is not within an EZ + final EncryptionZone zone = fsd.getEZForPath(iip); + if (zone == null) { + return null; + } + CryptoProtocolVersion protocolVersion = fsn.chooseProtocolVersion( + zone, supportedVersions); + CipherSuite suite = zone.getSuite(); + String ezKeyName = zone.getKeyName(); + + Preconditions.checkNotNull(protocolVersion); + Preconditions.checkNotNull(suite); + Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN), + "Chose an UNKNOWN CipherSuite!"); + Preconditions.checkNotNull(ezKeyName); + return new EncryptionKeyInfo(protocolVersion, suite, ezKeyName); + } + + static INodeFile addFileForEditLog( + FSDirectory fsd, long id, INodesInPath existing, byte[] localName, + PermissionStatus permissions, List aclEntries, + List xAttrs, short replication, long modificationTime, long atime, + long preferredBlockSize, boolean underConstruction, String clientName, + String clientMachine, byte storagePolicyId) { + final INodeFile newNode; + assert fsd.hasWriteLock(); + if (underConstruction) { + newNode = newINodeFile(id, permissions, modificationTime, + modificationTime, replication, + preferredBlockSize, + storagePolicyId); + newNode.toUnderConstruction(clientName, clientMachine); + } else { + newNode = newINodeFile(id, permissions, modificationTime, + atime, replication, + preferredBlockSize, + storagePolicyId); + } + + newNode.setLocalName(localName); + try { + INodesInPath iip = fsd.addINode(existing, newNode); + if (iip != null) { + if (aclEntries != null) { + AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID); + } + if (xAttrs != null) { + XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID); + } + return newNode; + } + } catch (IOException e) { + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug( + "DIR* FSDirectory.unprotectedAddFile: exception when add " + + existing.getPath() + " to the file system", e); + } + } + return null; + } + /** * Add a block to the file. Returns a reference to the added block. */ @@ -314,6 +541,41 @@ class FSDirWriteFileOp { } } + /** + * Add the given filename to the fs. + * @return the new INodesInPath instance that contains the new INode + */ + private static INodesInPath addFile( + FSDirectory fsd, INodesInPath existing, String localName, + PermissionStatus permissions, short replication, long preferredBlockSize, + String clientName, String clientMachine) + throws IOException { + + long modTime = now(); + INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, + modTime, modTime, replication, preferredBlockSize); + newNode.setLocalName(localName.getBytes(Charsets.UTF_8)); + newNode.toUnderConstruction(clientName, clientMachine); + + INodesInPath newiip; + fsd.writeLock(); + try { + newiip = fsd.addINode(existing, newNode); + } finally { + fsd.writeUnlock(); + } + if (newiip == null) { + NameNode.stateChangeLog.info("DIR* addFile: failed to add " + + existing.getPath() + "/" + localName); + return null; + } + + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added"); + } + return newiip; + } + private static FileState analyzeFileState( FSNamesystem fsn, String src, long fileId, String clientName, ExtendedBlock previous, LocatedBlock[] onRetryBlock) @@ -345,8 +607,7 @@ class FSDirWriteFileOp { src = iip.getPath(); } } - final INodeFile file = fsn.checkLease(src, clientName, - inode, fileId); + final INodeFile file = fsn.checkLease(src, clientName, inode, fileId); BlockInfoContiguous lastBlockInFile = file.getLastBlock(); if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) { // The block that the client claims is the current last block @@ -497,6 +758,20 @@ class FSDirWriteFileOp { return true; } + private static INodeFile newINodeFile( + long id, PermissionStatus permissions, long mtime, long atime, + short replication, long preferredBlockSize, byte storagePolicyId) { + return new INodeFile(id, null, permissions, mtime, atime, + BlockInfoContiguous.EMPTY_ARRAY, replication, preferredBlockSize, + storagePolicyId); + } + + private static INodeFile newINodeFile(long id, PermissionStatus permissions, + long mtime, long atime, short replication, long preferredBlockSize) { + return newINodeFile(id, permissions, mtime, atime, replication, + preferredBlockSize, (byte)0); + } + /** * Persist the new block (the last block of the given file). */ @@ -533,6 +808,36 @@ class FSDirWriteFileOp { DatanodeStorageInfo.incrementBlocksScheduled(targets); } + private static void setNewINodeStoragePolicy(BlockManager bm, INodeFile + inode, INodesInPath iip, boolean isLazyPersist) + throws IOException { + + if (isLazyPersist) { + BlockStoragePolicy lpPolicy = + bm.getStoragePolicy("LAZY_PERSIST"); + + // Set LAZY_PERSIST storage policy if the flag was passed to + // CreateFile. + if (lpPolicy == null) { + throw new HadoopIllegalArgumentException( + "The LAZY_PERSIST storage policy has been disabled " + + "by the administrator."); + } + inode.setStoragePolicyID(lpPolicy.getId(), + iip.getLatestSnapshotId()); + } else { + BlockStoragePolicy effectivePolicy = + bm.getStoragePolicy(inode.getStoragePolicyID()); + + if (effectivePolicy != null && + effectivePolicy.isCopyOnCreateFile()) { + // Copy effective policy from ancestor directory to current file. + inode.setStoragePolicyID(effectivePolicy.getId(), + iip.getLatestSnapshotId()); + } + } + } + private static class FileState { final INodeFile inode; final String path; @@ -560,4 +865,19 @@ class FSDirWriteFileOp { this.clientMachine = clientMachine; } } + + static class EncryptionKeyInfo { + final CryptoProtocolVersion protocolVersion; + final CipherSuite suite; + final String ezKeyName; + KeyProviderCryptoExtension.EncryptedKeyVersion edek; + + EncryptionKeyInfo( + CryptoProtocolVersion protocolVersion, CipherSuite suite, + String ezKeyName) { + this.protocolVersion = protocolVersion; + this.suite = suite; + this.ezKeyName = ezKeyName; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index c2ed95608c6..8fdd2d7de55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -21,13 +21,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.commons.io.Charsets; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoProtocolVersion; -import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; @@ -42,7 +40,6 @@ import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.XAttrHelper; -import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.EncryptionZone; @@ -86,7 +83,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO; import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID; -import static org.apache.hadoop.util.Time.now; /** * Both FSDirectory and FSNamesystem manage the state of the namespace. @@ -388,93 +384,6 @@ public class FSDirectory implements Closeable { skipQuotaCheck = true; } - private static INodeFile newINodeFile(long id, PermissionStatus permissions, - long mtime, long atime, short replication, long preferredBlockSize) { - return newINodeFile(id, permissions, mtime, atime, replication, - preferredBlockSize, (byte)0); - } - - private static INodeFile newINodeFile(long id, PermissionStatus permissions, - long mtime, long atime, short replication, long preferredBlockSize, - byte storagePolicyId) { - return new INodeFile(id, null, permissions, mtime, atime, - BlockInfoContiguous.EMPTY_ARRAY, replication, preferredBlockSize, - storagePolicyId); - } - - /** - * Add the given filename to the fs. - * @return the new INodesInPath instance that contains the new INode - */ - INodesInPath addFile(INodesInPath existing, String localName, PermissionStatus - permissions, short replication, long preferredBlockSize, - String clientName, String clientMachine) - throws FileAlreadyExistsException, QuotaExceededException, - UnresolvedLinkException, SnapshotAccessControlException, AclException { - - long modTime = now(); - INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime, - modTime, replication, preferredBlockSize); - newNode.setLocalName(localName.getBytes(Charsets.UTF_8)); - newNode.toUnderConstruction(clientName, clientMachine); - - INodesInPath newiip; - writeLock(); - try { - newiip = addINode(existing, newNode); - } finally { - writeUnlock(); - } - if (newiip == null) { - NameNode.stateChangeLog.info("DIR* addFile: failed to add " + - existing.getPath() + "/" + localName); - return null; - } - - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added"); - } - return newiip; - } - - INodeFile addFileForEditLog(long id, INodesInPath existing, byte[] localName, - PermissionStatus permissions, List aclEntries, - List xAttrs, short replication, long modificationTime, long atime, - long preferredBlockSize, boolean underConstruction, String clientName, - String clientMachine, byte storagePolicyId) { - final INodeFile newNode; - assert hasWriteLock(); - if (underConstruction) { - newNode = newINodeFile(id, permissions, modificationTime, - modificationTime, replication, preferredBlockSize, storagePolicyId); - newNode.toUnderConstruction(clientName, clientMachine); - } else { - newNode = newINodeFile(id, permissions, modificationTime, atime, - replication, preferredBlockSize, storagePolicyId); - } - - newNode.setLocalName(localName); - try { - INodesInPath iip = addINode(existing, newNode); - if (iip != null) { - if (aclEntries != null) { - AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID); - } - if (xAttrs != null) { - XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID); - } - return newNode; - } - } catch (IOException e) { - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug( - "DIR* FSDirectory.unprotectedAddFile: exception when add " - + existing.getPath() + " to the file system", e); - } - } - return null; - } - /** * This is a wrapper for resolvePath(). If the path passed * is prefixed with /.reserved/raw, then it checks to ensure that the caller diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 532290c4d3a..da530a74d10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -365,15 +365,12 @@ public class FSEditLogLoader { // add to the file tree inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion, lastInodeId); - newFile = fsDir.addFileForEditLog(inodeId, iip.getExistingINodes(), - iip.getLastLocalName(), - addCloseOp.permissions, - addCloseOp.aclEntries, - addCloseOp.xAttrs, replication, - addCloseOp.mtime, addCloseOp.atime, - addCloseOp.blockSize, true, - addCloseOp.clientName, - addCloseOp.clientMachine, + newFile = FSDirWriteFileOp.addFileForEditLog(fsDir, inodeId, + iip.getExistingINodes(), iip.getLastLocalName(), + addCloseOp.permissions, addCloseOp.aclEntries, + addCloseOp.xAttrs, replication, addCloseOp.mtime, + addCloseOp.atime, addCloseOp.blockSize, true, + addCloseOp.clientName, addCloseOp.clientMachine, addCloseOp.storagePolicyId); iip = INodesInPath.replace(iip, iip.length() - 1, newFile); fsNamesys.leaseManager.addLease(addCloseOp.clientName, newFile.getId()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 4974b920d8f..fa151204af3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -154,7 +154,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.XAttr; @@ -278,7 +277,6 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.delegation.DelegationKey; -import org.apache.hadoop.util.ChunkedArrayList; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.ReflectionUtils; @@ -2269,8 +2267,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * @return chosen protocol version * @throws IOException */ - private CryptoProtocolVersion chooseProtocolVersion(EncryptionZone zone, - CryptoProtocolVersion[] supportedVersions) + CryptoProtocolVersion chooseProtocolVersion( + EncryptionZone zone, CryptoProtocolVersion[] supportedVersions) throws UnknownCryptoProtocolVersionException, UnresolvedLinkException, SnapshotAccessControlException { Preconditions.checkNotNull(zone); @@ -2332,11 +2330,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, String holder, String clientMachine, EnumSet flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, boolean logRetryCache) - throws AccessControlException, SafeModeException, - FileAlreadyExistsException, UnresolvedLinkException, - FileNotFoundException, ParentNotDirectoryException, IOException { + throws IOException { - HdfsFileStatus status = null; + HdfsFileStatus status; try { status = startFileInt(src, permissions, holder, clientMachine, flag, createParent, replication, blockSize, supportedVersions, @@ -2345,54 +2341,42 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, logAuditEvent(false, "create", src); throw e; } + logAuditEvent(true, "create", src, null, status); return status; } - private HdfsFileStatus startFileInt(final String srcArg, + private HdfsFileStatus startFileInt(final String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, boolean logRetryCache) - throws AccessControlException, SafeModeException, - FileAlreadyExistsException, UnresolvedLinkException, - FileNotFoundException, ParentNotDirectoryException, IOException { - String src = srcArg; + throws IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { StringBuilder builder = new StringBuilder(); - builder.append("DIR* NameSystem.startFile: src=" + src - + ", holder=" + holder - + ", clientMachine=" + clientMachine - + ", createParent=" + createParent - + ", replication=" + replication - + ", createFlag=" + flag.toString() - + ", blockSize=" + blockSize); - builder.append(", supportedVersions="); - if (supportedVersions != null) { - builder.append(Arrays.toString(supportedVersions)); - } else { - builder.append("null"); - } + builder.append("DIR* NameSystem.startFile: src=").append(src) + .append(", holder=").append(holder) + .append(", clientMachine=").append(clientMachine) + .append(", createParent=").append(createParent) + .append(", replication=").append(replication) + .append(", createFlag=").append(flag.toString()) + .append(", blockSize=").append(blockSize) + .append(", supportedVersions=") + .append(supportedVersions == null ? null : Arrays.toString + (supportedVersions)); NameNode.stateChangeLog.debug(builder.toString()); } if (!DFSUtil.isValidName(src)) { throw new InvalidPathException(src); } blockManager.verifyReplication(src, replication, clientMachine); - - boolean skipSync = false; - HdfsFileStatus stat = null; - FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.WRITE); if (blockSize < minBlockSize) { throw new IOException("Specified block size is less than configured" + " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY + "): " + blockSize + " < " + minBlockSize); } - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); - boolean create = flag.contains(CreateFlag.CREATE); - boolean overwrite = flag.contains(CreateFlag.OVERWRITE); - boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST); + FSPermissionChecker pc = getPermissionChecker(); waitForLoadingFSImage(); /** @@ -2407,245 +2391,61 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * special RetryStartFileException to ask the DFSClient to try the create * again later. */ - CryptoProtocolVersion protocolVersion = null; - CipherSuite suite = null; - String ezKeyName = null; - EncryptedKeyVersion edek = null; + FSDirWriteFileOp.EncryptionKeyInfo ezInfo = null; if (provider != null) { readLock(); try { - src = dir.resolvePath(pc, src, pathComponents); - INodesInPath iip = dir.getINodesInPath4Write(src); - // Nothing to do if the path is not within an EZ - final EncryptionZone zone = dir.getEZForPath(iip); - if (zone != null) { - protocolVersion = chooseProtocolVersion(zone, supportedVersions); - suite = zone.getSuite(); - ezKeyName = zone.getKeyName(); - - Preconditions.checkNotNull(protocolVersion); - Preconditions.checkNotNull(suite); - Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN), - "Chose an UNKNOWN CipherSuite!"); - Preconditions.checkNotNull(ezKeyName); - } + checkOperation(OperationCategory.READ); + ezInfo = FSDirWriteFileOp + .getEncryptionKeyInfo(this, pc, src, supportedVersions); } finally { readUnlock(); } - Preconditions.checkState( - (suite == null && ezKeyName == null) || - (suite != null && ezKeyName != null), - "Both suite and ezKeyName should both be null or not null"); - // Generate EDEK if necessary while not holding the lock - edek = generateEncryptedDataEncryptionKey(ezKeyName); + if (ezInfo != null) { + ezInfo.edek = generateEncryptedDataEncryptionKey(ezInfo.ezKeyName); + } EncryptionFaultInjector.getInstance().startFileAfterGenerateKey(); } - // Proceed with the create, using the computed cipher suite and + boolean skipSync = false; + HdfsFileStatus stat = null; + + // Proceed with the create, using the computed cipher suite and // generated EDEK - BlocksMapUpdateInfo toRemoveBlocks = null; + BlocksMapUpdateInfo toRemoveBlocks = new BlocksMapUpdateInfo(); writeLock(); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot create file" + src); dir.writeLock(); try { - src = dir.resolvePath(pc, src, pathComponents); - final INodesInPath iip = dir.getINodesInPath4Write(src); - toRemoveBlocks = startFileInternal( - pc, iip, permissions, holder, - clientMachine, create, overwrite, - createParent, replication, blockSize, - isLazyPersist, suite, protocolVersion, edek, - logRetryCache); - stat = FSDirStatAndListingOp.getFileInfo( - dir, src, false, FSDirectory.isReservedRawName(srcArg), true); + stat = FSDirWriteFileOp.startFile(this, pc, src, permissions, holder, + clientMachine, flag, createParent, + replication, blockSize, ezInfo, + toRemoveBlocks, logRetryCache); } finally { dir.writeUnlock(); } - } catch (StandbyException se) { - skipSync = true; - throw se; + } catch (IOException e) { + skipSync = e instanceof StandbyException; + throw e; } finally { writeUnlock(); // There might be transactions logged while trying to recover the lease. // They need to be sync'ed even when an exception was thrown. if (!skipSync) { getEditLog().logSync(); - if (toRemoveBlocks != null) { - removeBlocks(toRemoveBlocks); - toRemoveBlocks.clear(); - } + removeBlocks(toRemoveBlocks); + toRemoveBlocks.clear(); } } - logAuditEvent(true, "create", srcArg, null, stat); return stat; } - /** - * Create a new file or overwrite an existing file
- * - * Once the file is create the client then allocates a new block with the next - * call using {@link ClientProtocol#addBlock}. - *

- * For description of parameters and exceptions thrown see - * {@link ClientProtocol#create} - */ - private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc, - INodesInPath iip, PermissionStatus permissions, String holder, - String clientMachine, boolean create, boolean overwrite, - boolean createParent, short replication, long blockSize, - boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version, - EncryptedKeyVersion edek, boolean logRetryEntry) - throws IOException { - assert hasWriteLock(); - // Verify that the destination does not exist as a directory already. - final INode inode = iip.getLastINode(); - final String src = iip.getPath(); - if (inode != null && inode.isDirectory()) { - throw new FileAlreadyExistsException(src + - " already exists as a directory"); - } - - final INodeFile myFile = INodeFile.valueOf(inode, src, true); - if (isPermissionEnabled) { - if (overwrite && myFile != null) { - dir.checkPathAccess(pc, iip, FsAction.WRITE); - } - /* - * To overwrite existing file, need to check 'w' permission - * of parent (equals to ancestor in this case) - */ - dir.checkAncestorAccess(pc, iip, FsAction.WRITE); - } - if (!createParent) { - dir.verifyParentDir(iip, src); - } - - FileEncryptionInfo feInfo = null; - - final EncryptionZone zone = dir.getEZForPath(iip); - if (zone != null) { - // The path is now within an EZ, but we're missing encryption parameters - if (suite == null || edek == null) { - throw new RetryStartFileException(); - } - // Path is within an EZ and we have provided encryption parameters. - // Make sure that the generated EDEK matches the settings of the EZ. - final String ezKeyName = zone.getKeyName(); - if (!ezKeyName.equals(edek.getEncryptionKeyName())) { - throw new RetryStartFileException(); - } - feInfo = new FileEncryptionInfo(suite, version, - edek.getEncryptedKeyVersion().getMaterial(), - edek.getEncryptedKeyIv(), - ezKeyName, edek.getEncryptionKeyVersionName()); - } - - try { - BlocksMapUpdateInfo toRemoveBlocks = null; - if (myFile == null) { - if (!create) { - throw new FileNotFoundException("Can't overwrite non-existent " + - src + " for client " + clientMachine); - } - } else { - if (overwrite) { - toRemoveBlocks = new BlocksMapUpdateInfo(); - List toRemoveINodes = new ChunkedArrayList<>(); - List toRemoveUCFiles = new ChunkedArrayList<>(); - long ret = FSDirDeleteOp.delete( - dir, iip, toRemoveBlocks, toRemoveINodes, - toRemoveUCFiles, now()); - if (ret >= 0) { - iip = INodesInPath.replace(iip, iip.length() - 1, null); - FSDirDeleteOp.incrDeletedFileCount(ret); - removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true); - } - } else { - // If lease soft limit time is expired, recover the lease - recoverLeaseInternal(RecoverLeaseOp.CREATE_FILE, - iip, src, holder, clientMachine, false); - throw new FileAlreadyExistsException(src + " for client " + - clientMachine + " already exists"); - } - } - - checkFsObjectLimit(); - INodeFile newNode = null; - - // Always do an implicit mkdirs for parent directory tree. - Map.Entry parent = FSDirMkdirOp - .createAncestorDirectories(dir, iip, permissions); - if (parent != null) { - iip = dir.addFile(parent.getKey(), parent.getValue(), permissions, - replication, blockSize, holder, clientMachine); - newNode = iip != null ? iip.getLastINode().asFile() : null; - } - - if (newNode == null) { - throw new IOException("Unable to add " + src + " to namespace"); - } - leaseManager.addLease(newNode.getFileUnderConstructionFeature() - .getClientName(), newNode.getId()); - - // Set encryption attributes if necessary - if (feInfo != null) { - dir.setFileEncryptionInfo(src, feInfo); - newNode = dir.getInode(newNode.getId()).asFile(); - } - - setNewINodeStoragePolicy(newNode, iip, isLazyPersist); - - // record file record in log, record new generation stamp - getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry); - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " + - src + " inode " + newNode.getId() + " " + holder); - } - return toRemoveBlocks; - } catch (IOException ie) { - NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " + - ie.getMessage()); - throw ie; - } - } - - private void setNewINodeStoragePolicy(INodeFile inode, - INodesInPath iip, - boolean isLazyPersist) - throws IOException { - - if (isLazyPersist) { - BlockStoragePolicy lpPolicy = - blockManager.getStoragePolicy("LAZY_PERSIST"); - - // Set LAZY_PERSIST storage policy if the flag was passed to - // CreateFile. - if (lpPolicy == null) { - throw new HadoopIllegalArgumentException( - "The LAZY_PERSIST storage policy has been disabled " + - "by the administrator."); - } - inode.setStoragePolicyID(lpPolicy.getId(), - iip.getLatestSnapshotId()); - } else { - BlockStoragePolicy effectivePolicy = - blockManager.getStoragePolicy(inode.getStoragePolicyID()); - - if (effectivePolicy != null && - effectivePolicy.isCopyOnCreateFile()) { - // Copy effective policy from ancestor directory to current file. - inode.setStoragePolicyID(effectivePolicy.getId(), - iip.getLatestSnapshotId()); - } - } - } - /** * Append to an existing file for append. *

@@ -2861,7 +2661,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return false; } - private enum RecoverLeaseOp { + enum RecoverLeaseOp { CREATE_FILE, APPEND_FILE, TRUNCATE_FILE,