From 1796d94d624edd8ca78bf2eda1405c11d0f030f5 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Fri, 15 May 2015 19:09:59 -0700 Subject: [PATCH] HDFS-8394. Move getAdditionalBlock() and related functionalities into a separate class. Contributed by Haohui Mai. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/namenode/FSDirWriteFileOp.java | 563 ++++++++++++++++++ .../hdfs/server/namenode/FSDirectory.java | 78 +-- .../hdfs/server/namenode/FSEditLogLoader.java | 3 +- .../hdfs/server/namenode/FSNamesystem.java | 499 ++-------------- .../server/namenode/NameNodeRpcServer.java | 30 +- .../server/namenode/TestAddBlockRetry.java | 30 +- .../TestCommitBlockSynchronization.java | 3 + 8 files changed, 649 insertions(+), 560 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d44030da754..81dc05773c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -224,6 +224,9 @@ Release 2.8.0 - UNRELEASED HDFS-8397. Refactor the error handling code in DataStreamer. (Tsz Wo Nicholas Sze via jing9) + HDFS-8394. Move getAdditionalBlock() and related functionalities into a + separate class. (wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java new file mode 100644 index 00000000000..1ff0899e5eb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -0,0 +1,563 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.QuotaExceededException; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +class FSDirWriteFileOp { + private FSDirWriteFileOp() {} + static boolean unprotectedRemoveBlock( + FSDirectory fsd, String path, INodesInPath iip, INodeFile fileNode, + Block block) throws IOException { + // modify file-> block and blocksMap + // fileNode should be under construction + BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block); + if (uc == null) { + return false; + } + fsd.getBlockManager().removeBlockFromMap(block); + + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: " + +path+" with "+block + +" block is removed from the file system"); + } + + // update space consumed + fsd.updateCount(iip, 0, -fileNode.getPreferredBlockSize(), + fileNode.getPreferredBlockReplication(), true); + return true; + } + + /** + * Persist the block list for the inode. + */ + static void persistBlocks( + FSDirectory fsd, String path, INodeFile file, boolean logRetryCache) { + assert fsd.getFSNamesystem().hasWriteLock(); + Preconditions.checkArgument(file.isUnderConstruction()); + fsd.getEditLog().logUpdateBlocks(path, file, logRetryCache); + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("persistBlocks: " + path + + " with " + file.getBlocks().length + " blocks is persisted to" + + " the file system"); + } + } + + static void abandonBlock( + FSDirectory fsd, FSPermissionChecker pc, ExtendedBlock b, long fileId, + String src, String holder) throws IOException { + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + src = fsd.resolvePath(pc, src, pathComponents); + + final INode inode; + final INodesInPath iip; + if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) { + // Older clients may not have given us an inode ID to work with. + // In this case, we have to try to resolve the path and hope it + // hasn't changed or been deleted since the file was opened for write. + iip = fsd.getINodesInPath(src, true); + inode = iip.getLastINode(); + } else { + inode = fsd.getInode(fileId); + iip = INodesInPath.fromINode(inode); + if (inode != null) { + src = iip.getPath(); + } + } + FSNamesystem fsn = fsd.getFSNamesystem(); + final INodeFile file = fsn.checkLease(src, holder, inode, fileId); + Preconditions.checkState(file.isUnderConstruction()); + + Block localBlock = ExtendedBlock.getLocalBlock(b); + fsd.writeLock(); + try { + // Remove the block from the pending creates list + if (!unprotectedRemoveBlock(fsd, src, iip, file, localBlock)) { + return; + } + } finally { + fsd.writeUnlock(); + } + persistBlocks(fsd, src, file, false); + } + + static void checkBlock(FSNamesystem fsn, ExtendedBlock block) + throws IOException { + String bpId = fsn.getBlockPoolId(); + if (block != null && !bpId.equals(block.getBlockPoolId())) { + throw new IOException("Unexpected BlockPoolId " + block.getBlockPoolId() + + " - expected " + bpId); + } + } + + /** + * Part I of getAdditionalBlock(). + * Analyze the state of the file under read lock to determine if the client + * can add a new block, detect potential retries, lease mismatches, + * and minimal replication of the penultimate block. + * + * Generate target DataNode locations for the new block, + * but do not create the new block yet. + */ + static ValidateAddBlockResult validateAddBlock( + FSNamesystem fsn, FSPermissionChecker pc, + String src, long fileId, String clientName, + ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException { + final long blockSize; + final int replication; + final byte storagePolicyID; + String clientMachine; + + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + src = fsn.dir.resolvePath(pc, src, pathComponents); + FileState fileState = analyzeFileState(fsn, src, fileId, clientName, + previous, onRetryBlock); + final INodeFile pendingFile = fileState.inode; + // Check if the penultimate block is minimally replicated + if (!fsn.checkFileProgress(src, pendingFile, false)) { + throw new NotReplicatedYetException("Not replicated yet: " + src); + } + + if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) { + // This is a retry. No need to generate new locations. + // Use the last block if it has locations. + return null; + } + if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) { + throw new IOException("File has reached the limit on maximum number of" + + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY + + "): " + pendingFile.getBlocks().length + " >= " + + fsn.maxBlocksPerFile); + } + blockSize = pendingFile.getPreferredBlockSize(); + clientMachine = pendingFile.getFileUnderConstructionFeature() + .getClientMachine(); + replication = pendingFile.getFileReplication(); + storagePolicyID = pendingFile.getStoragePolicyID(); + return new ValidateAddBlockResult(blockSize, replication, storagePolicyID, + clientMachine); + } + + static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk, + DatanodeStorageInfo[] locs, long offset) throws IOException { + LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk), + locs, offset, false); + fsn.getBlockManager().setBlockToken(lBlk, + BlockTokenIdentifier.AccessMode.WRITE); + return lBlk; + } + + /** + * Part II of getAdditionalBlock(). + * Should repeat the same analysis of the file state as in Part 1, + * but under the write lock. + * If the conditions still hold, then allocate a new block with + * the new targets, add it to the INode and to the BlocksMap. + */ + static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src, + long fileId, String clientName, ExtendedBlock previous, + DatanodeStorageInfo[] targets) throws IOException { + long offset; + // Run the full analysis again, since things could have changed + // while chooseTarget() was executing. + LocatedBlock[] onRetryBlock = new LocatedBlock[1]; + FileState fileState = analyzeFileState(fsn, src, fileId, clientName, + previous, onRetryBlock); + final INodeFile pendingFile = fileState.inode; + src = fileState.path; + + if (onRetryBlock[0] != null) { + if (onRetryBlock[0].getLocations().length > 0) { + // This is a retry. Just return the last block if having locations. + return onRetryBlock[0]; + } else { + // add new chosen targets to already allocated block and return + BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock(); + ((BlockInfoContiguousUnderConstruction) lastBlockInFile) + .setExpectedLocations(targets); + offset = pendingFile.computeFileSize(); + return makeLocatedBlock(fsn, lastBlockInFile, targets, offset); + } + } + + // commit the last block and complete it if it has minimum replicas + fsn.commitOrCompleteLastBlock(pendingFile, fileState.iip, + ExtendedBlock.getLocalBlock(previous)); + + // allocate new block, record block locations in INode. + Block newBlock = fsn.createNewBlock(); + INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile); + saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets); + + persistNewBlock(fsn, src, pendingFile); + offset = pendingFile.computeFileSize(); + + // Return located block + return makeLocatedBlock(fsn, newBlock, targets, offset); + } + + static DatanodeStorageInfo[] chooseTargetForNewBlock( + BlockManager bm, String src, DatanodeInfo[] excludedNodes, String[] + favoredNodes, ValidateAddBlockResult r) throws IOException { + Node clientNode = bm.getDatanodeManager() + .getDatanodeByHost(r.clientMachine); + if (clientNode == null) { + clientNode = getClientNode(bm, r.clientMachine); + } + + Set excludedNodesSet = null; + if (excludedNodes != null) { + excludedNodesSet = new HashSet<>(excludedNodes.length); + Collections.addAll(excludedNodesSet, excludedNodes); + } + List favoredNodesList = (favoredNodes == null) ? null + : Arrays.asList(favoredNodes); + + // choose targets for the new block to be allocated. + return bm.chooseTarget4NewBlock(src, r.replication, clientNode, + excludedNodesSet, r.blockSize, + favoredNodesList, r.storagePolicyID); + } + + /** + * Resolve clientmachine address to get a network location path + */ + static Node getClientNode(BlockManager bm, String clientMachine) { + List hosts = new ArrayList<>(1); + hosts.add(clientMachine); + List rName = bm.getDatanodeManager() + .resolveNetworkLocation(hosts); + Node clientNode = null; + if (rName != null) { + // Able to resolve clientMachine mapping. + // Create a temp node to findout the rack local nodes + clientNode = new NodeBase(rName.get(0) + NodeBase.PATH_SEPARATOR_STR + + clientMachine); + } + return clientNode; + } + + /** + * Add a block to the file. Returns a reference to the added block. + */ + private static BlockInfoContiguous addBlock( + FSDirectory fsd, String path, INodesInPath inodesInPath, Block block, + DatanodeStorageInfo[] targets) throws IOException { + fsd.writeLock(); + try { + final INodeFile fileINode = inodesInPath.getLastINode().asFile(); + Preconditions.checkState(fileINode.isUnderConstruction()); + + // check quota limits and updated space consumed + fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), + fileINode.getPreferredBlockReplication(), true); + + // associate new last block for the file + BlockInfoContiguousUnderConstruction blockInfo = + new BlockInfoContiguousUnderConstruction( + block, + fileINode.getFileReplication(), + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, + targets); + fsd.getBlockManager().addBlockCollection(blockInfo, fileINode); + fileINode.addBlock(blockInfo); + + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: " + + path + " with " + block + + " block is added to the in-memory " + + "file system"); + } + return blockInfo; + } finally { + fsd.writeUnlock(); + } + } + + private static FileState analyzeFileState( + FSNamesystem fsn, String src, long fileId, String clientName, + ExtendedBlock previous, LocatedBlock[] onRetryBlock) + throws IOException { + assert fsn.hasReadLock(); + + checkBlock(fsn, previous); + onRetryBlock[0] = null; + fsn.checkNameNodeSafeMode("Cannot add block to " + src); + + // have we exceeded the configured limit of fs objects. + fsn.checkFsObjectLimit(); + + Block previousBlock = ExtendedBlock.getLocalBlock(previous); + final INode inode; + final INodesInPath iip; + if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) { + // Older clients may not have given us an inode ID to work with. + // In this case, we have to try to resolve the path and hope it + // hasn't changed or been deleted since the file was opened for write. + iip = fsn.dir.getINodesInPath4Write(src); + inode = iip.getLastINode(); + } else { + // Newer clients pass the inode ID, so we can just get the inode + // directly. + inode = fsn.dir.getInode(fileId); + iip = INodesInPath.fromINode(inode); + if (inode != null) { + src = iip.getPath(); + } + } + final INodeFile file = fsn.checkLease(src, clientName, + inode, fileId); + BlockInfoContiguous lastBlockInFile = file.getLastBlock(); + if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) { + // The block that the client claims is the current last block + // doesn't match up with what we think is the last block. There are + // four possibilities: + // 1) This is the first block allocation of an append() pipeline + // which started appending exactly at or exceeding the block boundary. + // In this case, the client isn't passed the previous block, + // so it makes the allocateBlock() call with previous=null. + // We can distinguish this since the last block of the file + // will be exactly a full block. + // 2) This is a retry from a client that missed the response of a + // prior getAdditionalBlock() call, perhaps because of a network + // timeout, or because of an HA failover. In that case, we know + // by the fact that the client is re-issuing the RPC that it + // never began to write to the old block. Hence it is safe to + // to return the existing block. + // 3) This is an entirely bogus request/bug -- we should error out + // rather than potentially appending a new block with an empty + // one in the middle, etc + // 4) This is a retry from a client that timed out while + // the prior getAdditionalBlock() is still being processed, + // currently working on chooseTarget(). + // There are no means to distinguish between the first and + // the second attempts in Part I, because the first one hasn't + // changed the namesystem state yet. + // We run this analysis again in Part II where case 4 is impossible. + + BlockInfoContiguous penultimateBlock = file.getPenultimateBlock(); + if (previous == null && + lastBlockInFile != null && + lastBlockInFile.getNumBytes() >= file.getPreferredBlockSize() && + lastBlockInFile.isComplete()) { + // Case 1 + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug( + "BLOCK* NameSystem.allocateBlock: handling block allocation" + + " writing to a file with a complete previous block: src=" + + src + " lastBlock=" + lastBlockInFile); + } + } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) { + if (lastBlockInFile.getNumBytes() != 0) { + throw new IOException( + "Request looked like a retry to allocate block " + + lastBlockInFile + " but it already contains " + + lastBlockInFile.getNumBytes() + " bytes"); + } + + // Case 2 + // Return the last block. + NameNode.stateChangeLog.info("BLOCK* allocateBlock: caught retry for " + + "allocation of a new block in " + src + ". Returning previously" + + " allocated block " + lastBlockInFile); + long offset = file.computeFileSize(); + BlockInfoContiguousUnderConstruction lastBlockUC = + (BlockInfoContiguousUnderConstruction) lastBlockInFile; + onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile, + lastBlockUC.getExpectedStorageLocations(), offset); + return new FileState(file, src, iip); + } else { + // Case 3 + throw new IOException("Cannot allocate block in " + src + ": " + + "passed 'previous' block " + previous + " does not match actual " + + "last block in file " + lastBlockInFile); + } + } + return new FileState(file, src, iip); + } + + static boolean completeFile(FSNamesystem fsn, FSPermissionChecker pc, + final String srcArg, String holder, ExtendedBlock last, long fileId) + throws IOException { + String src = srcArg; + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + + src + " for " + holder); + } + checkBlock(fsn, last); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + src = fsn.dir.resolvePath(pc, src, pathComponents); + boolean success = completeFileInternal(fsn, src, holder, + ExtendedBlock.getLocalBlock(last), + fileId); + if (success) { + NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg + + " is closed by " + holder); + } + return success; + } + + private static boolean completeFileInternal( + FSNamesystem fsn, String src, String holder, Block last, long fileId) + throws IOException { + assert fsn.hasWriteLock(); + final INodeFile pendingFile; + final INodesInPath iip; + INode inode = null; + try { + if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) { + // Older clients may not have given us an inode ID to work with. + // In this case, we have to try to resolve the path and hope it + // hasn't changed or been deleted since the file was opened for write. + iip = fsn.dir.getINodesInPath(src, true); + inode = iip.getLastINode(); + } else { + inode = fsn.dir.getInode(fileId); + iip = INodesInPath.fromINode(inode); + if (inode != null) { + src = iip.getPath(); + } + } + pendingFile = fsn.checkLease(src, holder, inode, fileId); + } catch (LeaseExpiredException lee) { + if (inode != null && inode.isFile() && + !inode.asFile().isUnderConstruction()) { + // This could be a retry RPC - i.e the client tried to close + // the file, but missed the RPC response. Thus, it is trying + // again to close the file. If the file still exists and + // the client's view of the last block matches the actual + // last block, then we'll treat it as a successful close. + // See HDFS-3031. + final Block realLastBlock = inode.asFile().getLastBlock(); + if (Block.matchingIdAndGenStamp(last, realLastBlock)) { + NameNode.stateChangeLog.info("DIR* completeFile: " + + "request from " + holder + " to complete inode " + fileId + + "(" + src + ") which is already closed. But, it appears to be " + + "an RPC retry. Returning success"); + return true; + } + } + throw lee; + } + // Check the state of the penultimate block. It should be completed + // before attempting to complete the last one. + if (!fsn.checkFileProgress(src, pendingFile, false)) { + return false; + } + + // commit the last block and complete it if it has minimum replicas + fsn.commitOrCompleteLastBlock(pendingFile, iip, last); + + if (!fsn.checkFileProgress(src, pendingFile, true)) { + return false; + } + + fsn.finalizeINodeFileUnderConstruction(src, pendingFile, + Snapshot.CURRENT_STATE_ID); + return true; + } + + /** + * Persist the new block (the last block of the given file). + */ + private static void persistNewBlock( + FSNamesystem fsn, String path, INodeFile file) { + Preconditions.checkArgument(file.isUnderConstruction()); + fsn.getEditLog().logAddBlock(path, file); + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("persistNewBlock: " + + path + " with new block " + file.getLastBlock().toString() + + ", current total block count is " + file.getBlocks().length); + } + } + + /** + * Save allocated block at the given pending filename + * + * @param fsn FSNamesystem + * @param src path to the file + * @param inodesInPath representing each of the components of src. + * The last INode is the INode for {@code src} file. + * @param newBlock newly allocated block to be save + * @param targets target datanodes where replicas of the new block is placed + * @throws QuotaExceededException If addition of block exceeds space quota + */ + private static void saveAllocatedBlock( + FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock, + DatanodeStorageInfo[] targets) + throws IOException { + assert fsn.hasWriteLock(); + BlockInfoContiguous b = addBlock(fsn.dir, src, inodesInPath, newBlock, + targets); + NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src); + DatanodeStorageInfo.incrementBlocksScheduled(targets); + } + + private static class FileState { + final INodeFile inode; + final String path; + final INodesInPath iip; + + FileState(INodeFile inode, String fullPath, INodesInPath iip) { + this.inode = inode; + this.path = fullPath; + this.iip = iip; + } + } + + static class ValidateAddBlockResult { + final long blockSize; + final int replication; + final byte storagePolicyID; + final String clientMachine; + + ValidateAddBlockResult( + long blockSize, int replication, byte storagePolicyID, + String clientMachine) { + this.blockSize = blockSize; + this.replication = replication; + this.storagePolicyID = storagePolicyID; + this.clientMachine = clientMachine; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 1583815cdfa..c2ed95608c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -55,12 +55,9 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.util.ByteArray; import org.apache.hadoop.hdfs.util.EnumCounters; @@ -308,7 +305,7 @@ public class FSDirectory implements Closeable { return namesystem; } - private BlockManager getBlockManager() { + BlockManager getBlockManager() { return getFSNamesystem().getBlockManager(); } @@ -478,79 +475,6 @@ public class FSDirectory implements Closeable { return null; } - /** - * Add a block to the file. Returns a reference to the added block. - */ - BlockInfoContiguous addBlock(String path, INodesInPath inodesInPath, - Block block, DatanodeStorageInfo[] targets) throws IOException { - writeLock(); - try { - final INodeFile fileINode = inodesInPath.getLastINode().asFile(); - Preconditions.checkState(fileINode.isUnderConstruction()); - - // check quota limits and updated space consumed - updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), - fileINode.getPreferredBlockReplication(), true); - - // associate new last block for the file - BlockInfoContiguousUnderConstruction blockInfo = - new BlockInfoContiguousUnderConstruction( - block, - fileINode.getFileReplication(), - BlockUCState.UNDER_CONSTRUCTION, - targets); - getBlockManager().addBlockCollection(blockInfo, fileINode); - fileINode.addBlock(blockInfo); - - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: " - + path + " with " + block - + " block is added to the in-memory " - + "file system"); - } - return blockInfo; - } finally { - writeUnlock(); - } - } - - /** - * Remove a block from the file. - * @return Whether the block exists in the corresponding file - */ - boolean removeBlock(String path, INodesInPath iip, INodeFile fileNode, - Block block) throws IOException { - Preconditions.checkArgument(fileNode.isUnderConstruction()); - writeLock(); - try { - return unprotectedRemoveBlock(path, iip, fileNode, block); - } finally { - writeUnlock(); - } - } - - boolean unprotectedRemoveBlock(String path, INodesInPath iip, - INodeFile fileNode, Block block) throws IOException { - // modify file-> block and blocksMap - // fileNode should be under construction - BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block); - if (uc == null) { - return false; - } - getBlockManager().removeBlockFromMap(block); - - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: " - +path+" with "+block - +" block is removed from the file system"); - } - - // update space consumed - updateCount(iip, 0, -fileNode.getPreferredBlockSize(), - fileNode.getPreferredBlockReplication(), true); - return true; - } - /** * This is a wrapper for resolvePath(). If the path passed * is prefixed with /.reserved/raw, then it checks to ensure that the caller diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index e961fc78756..532290c4d3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -1041,7 +1041,8 @@ public class FSEditLogLoader { + path); } Block oldBlock = oldBlocks[oldBlocks.length - 1]; - boolean removed = fsDir.unprotectedRemoveBlock(path, iip, file, oldBlock); + boolean removed = FSDirWriteFileOp.unprotectedRemoveBlock( + fsDir, path, iip, file, oldBlock); if (!removed && !(op instanceof UpdateBlocksOp)) { throw new IOException("Trying to delete non-existant block " + oldBlock); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 8fe32faac52..13692a048ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -271,7 +271,6 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; -import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -472,7 +471,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, private final long maxFsObjects; // maximum number of fs objects private final long minBlockSize; // minimum block size - private final long maxBlocksPerFile; // maximum # of blocks per file + final long maxBlocksPerFile; // maximum # of blocks per file // precision of access times. private final long accessTimePrecision; @@ -602,7 +601,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, boolean isHaEnabled() { return haEnabled; } - + /** * Check the supplied configuration for correctness. * @param conf Supplies the configuration to validate. @@ -1853,8 +1852,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip); final LocatedBlocks blocks = blockManager.createLocatedBlocks( - inode.getBlocks(iip.getPathSnapshotId()), fileSize, - isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo); + inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset, + length, needBlockToken, iip.isSnapshot(), feInfo); // Set caching information for the located blocks. for (LocatedBlock lb : blocks.getLocatedBlocks()) { @@ -2222,8 +2221,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot set storage policy for " + src); - auditStat = FSDirAttrOp.setStoragePolicy( - dir, blockManager, src, policyName); + auditStat = FSDirAttrOp.setStoragePolicy(dir, blockManager, src, + policyName); } catch (AccessControlException e) { logAuditEvent(false, "setStoragePolicy", src); throw e; @@ -2611,7 +2610,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return toRemoveBlocks; } catch (IOException ie) { NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " + - ie.getMessage()); + ie.getMessage()); throw ie; } } @@ -2693,8 +2692,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, "Cannot append to lazy persist file " + src); } // Opening an existing file for append - may need to recover lease. - recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, - iip, src, holder, clientMachine, false); + recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, src, holder, + clientMachine, false); final BlockInfoContiguous lastBlock = myFile.getLastBlock(); // Check that the block has at least minimum replication. @@ -3038,290 +3037,49 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * are replicated. Will return an empty 2-elt array if we want the * client to "try again later". */ - LocatedBlock getAdditionalBlock(String src, long fileId, String clientName, - ExtendedBlock previous, Set excludedNodes, - List favoredNodes) throws IOException { - LocatedBlock[] onRetryBlock = new LocatedBlock[1]; - DatanodeStorageInfo targets[] = getNewBlockTargets(src, fileId, - clientName, previous, excludedNodes, favoredNodes, onRetryBlock); - if (targets == null) { - assert onRetryBlock[0] != null : "Retry block is null"; - // This is a retry. Just return the last block. - return onRetryBlock[0]; - } - LocatedBlock newBlock = storeAllocatedBlock( - src, fileId, clientName, previous, targets); - return newBlock; - } - - /** - * Part I of getAdditionalBlock(). - * Analyze the state of the file under read lock to determine if the client - * can add a new block, detect potential retries, lease mismatches, - * and minimal replication of the penultimate block. - * - * Generate target DataNode locations for the new block, - * but do not create the new block yet. - */ - DatanodeStorageInfo[] getNewBlockTargets(String src, long fileId, - String clientName, ExtendedBlock previous, Set excludedNodes, - List favoredNodes, LocatedBlock[] onRetryBlock) throws IOException { - final long blockSize; - final int replication; - final byte storagePolicyID; - Node clientNode = null; - String clientMachine = null; - + LocatedBlock getAdditionalBlock( + String src, long fileId, String clientName, ExtendedBlock previous, + DatanodeInfo[] excludedNodes, String[] favoredNodes) throws IOException { if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: " + src + " inodeId " + fileId + " for " + clientName); } - checkOperation(OperationCategory.READ); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + waitForLoadingFSImage(); + LocatedBlock[] onRetryBlock = new LocatedBlock[1]; + FSDirWriteFileOp.ValidateAddBlockResult r; FSPermissionChecker pc = getPermissionChecker(); + checkOperation(OperationCategory.READ); readLock(); try { checkOperation(OperationCategory.READ); - src = dir.resolvePath(pc, src, pathComponents); - FileState fileState = analyzeFileState( - src, fileId, clientName, previous, onRetryBlock); - final INodeFile pendingFile = fileState.inode; - // Check if the penultimate block is minimally replicated - if (!checkFileProgress(src, pendingFile, false)) { - throw new NotReplicatedYetException("Not replicated yet: " + src); - } - src = fileState.path; - - if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) { - // This is a retry. No need to generate new locations. - // Use the last block if it has locations. - return null; - } - if (pendingFile.getBlocks().length >= maxBlocksPerFile) { - throw new IOException("File has reached the limit on maximum number of" - + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY - + "): " + pendingFile.getBlocks().length + " >= " - + maxBlocksPerFile); - } - blockSize = pendingFile.getPreferredBlockSize(); - clientMachine = pendingFile.getFileUnderConstructionFeature() - .getClientMachine(); - clientNode = blockManager.getDatanodeManager().getDatanodeByHost( - clientMachine); - replication = pendingFile.getFileReplication(); - storagePolicyID = pendingFile.getStoragePolicyID(); + r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName, + previous, onRetryBlock); } finally { readUnlock(); } - if (clientNode == null) { - clientNode = getClientNode(clientMachine); + if (r == null) { + assert onRetryBlock[0] != null : "Retry block is null"; + // This is a retry. Just return the last block. + return onRetryBlock[0]; } - // choose targets for the new block to be allocated. - return getBlockManager().chooseTarget4NewBlock( - src, replication, clientNode, excludedNodes, blockSize, favoredNodes, - storagePolicyID); - } + DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock( + blockManager, src, excludedNodes, favoredNodes, r); - /** - * Part II of getAdditionalBlock(). - * Should repeat the same analysis of the file state as in Part 1, - * but under the write lock. - * If the conditions still hold, then allocate a new block with - * the new targets, add it to the INode and to the BlocksMap. - */ - LocatedBlock storeAllocatedBlock(String src, long fileId, String clientName, - ExtendedBlock previous, DatanodeStorageInfo[] targets) throws IOException { - Block newBlock = null; - long offset; checkOperation(OperationCategory.WRITE); - waitForLoadingFSImage(); writeLock(); + LocatedBlock lb; try { checkOperation(OperationCategory.WRITE); - // Run the full analysis again, since things could have changed - // while chooseTarget() was executing. - LocatedBlock[] onRetryBlock = new LocatedBlock[1]; - FileState fileState = - analyzeFileState(src, fileId, clientName, previous, onRetryBlock); - final INodeFile pendingFile = fileState.inode; - src = fileState.path; - - if (onRetryBlock[0] != null) { - if (onRetryBlock[0].getLocations().length > 0) { - // This is a retry. Just return the last block if having locations. - return onRetryBlock[0]; - } else { - // add new chosen targets to already allocated block and return - BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock(); - ((BlockInfoContiguousUnderConstruction) lastBlockInFile) - .setExpectedLocations(targets); - offset = pendingFile.computeFileSize(); - return makeLocatedBlock(lastBlockInFile, targets, offset); - } - } - - // commit the last block and complete it if it has minimum replicas - commitOrCompleteLastBlock(pendingFile, fileState.iip, - ExtendedBlock.getLocalBlock(previous)); - - // allocate new block, record block locations in INode. - newBlock = createNewBlock(); - INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile); - saveAllocatedBlock(src, inodesInPath, newBlock, targets); - - persistNewBlock(src, pendingFile); - offset = pendingFile.computeFileSize(); + lb = FSDirWriteFileOp.storeAllocatedBlock( + this, src, fileId, clientName, previous, targets); } finally { writeUnlock(); } getEditLog().logSync(); - - // Return located block - return makeLocatedBlock(newBlock, targets, offset); - } - - /* - * Resolve clientmachine address to get a network location path - */ - private Node getClientNode(String clientMachine) { - List hosts = new ArrayList(1); - hosts.add(clientMachine); - List rName = getBlockManager().getDatanodeManager() - .resolveNetworkLocation(hosts); - Node clientNode = null; - if (rName != null) { - // Able to resolve clientMachine mapping. - // Create a temp node to findout the rack local nodes - clientNode = new NodeBase(rName.get(0) + NodeBase.PATH_SEPARATOR_STR - + clientMachine); - } - return clientNode; - } - - static class FileState { - public final INodeFile inode; - public final String path; - public final INodesInPath iip; - - public FileState(INodeFile inode, String fullPath, INodesInPath iip) { - this.inode = inode; - this.path = fullPath; - this.iip = iip; - } - } - - FileState analyzeFileState(String src, - long fileId, - String clientName, - ExtendedBlock previous, - LocatedBlock[] onRetryBlock) - throws IOException { - assert hasReadLock(); - - checkBlock(previous); - onRetryBlock[0] = null; - checkNameNodeSafeMode("Cannot add block to " + src); - - // have we exceeded the configured limit of fs objects. - checkFsObjectLimit(); - - Block previousBlock = ExtendedBlock.getLocalBlock(previous); - final INode inode; - final INodesInPath iip; - if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) { - // Older clients may not have given us an inode ID to work with. - // In this case, we have to try to resolve the path and hope it - // hasn't changed or been deleted since the file was opened for write. - iip = dir.getINodesInPath4Write(src); - inode = iip.getLastINode(); - } else { - // Newer clients pass the inode ID, so we can just get the inode - // directly. - inode = dir.getInode(fileId); - iip = INodesInPath.fromINode(inode); - if (inode != null) { - src = iip.getPath(); - } - } - final INodeFile pendingFile = checkLease(src, clientName, inode, fileId); - BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock(); - if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) { - // The block that the client claims is the current last block - // doesn't match up with what we think is the last block. There are - // four possibilities: - // 1) This is the first block allocation of an append() pipeline - // which started appending exactly at or exceeding the block boundary. - // In this case, the client isn't passed the previous block, - // so it makes the allocateBlock() call with previous=null. - // We can distinguish this since the last block of the file - // will be exactly a full block. - // 2) This is a retry from a client that missed the response of a - // prior getAdditionalBlock() call, perhaps because of a network - // timeout, or because of an HA failover. In that case, we know - // by the fact that the client is re-issuing the RPC that it - // never began to write to the old block. Hence it is safe to - // to return the existing block. - // 3) This is an entirely bogus request/bug -- we should error out - // rather than potentially appending a new block with an empty - // one in the middle, etc - // 4) This is a retry from a client that timed out while - // the prior getAdditionalBlock() is still being processed, - // currently working on chooseTarget(). - // There are no means to distinguish between the first and - // the second attempts in Part I, because the first one hasn't - // changed the namesystem state yet. - // We run this analysis again in Part II where case 4 is impossible. - - BlockInfoContiguous penultimateBlock = pendingFile.getPenultimateBlock(); - if (previous == null && - lastBlockInFile != null && - lastBlockInFile.getNumBytes() >= pendingFile.getPreferredBlockSize() && - lastBlockInFile.isComplete()) { - // Case 1 - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug( - "BLOCK* NameSystem.allocateBlock: handling block allocation" + - " writing to a file with a complete previous block: src=" + - src + " lastBlock=" + lastBlockInFile); - } - } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) { - if (lastBlockInFile.getNumBytes() != 0) { - throw new IOException( - "Request looked like a retry to allocate block " + - lastBlockInFile + " but it already contains " + - lastBlockInFile.getNumBytes() + " bytes"); - } - - // Case 2 - // Return the last block. - NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + - "caught retry for allocation of a new block in " + - src + ". Returning previously allocated block " + lastBlockInFile); - long offset = pendingFile.computeFileSize(); - onRetryBlock[0] = makeLocatedBlock(lastBlockInFile, - ((BlockInfoContiguousUnderConstruction)lastBlockInFile).getExpectedStorageLocations(), - offset); - return new FileState(pendingFile, src, iip); - } else { - // Case 3 - throw new IOException("Cannot allocate block in " + src + ": " + - "passed 'previous' block " + previous + " does not match actual " + - "last block in file " + lastBlockInFile); - } - } - return new FileState(pendingFile, src, iip); - } - - LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs, - long offset) throws IOException { - LocatedBlock lBlk = BlockManager.newLocatedBlock( - getExtendedBlock(blk), locs, offset, false); - getBlockManager().setBlockToken( - lBlk, BlockTokenIdentifier.AccessMode.WRITE); - return lBlk; + return lb; } /** @see ClientProtocol#getAdditionalDatanode */ @@ -3374,7 +3132,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } if (clientnode == null) { - clientnode = getClientNode(clientMachine); + clientnode = FSDirWriteFileOp.getClientNode(blockManager, clientMachine); } // choose new datanodes. @@ -3390,60 +3148,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, /** * The client would like to let go of the given block */ - boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder) + void abandonBlock(ExtendedBlock b, long fileId, String src, String holder) throws IOException { if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b + "of file " + src); } - checkOperation(OperationCategory.WRITE); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); - FSPermissionChecker pc = getPermissionChecker(); waitForLoadingFSImage(); + checkOperation(OperationCategory.WRITE); + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src); - src = dir.resolvePath(pc, src, pathComponents); - - final INode inode; - final INodesInPath iip; - if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) { - // Older clients may not have given us an inode ID to work with. - // In this case, we have to try to resolve the path and hope it - // hasn't changed or been deleted since the file was opened for write. - iip = dir.getINodesInPath(src, true); - inode = iip.getLastINode(); - } else { - inode = dir.getInode(fileId); - iip = INodesInPath.fromINode(inode); - if (inode != null) { - src = iip.getPath(); - } - } - final INodeFile file = checkLease(src, holder, inode, fileId); - - // Remove the block from the pending creates list - boolean removed = dir.removeBlock(src, iip, file, - ExtendedBlock.getLocalBlock(b)); - if (!removed) { - return true; - } + FSDirWriteFileOp.abandonBlock(dir, pc, b, fileId, src, holder); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b + " is removed from pendingCreates"); } - persistBlocks(src, file, false); } finally { writeUnlock(); } getEditLog().logSync(); - - return true; } - private INodeFile checkLease(String src, String holder, INode inode, - long fileId) throws LeaseExpiredException, FileNotFoundException { + INodeFile checkLease( + String src, String holder, INode inode, long fileId) throws LeaseExpiredException, FileNotFoundException { assert hasReadLock(); final String ident = src + " (inode " + fileId + ")"; if (inode == null) { @@ -3488,120 +3218,30 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * (e.g if not all blocks have reached minimum replication yet) * @throws IOException on error (eg lease mismatch, file not open, file deleted) */ - boolean completeFile(final String srcArg, String holder, + boolean completeFile(final String src, String holder, ExtendedBlock last, long fileId) - throws SafeModeException, UnresolvedLinkException, IOException { - String src = srcArg; - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + - src + " for " + holder); - } - checkBlock(last); + throws IOException { boolean success = false; checkOperation(OperationCategory.WRITE); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); - FSPermissionChecker pc = getPermissionChecker(); waitForLoadingFSImage(); + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot complete file " + src); - src = dir.resolvePath(pc, src, pathComponents); - success = completeFileInternal(src, holder, - ExtendedBlock.getLocalBlock(last), fileId); + success = FSDirWriteFileOp.completeFile(this, pc, src, holder, last, + fileId); } finally { writeUnlock(); } getEditLog().logSync(); - if (success) { - NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg - + " is closed by " + holder); - } return success; } - private boolean completeFileInternal(String src, String holder, Block last, - long fileId) throws IOException { - assert hasWriteLock(); - final INodeFile pendingFile; - final INodesInPath iip; - INode inode = null; - try { - if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) { - // Older clients may not have given us an inode ID to work with. - // In this case, we have to try to resolve the path and hope it - // hasn't changed or been deleted since the file was opened for write. - iip = dir.getINodesInPath(src, true); - inode = iip.getLastINode(); - } else { - inode = dir.getInode(fileId); - iip = INodesInPath.fromINode(inode); - if (inode != null) { - src = iip.getPath(); - } - } - pendingFile = checkLease(src, holder, inode, fileId); - } catch (LeaseExpiredException lee) { - if (inode != null && inode.isFile() && - !inode.asFile().isUnderConstruction()) { - // This could be a retry RPC - i.e the client tried to close - // the file, but missed the RPC response. Thus, it is trying - // again to close the file. If the file still exists and - // the client's view of the last block matches the actual - // last block, then we'll treat it as a successful close. - // See HDFS-3031. - final Block realLastBlock = inode.asFile().getLastBlock(); - if (Block.matchingIdAndGenStamp(last, realLastBlock)) { - NameNode.stateChangeLog.info("DIR* completeFile: " + - "request from " + holder + " to complete inode " + fileId + - "(" + src + ") which is already closed. But, it appears to be " + - "an RPC retry. Returning success"); - return true; - } - } - throw lee; - } - // Check the state of the penultimate block. It should be completed - // before attempting to complete the last one. - if (!checkFileProgress(src, pendingFile, false)) { - return false; - } - - // commit the last block and complete it if it has minimum replicas - commitOrCompleteLastBlock(pendingFile, iip, last); - - if (!checkFileProgress(src, pendingFile, true)) { - return false; - } - - finalizeINodeFileUnderConstruction(src, pendingFile, - Snapshot.CURRENT_STATE_ID); - return true; - } - - /** - * Save allocated block at the given pending filename - * - * @param src path to the file - * @param inodesInPath representing each of the components of src. - * The last INode is the INode for {@code src} file. - * @param newBlock newly allocated block to be save - * @param targets target datanodes where replicas of the new block is placed - * @throws QuotaExceededException If addition of block exceeds space quota - */ - private void saveAllocatedBlock(String src, INodesInPath inodesInPath, - Block newBlock, DatanodeStorageInfo[] targets) - throws IOException { - assert hasWriteLock(); - BlockInfoContiguous b = dir.addBlock(src, inodesInPath, newBlock, targets); - NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src); - DatanodeStorageInfo.incrementBlocksScheduled(targets); - } - /** * Create new block with a unique block id and a new generation stamp. */ - private Block createNewBlock() throws IOException { + Block createNewBlock() throws IOException { assert hasWriteLock(); Block b = new Block(nextBlockId(), 0, 0); // Increment the generation stamp for every new block. @@ -3993,7 +3633,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock( pendingFile, lastBlockLength); } - persistBlocks(src, pendingFile, false); + FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, false); } finally { writeUnlock(); } @@ -4163,8 +3803,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return leaseManager.reassignLease(lease, pendingFile, newHolder); } - private void commitOrCompleteLastBlock(final INodeFile fileINode, - final INodesInPath iip, final Block commitBlock) throws IOException { + void commitOrCompleteLastBlock( + final INodeFile fileINode, final INodesInPath iip, + final Block commitBlock) throws IOException { assert hasWriteLock(); Preconditions.checkArgument(fileINode.isUnderConstruction()); if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) { @@ -4182,14 +3823,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } } - private void finalizeINodeFileUnderConstruction(String src, - INodeFile pendingFile, int latestSnapshot) throws IOException { + void finalizeINodeFileUnderConstruction( + String src, INodeFile pendingFile, int latestSnapshot) throws IOException { assert hasWriteLock(); FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature(); Preconditions.checkArgument(uc != null); leaseManager.removeLease(uc.getClientName(), pendingFile); - + pendingFile.recordModification(latestSnapshot); // The file is no longer pending. @@ -4401,7 +4042,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } else { // If this commit does not want to close the file, persist blocks src = iFile.getFullPathName(); - persistBlocks(src, iFile, false); + FSDirWriteFileOp.persistBlocks(dir, src, iFile, false); } } finally { writeUnlock(); @@ -4591,24 +4232,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace(); } - /** - * Persist the block list for the inode. - * @param path - * @param file - * @param logRetryCache - */ - private void persistBlocks(String path, INodeFile file, - boolean logRetryCache) { - assert hasWriteLock(); - Preconditions.checkArgument(file.isUnderConstruction()); - getEditLog().logUpdateBlocks(path, file, logRetryCache); - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("persistBlocks: " + path - + " with " + file.getBlocks().length + " blocks is persisted to" + - " the file system"); - } - } - /** * Close file. * @param path @@ -4796,13 +4419,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, public FSEditLog getEditLog() { return getFSImage().getEditLog(); - } - - private void checkBlock(ExtendedBlock block) throws IOException { - if (block != null && !this.blockPoolId.equals(block.getBlockPoolId())) { - throw new IOException("Unexpected BlockPoolId " + block.getBlockPoolId() - + " - expected " + blockPoolId); - } } @Metric({"MissingBlocks", "Number of missing blocks"}) @@ -5073,21 +4689,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth); } - /** - * Persist the new block (the last block of the given file). - * @param path - * @param file - */ - private void persistNewBlock(String path, INodeFile file) { - Preconditions.checkArgument(file.isUnderConstruction()); - getEditLog().logAddBlock(path, file); - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("persistNewBlock: " - + path + " with new block " + file.getLastBlock().toString() - + ", current total block count is " + file.getBlocks().length); - } - } - /** * SafeModeInfo contains information related to the safe mode. *

@@ -6393,7 +5994,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, blockinfo.setExpectedLocations(storages); String src = pendingFile.getFullPathName(); - persistBlocks(src, pendingFile, logRetryCache); + FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index e2f44b52abf..b95194a28f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -708,23 +708,11 @@ class NameNodeRpcServer implements NamenodeProtocols { String[] favoredNodes) throws IOException { checkNNStartup(); - if (stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src - + " fileId=" + fileId + " for " + clientName); - } - Set excludedNodesSet = null; - if (excludedNodes != null) { - excludedNodesSet = new HashSet(excludedNodes.length); - for (Node node : excludedNodes) { - excludedNodesSet.add(node); - } - } - List favoredNodesList = (favoredNodes == null) ? null - : Arrays.asList(favoredNodes); LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId, - clientName, previous, excludedNodesSet, favoredNodesList); - if (locatedBlock != null) + clientName, previous, excludedNodes, favoredNodes); + if (locatedBlock != null) { metrics.incrAddBlockOps(); + } return locatedBlock; } @@ -765,13 +753,7 @@ class NameNodeRpcServer implements NamenodeProtocols { public void abandonBlock(ExtendedBlock b, long fileId, String src, String holder) throws IOException { checkNNStartup(); - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: " - +b+" of file "+src); - } - if (!namesystem.abandonBlock(b, fileId, src, holder)) { - throw new IOException("Cannot abandon block during write to " + src); - } + namesystem.abandonBlock(b, fileId, src, holder); } @Override // ClientProtocol @@ -779,10 +761,6 @@ class NameNodeRpcServer implements NamenodeProtocols { ExtendedBlock last, long fileId) throws IOException { checkNNStartup(); - if(stateChangeLog.isDebugEnabled()) { - stateChangeLog.debug("*DIR* NameNode.complete: " - + src + " fileId=" + fileId +" for " + clientName); - } return namesystem.completeFile(src, clientName, last, fileId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java index 5a4134c0adf..c92e79b76f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java @@ -39,6 +39,7 @@ import org.apache.hadoop.io.EnumSetWritable; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; /** * Race between two threads simultaneously calling @@ -88,25 +89,40 @@ public class TestAddBlockRetry { // start first addBlock() LOG.info("Starting first addBlock for " + src); LocatedBlock[] onRetryBlock = new LocatedBlock[1]; - DatanodeStorageInfo targets[] = ns.getNewBlockTargets( - src, HdfsConstants.GRANDFATHER_INODE_ID, "clientName", - null, null, null, onRetryBlock); + ns.readLock(); + FSDirWriteFileOp.ValidateAddBlockResult r; + FSPermissionChecker pc = Mockito.mock(FSPermissionChecker.class); + try { + r = FSDirWriteFileOp.validateAddBlock(ns, pc, src, + HdfsConstants.GRANDFATHER_INODE_ID, + "clientName", null, onRetryBlock); + } finally { + ns.readUnlock();; + } + DatanodeStorageInfo targets[] = FSDirWriteFileOp.chooseTargetForNewBlock( + ns.getBlockManager(), src, null, null, r); assertNotNull("Targets must be generated", targets); // run second addBlock() LOG.info("Starting second addBlock for " + src); nn.addBlock(src, "clientName", null, null, - HdfsConstants.GRANDFATHER_INODE_ID, null); + HdfsConstants.GRANDFATHER_INODE_ID, null); assertTrue("Penultimate block must be complete", - checkFileProgress(src, false)); + checkFileProgress(src, false)); LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE); assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size()); LocatedBlock lb2 = lbs.get(0); assertEquals("Wrong replication", REPLICATION, lb2.getLocations().length); // continue first addBlock() - LocatedBlock newBlock = ns.storeAllocatedBlock( - src, HdfsConstants.GRANDFATHER_INODE_ID, "clientName", null, targets); + ns.writeLock(); + LocatedBlock newBlock; + try { + newBlock = FSDirWriteFileOp.storeAllocatedBlock(ns, src, + HdfsConstants.GRANDFATHER_INODE_ID, "clientName", null, targets); + } finally { + ns.writeUnlock(); + } assertEquals("Blocks are not equal", lb2.getBlock(), newBlock.getBlock()); // check locations diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java index 30496121819..ea560feeded 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderCon import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; import java.io.IOException; @@ -45,7 +46,9 @@ public class TestCommitBlockSynchronization { private FSNamesystem makeNameSystemSpy(Block block, INodeFile file) throws IOException { Configuration conf = new Configuration(); + FSEditLog editlog = mock(FSEditLog.class); FSImage image = new FSImage(conf); + Whitebox.setInternalState(image, "editLog", editlog); final DatanodeStorageInfo[] targets = {}; FSNamesystem namesystem = new FSNamesystem(conf, image);