diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 414bba58720..9e5f51df5b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -240,6 +240,9 @@ Release 2.8.0 - UNRELEASED
HDFS-4383. Document the lease limits. (Arshad Mohammad via aajisaka)
+ HDFS-8421. Move startFile() and related functions into FSDirWriteFileOp.
+ (wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 1ff0899e5eb..307bd594226 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -18,11 +18,27 @@
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -34,15 +50,22 @@
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.util.ChunkedArrayList;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
+import static org.apache.hadoop.util.Time.now;
+
class FSDirWriteFileOp {
private FSDirWriteFileOp() {}
static boolean unprotectedRemoveBlock(
@@ -277,6 +300,210 @@ static Node getClientNode(BlockManager bm, String clientMachine) {
return clientNode;
}
+ /**
+ * Create a new file or overwrite an existing file
+ *
+ * Once the file is create the client then allocates a new block with the next
+ * call using {@link ClientProtocol#addBlock}.
+ *
+ * For description of parameters and exceptions thrown see
+ * {@link ClientProtocol#create}
+ */
+ static HdfsFileStatus startFile(
+ FSNamesystem fsn, FSPermissionChecker pc, String src,
+ PermissionStatus permissions, String holder, String clientMachine,
+ EnumSet flag, boolean createParent,
+ short replication, long blockSize,
+ EncryptionKeyInfo ezInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
+ boolean logRetryEntry)
+ throws IOException {
+ assert fsn.hasWriteLock();
+
+ boolean create = flag.contains(CreateFlag.CREATE);
+ boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+ boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
+
+ CipherSuite suite = null;
+ CryptoProtocolVersion version = null;
+ KeyProviderCryptoExtension.EncryptedKeyVersion edek = null;
+
+ if (ezInfo != null) {
+ edek = ezInfo.edek;
+ suite = ezInfo.suite;
+ version = ezInfo.protocolVersion;
+ }
+
+ boolean isRawPath = FSDirectory.isReservedRawName(src);
+ FSDirectory fsd = fsn.getFSDirectory();
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ src = fsd.resolvePath(pc, src, pathComponents);
+ INodesInPath iip = fsd.getINodesInPath4Write(src);
+
+ // Verify that the destination does not exist as a directory already.
+ final INode inode = iip.getLastINode();
+ if (inode != null && inode.isDirectory()) {
+ throw new FileAlreadyExistsException(src +
+ " already exists as a directory");
+ }
+
+ final INodeFile myFile = INodeFile.valueOf(inode, src, true);
+ if (fsd.isPermissionEnabled()) {
+ if (overwrite && myFile != null) {
+ fsd.checkPathAccess(pc, iip, FsAction.WRITE);
+ }
+ /*
+ * To overwrite existing file, need to check 'w' permission
+ * of parent (equals to ancestor in this case)
+ */
+ fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
+ }
+
+ if (!createParent) {
+ fsd.verifyParentDir(iip, src);
+ }
+
+ if (myFile == null && !create) {
+ throw new FileNotFoundException("Can't overwrite non-existent " +
+ src + " for client " + clientMachine);
+ }
+
+ FileEncryptionInfo feInfo = null;
+
+ final EncryptionZone zone = fsd.getEZForPath(iip);
+ if (zone != null) {
+ // The path is now within an EZ, but we're missing encryption parameters
+ if (suite == null || edek == null) {
+ throw new RetryStartFileException();
+ }
+ // Path is within an EZ and we have provided encryption parameters.
+ // Make sure that the generated EDEK matches the settings of the EZ.
+ final String ezKeyName = zone.getKeyName();
+ if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
+ throw new RetryStartFileException();
+ }
+ feInfo = new FileEncryptionInfo(suite, version,
+ edek.getEncryptedKeyVersion().getMaterial(),
+ edek.getEncryptedKeyIv(),
+ ezKeyName, edek.getEncryptionKeyVersionName());
+ }
+
+ if (myFile != null) {
+ if (overwrite) {
+ List toRemoveINodes = new ChunkedArrayList<>();
+ List toRemoveUCFiles = new ChunkedArrayList<>();
+ long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,
+ toRemoveINodes, toRemoveUCFiles, now());
+ if (ret >= 0) {
+ iip = INodesInPath.replace(iip, iip.length() - 1, null);
+ FSDirDeleteOp.incrDeletedFileCount(ret);
+ fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
+ }
+ } else {
+ // If lease soft limit time is expired, recover the lease
+ fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
+ src, holder, clientMachine, false);
+ throw new FileAlreadyExistsException(src + " for client " +
+ clientMachine + " already exists");
+ }
+ }
+ fsn.checkFsObjectLimit();
+ INodeFile newNode = null;
+ Map.Entry parent = FSDirMkdirOp
+ .createAncestorDirectories(fsd, iip, permissions);
+ if (parent != null) {
+ iip = addFile(fsd, parent.getKey(), parent.getValue(), permissions,
+ replication, blockSize, holder, clientMachine);
+ newNode = iip != null ? iip.getLastINode().asFile() : null;
+ }
+ if (newNode == null) {
+ throw new IOException("Unable to add " + src + " to namespace");
+ }
+ fsn.leaseManager.addLease(
+ newNode.getFileUnderConstructionFeature().getClientName(),
+ newNode.getId());
+ if (feInfo != null) {
+ fsd.setFileEncryptionInfo(src, feInfo);
+ newNode = fsd.getInode(newNode.getId()).asFile();
+ }
+ setNewINodeStoragePolicy(fsn.getBlockManager(), newNode, iip,
+ isLazyPersist);
+ fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
+ src + " inode " + newNode.getId() + " " + holder);
+ }
+ return FSDirStatAndListingOp.getFileInfo(fsd, src, false, isRawPath, true);
+ }
+
+ static EncryptionKeyInfo getEncryptionKeyInfo(FSNamesystem fsn,
+ FSPermissionChecker pc, String src,
+ CryptoProtocolVersion[] supportedVersions)
+ throws IOException {
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ FSDirectory fsd = fsn.getFSDirectory();
+ src = fsd.resolvePath(pc, src, pathComponents);
+ INodesInPath iip = fsd.getINodesInPath4Write(src);
+ // Nothing to do if the path is not within an EZ
+ final EncryptionZone zone = fsd.getEZForPath(iip);
+ if (zone == null) {
+ return null;
+ }
+ CryptoProtocolVersion protocolVersion = fsn.chooseProtocolVersion(
+ zone, supportedVersions);
+ CipherSuite suite = zone.getSuite();
+ String ezKeyName = zone.getKeyName();
+
+ Preconditions.checkNotNull(protocolVersion);
+ Preconditions.checkNotNull(suite);
+ Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
+ "Chose an UNKNOWN CipherSuite!");
+ Preconditions.checkNotNull(ezKeyName);
+ return new EncryptionKeyInfo(protocolVersion, suite, ezKeyName);
+ }
+
+ static INodeFile addFileForEditLog(
+ FSDirectory fsd, long id, INodesInPath existing, byte[] localName,
+ PermissionStatus permissions, List aclEntries,
+ List xAttrs, short replication, long modificationTime, long atime,
+ long preferredBlockSize, boolean underConstruction, String clientName,
+ String clientMachine, byte storagePolicyId) {
+ final INodeFile newNode;
+ assert fsd.hasWriteLock();
+ if (underConstruction) {
+ newNode = newINodeFile(id, permissions, modificationTime,
+ modificationTime, replication,
+ preferredBlockSize,
+ storagePolicyId);
+ newNode.toUnderConstruction(clientName, clientMachine);
+ } else {
+ newNode = newINodeFile(id, permissions, modificationTime,
+ atime, replication,
+ preferredBlockSize,
+ storagePolicyId);
+ }
+
+ newNode.setLocalName(localName);
+ try {
+ INodesInPath iip = fsd.addINode(existing, newNode);
+ if (iip != null) {
+ if (aclEntries != null) {
+ AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
+ }
+ if (xAttrs != null) {
+ XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID);
+ }
+ return newNode;
+ }
+ } catch (IOException e) {
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug(
+ "DIR* FSDirectory.unprotectedAddFile: exception when add "
+ + existing.getPath() + " to the file system", e);
+ }
+ }
+ return null;
+ }
+
/**
* Add a block to the file. Returns a reference to the added block.
*/
@@ -314,6 +541,41 @@ private static BlockInfoContiguous addBlock(
}
}
+ /**
+ * Add the given filename to the fs.
+ * @return the new INodesInPath instance that contains the new INode
+ */
+ private static INodesInPath addFile(
+ FSDirectory fsd, INodesInPath existing, String localName,
+ PermissionStatus permissions, short replication, long preferredBlockSize,
+ String clientName, String clientMachine)
+ throws IOException {
+
+ long modTime = now();
+ INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
+ modTime, modTime, replication, preferredBlockSize);
+ newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
+ newNode.toUnderConstruction(clientName, clientMachine);
+
+ INodesInPath newiip;
+ fsd.writeLock();
+ try {
+ newiip = fsd.addINode(existing, newNode);
+ } finally {
+ fsd.writeUnlock();
+ }
+ if (newiip == null) {
+ NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
+ existing.getPath() + "/" + localName);
+ return null;
+ }
+
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added");
+ }
+ return newiip;
+ }
+
private static FileState analyzeFileState(
FSNamesystem fsn, String src, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
@@ -345,8 +607,7 @@ private static FileState analyzeFileState(
src = iip.getPath();
}
}
- final INodeFile file = fsn.checkLease(src, clientName,
- inode, fileId);
+ final INodeFile file = fsn.checkLease(src, clientName, inode, fileId);
BlockInfoContiguous lastBlockInFile = file.getLastBlock();
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
// The block that the client claims is the current last block
@@ -497,6 +758,20 @@ private static boolean completeFileInternal(
return true;
}
+ private static INodeFile newINodeFile(
+ long id, PermissionStatus permissions, long mtime, long atime,
+ short replication, long preferredBlockSize, byte storagePolicyId) {
+ return new INodeFile(id, null, permissions, mtime, atime,
+ BlockInfoContiguous.EMPTY_ARRAY, replication, preferredBlockSize,
+ storagePolicyId);
+ }
+
+ private static INodeFile newINodeFile(long id, PermissionStatus permissions,
+ long mtime, long atime, short replication, long preferredBlockSize) {
+ return newINodeFile(id, permissions, mtime, atime, replication,
+ preferredBlockSize, (byte)0);
+ }
+
/**
* Persist the new block (the last block of the given file).
*/
@@ -533,6 +808,36 @@ private static void saveAllocatedBlock(
DatanodeStorageInfo.incrementBlocksScheduled(targets);
}
+ private static void setNewINodeStoragePolicy(BlockManager bm, INodeFile
+ inode, INodesInPath iip, boolean isLazyPersist)
+ throws IOException {
+
+ if (isLazyPersist) {
+ BlockStoragePolicy lpPolicy =
+ bm.getStoragePolicy("LAZY_PERSIST");
+
+ // Set LAZY_PERSIST storage policy if the flag was passed to
+ // CreateFile.
+ if (lpPolicy == null) {
+ throw new HadoopIllegalArgumentException(
+ "The LAZY_PERSIST storage policy has been disabled " +
+ "by the administrator.");
+ }
+ inode.setStoragePolicyID(lpPolicy.getId(),
+ iip.getLatestSnapshotId());
+ } else {
+ BlockStoragePolicy effectivePolicy =
+ bm.getStoragePolicy(inode.getStoragePolicyID());
+
+ if (effectivePolicy != null &&
+ effectivePolicy.isCopyOnCreateFile()) {
+ // Copy effective policy from ancestor directory to current file.
+ inode.setStoragePolicyID(effectivePolicy.getId(),
+ iip.getLatestSnapshotId());
+ }
+ }
+ }
+
private static class FileState {
final INodeFile inode;
final String path;
@@ -560,4 +865,19 @@ static class ValidateAddBlockResult {
this.clientMachine = clientMachine;
}
}
+
+ static class EncryptionKeyInfo {
+ final CryptoProtocolVersion protocolVersion;
+ final CipherSuite suite;
+ final String ezKeyName;
+ KeyProviderCryptoExtension.EncryptedKeyVersion edek;
+
+ EncryptionKeyInfo(
+ CryptoProtocolVersion protocolVersion, CipherSuite suite,
+ String ezKeyName) {
+ this.protocolVersion = protocolVersion;
+ this.suite = suite;
+ this.ezKeyName = ezKeyName;
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index c2ed95608c6..8fdd2d7de55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -21,13 +21,11 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.commons.io.Charsets;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
@@ -42,7 +40,6 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper;
-import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
@@ -86,7 +83,6 @@
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
-import static org.apache.hadoop.util.Time.now;
/**
* Both FSDirectory and FSNamesystem manage the state of the namespace.
@@ -388,93 +384,6 @@ void disableQuotaChecks() {
skipQuotaCheck = true;
}
- private static INodeFile newINodeFile(long id, PermissionStatus permissions,
- long mtime, long atime, short replication, long preferredBlockSize) {
- return newINodeFile(id, permissions, mtime, atime, replication,
- preferredBlockSize, (byte)0);
- }
-
- private static INodeFile newINodeFile(long id, PermissionStatus permissions,
- long mtime, long atime, short replication, long preferredBlockSize,
- byte storagePolicyId) {
- return new INodeFile(id, null, permissions, mtime, atime,
- BlockInfoContiguous.EMPTY_ARRAY, replication, preferredBlockSize,
- storagePolicyId);
- }
-
- /**
- * Add the given filename to the fs.
- * @return the new INodesInPath instance that contains the new INode
- */
- INodesInPath addFile(INodesInPath existing, String localName, PermissionStatus
- permissions, short replication, long preferredBlockSize,
- String clientName, String clientMachine)
- throws FileAlreadyExistsException, QuotaExceededException,
- UnresolvedLinkException, SnapshotAccessControlException, AclException {
-
- long modTime = now();
- INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime,
- modTime, replication, preferredBlockSize);
- newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
- newNode.toUnderConstruction(clientName, clientMachine);
-
- INodesInPath newiip;
- writeLock();
- try {
- newiip = addINode(existing, newNode);
- } finally {
- writeUnlock();
- }
- if (newiip == null) {
- NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
- existing.getPath() + "/" + localName);
- return null;
- }
-
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added");
- }
- return newiip;
- }
-
- INodeFile addFileForEditLog(long id, INodesInPath existing, byte[] localName,
- PermissionStatus permissions, List aclEntries,
- List xAttrs, short replication, long modificationTime, long atime,
- long preferredBlockSize, boolean underConstruction, String clientName,
- String clientMachine, byte storagePolicyId) {
- final INodeFile newNode;
- assert hasWriteLock();
- if (underConstruction) {
- newNode = newINodeFile(id, permissions, modificationTime,
- modificationTime, replication, preferredBlockSize, storagePolicyId);
- newNode.toUnderConstruction(clientName, clientMachine);
- } else {
- newNode = newINodeFile(id, permissions, modificationTime, atime,
- replication, preferredBlockSize, storagePolicyId);
- }
-
- newNode.setLocalName(localName);
- try {
- INodesInPath iip = addINode(existing, newNode);
- if (iip != null) {
- if (aclEntries != null) {
- AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
- }
- if (xAttrs != null) {
- XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID);
- }
- return newNode;
- }
- } catch (IOException e) {
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug(
- "DIR* FSDirectory.unprotectedAddFile: exception when add "
- + existing.getPath() + " to the file system", e);
- }
- }
- return null;
- }
-
/**
* This is a wrapper for resolvePath(). If the path passed
* is prefixed with /.reserved/raw, then it checks to ensure that the caller
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 532290c4d3a..da530a74d10 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -365,15 +365,12 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
// add to the file tree
inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion, lastInodeId);
- newFile = fsDir.addFileForEditLog(inodeId, iip.getExistingINodes(),
- iip.getLastLocalName(),
- addCloseOp.permissions,
- addCloseOp.aclEntries,
- addCloseOp.xAttrs, replication,
- addCloseOp.mtime, addCloseOp.atime,
- addCloseOp.blockSize, true,
- addCloseOp.clientName,
- addCloseOp.clientMachine,
+ newFile = FSDirWriteFileOp.addFileForEditLog(fsDir, inodeId,
+ iip.getExistingINodes(), iip.getLastLocalName(),
+ addCloseOp.permissions, addCloseOp.aclEntries,
+ addCloseOp.xAttrs, replication, addCloseOp.mtime,
+ addCloseOp.atime, addCloseOp.blockSize, true,
+ addCloseOp.clientName, addCloseOp.clientMachine,
addCloseOp.storagePolicyId);
iip = INodesInPath.replace(iip, iip.length() - 1, newFile);
fsNamesys.leaseManager.addLease(addCloseOp.clientName, newFile.getId());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 4974b920d8f..fa151204af3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -154,7 +154,6 @@
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttr;
@@ -278,7 +277,6 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.util.ChunkedArrayList;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.ReflectionUtils;
@@ -2269,8 +2267,8 @@ long getPreferredBlockSize(String src) throws IOException {
* @return chosen protocol version
* @throws IOException
*/
- private CryptoProtocolVersion chooseProtocolVersion(EncryptionZone zone,
- CryptoProtocolVersion[] supportedVersions)
+ CryptoProtocolVersion chooseProtocolVersion(
+ EncryptionZone zone, CryptoProtocolVersion[] supportedVersions)
throws UnknownCryptoProtocolVersionException, UnresolvedLinkException,
SnapshotAccessControlException {
Preconditions.checkNotNull(zone);
@@ -2332,11 +2330,9 @@ HdfsFileStatus startFile(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, boolean logRetryCache)
- throws AccessControlException, SafeModeException,
- FileAlreadyExistsException, UnresolvedLinkException,
- FileNotFoundException, ParentNotDirectoryException, IOException {
+ throws IOException {
- HdfsFileStatus status = null;
+ HdfsFileStatus status;
try {
status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize, supportedVersions,
@@ -2345,54 +2341,42 @@ HdfsFileStatus startFile(String src, PermissionStatus permissions,
logAuditEvent(false, "create", src);
throw e;
}
+ logAuditEvent(true, "create", src, null, status);
return status;
}
- private HdfsFileStatus startFileInt(final String srcArg,
+ private HdfsFileStatus startFileInt(final String src,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet flag, boolean createParent, short replication,
long blockSize, CryptoProtocolVersion[] supportedVersions,
boolean logRetryCache)
- throws AccessControlException, SafeModeException,
- FileAlreadyExistsException, UnresolvedLinkException,
- FileNotFoundException, ParentNotDirectoryException, IOException {
- String src = srcArg;
+ throws IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
- builder.append("DIR* NameSystem.startFile: src=" + src
- + ", holder=" + holder
- + ", clientMachine=" + clientMachine
- + ", createParent=" + createParent
- + ", replication=" + replication
- + ", createFlag=" + flag.toString()
- + ", blockSize=" + blockSize);
- builder.append(", supportedVersions=");
- if (supportedVersions != null) {
- builder.append(Arrays.toString(supportedVersions));
- } else {
- builder.append("null");
- }
+ builder.append("DIR* NameSystem.startFile: src=").append(src)
+ .append(", holder=").append(holder)
+ .append(", clientMachine=").append(clientMachine)
+ .append(", createParent=").append(createParent)
+ .append(", replication=").append(replication)
+ .append(", createFlag=").append(flag.toString())
+ .append(", blockSize=").append(blockSize)
+ .append(", supportedVersions=")
+ .append(supportedVersions == null ? null : Arrays.toString
+ (supportedVersions));
NameNode.stateChangeLog.debug(builder.toString());
}
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
blockManager.verifyReplication(src, replication, clientMachine);
-
- boolean skipSync = false;
- HdfsFileStatus stat = null;
- FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
if (blockSize < minBlockSize) {
throw new IOException("Specified block size is less than configured" +
" minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
+ "): " + blockSize + " < " + minBlockSize);
}
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
- boolean create = flag.contains(CreateFlag.CREATE);
- boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
- boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
+ FSPermissionChecker pc = getPermissionChecker();
waitForLoadingFSImage();
/**
@@ -2407,245 +2391,61 @@ private HdfsFileStatus startFileInt(final String srcArg,
* special RetryStartFileException to ask the DFSClient to try the create
* again later.
*/
- CryptoProtocolVersion protocolVersion = null;
- CipherSuite suite = null;
- String ezKeyName = null;
- EncryptedKeyVersion edek = null;
+ FSDirWriteFileOp.EncryptionKeyInfo ezInfo = null;
if (provider != null) {
readLock();
try {
- src = dir.resolvePath(pc, src, pathComponents);
- INodesInPath iip = dir.getINodesInPath4Write(src);
- // Nothing to do if the path is not within an EZ
- final EncryptionZone zone = dir.getEZForPath(iip);
- if (zone != null) {
- protocolVersion = chooseProtocolVersion(zone, supportedVersions);
- suite = zone.getSuite();
- ezKeyName = zone.getKeyName();
-
- Preconditions.checkNotNull(protocolVersion);
- Preconditions.checkNotNull(suite);
- Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
- "Chose an UNKNOWN CipherSuite!");
- Preconditions.checkNotNull(ezKeyName);
- }
+ checkOperation(OperationCategory.READ);
+ ezInfo = FSDirWriteFileOp
+ .getEncryptionKeyInfo(this, pc, src, supportedVersions);
} finally {
readUnlock();
}
- Preconditions.checkState(
- (suite == null && ezKeyName == null) ||
- (suite != null && ezKeyName != null),
- "Both suite and ezKeyName should both be null or not null");
-
// Generate EDEK if necessary while not holding the lock
- edek = generateEncryptedDataEncryptionKey(ezKeyName);
+ if (ezInfo != null) {
+ ezInfo.edek = generateEncryptedDataEncryptionKey(ezInfo.ezKeyName);
+ }
EncryptionFaultInjector.getInstance().startFileAfterGenerateKey();
}
- // Proceed with the create, using the computed cipher suite and
+ boolean skipSync = false;
+ HdfsFileStatus stat = null;
+
+ // Proceed with the create, using the computed cipher suite and
// generated EDEK
- BlocksMapUpdateInfo toRemoveBlocks = null;
+ BlocksMapUpdateInfo toRemoveBlocks = new BlocksMapUpdateInfo();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create file" + src);
dir.writeLock();
try {
- src = dir.resolvePath(pc, src, pathComponents);
- final INodesInPath iip = dir.getINodesInPath4Write(src);
- toRemoveBlocks = startFileInternal(
- pc, iip, permissions, holder,
- clientMachine, create, overwrite,
- createParent, replication, blockSize,
- isLazyPersist, suite, protocolVersion, edek,
- logRetryCache);
- stat = FSDirStatAndListingOp.getFileInfo(
- dir, src, false, FSDirectory.isReservedRawName(srcArg), true);
+ stat = FSDirWriteFileOp.startFile(this, pc, src, permissions, holder,
+ clientMachine, flag, createParent,
+ replication, blockSize, ezInfo,
+ toRemoveBlocks, logRetryCache);
} finally {
dir.writeUnlock();
}
- } catch (StandbyException se) {
- skipSync = true;
- throw se;
+ } catch (IOException e) {
+ skipSync = e instanceof StandbyException;
+ throw e;
} finally {
writeUnlock();
// There might be transactions logged while trying to recover the lease.
// They need to be sync'ed even when an exception was thrown.
if (!skipSync) {
getEditLog().logSync();
- if (toRemoveBlocks != null) {
- removeBlocks(toRemoveBlocks);
- toRemoveBlocks.clear();
- }
+ removeBlocks(toRemoveBlocks);
+ toRemoveBlocks.clear();
}
}
- logAuditEvent(true, "create", srcArg, null, stat);
return stat;
}
- /**
- * Create a new file or overwrite an existing file
- *
- * Once the file is create the client then allocates a new block with the next
- * call using {@link ClientProtocol#addBlock}.
- *
- * For description of parameters and exceptions thrown see
- * {@link ClientProtocol#create}
- */
- private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc,
- INodesInPath iip, PermissionStatus permissions, String holder,
- String clientMachine, boolean create, boolean overwrite,
- boolean createParent, short replication, long blockSize,
- boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
- EncryptedKeyVersion edek, boolean logRetryEntry)
- throws IOException {
- assert hasWriteLock();
- // Verify that the destination does not exist as a directory already.
- final INode inode = iip.getLastINode();
- final String src = iip.getPath();
- if (inode != null && inode.isDirectory()) {
- throw new FileAlreadyExistsException(src +
- " already exists as a directory");
- }
-
- final INodeFile myFile = INodeFile.valueOf(inode, src, true);
- if (isPermissionEnabled) {
- if (overwrite && myFile != null) {
- dir.checkPathAccess(pc, iip, FsAction.WRITE);
- }
- /*
- * To overwrite existing file, need to check 'w' permission
- * of parent (equals to ancestor in this case)
- */
- dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
- }
- if (!createParent) {
- dir.verifyParentDir(iip, src);
- }
-
- FileEncryptionInfo feInfo = null;
-
- final EncryptionZone zone = dir.getEZForPath(iip);
- if (zone != null) {
- // The path is now within an EZ, but we're missing encryption parameters
- if (suite == null || edek == null) {
- throw new RetryStartFileException();
- }
- // Path is within an EZ and we have provided encryption parameters.
- // Make sure that the generated EDEK matches the settings of the EZ.
- final String ezKeyName = zone.getKeyName();
- if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
- throw new RetryStartFileException();
- }
- feInfo = new FileEncryptionInfo(suite, version,
- edek.getEncryptedKeyVersion().getMaterial(),
- edek.getEncryptedKeyIv(),
- ezKeyName, edek.getEncryptionKeyVersionName());
- }
-
- try {
- BlocksMapUpdateInfo toRemoveBlocks = null;
- if (myFile == null) {
- if (!create) {
- throw new FileNotFoundException("Can't overwrite non-existent " +
- src + " for client " + clientMachine);
- }
- } else {
- if (overwrite) {
- toRemoveBlocks = new BlocksMapUpdateInfo();
- List toRemoveINodes = new ChunkedArrayList<>();
- List toRemoveUCFiles = new ChunkedArrayList<>();
- long ret = FSDirDeleteOp.delete(
- dir, iip, toRemoveBlocks, toRemoveINodes,
- toRemoveUCFiles, now());
- if (ret >= 0) {
- iip = INodesInPath.replace(iip, iip.length() - 1, null);
- FSDirDeleteOp.incrDeletedFileCount(ret);
- removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
- }
- } else {
- // If lease soft limit time is expired, recover the lease
- recoverLeaseInternal(RecoverLeaseOp.CREATE_FILE,
- iip, src, holder, clientMachine, false);
- throw new FileAlreadyExistsException(src + " for client " +
- clientMachine + " already exists");
- }
- }
-
- checkFsObjectLimit();
- INodeFile newNode = null;
-
- // Always do an implicit mkdirs for parent directory tree.
- Map.Entry parent = FSDirMkdirOp
- .createAncestorDirectories(dir, iip, permissions);
- if (parent != null) {
- iip = dir.addFile(parent.getKey(), parent.getValue(), permissions,
- replication, blockSize, holder, clientMachine);
- newNode = iip != null ? iip.getLastINode().asFile() : null;
- }
-
- if (newNode == null) {
- throw new IOException("Unable to add " + src + " to namespace");
- }
- leaseManager.addLease(newNode.getFileUnderConstructionFeature()
- .getClientName(), newNode.getId());
-
- // Set encryption attributes if necessary
- if (feInfo != null) {
- dir.setFileEncryptionInfo(src, feInfo);
- newNode = dir.getInode(newNode.getId()).asFile();
- }
-
- setNewINodeStoragePolicy(newNode, iip, isLazyPersist);
-
- // record file record in log, record new generation stamp
- getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
- src + " inode " + newNode.getId() + " " + holder);
- }
- return toRemoveBlocks;
- } catch (IOException ie) {
- NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
- ie.getMessage());
- throw ie;
- }
- }
-
- private void setNewINodeStoragePolicy(INodeFile inode,
- INodesInPath iip,
- boolean isLazyPersist)
- throws IOException {
-
- if (isLazyPersist) {
- BlockStoragePolicy lpPolicy =
- blockManager.getStoragePolicy("LAZY_PERSIST");
-
- // Set LAZY_PERSIST storage policy if the flag was passed to
- // CreateFile.
- if (lpPolicy == null) {
- throw new HadoopIllegalArgumentException(
- "The LAZY_PERSIST storage policy has been disabled " +
- "by the administrator.");
- }
- inode.setStoragePolicyID(lpPolicy.getId(),
- iip.getLatestSnapshotId());
- } else {
- BlockStoragePolicy effectivePolicy =
- blockManager.getStoragePolicy(inode.getStoragePolicyID());
-
- if (effectivePolicy != null &&
- effectivePolicy.isCopyOnCreateFile()) {
- // Copy effective policy from ancestor directory to current file.
- inode.setStoragePolicyID(effectivePolicy.getId(),
- iip.getLatestSnapshotId());
- }
- }
- }
-
/**
* Append to an existing file for append.
*
@@ -2861,7 +2661,7 @@ boolean recoverLease(String src, String holder, String clientMachine)
return false;
}
- private enum RecoverLeaseOp {
+ enum RecoverLeaseOp {
CREATE_FILE,
APPEND_FILE,
TRUNCATE_FILE,