HDFS-8495. Consolidate append() related implementation into a single class. Contributed by Rakesh R.
This commit is contained in:
parent
393fe71771
commit
31f117138a
|
@ -737,6 +737,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-8721. Add a metric for number of encryption zones.
|
||||
(Rakesh R via cnauroth)
|
||||
|
||||
HDFS-8495. Consolidate append() related implementation into a single class.
|
||||
(Rakesh R via wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -0,0 +1,261 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Helper class to perform append operation.
|
||||
*/
|
||||
final class FSDirAppendOp {
|
||||
|
||||
/**
|
||||
* Private constructor for preventing FSDirAppendOp object creation.
|
||||
* Static-only class.
|
||||
*/
|
||||
private FSDirAppendOp() {}
|
||||
|
||||
/**
|
||||
* Append to an existing file.
|
||||
* <p>
|
||||
*
|
||||
* The method returns the last block of the file if this is a partial block,
|
||||
* which can still be used for writing more data. The client uses the
|
||||
* returned block locations to form the data pipeline for this block.<br>
|
||||
* The {@link LocatedBlock} will be null if the last block is full.
|
||||
* The client then allocates a new block with the next call using
|
||||
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#addBlock}.
|
||||
* <p>
|
||||
*
|
||||
* For description of parameters and exceptions thrown see
|
||||
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#append}
|
||||
*
|
||||
* @param fsn namespace
|
||||
* @param srcArg path name
|
||||
* @param pc permission checker to check fs permission
|
||||
* @param holder client name
|
||||
* @param clientMachine client machine info
|
||||
* @param newBlock if the data is appended to a new block
|
||||
* @param logRetryCache whether to record RPC ids in editlog for retry cache
|
||||
* rebuilding
|
||||
* @return the last block with status
|
||||
*/
|
||||
static LastBlockWithStatus appendFile(final FSNamesystem fsn,
|
||||
final String srcArg, final FSPermissionChecker pc, final String holder,
|
||||
final String clientMachine, final boolean newBlock,
|
||||
final boolean logRetryCache) throws IOException {
|
||||
assert fsn.hasWriteLock();
|
||||
|
||||
final byte[][] pathComponents = FSDirectory
|
||||
.getPathComponentsForReservedPath(srcArg);
|
||||
final LocatedBlock lb;
|
||||
final FSDirectory fsd = fsn.getFSDirectory();
|
||||
final String src;
|
||||
fsd.writeLock();
|
||||
try {
|
||||
src = fsd.resolvePath(pc, srcArg, pathComponents);
|
||||
final INodesInPath iip = fsd.getINodesInPath4Write(src);
|
||||
// Verify that the destination does not exist as a directory already
|
||||
final INode inode = iip.getLastINode();
|
||||
final String path = iip.getPath();
|
||||
if (inode != null && inode.isDirectory()) {
|
||||
throw new FileAlreadyExistsException("Cannot append to directory "
|
||||
+ path + "; already exists as a directory.");
|
||||
}
|
||||
if (fsd.isPermissionEnabled()) {
|
||||
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
|
||||
}
|
||||
|
||||
if (inode == null) {
|
||||
throw new FileNotFoundException(
|
||||
"Failed to append to non-existent file " + path + " for client "
|
||||
+ clientMachine);
|
||||
}
|
||||
final INodeFile file = INodeFile.valueOf(inode, path, true);
|
||||
BlockManager blockManager = fsd.getBlockManager();
|
||||
final BlockStoragePolicy lpPolicy = blockManager
|
||||
.getStoragePolicy("LAZY_PERSIST");
|
||||
if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Cannot append to lazy persist file " + path);
|
||||
}
|
||||
// Opening an existing file for append - may need to recover lease.
|
||||
fsn.recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, path, holder,
|
||||
clientMachine, false);
|
||||
|
||||
final BlockInfo lastBlock = file.getLastBlock();
|
||||
// Check that the block has at least minimum replication.
|
||||
if (lastBlock != null && lastBlock.isComplete()
|
||||
&& !blockManager.isSufficientlyReplicated(lastBlock)) {
|
||||
throw new IOException("append: lastBlock=" + lastBlock + " of src="
|
||||
+ path + " is not sufficiently replicated yet.");
|
||||
}
|
||||
lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock,
|
||||
true, logRetryCache);
|
||||
} catch (IOException ie) {
|
||||
NameNode.stateChangeLog
|
||||
.warn("DIR* NameSystem.append: " + ie.getMessage());
|
||||
throw ie;
|
||||
} finally {
|
||||
fsd.writeUnlock();
|
||||
}
|
||||
|
||||
HdfsFileStatus stat = FSDirStatAndListingOp.getFileInfo(fsd, src, false,
|
||||
FSDirectory.isReservedRawName(srcArg), true);
|
||||
if (lb != null) {
|
||||
NameNode.stateChangeLog.debug(
|
||||
"DIR* NameSystem.appendFile: file {} for {} at {} block {} block"
|
||||
+ " size {}", srcArg, holder, clientMachine, lb.getBlock(), lb
|
||||
.getBlock().getNumBytes());
|
||||
}
|
||||
return new LastBlockWithStatus(lb, stat);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert current node to under construction.
|
||||
* Recreate in-memory lease record.
|
||||
*
|
||||
* @param fsn namespace
|
||||
* @param iip inodes in the path containing the file
|
||||
* @param leaseHolder identifier of the lease holder on this file
|
||||
* @param clientMachine identifier of the client machine
|
||||
* @param newBlock if the data is appended to a new block
|
||||
* @param writeToEditLog whether to persist this change to the edit log
|
||||
* @param logRetryCache whether to record RPC ids in editlog for retry cache
|
||||
* rebuilding
|
||||
* @return the last block locations if the block is partial or null otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
static LocatedBlock prepareFileForAppend(final FSNamesystem fsn,
|
||||
final INodesInPath iip, final String leaseHolder,
|
||||
final String clientMachine, final boolean newBlock,
|
||||
final boolean writeToEditLog, final boolean logRetryCache)
|
||||
throws IOException {
|
||||
assert fsn.hasWriteLock();
|
||||
|
||||
final INodeFile file = iip.getLastINode().asFile();
|
||||
final QuotaCounts delta = verifyQuotaForUCBlock(fsn, file, iip);
|
||||
|
||||
file.recordModification(iip.getLatestSnapshotId());
|
||||
file.toUnderConstruction(leaseHolder, clientMachine);
|
||||
|
||||
fsn.getLeaseManager().addLease(
|
||||
file.getFileUnderConstructionFeature().getClientName(), file.getId());
|
||||
|
||||
LocatedBlock ret = null;
|
||||
if (!newBlock) {
|
||||
FSDirectory fsd = fsn.getFSDirectory();
|
||||
ret = fsd.getBlockManager().convertLastBlockToUnderConstruction(file, 0);
|
||||
if (ret != null && delta != null) {
|
||||
Preconditions.checkState(delta.getStorageSpace() >= 0, "appending to"
|
||||
+ " a block with size larger than the preferred block size");
|
||||
fsd.writeLock();
|
||||
try {
|
||||
fsd.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
|
||||
} finally {
|
||||
fsd.writeUnlock();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
BlockInfo lastBlock = file.getLastBlock();
|
||||
if (lastBlock != null) {
|
||||
ExtendedBlock blk = new ExtendedBlock(fsn.getBlockPoolId(), lastBlock);
|
||||
ret = new LocatedBlock(blk, new DatanodeInfo[0]);
|
||||
}
|
||||
}
|
||||
|
||||
if (writeToEditLog) {
|
||||
final String path = iip.getPath();
|
||||
if (NameNodeLayoutVersion.supports(Feature.APPEND_NEW_BLOCK,
|
||||
fsn.getEffectiveLayoutVersion())) {
|
||||
fsn.getEditLog().logAppendFile(path, file, newBlock, logRetryCache);
|
||||
} else {
|
||||
fsn.getEditLog().logOpenFile(path, file, false, logRetryCache);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify quota when using the preferred block size for UC block. This is
|
||||
* usually used by append and truncate.
|
||||
*
|
||||
* @throws QuotaExceededException when violating the storage quota
|
||||
* @return expected quota usage update. null means no change or no need to
|
||||
* update quota usage later
|
||||
*/
|
||||
private static QuotaCounts verifyQuotaForUCBlock(FSNamesystem fsn,
|
||||
INodeFile file, INodesInPath iip) throws QuotaExceededException {
|
||||
FSDirectory fsd = fsn.getFSDirectory();
|
||||
if (!fsn.isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
|
||||
// Do not check quota if editlog is still being processed
|
||||
return null;
|
||||
}
|
||||
if (file.getLastBlock() != null) {
|
||||
final QuotaCounts delta = computeQuotaDeltaForUCBlock(fsn, file);
|
||||
fsd.readLock();
|
||||
try {
|
||||
FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null);
|
||||
return delta;
|
||||
} finally {
|
||||
fsd.readUnlock();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/** Compute quota change for converting a complete block to a UC block. */
|
||||
private static QuotaCounts computeQuotaDeltaForUCBlock(FSNamesystem fsn,
|
||||
INodeFile file) {
|
||||
final QuotaCounts delta = new QuotaCounts.Builder().build();
|
||||
final BlockInfo lastBlock = file.getLastBlock();
|
||||
if (lastBlock != null) {
|
||||
final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes();
|
||||
final short repl = file.getPreferredBlockReplication();
|
||||
delta.addStorageSpace(diff * repl);
|
||||
final BlockStoragePolicy policy = fsn.getFSDirectory()
|
||||
.getBlockStoragePolicySuite().getPolicy(file.getStoragePolicyID());
|
||||
List<StorageType> types = policy.chooseStorageTypes(repl);
|
||||
for (StorageType t : types) {
|
||||
if (t.supportTypeQuota()) {
|
||||
delta.addTypeSpace(t, diff);
|
||||
}
|
||||
}
|
||||
}
|
||||
return delta;
|
||||
}
|
||||
}
|
|
@ -508,7 +508,7 @@ class FSDirStatAndListingOp {
|
|||
final long fileSize = !inSnapshot && isUc ?
|
||||
fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
|
||||
|
||||
loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks(
|
||||
loc = fsd.getBlockManager().createLocatedBlocks(
|
||||
fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false,
|
||||
inSnapshot, feInfo);
|
||||
if (loc == null) {
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||
|
@ -79,11 +80,11 @@ final class FSDirTruncateOp {
|
|||
try {
|
||||
src = fsd.resolvePath(pc, srcArg, pathComponents);
|
||||
iip = fsd.getINodesInPath4Write(src, true);
|
||||
if (fsn.isPermissionEnabled()) {
|
||||
if (fsd.isPermissionEnabled()) {
|
||||
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
|
||||
}
|
||||
INodeFile file = INodeFile.valueOf(iip.getLastINode(), src);
|
||||
final BlockStoragePolicy lpPolicy = fsn.getBlockManager()
|
||||
final BlockStoragePolicy lpPolicy = fsd.getBlockManager()
|
||||
.getStoragePolicy("LAZY_PERSIST");
|
||||
|
||||
if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) {
|
||||
|
@ -178,7 +179,7 @@ final class FSDirTruncateOp {
|
|||
"Should be the same block.";
|
||||
if (oldBlock.getBlockId() != tBlk.getBlockId()
|
||||
&& !file.isBlockInLatestSnapshot(oldBlock)) {
|
||||
fsn.getBlockManager().removeBlockFromMap(oldBlock);
|
||||
fsd.getBlockManager().removeBlockFromMap(oldBlock);
|
||||
}
|
||||
}
|
||||
assert onBlockBoundary == (truncateBlock == null) :
|
||||
|
@ -223,6 +224,7 @@ final class FSDirTruncateOp {
|
|||
}
|
||||
|
||||
BlockInfoUnderConstruction truncatedBlockUC;
|
||||
BlockManager blockManager = fsn.getFSDirectory().getBlockManager();
|
||||
if (shouldCopyOnTruncate) {
|
||||
// Add new truncateBlock into blocksMap and
|
||||
// use oldBlock as a source for copy-on-truncate recovery
|
||||
|
@ -230,9 +232,8 @@ final class FSDirTruncateOp {
|
|||
file.getPreferredBlockReplication());
|
||||
truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
|
||||
truncatedBlockUC.setTruncateBlock(oldBlock);
|
||||
file.setLastBlock(truncatedBlockUC,
|
||||
fsn.getBlockManager().getStorages(oldBlock));
|
||||
fsn.getBlockManager().addBlockCollection(truncatedBlockUC, file);
|
||||
file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock));
|
||||
blockManager.addBlockCollection(truncatedBlockUC, file);
|
||||
|
||||
NameNode.stateChangeLog.debug(
|
||||
"BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new"
|
||||
|
@ -241,8 +242,7 @@ final class FSDirTruncateOp {
|
|||
truncatedBlockUC.getTruncateBlock());
|
||||
} else {
|
||||
// Use new generation stamp for in-place truncate recovery
|
||||
fsn.getBlockManager().convertLastBlockToUnderConstruction(file,
|
||||
lastBlockDelta);
|
||||
blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta);
|
||||
oldBlock = file.getLastBlock();
|
||||
assert !oldBlock.isComplete() : "oldBlock should be under construction";
|
||||
truncatedBlockUC = (BlockInfoUnderConstruction) oldBlock;
|
||||
|
|
|
@ -206,8 +206,8 @@ class FSDirWriteFileOp {
|
|||
DatanodeStorageInfo[] locs, long offset) throws IOException {
|
||||
LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk),
|
||||
locs, offset, false);
|
||||
fsn.getBlockManager().setBlockToken(lBlk,
|
||||
BlockTokenIdentifier.AccessMode.WRITE);
|
||||
fsn.getFSDirectory().getBlockManager()
|
||||
.setBlockToken(lBlk, BlockTokenIdentifier.AccessMode.WRITE);
|
||||
return lBlk;
|
||||
}
|
||||
|
||||
|
@ -426,7 +426,7 @@ class FSDirWriteFileOp {
|
|||
fsd.setFileEncryptionInfo(src, feInfo);
|
||||
newNode = fsd.getInode(newNode.getId()).asFile();
|
||||
}
|
||||
setNewINodeStoragePolicy(fsn.getBlockManager(), newNode, iip,
|
||||
setNewINodeStoragePolicy(fsd.getBlockManager(), newNode, iip,
|
||||
isLazyPersist);
|
||||
fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
|
|
|
@ -392,7 +392,7 @@ public class FSEditLogLoader {
|
|||
FSNamesystem.LOG.debug("Reopening an already-closed file " +
|
||||
"for append");
|
||||
}
|
||||
LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
|
||||
LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
|
||||
addCloseOp.clientName, addCloseOp.clientMachine, false, false,
|
||||
false);
|
||||
// add the op into retry cache if necessary
|
||||
|
@ -466,7 +466,7 @@ public class FSEditLogLoader {
|
|||
INodesInPath iip = fsDir.getINodesInPath4Write(path);
|
||||
INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
|
||||
if (!file.isUnderConstruction()) {
|
||||
LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
|
||||
LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
|
||||
appendOp.clientName, appendOp.clientMachine, appendOp.newBlock,
|
||||
false, false);
|
||||
// add the op into retry cache if necessary
|
||||
|
|
|
@ -142,7 +142,6 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
|||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
|
@ -185,7 +184,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
|
@ -250,7 +248,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
|
@ -2173,175 +2170,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return stat;
|
||||
}
|
||||
|
||||
/**
|
||||
* Append to an existing file for append.
|
||||
* <p>
|
||||
*
|
||||
* The method returns the last block of the file if this is a partial block,
|
||||
* which can still be used for writing more data. The client uses the returned
|
||||
* block locations to form the data pipeline for this block.<br>
|
||||
* The method returns null if the last block is full. The client then
|
||||
* allocates a new block with the next call using
|
||||
* {@link ClientProtocol#addBlock}.
|
||||
* <p>
|
||||
*
|
||||
* For description of parameters and exceptions thrown see
|
||||
* {@link ClientProtocol#append(String, String, EnumSetWritable)}
|
||||
*
|
||||
* @return the last block locations if the block is partial or null otherwise
|
||||
*/
|
||||
private LocatedBlock appendFileInternal(FSPermissionChecker pc,
|
||||
INodesInPath iip, String holder, String clientMachine, boolean newBlock,
|
||||
boolean logRetryCache) throws IOException {
|
||||
assert hasWriteLock();
|
||||
// Verify that the destination does not exist as a directory already.
|
||||
final INode inode = iip.getLastINode();
|
||||
final String src = iip.getPath();
|
||||
if (inode != null && inode.isDirectory()) {
|
||||
throw new FileAlreadyExistsException("Cannot append to directory " + src
|
||||
+ "; already exists as a directory.");
|
||||
}
|
||||
if (isPermissionEnabled) {
|
||||
dir.checkPathAccess(pc, iip, FsAction.WRITE);
|
||||
}
|
||||
|
||||
try {
|
||||
if (inode == null) {
|
||||
throw new FileNotFoundException("failed to append to non-existent file "
|
||||
+ src + " for client " + clientMachine);
|
||||
}
|
||||
INodeFile myFile = INodeFile.valueOf(inode, src, true);
|
||||
final BlockStoragePolicy lpPolicy =
|
||||
blockManager.getStoragePolicy("LAZY_PERSIST");
|
||||
if (lpPolicy != null &&
|
||||
lpPolicy.getId() == myFile.getStoragePolicyID()) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Cannot append to lazy persist file " + src);
|
||||
}
|
||||
// Opening an existing file for append - may need to recover lease.
|
||||
recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, src, holder,
|
||||
clientMachine, false);
|
||||
|
||||
final BlockInfo lastBlock = myFile.getLastBlock();
|
||||
// Check that the block has at least minimum replication.
|
||||
if(lastBlock != null && lastBlock.isComplete() &&
|
||||
!getBlockManager().isSufficientlyReplicated(lastBlock)) {
|
||||
throw new IOException("append: lastBlock=" + lastBlock +
|
||||
" of src=" + src + " is not sufficiently replicated yet.");
|
||||
}
|
||||
return prepareFileForAppend(src, iip, holder, clientMachine, newBlock,
|
||||
true, logRetryCache);
|
||||
} catch (IOException ie) {
|
||||
NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
|
||||
throw ie;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert current node to under construction.
|
||||
* Recreate in-memory lease record.
|
||||
*
|
||||
* @param src path to the file
|
||||
* @param leaseHolder identifier of the lease holder on this file
|
||||
* @param clientMachine identifier of the client machine
|
||||
* @param newBlock if the data is appended to a new block
|
||||
* @param writeToEditLog whether to persist this change to the edit log
|
||||
* @param logRetryCache whether to record RPC ids in editlog for retry cache
|
||||
* rebuilding
|
||||
* @return the last block locations if the block is partial or null otherwise
|
||||
* @throws UnresolvedLinkException
|
||||
* @throws IOException
|
||||
*/
|
||||
LocatedBlock prepareFileForAppend(String src, INodesInPath iip,
|
||||
String leaseHolder, String clientMachine, boolean newBlock,
|
||||
boolean writeToEditLog, boolean logRetryCache) throws IOException {
|
||||
final INodeFile file = iip.getLastINode().asFile();
|
||||
final QuotaCounts delta = verifyQuotaForUCBlock(file, iip);
|
||||
|
||||
file.recordModification(iip.getLatestSnapshotId());
|
||||
file.toUnderConstruction(leaseHolder, clientMachine);
|
||||
|
||||
leaseManager.addLease(
|
||||
file.getFileUnderConstructionFeature().getClientName(), file.getId());
|
||||
|
||||
LocatedBlock ret = null;
|
||||
if (!newBlock) {
|
||||
ret = blockManager.convertLastBlockToUnderConstruction(file, 0);
|
||||
if (ret != null && delta != null) {
|
||||
Preconditions.checkState(delta.getStorageSpace() >= 0,
|
||||
"appending to a block with size larger than the preferred block size");
|
||||
dir.writeLock();
|
||||
try {
|
||||
dir.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
|
||||
} finally {
|
||||
dir.writeUnlock();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
BlockInfo lastBlock = file.getLastBlock();
|
||||
if (lastBlock != null) {
|
||||
ExtendedBlock blk = new ExtendedBlock(this.getBlockPoolId(), lastBlock);
|
||||
ret = new LocatedBlock(blk, new DatanodeInfo[0]);
|
||||
}
|
||||
}
|
||||
|
||||
if (writeToEditLog) {
|
||||
if (NameNodeLayoutVersion.supports(Feature.APPEND_NEW_BLOCK,
|
||||
getEffectiveLayoutVersion())) {
|
||||
getEditLog().logAppendFile(src, file, newBlock, logRetryCache);
|
||||
} else {
|
||||
getEditLog().logOpenFile(src, file, false, logRetryCache);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify quota when using the preferred block size for UC block. This is
|
||||
* usually used by append and truncate
|
||||
* @throws QuotaExceededException when violating the storage quota
|
||||
* @return expected quota usage update. null means no change or no need to
|
||||
* update quota usage later
|
||||
*/
|
||||
private QuotaCounts verifyQuotaForUCBlock(INodeFile file, INodesInPath iip)
|
||||
throws QuotaExceededException {
|
||||
if (!isImageLoaded() || dir.shouldSkipQuotaChecks()) {
|
||||
// Do not check quota if editlog is still being processed
|
||||
return null;
|
||||
}
|
||||
if (file.getLastBlock() != null) {
|
||||
final QuotaCounts delta = computeQuotaDeltaForUCBlock(file);
|
||||
dir.readLock();
|
||||
try {
|
||||
FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null);
|
||||
return delta;
|
||||
} finally {
|
||||
dir.readUnlock();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/** Compute quota change for converting a complete block to a UC block */
|
||||
private QuotaCounts computeQuotaDeltaForUCBlock(INodeFile file) {
|
||||
final QuotaCounts delta = new QuotaCounts.Builder().build();
|
||||
final BlockInfo lastBlock = file.getLastBlock();
|
||||
if (lastBlock != null) {
|
||||
final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes();
|
||||
final short repl = file.getPreferredBlockReplication();
|
||||
delta.addStorageSpace(diff * repl);
|
||||
final BlockStoragePolicy policy = dir.getBlockStoragePolicySuite()
|
||||
.getPolicy(file.getStoragePolicyID());
|
||||
List<StorageType> types = policy.chooseStorageTypes(repl);
|
||||
for (StorageType t : types) {
|
||||
if (t.supportTypeQuota()) {
|
||||
delta.addTypeSpace(t, diff);
|
||||
}
|
||||
}
|
||||
}
|
||||
return delta;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recover lease;
|
||||
* Immediately revoke the lease of the current lease holder and start lease
|
||||
|
@ -2487,62 +2315,45 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
/**
|
||||
* Append to an existing file in the namespace.
|
||||
*/
|
||||
LastBlockWithStatus appendFile(String src, String holder,
|
||||
LastBlockWithStatus appendFile(String srcArg, String holder,
|
||||
String clientMachine, EnumSet<CreateFlag> flag, boolean logRetryCache)
|
||||
throws IOException {
|
||||
boolean newBlock = flag.contains(CreateFlag.NEW_BLOCK);
|
||||
if (newBlock) {
|
||||
requireEffectiveLayoutVersionForFeature(Feature.APPEND_NEW_BLOCK);
|
||||
}
|
||||
try {
|
||||
return appendFileInt(src, holder, clientMachine, newBlock, logRetryCache);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "append", src);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private LastBlockWithStatus appendFileInt(final String srcArg, String holder,
|
||||
String clientMachine, boolean newBlock, boolean logRetryCache)
|
||||
throws IOException {
|
||||
String src = srcArg;
|
||||
NameNode.stateChangeLog.debug(
|
||||
"DIR* NameSystem.appendFile: src={}, holder={}, clientMachine={}",
|
||||
src, holder, clientMachine);
|
||||
boolean skipSync = false;
|
||||
LocatedBlock lb = null;
|
||||
HdfsFileStatus stat = null;
|
||||
FSPermissionChecker pc = getPermissionChecker();
|
||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||
writeLock();
|
||||
srcArg, holder, clientMachine);
|
||||
try {
|
||||
boolean skipSync = false;
|
||||
LastBlockWithStatus lbs = null;
|
||||
final FSPermissionChecker pc = getPermissionChecker();
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkNameNodeSafeMode("Cannot append to file" + src);
|
||||
src = dir.resolvePath(pc, src, pathComponents);
|
||||
final INodesInPath iip = dir.getINodesInPath4Write(src);
|
||||
lb = appendFileInternal(pc, iip, holder, clientMachine, newBlock,
|
||||
logRetryCache);
|
||||
stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
|
||||
FSDirectory.isReservedRawName(srcArg), true);
|
||||
} catch (StandbyException se) {
|
||||
skipSync = true;
|
||||
throw se;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
// There might be transactions logged while trying to recover the lease.
|
||||
// They need to be sync'ed even when an exception was thrown.
|
||||
if (!skipSync) {
|
||||
getEditLog().logSync();
|
||||
writeLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkNameNodeSafeMode("Cannot append to file" + srcArg);
|
||||
lbs = FSDirAppendOp.appendFile(this, srcArg, pc, holder, clientMachine,
|
||||
newBlock, logRetryCache);
|
||||
} catch (StandbyException se) {
|
||||
skipSync = true;
|
||||
throw se;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
// There might be transactions logged while trying to recover the lease
|
||||
// They need to be sync'ed even when an exception was thrown.
|
||||
if (!skipSync) {
|
||||
getEditLog().logSync();
|
||||
}
|
||||
}
|
||||
logAuditEvent(true, "append", srcArg);
|
||||
return lbs;
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "append", srcArg);
|
||||
throw e;
|
||||
}
|
||||
if (lb != null) {
|
||||
NameNode.stateChangeLog.debug(
|
||||
"DIR* NameSystem.appendFile: file {} for {} at {} block {} block" +
|
||||
" size {}", src, holder, clientMachine, lb.getBlock(),
|
||||
lb.getBlock().getNumBytes());
|
||||
}
|
||||
logAuditEvent(true, "append", srcArg);
|
||||
return new LastBlockWithStatus(lb, stat);
|
||||
}
|
||||
|
||||
ExtendedBlock getExtendedBlock(Block blk) {
|
||||
|
|
Loading…
Reference in New Issue