HDFS-8421. Move startFile() and related functions into FSDirWriteFileOp. Contributed by Haohui Mai.
This commit is contained in:
parent
157ecb2241
commit
0b909d028f
|
@ -240,6 +240,9 @@ Release 2.8.0 - UNRELEASED
|
|||
|
||||
HDFS-4383. Document the lease limits. (Arshad Mohammad via aajisaka)
|
||||
|
||||
HDFS-8421. Move startFile() and related functions into FSDirWriteFileOp.
|
||||
(wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -18,11 +18,27 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.io.Charsets;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||
import org.apache.hadoop.fs.XAttr;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
|
@ -34,15 +50,22 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
import org.apache.hadoop.util.ChunkedArrayList;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
|
||||
class FSDirWriteFileOp {
|
||||
private FSDirWriteFileOp() {}
|
||||
static boolean unprotectedRemoveBlock(
|
||||
|
@ -277,6 +300,210 @@ class FSDirWriteFileOp {
|
|||
return clientNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new file or overwrite an existing file<br>
|
||||
*
|
||||
* Once the file is create the client then allocates a new block with the next
|
||||
* call using {@link ClientProtocol#addBlock}.
|
||||
* <p>
|
||||
* For description of parameters and exceptions thrown see
|
||||
* {@link ClientProtocol#create}
|
||||
*/
|
||||
static HdfsFileStatus startFile(
|
||||
FSNamesystem fsn, FSPermissionChecker pc, String src,
|
||||
PermissionStatus permissions, String holder, String clientMachine,
|
||||
EnumSet<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize,
|
||||
EncryptionKeyInfo ezInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
|
||||
boolean logRetryEntry)
|
||||
throws IOException {
|
||||
assert fsn.hasWriteLock();
|
||||
|
||||
boolean create = flag.contains(CreateFlag.CREATE);
|
||||
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
|
||||
boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
|
||||
|
||||
CipherSuite suite = null;
|
||||
CryptoProtocolVersion version = null;
|
||||
KeyProviderCryptoExtension.EncryptedKeyVersion edek = null;
|
||||
|
||||
if (ezInfo != null) {
|
||||
edek = ezInfo.edek;
|
||||
suite = ezInfo.suite;
|
||||
version = ezInfo.protocolVersion;
|
||||
}
|
||||
|
||||
boolean isRawPath = FSDirectory.isReservedRawName(src);
|
||||
FSDirectory fsd = fsn.getFSDirectory();
|
||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||
src = fsd.resolvePath(pc, src, pathComponents);
|
||||
INodesInPath iip = fsd.getINodesInPath4Write(src);
|
||||
|
||||
// Verify that the destination does not exist as a directory already.
|
||||
final INode inode = iip.getLastINode();
|
||||
if (inode != null && inode.isDirectory()) {
|
||||
throw new FileAlreadyExistsException(src +
|
||||
" already exists as a directory");
|
||||
}
|
||||
|
||||
final INodeFile myFile = INodeFile.valueOf(inode, src, true);
|
||||
if (fsd.isPermissionEnabled()) {
|
||||
if (overwrite && myFile != null) {
|
||||
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
|
||||
}
|
||||
/*
|
||||
* To overwrite existing file, need to check 'w' permission
|
||||
* of parent (equals to ancestor in this case)
|
||||
*/
|
||||
fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
|
||||
}
|
||||
|
||||
if (!createParent) {
|
||||
fsd.verifyParentDir(iip, src);
|
||||
}
|
||||
|
||||
if (myFile == null && !create) {
|
||||
throw new FileNotFoundException("Can't overwrite non-existent " +
|
||||
src + " for client " + clientMachine);
|
||||
}
|
||||
|
||||
FileEncryptionInfo feInfo = null;
|
||||
|
||||
final EncryptionZone zone = fsd.getEZForPath(iip);
|
||||
if (zone != null) {
|
||||
// The path is now within an EZ, but we're missing encryption parameters
|
||||
if (suite == null || edek == null) {
|
||||
throw new RetryStartFileException();
|
||||
}
|
||||
// Path is within an EZ and we have provided encryption parameters.
|
||||
// Make sure that the generated EDEK matches the settings of the EZ.
|
||||
final String ezKeyName = zone.getKeyName();
|
||||
if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
|
||||
throw new RetryStartFileException();
|
||||
}
|
||||
feInfo = new FileEncryptionInfo(suite, version,
|
||||
edek.getEncryptedKeyVersion().getMaterial(),
|
||||
edek.getEncryptedKeyIv(),
|
||||
ezKeyName, edek.getEncryptionKeyVersionName());
|
||||
}
|
||||
|
||||
if (myFile != null) {
|
||||
if (overwrite) {
|
||||
List<INode> toRemoveINodes = new ChunkedArrayList<>();
|
||||
List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
|
||||
long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,
|
||||
toRemoveINodes, toRemoveUCFiles, now());
|
||||
if (ret >= 0) {
|
||||
iip = INodesInPath.replace(iip, iip.length() - 1, null);
|
||||
FSDirDeleteOp.incrDeletedFileCount(ret);
|
||||
fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
|
||||
}
|
||||
} else {
|
||||
// If lease soft limit time is expired, recover the lease
|
||||
fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
|
||||
src, holder, clientMachine, false);
|
||||
throw new FileAlreadyExistsException(src + " for client " +
|
||||
clientMachine + " already exists");
|
||||
}
|
||||
}
|
||||
fsn.checkFsObjectLimit();
|
||||
INodeFile newNode = null;
|
||||
Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
|
||||
.createAncestorDirectories(fsd, iip, permissions);
|
||||
if (parent != null) {
|
||||
iip = addFile(fsd, parent.getKey(), parent.getValue(), permissions,
|
||||
replication, blockSize, holder, clientMachine);
|
||||
newNode = iip != null ? iip.getLastINode().asFile() : null;
|
||||
}
|
||||
if (newNode == null) {
|
||||
throw new IOException("Unable to add " + src + " to namespace");
|
||||
}
|
||||
fsn.leaseManager.addLease(
|
||||
newNode.getFileUnderConstructionFeature().getClientName(),
|
||||
newNode.getId());
|
||||
if (feInfo != null) {
|
||||
fsd.setFileEncryptionInfo(src, feInfo);
|
||||
newNode = fsd.getInode(newNode.getId()).asFile();
|
||||
}
|
||||
setNewINodeStoragePolicy(fsn.getBlockManager(), newNode, iip,
|
||||
isLazyPersist);
|
||||
fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
|
||||
src + " inode " + newNode.getId() + " " + holder);
|
||||
}
|
||||
return FSDirStatAndListingOp.getFileInfo(fsd, src, false, isRawPath, true);
|
||||
}
|
||||
|
||||
static EncryptionKeyInfo getEncryptionKeyInfo(FSNamesystem fsn,
|
||||
FSPermissionChecker pc, String src,
|
||||
CryptoProtocolVersion[] supportedVersions)
|
||||
throws IOException {
|
||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||
FSDirectory fsd = fsn.getFSDirectory();
|
||||
src = fsd.resolvePath(pc, src, pathComponents);
|
||||
INodesInPath iip = fsd.getINodesInPath4Write(src);
|
||||
// Nothing to do if the path is not within an EZ
|
||||
final EncryptionZone zone = fsd.getEZForPath(iip);
|
||||
if (zone == null) {
|
||||
return null;
|
||||
}
|
||||
CryptoProtocolVersion protocolVersion = fsn.chooseProtocolVersion(
|
||||
zone, supportedVersions);
|
||||
CipherSuite suite = zone.getSuite();
|
||||
String ezKeyName = zone.getKeyName();
|
||||
|
||||
Preconditions.checkNotNull(protocolVersion);
|
||||
Preconditions.checkNotNull(suite);
|
||||
Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
|
||||
"Chose an UNKNOWN CipherSuite!");
|
||||
Preconditions.checkNotNull(ezKeyName);
|
||||
return new EncryptionKeyInfo(protocolVersion, suite, ezKeyName);
|
||||
}
|
||||
|
||||
static INodeFile addFileForEditLog(
|
||||
FSDirectory fsd, long id, INodesInPath existing, byte[] localName,
|
||||
PermissionStatus permissions, List<AclEntry> aclEntries,
|
||||
List<XAttr> xAttrs, short replication, long modificationTime, long atime,
|
||||
long preferredBlockSize, boolean underConstruction, String clientName,
|
||||
String clientMachine, byte storagePolicyId) {
|
||||
final INodeFile newNode;
|
||||
assert fsd.hasWriteLock();
|
||||
if (underConstruction) {
|
||||
newNode = newINodeFile(id, permissions, modificationTime,
|
||||
modificationTime, replication,
|
||||
preferredBlockSize,
|
||||
storagePolicyId);
|
||||
newNode.toUnderConstruction(clientName, clientMachine);
|
||||
} else {
|
||||
newNode = newINodeFile(id, permissions, modificationTime,
|
||||
atime, replication,
|
||||
preferredBlockSize,
|
||||
storagePolicyId);
|
||||
}
|
||||
|
||||
newNode.setLocalName(localName);
|
||||
try {
|
||||
INodesInPath iip = fsd.addINode(existing, newNode);
|
||||
if (iip != null) {
|
||||
if (aclEntries != null) {
|
||||
AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
|
||||
}
|
||||
if (xAttrs != null) {
|
||||
XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID);
|
||||
}
|
||||
return newNode;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug(
|
||||
"DIR* FSDirectory.unprotectedAddFile: exception when add "
|
||||
+ existing.getPath() + " to the file system", e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a block to the file. Returns a reference to the added block.
|
||||
*/
|
||||
|
@ -314,6 +541,41 @@ class FSDirWriteFileOp {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the given filename to the fs.
|
||||
* @return the new INodesInPath instance that contains the new INode
|
||||
*/
|
||||
private static INodesInPath addFile(
|
||||
FSDirectory fsd, INodesInPath existing, String localName,
|
||||
PermissionStatus permissions, short replication, long preferredBlockSize,
|
||||
String clientName, String clientMachine)
|
||||
throws IOException {
|
||||
|
||||
long modTime = now();
|
||||
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
|
||||
modTime, modTime, replication, preferredBlockSize);
|
||||
newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
|
||||
newNode.toUnderConstruction(clientName, clientMachine);
|
||||
|
||||
INodesInPath newiip;
|
||||
fsd.writeLock();
|
||||
try {
|
||||
newiip = fsd.addINode(existing, newNode);
|
||||
} finally {
|
||||
fsd.writeUnlock();
|
||||
}
|
||||
if (newiip == null) {
|
||||
NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
|
||||
existing.getPath() + "/" + localName);
|
||||
return null;
|
||||
}
|
||||
|
||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added");
|
||||
}
|
||||
return newiip;
|
||||
}
|
||||
|
||||
private static FileState analyzeFileState(
|
||||
FSNamesystem fsn, String src, long fileId, String clientName,
|
||||
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
|
||||
|
@ -345,8 +607,7 @@ class FSDirWriteFileOp {
|
|||
src = iip.getPath();
|
||||
}
|
||||
}
|
||||
final INodeFile file = fsn.checkLease(src, clientName,
|
||||
inode, fileId);
|
||||
final INodeFile file = fsn.checkLease(src, clientName, inode, fileId);
|
||||
BlockInfoContiguous lastBlockInFile = file.getLastBlock();
|
||||
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
|
||||
// The block that the client claims is the current last block
|
||||
|
@ -497,6 +758,20 @@ class FSDirWriteFileOp {
|
|||
return true;
|
||||
}
|
||||
|
||||
private static INodeFile newINodeFile(
|
||||
long id, PermissionStatus permissions, long mtime, long atime,
|
||||
short replication, long preferredBlockSize, byte storagePolicyId) {
|
||||
return new INodeFile(id, null, permissions, mtime, atime,
|
||||
BlockInfoContiguous.EMPTY_ARRAY, replication, preferredBlockSize,
|
||||
storagePolicyId);
|
||||
}
|
||||
|
||||
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
|
||||
long mtime, long atime, short replication, long preferredBlockSize) {
|
||||
return newINodeFile(id, permissions, mtime, atime, replication,
|
||||
preferredBlockSize, (byte)0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist the new block (the last block of the given file).
|
||||
*/
|
||||
|
@ -533,6 +808,36 @@ class FSDirWriteFileOp {
|
|||
DatanodeStorageInfo.incrementBlocksScheduled(targets);
|
||||
}
|
||||
|
||||
private static void setNewINodeStoragePolicy(BlockManager bm, INodeFile
|
||||
inode, INodesInPath iip, boolean isLazyPersist)
|
||||
throws IOException {
|
||||
|
||||
if (isLazyPersist) {
|
||||
BlockStoragePolicy lpPolicy =
|
||||
bm.getStoragePolicy("LAZY_PERSIST");
|
||||
|
||||
// Set LAZY_PERSIST storage policy if the flag was passed to
|
||||
// CreateFile.
|
||||
if (lpPolicy == null) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"The LAZY_PERSIST storage policy has been disabled " +
|
||||
"by the administrator.");
|
||||
}
|
||||
inode.setStoragePolicyID(lpPolicy.getId(),
|
||||
iip.getLatestSnapshotId());
|
||||
} else {
|
||||
BlockStoragePolicy effectivePolicy =
|
||||
bm.getStoragePolicy(inode.getStoragePolicyID());
|
||||
|
||||
if (effectivePolicy != null &&
|
||||
effectivePolicy.isCopyOnCreateFile()) {
|
||||
// Copy effective policy from ancestor directory to current file.
|
||||
inode.setStoragePolicyID(effectivePolicy.getId(),
|
||||
iip.getLatestSnapshotId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class FileState {
|
||||
final INodeFile inode;
|
||||
final String path;
|
||||
|
@ -560,4 +865,19 @@ class FSDirWriteFileOp {
|
|||
this.clientMachine = clientMachine;
|
||||
}
|
||||
}
|
||||
|
||||
static class EncryptionKeyInfo {
|
||||
final CryptoProtocolVersion protocolVersion;
|
||||
final CipherSuite suite;
|
||||
final String ezKeyName;
|
||||
KeyProviderCryptoExtension.EncryptedKeyVersion edek;
|
||||
|
||||
EncryptionKeyInfo(
|
||||
CryptoProtocolVersion protocolVersion, CipherSuite suite,
|
||||
String ezKeyName) {
|
||||
this.protocolVersion = protocolVersion;
|
||||
this.suite = suite;
|
||||
this.ezKeyName = ezKeyName;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,13 +21,11 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.apache.commons.io.Charsets;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -42,7 +40,6 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||
import org.apache.hadoop.hdfs.protocol.AclException;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
|
@ -86,7 +83,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE
|
|||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
|
||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
|
||||
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
|
||||
/**
|
||||
* Both FSDirectory and FSNamesystem manage the state of the namespace.
|
||||
|
@ -388,93 +384,6 @@ public class FSDirectory implements Closeable {
|
|||
skipQuotaCheck = true;
|
||||
}
|
||||
|
||||
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
|
||||
long mtime, long atime, short replication, long preferredBlockSize) {
|
||||
return newINodeFile(id, permissions, mtime, atime, replication,
|
||||
preferredBlockSize, (byte)0);
|
||||
}
|
||||
|
||||
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
|
||||
long mtime, long atime, short replication, long preferredBlockSize,
|
||||
byte storagePolicyId) {
|
||||
return new INodeFile(id, null, permissions, mtime, atime,
|
||||
BlockInfoContiguous.EMPTY_ARRAY, replication, preferredBlockSize,
|
||||
storagePolicyId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the given filename to the fs.
|
||||
* @return the new INodesInPath instance that contains the new INode
|
||||
*/
|
||||
INodesInPath addFile(INodesInPath existing, String localName, PermissionStatus
|
||||
permissions, short replication, long preferredBlockSize,
|
||||
String clientName, String clientMachine)
|
||||
throws FileAlreadyExistsException, QuotaExceededException,
|
||||
UnresolvedLinkException, SnapshotAccessControlException, AclException {
|
||||
|
||||
long modTime = now();
|
||||
INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime,
|
||||
modTime, replication, preferredBlockSize);
|
||||
newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
|
||||
newNode.toUnderConstruction(clientName, clientMachine);
|
||||
|
||||
INodesInPath newiip;
|
||||
writeLock();
|
||||
try {
|
||||
newiip = addINode(existing, newNode);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
if (newiip == null) {
|
||||
NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
|
||||
existing.getPath() + "/" + localName);
|
||||
return null;
|
||||
}
|
||||
|
||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added");
|
||||
}
|
||||
return newiip;
|
||||
}
|
||||
|
||||
INodeFile addFileForEditLog(long id, INodesInPath existing, byte[] localName,
|
||||
PermissionStatus permissions, List<AclEntry> aclEntries,
|
||||
List<XAttr> xAttrs, short replication, long modificationTime, long atime,
|
||||
long preferredBlockSize, boolean underConstruction, String clientName,
|
||||
String clientMachine, byte storagePolicyId) {
|
||||
final INodeFile newNode;
|
||||
assert hasWriteLock();
|
||||
if (underConstruction) {
|
||||
newNode = newINodeFile(id, permissions, modificationTime,
|
||||
modificationTime, replication, preferredBlockSize, storagePolicyId);
|
||||
newNode.toUnderConstruction(clientName, clientMachine);
|
||||
} else {
|
||||
newNode = newINodeFile(id, permissions, modificationTime, atime,
|
||||
replication, preferredBlockSize, storagePolicyId);
|
||||
}
|
||||
|
||||
newNode.setLocalName(localName);
|
||||
try {
|
||||
INodesInPath iip = addINode(existing, newNode);
|
||||
if (iip != null) {
|
||||
if (aclEntries != null) {
|
||||
AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
|
||||
}
|
||||
if (xAttrs != null) {
|
||||
XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID);
|
||||
}
|
||||
return newNode;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug(
|
||||
"DIR* FSDirectory.unprotectedAddFile: exception when add "
|
||||
+ existing.getPath() + " to the file system", e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a wrapper for resolvePath(). If the path passed
|
||||
* is prefixed with /.reserved/raw, then it checks to ensure that the caller
|
||||
|
|
|
@ -365,15 +365,12 @@ public class FSEditLogLoader {
|
|||
|
||||
// add to the file tree
|
||||
inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion, lastInodeId);
|
||||
newFile = fsDir.addFileForEditLog(inodeId, iip.getExistingINodes(),
|
||||
iip.getLastLocalName(),
|
||||
addCloseOp.permissions,
|
||||
addCloseOp.aclEntries,
|
||||
addCloseOp.xAttrs, replication,
|
||||
addCloseOp.mtime, addCloseOp.atime,
|
||||
addCloseOp.blockSize, true,
|
||||
addCloseOp.clientName,
|
||||
addCloseOp.clientMachine,
|
||||
newFile = FSDirWriteFileOp.addFileForEditLog(fsDir, inodeId,
|
||||
iip.getExistingINodes(), iip.getLastLocalName(),
|
||||
addCloseOp.permissions, addCloseOp.aclEntries,
|
||||
addCloseOp.xAttrs, replication, addCloseOp.mtime,
|
||||
addCloseOp.atime, addCloseOp.blockSize, true,
|
||||
addCloseOp.clientName, addCloseOp.clientMachine,
|
||||
addCloseOp.storagePolicyId);
|
||||
iip = INodesInPath.replace(iip, iip.length() - 1, newFile);
|
||||
fsNamesys.leaseManager.addLease(addCloseOp.clientName, newFile.getId());
|
||||
|
|
|
@ -154,7 +154,6 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.InvalidPathException;
|
||||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.XAttr;
|
||||
|
@ -278,7 +277,6 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
import org.apache.hadoop.util.ChunkedArrayList;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
@ -2269,8 +2267,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* @return chosen protocol version
|
||||
* @throws IOException
|
||||
*/
|
||||
private CryptoProtocolVersion chooseProtocolVersion(EncryptionZone zone,
|
||||
CryptoProtocolVersion[] supportedVersions)
|
||||
CryptoProtocolVersion chooseProtocolVersion(
|
||||
EncryptionZone zone, CryptoProtocolVersion[] supportedVersions)
|
||||
throws UnknownCryptoProtocolVersionException, UnresolvedLinkException,
|
||||
SnapshotAccessControlException {
|
||||
Preconditions.checkNotNull(zone);
|
||||
|
@ -2332,11 +2330,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
String holder, String clientMachine, EnumSet<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize,
|
||||
CryptoProtocolVersion[] supportedVersions, boolean logRetryCache)
|
||||
throws AccessControlException, SafeModeException,
|
||||
FileAlreadyExistsException, UnresolvedLinkException,
|
||||
FileNotFoundException, ParentNotDirectoryException, IOException {
|
||||
throws IOException {
|
||||
|
||||
HdfsFileStatus status = null;
|
||||
HdfsFileStatus status;
|
||||
try {
|
||||
status = startFileInt(src, permissions, holder, clientMachine, flag,
|
||||
createParent, replication, blockSize, supportedVersions,
|
||||
|
@ -2345,54 +2341,42 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
logAuditEvent(false, "create", src);
|
||||
throw e;
|
||||
}
|
||||
logAuditEvent(true, "create", src, null, status);
|
||||
return status;
|
||||
}
|
||||
|
||||
private HdfsFileStatus startFileInt(final String srcArg,
|
||||
private HdfsFileStatus startFileInt(final String src,
|
||||
PermissionStatus permissions, String holder, String clientMachine,
|
||||
EnumSet<CreateFlag> flag, boolean createParent, short replication,
|
||||
long blockSize, CryptoProtocolVersion[] supportedVersions,
|
||||
boolean logRetryCache)
|
||||
throws AccessControlException, SafeModeException,
|
||||
FileAlreadyExistsException, UnresolvedLinkException,
|
||||
FileNotFoundException, ParentNotDirectoryException, IOException {
|
||||
String src = srcArg;
|
||||
throws IOException {
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("DIR* NameSystem.startFile: src=" + src
|
||||
+ ", holder=" + holder
|
||||
+ ", clientMachine=" + clientMachine
|
||||
+ ", createParent=" + createParent
|
||||
+ ", replication=" + replication
|
||||
+ ", createFlag=" + flag.toString()
|
||||
+ ", blockSize=" + blockSize);
|
||||
builder.append(", supportedVersions=");
|
||||
if (supportedVersions != null) {
|
||||
builder.append(Arrays.toString(supportedVersions));
|
||||
} else {
|
||||
builder.append("null");
|
||||
}
|
||||
builder.append("DIR* NameSystem.startFile: src=").append(src)
|
||||
.append(", holder=").append(holder)
|
||||
.append(", clientMachine=").append(clientMachine)
|
||||
.append(", createParent=").append(createParent)
|
||||
.append(", replication=").append(replication)
|
||||
.append(", createFlag=").append(flag.toString())
|
||||
.append(", blockSize=").append(blockSize)
|
||||
.append(", supportedVersions=")
|
||||
.append(supportedVersions == null ? null : Arrays.toString
|
||||
(supportedVersions));
|
||||
NameNode.stateChangeLog.debug(builder.toString());
|
||||
}
|
||||
if (!DFSUtil.isValidName(src)) {
|
||||
throw new InvalidPathException(src);
|
||||
}
|
||||
blockManager.verifyReplication(src, replication, clientMachine);
|
||||
|
||||
boolean skipSync = false;
|
||||
HdfsFileStatus stat = null;
|
||||
FSPermissionChecker pc = getPermissionChecker();
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
if (blockSize < minBlockSize) {
|
||||
throw new IOException("Specified block size is less than configured" +
|
||||
" minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
|
||||
+ "): " + blockSize + " < " + minBlockSize);
|
||||
}
|
||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||
boolean create = flag.contains(CreateFlag.CREATE);
|
||||
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
|
||||
boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
|
||||
|
||||
FSPermissionChecker pc = getPermissionChecker();
|
||||
waitForLoadingFSImage();
|
||||
|
||||
/**
|
||||
|
@ -2407,245 +2391,61 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* special RetryStartFileException to ask the DFSClient to try the create
|
||||
* again later.
|
||||
*/
|
||||
CryptoProtocolVersion protocolVersion = null;
|
||||
CipherSuite suite = null;
|
||||
String ezKeyName = null;
|
||||
EncryptedKeyVersion edek = null;
|
||||
FSDirWriteFileOp.EncryptionKeyInfo ezInfo = null;
|
||||
|
||||
if (provider != null) {
|
||||
readLock();
|
||||
try {
|
||||
src = dir.resolvePath(pc, src, pathComponents);
|
||||
INodesInPath iip = dir.getINodesInPath4Write(src);
|
||||
// Nothing to do if the path is not within an EZ
|
||||
final EncryptionZone zone = dir.getEZForPath(iip);
|
||||
if (zone != null) {
|
||||
protocolVersion = chooseProtocolVersion(zone, supportedVersions);
|
||||
suite = zone.getSuite();
|
||||
ezKeyName = zone.getKeyName();
|
||||
|
||||
Preconditions.checkNotNull(protocolVersion);
|
||||
Preconditions.checkNotNull(suite);
|
||||
Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
|
||||
"Chose an UNKNOWN CipherSuite!");
|
||||
Preconditions.checkNotNull(ezKeyName);
|
||||
}
|
||||
checkOperation(OperationCategory.READ);
|
||||
ezInfo = FSDirWriteFileOp
|
||||
.getEncryptionKeyInfo(this, pc, src, supportedVersions);
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
|
||||
Preconditions.checkState(
|
||||
(suite == null && ezKeyName == null) ||
|
||||
(suite != null && ezKeyName != null),
|
||||
"Both suite and ezKeyName should both be null or not null");
|
||||
|
||||
// Generate EDEK if necessary while not holding the lock
|
||||
edek = generateEncryptedDataEncryptionKey(ezKeyName);
|
||||
if (ezInfo != null) {
|
||||
ezInfo.edek = generateEncryptedDataEncryptionKey(ezInfo.ezKeyName);
|
||||
}
|
||||
EncryptionFaultInjector.getInstance().startFileAfterGenerateKey();
|
||||
}
|
||||
|
||||
// Proceed with the create, using the computed cipher suite and
|
||||
boolean skipSync = false;
|
||||
HdfsFileStatus stat = null;
|
||||
|
||||
// Proceed with the create, using the computed cipher suite and
|
||||
// generated EDEK
|
||||
BlocksMapUpdateInfo toRemoveBlocks = null;
|
||||
BlocksMapUpdateInfo toRemoveBlocks = new BlocksMapUpdateInfo();
|
||||
writeLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkNameNodeSafeMode("Cannot create file" + src);
|
||||
dir.writeLock();
|
||||
try {
|
||||
src = dir.resolvePath(pc, src, pathComponents);
|
||||
final INodesInPath iip = dir.getINodesInPath4Write(src);
|
||||
toRemoveBlocks = startFileInternal(
|
||||
pc, iip, permissions, holder,
|
||||
clientMachine, create, overwrite,
|
||||
createParent, replication, blockSize,
|
||||
isLazyPersist, suite, protocolVersion, edek,
|
||||
logRetryCache);
|
||||
stat = FSDirStatAndListingOp.getFileInfo(
|
||||
dir, src, false, FSDirectory.isReservedRawName(srcArg), true);
|
||||
stat = FSDirWriteFileOp.startFile(this, pc, src, permissions, holder,
|
||||
clientMachine, flag, createParent,
|
||||
replication, blockSize, ezInfo,
|
||||
toRemoveBlocks, logRetryCache);
|
||||
} finally {
|
||||
dir.writeUnlock();
|
||||
}
|
||||
} catch (StandbyException se) {
|
||||
skipSync = true;
|
||||
throw se;
|
||||
} catch (IOException e) {
|
||||
skipSync = e instanceof StandbyException;
|
||||
throw e;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
// There might be transactions logged while trying to recover the lease.
|
||||
// They need to be sync'ed even when an exception was thrown.
|
||||
if (!skipSync) {
|
||||
getEditLog().logSync();
|
||||
if (toRemoveBlocks != null) {
|
||||
removeBlocks(toRemoveBlocks);
|
||||
toRemoveBlocks.clear();
|
||||
}
|
||||
removeBlocks(toRemoveBlocks);
|
||||
toRemoveBlocks.clear();
|
||||
}
|
||||
}
|
||||
|
||||
logAuditEvent(true, "create", srcArg, null, stat);
|
||||
return stat;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new file or overwrite an existing file<br>
|
||||
*
|
||||
* Once the file is create the client then allocates a new block with the next
|
||||
* call using {@link ClientProtocol#addBlock}.
|
||||
* <p>
|
||||
* For description of parameters and exceptions thrown see
|
||||
* {@link ClientProtocol#create}
|
||||
*/
|
||||
private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc,
|
||||
INodesInPath iip, PermissionStatus permissions, String holder,
|
||||
String clientMachine, boolean create, boolean overwrite,
|
||||
boolean createParent, short replication, long blockSize,
|
||||
boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
|
||||
EncryptedKeyVersion edek, boolean logRetryEntry)
|
||||
throws IOException {
|
||||
assert hasWriteLock();
|
||||
// Verify that the destination does not exist as a directory already.
|
||||
final INode inode = iip.getLastINode();
|
||||
final String src = iip.getPath();
|
||||
if (inode != null && inode.isDirectory()) {
|
||||
throw new FileAlreadyExistsException(src +
|
||||
" already exists as a directory");
|
||||
}
|
||||
|
||||
final INodeFile myFile = INodeFile.valueOf(inode, src, true);
|
||||
if (isPermissionEnabled) {
|
||||
if (overwrite && myFile != null) {
|
||||
dir.checkPathAccess(pc, iip, FsAction.WRITE);
|
||||
}
|
||||
/*
|
||||
* To overwrite existing file, need to check 'w' permission
|
||||
* of parent (equals to ancestor in this case)
|
||||
*/
|
||||
dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
|
||||
}
|
||||
if (!createParent) {
|
||||
dir.verifyParentDir(iip, src);
|
||||
}
|
||||
|
||||
FileEncryptionInfo feInfo = null;
|
||||
|
||||
final EncryptionZone zone = dir.getEZForPath(iip);
|
||||
if (zone != null) {
|
||||
// The path is now within an EZ, but we're missing encryption parameters
|
||||
if (suite == null || edek == null) {
|
||||
throw new RetryStartFileException();
|
||||
}
|
||||
// Path is within an EZ and we have provided encryption parameters.
|
||||
// Make sure that the generated EDEK matches the settings of the EZ.
|
||||
final String ezKeyName = zone.getKeyName();
|
||||
if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
|
||||
throw new RetryStartFileException();
|
||||
}
|
||||
feInfo = new FileEncryptionInfo(suite, version,
|
||||
edek.getEncryptedKeyVersion().getMaterial(),
|
||||
edek.getEncryptedKeyIv(),
|
||||
ezKeyName, edek.getEncryptionKeyVersionName());
|
||||
}
|
||||
|
||||
try {
|
||||
BlocksMapUpdateInfo toRemoveBlocks = null;
|
||||
if (myFile == null) {
|
||||
if (!create) {
|
||||
throw new FileNotFoundException("Can't overwrite non-existent " +
|
||||
src + " for client " + clientMachine);
|
||||
}
|
||||
} else {
|
||||
if (overwrite) {
|
||||
toRemoveBlocks = new BlocksMapUpdateInfo();
|
||||
List<INode> toRemoveINodes = new ChunkedArrayList<>();
|
||||
List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
|
||||
long ret = FSDirDeleteOp.delete(
|
||||
dir, iip, toRemoveBlocks, toRemoveINodes,
|
||||
toRemoveUCFiles, now());
|
||||
if (ret >= 0) {
|
||||
iip = INodesInPath.replace(iip, iip.length() - 1, null);
|
||||
FSDirDeleteOp.incrDeletedFileCount(ret);
|
||||
removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
|
||||
}
|
||||
} else {
|
||||
// If lease soft limit time is expired, recover the lease
|
||||
recoverLeaseInternal(RecoverLeaseOp.CREATE_FILE,
|
||||
iip, src, holder, clientMachine, false);
|
||||
throw new FileAlreadyExistsException(src + " for client " +
|
||||
clientMachine + " already exists");
|
||||
}
|
||||
}
|
||||
|
||||
checkFsObjectLimit();
|
||||
INodeFile newNode = null;
|
||||
|
||||
// Always do an implicit mkdirs for parent directory tree.
|
||||
Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
|
||||
.createAncestorDirectories(dir, iip, permissions);
|
||||
if (parent != null) {
|
||||
iip = dir.addFile(parent.getKey(), parent.getValue(), permissions,
|
||||
replication, blockSize, holder, clientMachine);
|
||||
newNode = iip != null ? iip.getLastINode().asFile() : null;
|
||||
}
|
||||
|
||||
if (newNode == null) {
|
||||
throw new IOException("Unable to add " + src + " to namespace");
|
||||
}
|
||||
leaseManager.addLease(newNode.getFileUnderConstructionFeature()
|
||||
.getClientName(), newNode.getId());
|
||||
|
||||
// Set encryption attributes if necessary
|
||||
if (feInfo != null) {
|
||||
dir.setFileEncryptionInfo(src, feInfo);
|
||||
newNode = dir.getInode(newNode.getId()).asFile();
|
||||
}
|
||||
|
||||
setNewINodeStoragePolicy(newNode, iip, isLazyPersist);
|
||||
|
||||
// record file record in log, record new generation stamp
|
||||
getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
|
||||
src + " inode " + newNode.getId() + " " + holder);
|
||||
}
|
||||
return toRemoveBlocks;
|
||||
} catch (IOException ie) {
|
||||
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
|
||||
ie.getMessage());
|
||||
throw ie;
|
||||
}
|
||||
}
|
||||
|
||||
private void setNewINodeStoragePolicy(INodeFile inode,
|
||||
INodesInPath iip,
|
||||
boolean isLazyPersist)
|
||||
throws IOException {
|
||||
|
||||
if (isLazyPersist) {
|
||||
BlockStoragePolicy lpPolicy =
|
||||
blockManager.getStoragePolicy("LAZY_PERSIST");
|
||||
|
||||
// Set LAZY_PERSIST storage policy if the flag was passed to
|
||||
// CreateFile.
|
||||
if (lpPolicy == null) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"The LAZY_PERSIST storage policy has been disabled " +
|
||||
"by the administrator.");
|
||||
}
|
||||
inode.setStoragePolicyID(lpPolicy.getId(),
|
||||
iip.getLatestSnapshotId());
|
||||
} else {
|
||||
BlockStoragePolicy effectivePolicy =
|
||||
blockManager.getStoragePolicy(inode.getStoragePolicyID());
|
||||
|
||||
if (effectivePolicy != null &&
|
||||
effectivePolicy.isCopyOnCreateFile()) {
|
||||
// Copy effective policy from ancestor directory to current file.
|
||||
inode.setStoragePolicyID(effectivePolicy.getId(),
|
||||
iip.getLatestSnapshotId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Append to an existing file for append.
|
||||
* <p>
|
||||
|
@ -2861,7 +2661,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return false;
|
||||
}
|
||||
|
||||
private enum RecoverLeaseOp {
|
||||
enum RecoverLeaseOp {
|
||||
CREATE_FILE,
|
||||
APPEND_FILE,
|
||||
TRUNCATE_FILE,
|
||||
|
|
Loading…
Reference in New Issue