HDFS-8421. Move startFile() and related functions into FSDirWriteFileOp. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-05-21 08:05:10 -07:00
parent 0305316d69
commit 2b6bcfdafa
5 changed files with 371 additions and 342 deletions

View File

@ -577,6 +577,9 @@ Release 2.8.0 - UNRELEASED
HDFS-4383. Document the lease limits. (Arshad Mohammad via aajisaka) HDFS-4383. Document the lease limits. (Arshad Mohammad via aajisaka)
HDFS-8421. Move startFile() and related functions into FSDirWriteFileOp.
(wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -18,11 +18,27 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.io.Charsets;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@ -34,15 +50,22 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.ChunkedArrayList;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
import static org.apache.hadoop.util.Time.now;
class FSDirWriteFileOp { class FSDirWriteFileOp {
private FSDirWriteFileOp() {} private FSDirWriteFileOp() {}
static boolean unprotectedRemoveBlock( static boolean unprotectedRemoveBlock(
@ -277,6 +300,210 @@ class FSDirWriteFileOp {
return clientNode; return clientNode;
} }
/**
* Create a new file or overwrite an existing file<br>
*
* Once the file is create the client then allocates a new block with the next
* call using {@link ClientProtocol#addBlock}.
* <p>
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#create}
*/
static HdfsFileStatus startFile(
FSNamesystem fsn, FSPermissionChecker pc, String src,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize,
EncryptionKeyInfo ezInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
boolean logRetryEntry)
throws IOException {
assert fsn.hasWriteLock();
boolean create = flag.contains(CreateFlag.CREATE);
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
CipherSuite suite = null;
CryptoProtocolVersion version = null;
KeyProviderCryptoExtension.EncryptedKeyVersion edek = null;
if (ezInfo != null) {
edek = ezInfo.edek;
suite = ezInfo.suite;
version = ezInfo.protocolVersion;
}
boolean isRawPath = FSDirectory.isReservedRawName(src);
FSDirectory fsd = fsn.getFSDirectory();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsd.resolvePath(pc, src, pathComponents);
INodesInPath iip = fsd.getINodesInPath4Write(src);
// Verify that the destination does not exist as a directory already.
final INode inode = iip.getLastINode();
if (inode != null && inode.isDirectory()) {
throw new FileAlreadyExistsException(src +
" already exists as a directory");
}
final INodeFile myFile = INodeFile.valueOf(inode, src, true);
if (fsd.isPermissionEnabled()) {
if (overwrite && myFile != null) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}
/*
* To overwrite existing file, need to check 'w' permission
* of parent (equals to ancestor in this case)
*/
fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
}
if (!createParent) {
fsd.verifyParentDir(iip, src);
}
if (myFile == null && !create) {
throw new FileNotFoundException("Can't overwrite non-existent " +
src + " for client " + clientMachine);
}
FileEncryptionInfo feInfo = null;
final EncryptionZone zone = fsd.getEZForPath(iip);
if (zone != null) {
// The path is now within an EZ, but we're missing encryption parameters
if (suite == null || edek == null) {
throw new RetryStartFileException();
}
// Path is within an EZ and we have provided encryption parameters.
// Make sure that the generated EDEK matches the settings of the EZ.
final String ezKeyName = zone.getKeyName();
if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
throw new RetryStartFileException();
}
feInfo = new FileEncryptionInfo(suite, version,
edek.getEncryptedKeyVersion().getMaterial(),
edek.getEncryptedKeyIv(),
ezKeyName, edek.getEncryptionKeyVersionName());
}
if (myFile != null) {
if (overwrite) {
List<INode> toRemoveINodes = new ChunkedArrayList<>();
List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,
toRemoveINodes, toRemoveUCFiles, now());
if (ret >= 0) {
iip = INodesInPath.replace(iip, iip.length() - 1, null);
FSDirDeleteOp.incrDeletedFileCount(ret);
fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
}
} else {
// If lease soft limit time is expired, recover the lease
fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
src, holder, clientMachine, false);
throw new FileAlreadyExistsException(src + " for client " +
clientMachine + " already exists");
}
}
fsn.checkFsObjectLimit();
INodeFile newNode = null;
Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
.createAncestorDirectories(fsd, iip, permissions);
if (parent != null) {
iip = addFile(fsd, parent.getKey(), parent.getValue(), permissions,
replication, blockSize, holder, clientMachine);
newNode = iip != null ? iip.getLastINode().asFile() : null;
}
if (newNode == null) {
throw new IOException("Unable to add " + src + " to namespace");
}
fsn.leaseManager.addLease(
newNode.getFileUnderConstructionFeature().getClientName(),
newNode.getId());
if (feInfo != null) {
fsd.setFileEncryptionInfo(src, feInfo);
newNode = fsd.getInode(newNode.getId()).asFile();
}
setNewINodeStoragePolicy(fsn.getBlockManager(), newNode, iip,
isLazyPersist);
fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
src + " inode " + newNode.getId() + " " + holder);
}
return FSDirStatAndListingOp.getFileInfo(fsd, src, false, isRawPath, true);
}
static EncryptionKeyInfo getEncryptionKeyInfo(FSNamesystem fsn,
FSPermissionChecker pc, String src,
CryptoProtocolVersion[] supportedVersions)
throws IOException {
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
FSDirectory fsd = fsn.getFSDirectory();
src = fsd.resolvePath(pc, src, pathComponents);
INodesInPath iip = fsd.getINodesInPath4Write(src);
// Nothing to do if the path is not within an EZ
final EncryptionZone zone = fsd.getEZForPath(iip);
if (zone == null) {
return null;
}
CryptoProtocolVersion protocolVersion = fsn.chooseProtocolVersion(
zone, supportedVersions);
CipherSuite suite = zone.getSuite();
String ezKeyName = zone.getKeyName();
Preconditions.checkNotNull(protocolVersion);
Preconditions.checkNotNull(suite);
Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
"Chose an UNKNOWN CipherSuite!");
Preconditions.checkNotNull(ezKeyName);
return new EncryptionKeyInfo(protocolVersion, suite, ezKeyName);
}
static INodeFile addFileForEditLog(
FSDirectory fsd, long id, INodesInPath existing, byte[] localName,
PermissionStatus permissions, List<AclEntry> aclEntries,
List<XAttr> xAttrs, short replication, long modificationTime, long atime,
long preferredBlockSize, boolean underConstruction, String clientName,
String clientMachine, byte storagePolicyId) {
final INodeFile newNode;
assert fsd.hasWriteLock();
if (underConstruction) {
newNode = newINodeFile(id, permissions, modificationTime,
modificationTime, replication,
preferredBlockSize,
storagePolicyId);
newNode.toUnderConstruction(clientName, clientMachine);
} else {
newNode = newINodeFile(id, permissions, modificationTime,
atime, replication,
preferredBlockSize,
storagePolicyId);
}
newNode.setLocalName(localName);
try {
INodesInPath iip = fsd.addINode(existing, newNode);
if (iip != null) {
if (aclEntries != null) {
AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
}
if (xAttrs != null) {
XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID);
}
return newNode;
}
} catch (IOException e) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"DIR* FSDirectory.unprotectedAddFile: exception when add "
+ existing.getPath() + " to the file system", e);
}
}
return null;
}
/** /**
* Add a block to the file. Returns a reference to the added block. * Add a block to the file. Returns a reference to the added block.
*/ */
@ -314,6 +541,41 @@ class FSDirWriteFileOp {
} }
} }
/**
* Add the given filename to the fs.
* @return the new INodesInPath instance that contains the new INode
*/
private static INodesInPath addFile(
FSDirectory fsd, INodesInPath existing, String localName,
PermissionStatus permissions, short replication, long preferredBlockSize,
String clientName, String clientMachine)
throws IOException {
long modTime = now();
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
modTime, modTime, replication, preferredBlockSize);
newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
newNode.toUnderConstruction(clientName, clientMachine);
INodesInPath newiip;
fsd.writeLock();
try {
newiip = fsd.addINode(existing, newNode);
} finally {
fsd.writeUnlock();
}
if (newiip == null) {
NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
existing.getPath() + "/" + localName);
return null;
}
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added");
}
return newiip;
}
private static FileState analyzeFileState( private static FileState analyzeFileState(
FSNamesystem fsn, String src, long fileId, String clientName, FSNamesystem fsn, String src, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock) ExtendedBlock previous, LocatedBlock[] onRetryBlock)
@ -345,8 +607,7 @@ class FSDirWriteFileOp {
src = iip.getPath(); src = iip.getPath();
} }
} }
final INodeFile file = fsn.checkLease(src, clientName, final INodeFile file = fsn.checkLease(src, clientName, inode, fileId);
inode, fileId);
BlockInfoContiguous lastBlockInFile = file.getLastBlock(); BlockInfoContiguous lastBlockInFile = file.getLastBlock();
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) { if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
// The block that the client claims is the current last block // The block that the client claims is the current last block
@ -497,6 +758,20 @@ class FSDirWriteFileOp {
return true; return true;
} }
private static INodeFile newINodeFile(
long id, PermissionStatus permissions, long mtime, long atime,
short replication, long preferredBlockSize, byte storagePolicyId) {
return new INodeFile(id, null, permissions, mtime, atime,
BlockInfoContiguous.EMPTY_ARRAY, replication, preferredBlockSize,
storagePolicyId);
}
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
long mtime, long atime, short replication, long preferredBlockSize) {
return newINodeFile(id, permissions, mtime, atime, replication,
preferredBlockSize, (byte)0);
}
/** /**
* Persist the new block (the last block of the given file). * Persist the new block (the last block of the given file).
*/ */
@ -533,6 +808,36 @@ class FSDirWriteFileOp {
DatanodeStorageInfo.incrementBlocksScheduled(targets); DatanodeStorageInfo.incrementBlocksScheduled(targets);
} }
private static void setNewINodeStoragePolicy(BlockManager bm, INodeFile
inode, INodesInPath iip, boolean isLazyPersist)
throws IOException {
if (isLazyPersist) {
BlockStoragePolicy lpPolicy =
bm.getStoragePolicy("LAZY_PERSIST");
// Set LAZY_PERSIST storage policy if the flag was passed to
// CreateFile.
if (lpPolicy == null) {
throw new HadoopIllegalArgumentException(
"The LAZY_PERSIST storage policy has been disabled " +
"by the administrator.");
}
inode.setStoragePolicyID(lpPolicy.getId(),
iip.getLatestSnapshotId());
} else {
BlockStoragePolicy effectivePolicy =
bm.getStoragePolicy(inode.getStoragePolicyID());
if (effectivePolicy != null &&
effectivePolicy.isCopyOnCreateFile()) {
// Copy effective policy from ancestor directory to current file.
inode.setStoragePolicyID(effectivePolicy.getId(),
iip.getLatestSnapshotId());
}
}
}
private static class FileState { private static class FileState {
final INodeFile inode; final INodeFile inode;
final String path; final String path;
@ -560,4 +865,19 @@ class FSDirWriteFileOp {
this.clientMachine = clientMachine; this.clientMachine = clientMachine;
} }
} }
static class EncryptionKeyInfo {
final CryptoProtocolVersion protocolVersion;
final CipherSuite suite;
final String ezKeyName;
KeyProviderCryptoExtension.EncryptedKeyVersion edek;
EncryptionKeyInfo(
CryptoProtocolVersion protocolVersion, CipherSuite suite,
String ezKeyName) {
this.protocolVersion = protocolVersion;
this.suite = suite;
this.ezKeyName = ezKeyName;
}
}
} }

View File

@ -21,13 +21,11 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.io.Charsets;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -42,7 +40,6 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.EncryptionZone;
@ -86,7 +83,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID; import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
import static org.apache.hadoop.util.Time.now;
/** /**
* Both FSDirectory and FSNamesystem manage the state of the namespace. * Both FSDirectory and FSNamesystem manage the state of the namespace.
@ -388,93 +384,6 @@ public class FSDirectory implements Closeable {
skipQuotaCheck = true; skipQuotaCheck = true;
} }
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
long mtime, long atime, short replication, long preferredBlockSize) {
return newINodeFile(id, permissions, mtime, atime, replication,
preferredBlockSize, (byte)0);
}
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
long mtime, long atime, short replication, long preferredBlockSize,
byte storagePolicyId) {
return new INodeFile(id, null, permissions, mtime, atime,
BlockInfoContiguous.EMPTY_ARRAY, replication, preferredBlockSize,
storagePolicyId);
}
/**
* Add the given filename to the fs.
* @return the new INodesInPath instance that contains the new INode
*/
INodesInPath addFile(INodesInPath existing, String localName, PermissionStatus
permissions, short replication, long preferredBlockSize,
String clientName, String clientMachine)
throws FileAlreadyExistsException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException, AclException {
long modTime = now();
INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime,
modTime, replication, preferredBlockSize);
newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
newNode.toUnderConstruction(clientName, clientMachine);
INodesInPath newiip;
writeLock();
try {
newiip = addINode(existing, newNode);
} finally {
writeUnlock();
}
if (newiip == null) {
NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
existing.getPath() + "/" + localName);
return null;
}
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added");
}
return newiip;
}
INodeFile addFileForEditLog(long id, INodesInPath existing, byte[] localName,
PermissionStatus permissions, List<AclEntry> aclEntries,
List<XAttr> xAttrs, short replication, long modificationTime, long atime,
long preferredBlockSize, boolean underConstruction, String clientName,
String clientMachine, byte storagePolicyId) {
final INodeFile newNode;
assert hasWriteLock();
if (underConstruction) {
newNode = newINodeFile(id, permissions, modificationTime,
modificationTime, replication, preferredBlockSize, storagePolicyId);
newNode.toUnderConstruction(clientName, clientMachine);
} else {
newNode = newINodeFile(id, permissions, modificationTime, atime,
replication, preferredBlockSize, storagePolicyId);
}
newNode.setLocalName(localName);
try {
INodesInPath iip = addINode(existing, newNode);
if (iip != null) {
if (aclEntries != null) {
AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
}
if (xAttrs != null) {
XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID);
}
return newNode;
}
} catch (IOException e) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"DIR* FSDirectory.unprotectedAddFile: exception when add "
+ existing.getPath() + " to the file system", e);
}
}
return null;
}
/** /**
* This is a wrapper for resolvePath(). If the path passed * This is a wrapper for resolvePath(). If the path passed
* is prefixed with /.reserved/raw, then it checks to ensure that the caller * is prefixed with /.reserved/raw, then it checks to ensure that the caller

View File

@ -364,15 +364,12 @@ public class FSEditLogLoader {
// add to the file tree // add to the file tree
inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion, lastInodeId); inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion, lastInodeId);
newFile = fsDir.addFileForEditLog(inodeId, iip.getExistingINodes(), newFile = FSDirWriteFileOp.addFileForEditLog(fsDir, inodeId,
iip.getLastLocalName(), iip.getExistingINodes(), iip.getLastLocalName(),
addCloseOp.permissions, addCloseOp.permissions, addCloseOp.aclEntries,
addCloseOp.aclEntries, addCloseOp.xAttrs, replication, addCloseOp.mtime,
addCloseOp.xAttrs, replication, addCloseOp.atime, addCloseOp.blockSize, true,
addCloseOp.mtime, addCloseOp.atime, addCloseOp.clientName, addCloseOp.clientMachine,
addCloseOp.blockSize, true,
addCloseOp.clientName,
addCloseOp.clientMachine,
addCloseOp.storagePolicyId); addCloseOp.storagePolicyId);
iip = INodesInPath.replace(iip, iip.length() - 1, newFile); iip = INodesInPath.replace(iip, iip.length() - 1, newFile);
fsNamesys.leaseManager.addLease(addCloseOp.clientName, newFile.getId()); fsNamesys.leaseManager.addLease(addCloseOp.clientName, newFile.getId());

View File

@ -151,7 +151,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.XAttr;
@ -275,7 +274,6 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ChunkedArrayList;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
@ -2279,8 +2277,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @return chosen protocol version * @return chosen protocol version
* @throws IOException * @throws IOException
*/ */
private CryptoProtocolVersion chooseProtocolVersion(EncryptionZone zone, CryptoProtocolVersion chooseProtocolVersion(
CryptoProtocolVersion[] supportedVersions) EncryptionZone zone, CryptoProtocolVersion[] supportedVersions)
throws UnknownCryptoProtocolVersionException, UnresolvedLinkException, throws UnknownCryptoProtocolVersionException, UnresolvedLinkException,
SnapshotAccessControlException { SnapshotAccessControlException {
Preconditions.checkNotNull(zone); Preconditions.checkNotNull(zone);
@ -2342,11 +2340,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
String holder, String clientMachine, EnumSet<CreateFlag> flag, String holder, String clientMachine, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize, boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, boolean logRetryCache) CryptoProtocolVersion[] supportedVersions, boolean logRetryCache)
throws AccessControlException, SafeModeException, throws IOException {
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
HdfsFileStatus status = null; HdfsFileStatus status;
try { try {
status = startFileInt(src, permissions, holder, clientMachine, flag, status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize, supportedVersions, createParent, replication, blockSize, supportedVersions,
@ -2355,54 +2351,42 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
logAuditEvent(false, "create", src); logAuditEvent(false, "create", src);
throw e; throw e;
} }
logAuditEvent(true, "create", src, null, status);
return status; return status;
} }
private HdfsFileStatus startFileInt(final String srcArg, private HdfsFileStatus startFileInt(final String src,
PermissionStatus permissions, String holder, String clientMachine, PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent, short replication, EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, CryptoProtocolVersion[] supportedVersions, long blockSize, CryptoProtocolVersion[] supportedVersions,
boolean logRetryCache) boolean logRetryCache)
throws AccessControlException, SafeModeException, throws IOException {
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
String src = srcArg;
if (NameNode.stateChangeLog.isDebugEnabled()) { if (NameNode.stateChangeLog.isDebugEnabled()) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("DIR* NameSystem.startFile: src=" + src builder.append("DIR* NameSystem.startFile: src=").append(src)
+ ", holder=" + holder .append(", holder=").append(holder)
+ ", clientMachine=" + clientMachine .append(", clientMachine=").append(clientMachine)
+ ", createParent=" + createParent .append(", createParent=").append(createParent)
+ ", replication=" + replication .append(", replication=").append(replication)
+ ", createFlag=" + flag.toString() .append(", createFlag=").append(flag.toString())
+ ", blockSize=" + blockSize); .append(", blockSize=").append(blockSize)
builder.append(", supportedVersions="); .append(", supportedVersions=")
if (supportedVersions != null) { .append(supportedVersions == null ? null : Arrays.toString
builder.append(Arrays.toString(supportedVersions)); (supportedVersions));
} else {
builder.append("null");
}
NameNode.stateChangeLog.debug(builder.toString()); NameNode.stateChangeLog.debug(builder.toString());
} }
if (!DFSUtil.isValidName(src)) { if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src); throw new InvalidPathException(src);
} }
blockManager.verifyReplication(src, replication, clientMachine); blockManager.verifyReplication(src, replication, clientMachine);
boolean skipSync = false;
HdfsFileStatus stat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (blockSize < minBlockSize) { if (blockSize < minBlockSize) {
throw new IOException("Specified block size is less than configured" + throw new IOException("Specified block size is less than configured" +
" minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
+ "): " + blockSize + " < " + minBlockSize); + "): " + blockSize + " < " + minBlockSize);
} }
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean create = flag.contains(CreateFlag.CREATE);
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
FSPermissionChecker pc = getPermissionChecker();
waitForLoadingFSImage(); waitForLoadingFSImage();
/** /**
@ -2417,245 +2401,61 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* special RetryStartFileException to ask the DFSClient to try the create * special RetryStartFileException to ask the DFSClient to try the create
* again later. * again later.
*/ */
CryptoProtocolVersion protocolVersion = null; FSDirWriteFileOp.EncryptionKeyInfo ezInfo = null;
CipherSuite suite = null;
String ezKeyName = null;
EncryptedKeyVersion edek = null;
if (provider != null) { if (provider != null) {
readLock(); readLock();
try { try {
src = dir.resolvePath(pc, src, pathComponents); checkOperation(OperationCategory.READ);
INodesInPath iip = dir.getINodesInPath4Write(src); ezInfo = FSDirWriteFileOp
// Nothing to do if the path is not within an EZ .getEncryptionKeyInfo(this, pc, src, supportedVersions);
final EncryptionZone zone = dir.getEZForPath(iip);
if (zone != null) {
protocolVersion = chooseProtocolVersion(zone, supportedVersions);
suite = zone.getSuite();
ezKeyName = zone.getKeyName();
Preconditions.checkNotNull(protocolVersion);
Preconditions.checkNotNull(suite);
Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
"Chose an UNKNOWN CipherSuite!");
Preconditions.checkNotNull(ezKeyName);
}
} finally { } finally {
readUnlock(); readUnlock();
} }
Preconditions.checkState(
(suite == null && ezKeyName == null) ||
(suite != null && ezKeyName != null),
"Both suite and ezKeyName should both be null or not null");
// Generate EDEK if necessary while not holding the lock // Generate EDEK if necessary while not holding the lock
edek = generateEncryptedDataEncryptionKey(ezKeyName); if (ezInfo != null) {
ezInfo.edek = generateEncryptedDataEncryptionKey(ezInfo.ezKeyName);
}
EncryptionFaultInjector.getInstance().startFileAfterGenerateKey(); EncryptionFaultInjector.getInstance().startFileAfterGenerateKey();
} }
// Proceed with the create, using the computed cipher suite and boolean skipSync = false;
HdfsFileStatus stat = null;
// Proceed with the create, using the computed cipher suite and
// generated EDEK // generated EDEK
BlocksMapUpdateInfo toRemoveBlocks = null; BlocksMapUpdateInfo toRemoveBlocks = new BlocksMapUpdateInfo();
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create file" + src); checkNameNodeSafeMode("Cannot create file" + src);
dir.writeLock(); dir.writeLock();
try { try {
src = dir.resolvePath(pc, src, pathComponents); stat = FSDirWriteFileOp.startFile(this, pc, src, permissions, holder,
final INodesInPath iip = dir.getINodesInPath4Write(src); clientMachine, flag, createParent,
toRemoveBlocks = startFileInternal( replication, blockSize, ezInfo,
pc, iip, permissions, holder, toRemoveBlocks, logRetryCache);
clientMachine, create, overwrite,
createParent, replication, blockSize,
isLazyPersist, suite, protocolVersion, edek,
logRetryCache);
stat = FSDirStatAndListingOp.getFileInfo(
dir, src, false, FSDirectory.isReservedRawName(srcArg), true);
} finally { } finally {
dir.writeUnlock(); dir.writeUnlock();
} }
} catch (StandbyException se) { } catch (IOException e) {
skipSync = true; skipSync = e instanceof StandbyException;
throw se; throw e;
} finally { } finally {
writeUnlock(); writeUnlock();
// There might be transactions logged while trying to recover the lease. // There might be transactions logged while trying to recover the lease.
// They need to be sync'ed even when an exception was thrown. // They need to be sync'ed even when an exception was thrown.
if (!skipSync) { if (!skipSync) {
getEditLog().logSync(); getEditLog().logSync();
if (toRemoveBlocks != null) { removeBlocks(toRemoveBlocks);
removeBlocks(toRemoveBlocks); toRemoveBlocks.clear();
toRemoveBlocks.clear();
}
} }
} }
logAuditEvent(true, "create", srcArg, null, stat);
return stat; return stat;
} }
/**
* Create a new file or overwrite an existing file<br>
*
* Once the file is create the client then allocates a new block with the next
* call using {@link ClientProtocol#addBlock}.
* <p>
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#create}
*/
private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc,
INodesInPath iip, PermissionStatus permissions, String holder,
String clientMachine, boolean create, boolean overwrite,
boolean createParent, short replication, long blockSize,
boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
EncryptedKeyVersion edek, boolean logRetryEntry)
throws IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
final INode inode = iip.getLastINode();
final String src = iip.getPath();
if (inode != null && inode.isDirectory()) {
throw new FileAlreadyExistsException(src +
" already exists as a directory");
}
final INodeFile myFile = INodeFile.valueOf(inode, src, true);
if (isPermissionEnabled) {
if (overwrite && myFile != null) {
dir.checkPathAccess(pc, iip, FsAction.WRITE);
}
/*
* To overwrite existing file, need to check 'w' permission
* of parent (equals to ancestor in this case)
*/
dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
}
if (!createParent) {
dir.verifyParentDir(iip, src);
}
FileEncryptionInfo feInfo = null;
final EncryptionZone zone = dir.getEZForPath(iip);
if (zone != null) {
// The path is now within an EZ, but we're missing encryption parameters
if (suite == null || edek == null) {
throw new RetryStartFileException();
}
// Path is within an EZ and we have provided encryption parameters.
// Make sure that the generated EDEK matches the settings of the EZ.
final String ezKeyName = zone.getKeyName();
if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
throw new RetryStartFileException();
}
feInfo = new FileEncryptionInfo(suite, version,
edek.getEncryptedKeyVersion().getMaterial(),
edek.getEncryptedKeyIv(),
ezKeyName, edek.getEncryptionKeyVersionName());
}
try {
BlocksMapUpdateInfo toRemoveBlocks = null;
if (myFile == null) {
if (!create) {
throw new FileNotFoundException("Can't overwrite non-existent " +
src + " for client " + clientMachine);
}
} else {
if (overwrite) {
toRemoveBlocks = new BlocksMapUpdateInfo();
List<INode> toRemoveINodes = new ChunkedArrayList<>();
List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
long ret = FSDirDeleteOp.delete(
dir, iip, toRemoveBlocks, toRemoveINodes,
toRemoveUCFiles, now());
if (ret >= 0) {
iip = INodesInPath.replace(iip, iip.length() - 1, null);
FSDirDeleteOp.incrDeletedFileCount(ret);
removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
}
} else {
// If lease soft limit time is expired, recover the lease
recoverLeaseInternal(RecoverLeaseOp.CREATE_FILE,
iip, src, holder, clientMachine, false);
throw new FileAlreadyExistsException(src + " for client " +
clientMachine + " already exists");
}
}
checkFsObjectLimit();
INodeFile newNode = null;
// Always do an implicit mkdirs for parent directory tree.
Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
.createAncestorDirectories(dir, iip, permissions);
if (parent != null) {
iip = dir.addFile(parent.getKey(), parent.getValue(), permissions,
replication, blockSize, holder, clientMachine);
newNode = iip != null ? iip.getLastINode().asFile() : null;
}
if (newNode == null) {
throw new IOException("Unable to add " + src + " to namespace");
}
leaseManager.addLease(newNode.getFileUnderConstructionFeature()
.getClientName(), newNode.getId());
// Set encryption attributes if necessary
if (feInfo != null) {
dir.setFileEncryptionInfo(src, feInfo);
newNode = dir.getInode(newNode.getId()).asFile();
}
setNewINodeStoragePolicy(newNode, iip, isLazyPersist);
// record file record in log, record new generation stamp
getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
src + " inode " + newNode.getId() + " " + holder);
}
return toRemoveBlocks;
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
ie.getMessage());
throw ie;
}
}
private void setNewINodeStoragePolicy(INodeFile inode,
INodesInPath iip,
boolean isLazyPersist)
throws IOException {
if (isLazyPersist) {
BlockStoragePolicy lpPolicy =
blockManager.getStoragePolicy("LAZY_PERSIST");
// Set LAZY_PERSIST storage policy if the flag was passed to
// CreateFile.
if (lpPolicy == null) {
throw new HadoopIllegalArgumentException(
"The LAZY_PERSIST storage policy has been disabled " +
"by the administrator.");
}
inode.setStoragePolicyID(lpPolicy.getId(),
iip.getLatestSnapshotId());
} else {
BlockStoragePolicy effectivePolicy =
blockManager.getStoragePolicy(inode.getStoragePolicyID());
if (effectivePolicy != null &&
effectivePolicy.isCopyOnCreateFile()) {
// Copy effective policy from ancestor directory to current file.
inode.setStoragePolicyID(effectivePolicy.getId(),
iip.getLatestSnapshotId());
}
}
}
/** /**
* Append to an existing file for append. * Append to an existing file for append.
* <p> * <p>
@ -2871,7 +2671,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return false; return false;
} }
private enum RecoverLeaseOp { enum RecoverLeaseOp {
CREATE_FILE, CREATE_FILE,
APPEND_FILE, APPEND_FILE,
TRUNCATE_FILE, TRUNCATE_FILE,