HDFS-8394. Move getAdditionalBlock() and related functionalities into a separate class. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-05-15 19:09:59 -07:00
parent 8f37873342
commit e5afac5896
8 changed files with 649 additions and 560 deletions

View File

@ -557,6 +557,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8397. Refactor the error handling code in DataStreamer. HDFS-8397. Refactor the error handling code in DataStreamer.
(Tsz Wo Nicholas Sze via jing9) (Tsz Wo Nicholas Sze via jing9)
HDFS-8394. Move getAdditionalBlock() and related functionalities into a
separate class. (wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -0,0 +1,563 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
class FSDirWriteFileOp {
private FSDirWriteFileOp() {}
static boolean unprotectedRemoveBlock(
FSDirectory fsd, String path, INodesInPath iip, INodeFile fileNode,
Block block) throws IOException {
// modify file-> block and blocksMap
// fileNode should be under construction
BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
if (uc == null) {
return false;
}
fsd.getBlockManager().removeBlockFromMap(block);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+path+" with "+block
+" block is removed from the file system");
}
// update space consumed
fsd.updateCount(iip, 0, -fileNode.getPreferredBlockSize(),
fileNode.getPreferredBlockReplication(), true);
return true;
}
/**
* Persist the block list for the inode.
*/
static void persistBlocks(
FSDirectory fsd, String path, INodeFile file, boolean logRetryCache) {
assert fsd.getFSNamesystem().hasWriteLock();
Preconditions.checkArgument(file.isUnderConstruction());
fsd.getEditLog().logUpdateBlocks(path, file, logRetryCache);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("persistBlocks: " + path
+ " with " + file.getBlocks().length + " blocks is persisted to" +
" the file system");
}
}
static void abandonBlock(
FSDirectory fsd, FSPermissionChecker pc, ExtendedBlock b, long fileId,
String src, String holder) throws IOException {
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsd.resolvePath(pc, src, pathComponents);
final INode inode;
final INodesInPath iip;
if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
iip = fsd.getINodesInPath(src, true);
inode = iip.getLastINode();
} else {
inode = fsd.getInode(fileId);
iip = INodesInPath.fromINode(inode);
if (inode != null) {
src = iip.getPath();
}
}
FSNamesystem fsn = fsd.getFSNamesystem();
final INodeFile file = fsn.checkLease(src, holder, inode, fileId);
Preconditions.checkState(file.isUnderConstruction());
Block localBlock = ExtendedBlock.getLocalBlock(b);
fsd.writeLock();
try {
// Remove the block from the pending creates list
if (!unprotectedRemoveBlock(fsd, src, iip, file, localBlock)) {
return;
}
} finally {
fsd.writeUnlock();
}
persistBlocks(fsd, src, file, false);
}
static void checkBlock(FSNamesystem fsn, ExtendedBlock block)
throws IOException {
String bpId = fsn.getBlockPoolId();
if (block != null && !bpId.equals(block.getBlockPoolId())) {
throw new IOException("Unexpected BlockPoolId " + block.getBlockPoolId()
+ " - expected " + bpId);
}
}
/**
* Part I of getAdditionalBlock().
* Analyze the state of the file under read lock to determine if the client
* can add a new block, detect potential retries, lease mismatches,
* and minimal replication of the penultimate block.
*
* Generate target DataNode locations for the new block,
* but do not create the new block yet.
*/
static ValidateAddBlockResult validateAddBlock(
FSNamesystem fsn, FSPermissionChecker pc,
String src, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException {
final long blockSize;
final int replication;
final byte storagePolicyID;
String clientMachine;
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsn.dir.resolvePath(pc, src, pathComponents);
FileState fileState = analyzeFileState(fsn, src, fileId, clientName,
previous, onRetryBlock);
final INodeFile pendingFile = fileState.inode;
// Check if the penultimate block is minimally replicated
if (!fsn.checkFileProgress(src, pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet: " + src);
}
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
// This is a retry. No need to generate new locations.
// Use the last block if it has locations.
return null;
}
if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) {
throw new IOException("File has reached the limit on maximum number of"
+ " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
+ "): " + pendingFile.getBlocks().length + " >= "
+ fsn.maxBlocksPerFile);
}
blockSize = pendingFile.getPreferredBlockSize();
clientMachine = pendingFile.getFileUnderConstructionFeature()
.getClientMachine();
replication = pendingFile.getFileReplication();
storagePolicyID = pendingFile.getStoragePolicyID();
return new ValidateAddBlockResult(blockSize, replication, storagePolicyID,
clientMachine);
}
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk,
DatanodeStorageInfo[] locs, long offset) throws IOException {
LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk),
locs, offset, false);
fsn.getBlockManager().setBlockToken(lBlk,
BlockTokenIdentifier.AccessMode.WRITE);
return lBlk;
}
/**
* Part II of getAdditionalBlock().
* Should repeat the same analysis of the file state as in Part 1,
* but under the write lock.
* If the conditions still hold, then allocate a new block with
* the new targets, add it to the INode and to the BlocksMap.
*/
static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
long fileId, String clientName, ExtendedBlock previous,
DatanodeStorageInfo[] targets) throws IOException {
long offset;
// Run the full analysis again, since things could have changed
// while chooseTarget() was executing.
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
FileState fileState = analyzeFileState(fsn, src, fileId, clientName,
previous, onRetryBlock);
final INodeFile pendingFile = fileState.inode;
src = fileState.path;
if (onRetryBlock[0] != null) {
if (onRetryBlock[0].getLocations().length > 0) {
// This is a retry. Just return the last block if having locations.
return onRetryBlock[0];
} else {
// add new chosen targets to already allocated block and return
BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
((BlockInfoContiguousUnderConstruction) lastBlockInFile)
.setExpectedLocations(targets);
offset = pendingFile.computeFileSize();
return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
}
}
// commit the last block and complete it if it has minimum replicas
fsn.commitOrCompleteLastBlock(pendingFile, fileState.iip,
ExtendedBlock.getLocalBlock(previous));
// allocate new block, record block locations in INode.
Block newBlock = fsn.createNewBlock();
INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets);
persistNewBlock(fsn, src, pendingFile);
offset = pendingFile.computeFileSize();
// Return located block
return makeLocatedBlock(fsn, newBlock, targets, offset);
}
static DatanodeStorageInfo[] chooseTargetForNewBlock(
BlockManager bm, String src, DatanodeInfo[] excludedNodes, String[]
favoredNodes, ValidateAddBlockResult r) throws IOException {
Node clientNode = bm.getDatanodeManager()
.getDatanodeByHost(r.clientMachine);
if (clientNode == null) {
clientNode = getClientNode(bm, r.clientMachine);
}
Set<Node> excludedNodesSet = null;
if (excludedNodes != null) {
excludedNodesSet = new HashSet<>(excludedNodes.length);
Collections.addAll(excludedNodesSet, excludedNodes);
}
List<String> favoredNodesList = (favoredNodes == null) ? null
: Arrays.asList(favoredNodes);
// choose targets for the new block to be allocated.
return bm.chooseTarget4NewBlock(src, r.replication, clientNode,
excludedNodesSet, r.blockSize,
favoredNodesList, r.storagePolicyID);
}
/**
* Resolve clientmachine address to get a network location path
*/
static Node getClientNode(BlockManager bm, String clientMachine) {
List<String> hosts = new ArrayList<>(1);
hosts.add(clientMachine);
List<String> rName = bm.getDatanodeManager()
.resolveNetworkLocation(hosts);
Node clientNode = null;
if (rName != null) {
// Able to resolve clientMachine mapping.
// Create a temp node to findout the rack local nodes
clientNode = new NodeBase(rName.get(0) + NodeBase.PATH_SEPARATOR_STR
+ clientMachine);
}
return clientNode;
}
/**
* Add a block to the file. Returns a reference to the added block.
*/
private static BlockInfoContiguous addBlock(
FSDirectory fsd, String path, INodesInPath inodesInPath, Block block,
DatanodeStorageInfo[] targets) throws IOException {
fsd.writeLock();
try {
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
Preconditions.checkState(fileINode.isUnderConstruction());
// check quota limits and updated space consumed
fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
fileINode.getPreferredBlockReplication(), true);
// associate new last block for the file
BlockInfoContiguousUnderConstruction blockInfo =
new BlockInfoContiguousUnderConstruction(
block,
fileINode.getFileReplication(),
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
targets);
fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
fileINode.addBlock(blockInfo);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: "
+ path + " with " + block
+ " block is added to the in-memory "
+ "file system");
}
return blockInfo;
} finally {
fsd.writeUnlock();
}
}
private static FileState analyzeFileState(
FSNamesystem fsn, String src, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
throws IOException {
assert fsn.hasReadLock();
checkBlock(fsn, previous);
onRetryBlock[0] = null;
fsn.checkNameNodeSafeMode("Cannot add block to " + src);
// have we exceeded the configured limit of fs objects.
fsn.checkFsObjectLimit();
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
final INode inode;
final INodesInPath iip;
if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
iip = fsn.dir.getINodesInPath4Write(src);
inode = iip.getLastINode();
} else {
// Newer clients pass the inode ID, so we can just get the inode
// directly.
inode = fsn.dir.getInode(fileId);
iip = INodesInPath.fromINode(inode);
if (inode != null) {
src = iip.getPath();
}
}
final INodeFile file = fsn.checkLease(src, clientName,
inode, fileId);
BlockInfoContiguous lastBlockInFile = file.getLastBlock();
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
// The block that the client claims is the current last block
// doesn't match up with what we think is the last block. There are
// four possibilities:
// 1) This is the first block allocation of an append() pipeline
// which started appending exactly at or exceeding the block boundary.
// In this case, the client isn't passed the previous block,
// so it makes the allocateBlock() call with previous=null.
// We can distinguish this since the last block of the file
// will be exactly a full block.
// 2) This is a retry from a client that missed the response of a
// prior getAdditionalBlock() call, perhaps because of a network
// timeout, or because of an HA failover. In that case, we know
// by the fact that the client is re-issuing the RPC that it
// never began to write to the old block. Hence it is safe to
// to return the existing block.
// 3) This is an entirely bogus request/bug -- we should error out
// rather than potentially appending a new block with an empty
// one in the middle, etc
// 4) This is a retry from a client that timed out while
// the prior getAdditionalBlock() is still being processed,
// currently working on chooseTarget().
// There are no means to distinguish between the first and
// the second attempts in Part I, because the first one hasn't
// changed the namesystem state yet.
// We run this analysis again in Part II where case 4 is impossible.
BlockInfoContiguous penultimateBlock = file.getPenultimateBlock();
if (previous == null &&
lastBlockInFile != null &&
lastBlockInFile.getNumBytes() >= file.getPreferredBlockSize() &&
lastBlockInFile.isComplete()) {
// Case 1
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.allocateBlock: handling block allocation" +
" writing to a file with a complete previous block: src=" +
src + " lastBlock=" + lastBlockInFile);
}
} else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
if (lastBlockInFile.getNumBytes() != 0) {
throw new IOException(
"Request looked like a retry to allocate block " +
lastBlockInFile + " but it already contains " +
lastBlockInFile.getNumBytes() + " bytes");
}
// Case 2
// Return the last block.
NameNode.stateChangeLog.info("BLOCK* allocateBlock: caught retry for " +
"allocation of a new block in " + src + ". Returning previously" +
" allocated block " + lastBlockInFile);
long offset = file.computeFileSize();
BlockInfoContiguousUnderConstruction lastBlockUC =
(BlockInfoContiguousUnderConstruction) lastBlockInFile;
onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
lastBlockUC.getExpectedStorageLocations(), offset);
return new FileState(file, src, iip);
} else {
// Case 3
throw new IOException("Cannot allocate block in " + src + ": " +
"passed 'previous' block " + previous + " does not match actual " +
"last block in file " + lastBlockInFile);
}
}
return new FileState(file, src, iip);
}
static boolean completeFile(FSNamesystem fsn, FSPermissionChecker pc,
final String srcArg, String holder, ExtendedBlock last, long fileId)
throws IOException {
String src = srcArg;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
src + " for " + holder);
}
checkBlock(fsn, last);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsn.dir.resolvePath(pc, src, pathComponents);
boolean success = completeFileInternal(fsn, src, holder,
ExtendedBlock.getLocalBlock(last),
fileId);
if (success) {
NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg
+ " is closed by " + holder);
}
return success;
}
private static boolean completeFileInternal(
FSNamesystem fsn, String src, String holder, Block last, long fileId)
throws IOException {
assert fsn.hasWriteLock();
final INodeFile pendingFile;
final INodesInPath iip;
INode inode = null;
try {
if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
iip = fsn.dir.getINodesInPath(src, true);
inode = iip.getLastINode();
} else {
inode = fsn.dir.getInode(fileId);
iip = INodesInPath.fromINode(inode);
if (inode != null) {
src = iip.getPath();
}
}
pendingFile = fsn.checkLease(src, holder, inode, fileId);
} catch (LeaseExpiredException lee) {
if (inode != null && inode.isFile() &&
!inode.asFile().isUnderConstruction()) {
// This could be a retry RPC - i.e the client tried to close
// the file, but missed the RPC response. Thus, it is trying
// again to close the file. If the file still exists and
// the client's view of the last block matches the actual
// last block, then we'll treat it as a successful close.
// See HDFS-3031.
final Block realLastBlock = inode.asFile().getLastBlock();
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
NameNode.stateChangeLog.info("DIR* completeFile: " +
"request from " + holder + " to complete inode " + fileId +
"(" + src + ") which is already closed. But, it appears to be " +
"an RPC retry. Returning success");
return true;
}
}
throw lee;
}
// Check the state of the penultimate block. It should be completed
// before attempting to complete the last one.
if (!fsn.checkFileProgress(src, pendingFile, false)) {
return false;
}
// commit the last block and complete it if it has minimum replicas
fsn.commitOrCompleteLastBlock(pendingFile, iip, last);
if (!fsn.checkFileProgress(src, pendingFile, true)) {
return false;
}
fsn.finalizeINodeFileUnderConstruction(src, pendingFile,
Snapshot.CURRENT_STATE_ID);
return true;
}
/**
* Persist the new block (the last block of the given file).
*/
private static void persistNewBlock(
FSNamesystem fsn, String path, INodeFile file) {
Preconditions.checkArgument(file.isUnderConstruction());
fsn.getEditLog().logAddBlock(path, file);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("persistNewBlock: "
+ path + " with new block " + file.getLastBlock().toString()
+ ", current total block count is " + file.getBlocks().length);
}
}
/**
* Save allocated block at the given pending filename
*
* @param fsn FSNamesystem
* @param src path to the file
* @param inodesInPath representing each of the components of src.
* The last INode is the INode for {@code src} file.
* @param newBlock newly allocated block to be save
* @param targets target datanodes where replicas of the new block is placed
* @throws QuotaExceededException If addition of block exceeds space quota
*/
private static void saveAllocatedBlock(
FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock,
DatanodeStorageInfo[] targets)
throws IOException {
assert fsn.hasWriteLock();
BlockInfoContiguous b = addBlock(fsn.dir, src, inodesInPath, newBlock,
targets);
NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src);
DatanodeStorageInfo.incrementBlocksScheduled(targets);
}
private static class FileState {
final INodeFile inode;
final String path;
final INodesInPath iip;
FileState(INodeFile inode, String fullPath, INodesInPath iip) {
this.inode = inode;
this.path = fullPath;
this.iip = iip;
}
}
static class ValidateAddBlockResult {
final long blockSize;
final int replication;
final byte storagePolicyID;
final String clientMachine;
ValidateAddBlockResult(
long blockSize, int replication, byte storagePolicyID,
String clientMachine) {
this.blockSize = blockSize;
this.replication = replication;
this.storagePolicyID = storagePolicyID;
this.clientMachine = clientMachine;
}
}
}

View File

@ -55,12 +55,9 @@
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.util.ByteArray; import org.apache.hadoop.hdfs.util.ByteArray;
import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.EnumCounters;
@ -308,7 +305,7 @@ FSNamesystem getFSNamesystem() {
return namesystem; return namesystem;
} }
private BlockManager getBlockManager() { BlockManager getBlockManager() {
return getFSNamesystem().getBlockManager(); return getFSNamesystem().getBlockManager();
} }
@ -478,79 +475,6 @@ INodeFile addFileForEditLog(long id, INodesInPath existing, byte[] localName,
return null; return null;
} }
/**
* Add a block to the file. Returns a reference to the added block.
*/
BlockInfoContiguous addBlock(String path, INodesInPath inodesInPath,
Block block, DatanodeStorageInfo[] targets) throws IOException {
writeLock();
try {
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
Preconditions.checkState(fileINode.isUnderConstruction());
// check quota limits and updated space consumed
updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
fileINode.getPreferredBlockReplication(), true);
// associate new last block for the file
BlockInfoContiguousUnderConstruction blockInfo =
new BlockInfoContiguousUnderConstruction(
block,
fileINode.getFileReplication(),
BlockUCState.UNDER_CONSTRUCTION,
targets);
getBlockManager().addBlockCollection(blockInfo, fileINode);
fileINode.addBlock(blockInfo);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: "
+ path + " with " + block
+ " block is added to the in-memory "
+ "file system");
}
return blockInfo;
} finally {
writeUnlock();
}
}
/**
* Remove a block from the file.
* @return Whether the block exists in the corresponding file
*/
boolean removeBlock(String path, INodesInPath iip, INodeFile fileNode,
Block block) throws IOException {
Preconditions.checkArgument(fileNode.isUnderConstruction());
writeLock();
try {
return unprotectedRemoveBlock(path, iip, fileNode, block);
} finally {
writeUnlock();
}
}
boolean unprotectedRemoveBlock(String path, INodesInPath iip,
INodeFile fileNode, Block block) throws IOException {
// modify file-> block and blocksMap
// fileNode should be under construction
BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
if (uc == null) {
return false;
}
getBlockManager().removeBlockFromMap(block);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+path+" with "+block
+" block is removed from the file system");
}
// update space consumed
updateCount(iip, 0, -fileNode.getPreferredBlockSize(),
fileNode.getPreferredBlockReplication(), true);
return true;
}
/** /**
* This is a wrapper for resolvePath(). If the path passed * This is a wrapper for resolvePath(). If the path passed
* is prefixed with /.reserved/raw, then it checks to ensure that the caller * is prefixed with /.reserved/raw, then it checks to ensure that the caller

View File

@ -1037,7 +1037,8 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
+ path); + path);
} }
Block oldBlock = oldBlocks[oldBlocks.length - 1]; Block oldBlock = oldBlocks[oldBlocks.length - 1];
boolean removed = fsDir.unprotectedRemoveBlock(path, iip, file, oldBlock); boolean removed = FSDirWriteFileOp.unprotectedRemoveBlock(
fsDir, path, iip, file, oldBlock);
if (!removed && !(op instanceof UpdateBlocksOp)) { if (!removed && !(op instanceof UpdateBlocksOp)) {
throw new IOException("Trying to delete non-existant block " + oldBlock); throw new IOException("Trying to delete non-existant block " + oldBlock);
} }

View File

@ -268,7 +268,6 @@
import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -484,7 +483,7 @@ private void logAuditEvent(boolean succeeded,
private final long maxFsObjects; // maximum number of fs objects private final long maxFsObjects; // maximum number of fs objects
private final long minBlockSize; // minimum block size private final long minBlockSize; // minimum block size
private final long maxBlocksPerFile; // maximum # of blocks per file final long maxBlocksPerFile; // maximum # of blocks per file
// precision of access times. // precision of access times.
private final long accessTimePrecision; private final long accessTimePrecision;
@ -614,7 +613,7 @@ LeaseManager getLeaseManager() {
boolean isHaEnabled() { boolean isHaEnabled() {
return haEnabled; return haEnabled;
} }
/** /**
* Check the supplied configuration for correctness. * Check the supplied configuration for correctness.
* @param conf Supplies the configuration to validate. * @param conf Supplies the configuration to validate.
@ -1863,8 +1862,8 @@ private GetBlockLocationsResult getBlockLocationsInt(
: dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip); : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
final LocatedBlocks blocks = blockManager.createLocatedBlocks( final LocatedBlocks blocks = blockManager.createLocatedBlocks(
inode.getBlocks(iip.getPathSnapshotId()), fileSize, inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo); length, needBlockToken, iip.isSnapshot(), feInfo);
// Set caching information for the located blocks. // Set caching information for the located blocks.
for (LocatedBlock lb : blocks.getLocatedBlocks()) { for (LocatedBlock lb : blocks.getLocatedBlocks()) {
@ -2232,8 +2231,8 @@ void setStoragePolicy(String src, String policyName) throws IOException {
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set storage policy for " + src); checkNameNodeSafeMode("Cannot set storage policy for " + src);
auditStat = FSDirAttrOp.setStoragePolicy( auditStat = FSDirAttrOp.setStoragePolicy(dir, blockManager, src,
dir, blockManager, src, policyName); policyName);
} catch (AccessControlException e) { } catch (AccessControlException e) {
logAuditEvent(false, "setStoragePolicy", src); logAuditEvent(false, "setStoragePolicy", src);
throw e; throw e;
@ -2621,7 +2620,7 @@ private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc,
return toRemoveBlocks; return toRemoveBlocks;
} catch (IOException ie) { } catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " + NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
ie.getMessage()); ie.getMessage());
throw ie; throw ie;
} }
} }
@ -2703,8 +2702,8 @@ private LocatedBlock appendFileInternal(FSPermissionChecker pc,
"Cannot append to lazy persist file " + src); "Cannot append to lazy persist file " + src);
} }
// Opening an existing file for append - may need to recover lease. // Opening an existing file for append - may need to recover lease.
recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, src, holder,
iip, src, holder, clientMachine, false); clientMachine, false);
final BlockInfoContiguous lastBlock = myFile.getLastBlock(); final BlockInfoContiguous lastBlock = myFile.getLastBlock();
// Check that the block has at least minimum replication. // Check that the block has at least minimum replication.
@ -3042,290 +3041,49 @@ void setBlockPoolId(String bpid) {
* are replicated. Will return an empty 2-elt array if we want the * are replicated. Will return an empty 2-elt array if we want the
* client to "try again later". * client to "try again later".
*/ */
LocatedBlock getAdditionalBlock(String src, long fileId, String clientName, LocatedBlock getAdditionalBlock(
ExtendedBlock previous, Set<Node> excludedNodes, String src, long fileId, String clientName, ExtendedBlock previous,
List<String> favoredNodes) throws IOException { DatanodeInfo[] excludedNodes, String[] favoredNodes) throws IOException {
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
DatanodeStorageInfo targets[] = getNewBlockTargets(src, fileId,
clientName, previous, excludedNodes, favoredNodes, onRetryBlock);
if (targets == null) {
assert onRetryBlock[0] != null : "Retry block is null";
// This is a retry. Just return the last block.
return onRetryBlock[0];
}
LocatedBlock newBlock = storeAllocatedBlock(
src, fileId, clientName, previous, targets);
return newBlock;
}
/**
* Part I of getAdditionalBlock().
* Analyze the state of the file under read lock to determine if the client
* can add a new block, detect potential retries, lease mismatches,
* and minimal replication of the penultimate block.
*
* Generate target DataNode locations for the new block,
* but do not create the new block yet.
*/
DatanodeStorageInfo[] getNewBlockTargets(String src, long fileId,
String clientName, ExtendedBlock previous, Set<Node> excludedNodes,
List<String> favoredNodes, LocatedBlock[] onRetryBlock) throws IOException {
final long blockSize;
final int replication;
final byte storagePolicyID;
Node clientNode = null;
String clientMachine = null;
if(NameNode.stateChangeLog.isDebugEnabled()) { if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: " NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: "
+ src + " inodeId " + fileId + " for " + clientName); + src + " inodeId " + fileId + " for " + clientName);
} }
checkOperation(OperationCategory.READ); waitForLoadingFSImage();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); LocatedBlock[] onRetryBlock = new LocatedBlock[1];
FSDirWriteFileOp.ValidateAddBlockResult r;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
src = dir.resolvePath(pc, src, pathComponents); r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
FileState fileState = analyzeFileState( previous, onRetryBlock);
src, fileId, clientName, previous, onRetryBlock);
final INodeFile pendingFile = fileState.inode;
// Check if the penultimate block is minimally replicated
if (!checkFileProgress(src, pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet: " + src);
}
src = fileState.path;
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
// This is a retry. No need to generate new locations.
// Use the last block if it has locations.
return null;
}
if (pendingFile.getBlocks().length >= maxBlocksPerFile) {
throw new IOException("File has reached the limit on maximum number of"
+ " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
+ "): " + pendingFile.getBlocks().length + " >= "
+ maxBlocksPerFile);
}
blockSize = pendingFile.getPreferredBlockSize();
clientMachine = pendingFile.getFileUnderConstructionFeature()
.getClientMachine();
clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
clientMachine);
replication = pendingFile.getFileReplication();
storagePolicyID = pendingFile.getStoragePolicyID();
} finally { } finally {
readUnlock(); readUnlock();
} }
if (clientNode == null) { if (r == null) {
clientNode = getClientNode(clientMachine); assert onRetryBlock[0] != null : "Retry block is null";
// This is a retry. Just return the last block.
return onRetryBlock[0];
} }
// choose targets for the new block to be allocated. DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
return getBlockManager().chooseTarget4NewBlock( blockManager, src, excludedNodes, favoredNodes, r);
src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
storagePolicyID);
}
/**
* Part II of getAdditionalBlock().
* Should repeat the same analysis of the file state as in Part 1,
* but under the write lock.
* If the conditions still hold, then allocate a new block with
* the new targets, add it to the INode and to the BlocksMap.
*/
LocatedBlock storeAllocatedBlock(String src, long fileId, String clientName,
ExtendedBlock previous, DatanodeStorageInfo[] targets) throws IOException {
Block newBlock = null;
long offset;
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
waitForLoadingFSImage();
writeLock(); writeLock();
LocatedBlock lb;
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
// Run the full analysis again, since things could have changed lb = FSDirWriteFileOp.storeAllocatedBlock(
// while chooseTarget() was executing. this, src, fileId, clientName, previous, targets);
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
FileState fileState =
analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
final INodeFile pendingFile = fileState.inode;
src = fileState.path;
if (onRetryBlock[0] != null) {
if (onRetryBlock[0].getLocations().length > 0) {
// This is a retry. Just return the last block if having locations.
return onRetryBlock[0];
} else {
// add new chosen targets to already allocated block and return
BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
((BlockInfoContiguousUnderConstruction) lastBlockInFile)
.setExpectedLocations(targets);
offset = pendingFile.computeFileSize();
return makeLocatedBlock(lastBlockInFile, targets, offset);
}
}
// commit the last block and complete it if it has minimum replicas
commitOrCompleteLastBlock(pendingFile, fileState.iip,
ExtendedBlock.getLocalBlock(previous));
// allocate new block, record block locations in INode.
newBlock = createNewBlock();
INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
saveAllocatedBlock(src, inodesInPath, newBlock, targets);
persistNewBlock(src, pendingFile);
offset = pendingFile.computeFileSize();
} finally { } finally {
writeUnlock(); writeUnlock();
} }
getEditLog().logSync(); getEditLog().logSync();
return lb;
// Return located block
return makeLocatedBlock(newBlock, targets, offset);
}
/*
* Resolve clientmachine address to get a network location path
*/
private Node getClientNode(String clientMachine) {
List<String> hosts = new ArrayList<String>(1);
hosts.add(clientMachine);
List<String> rName = getBlockManager().getDatanodeManager()
.resolveNetworkLocation(hosts);
Node clientNode = null;
if (rName != null) {
// Able to resolve clientMachine mapping.
// Create a temp node to findout the rack local nodes
clientNode = new NodeBase(rName.get(0) + NodeBase.PATH_SEPARATOR_STR
+ clientMachine);
}
return clientNode;
}
static class FileState {
public final INodeFile inode;
public final String path;
public final INodesInPath iip;
public FileState(INodeFile inode, String fullPath, INodesInPath iip) {
this.inode = inode;
this.path = fullPath;
this.iip = iip;
}
}
FileState analyzeFileState(String src,
long fileId,
String clientName,
ExtendedBlock previous,
LocatedBlock[] onRetryBlock)
throws IOException {
assert hasReadLock();
checkBlock(previous);
onRetryBlock[0] = null;
checkNameNodeSafeMode("Cannot add block to " + src);
// have we exceeded the configured limit of fs objects.
checkFsObjectLimit();
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
final INode inode;
final INodesInPath iip;
if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
iip = dir.getINodesInPath4Write(src);
inode = iip.getLastINode();
} else {
// Newer clients pass the inode ID, so we can just get the inode
// directly.
inode = dir.getInode(fileId);
iip = INodesInPath.fromINode(inode);
if (inode != null) {
src = iip.getPath();
}
}
final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
// The block that the client claims is the current last block
// doesn't match up with what we think is the last block. There are
// four possibilities:
// 1) This is the first block allocation of an append() pipeline
// which started appending exactly at or exceeding the block boundary.
// In this case, the client isn't passed the previous block,
// so it makes the allocateBlock() call with previous=null.
// We can distinguish this since the last block of the file
// will be exactly a full block.
// 2) This is a retry from a client that missed the response of a
// prior getAdditionalBlock() call, perhaps because of a network
// timeout, or because of an HA failover. In that case, we know
// by the fact that the client is re-issuing the RPC that it
// never began to write to the old block. Hence it is safe to
// to return the existing block.
// 3) This is an entirely bogus request/bug -- we should error out
// rather than potentially appending a new block with an empty
// one in the middle, etc
// 4) This is a retry from a client that timed out while
// the prior getAdditionalBlock() is still being processed,
// currently working on chooseTarget().
// There are no means to distinguish between the first and
// the second attempts in Part I, because the first one hasn't
// changed the namesystem state yet.
// We run this analysis again in Part II where case 4 is impossible.
BlockInfoContiguous penultimateBlock = pendingFile.getPenultimateBlock();
if (previous == null &&
lastBlockInFile != null &&
lastBlockInFile.getNumBytes() >= pendingFile.getPreferredBlockSize() &&
lastBlockInFile.isComplete()) {
// Case 1
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.allocateBlock: handling block allocation" +
" writing to a file with a complete previous block: src=" +
src + " lastBlock=" + lastBlockInFile);
}
} else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
if (lastBlockInFile.getNumBytes() != 0) {
throw new IOException(
"Request looked like a retry to allocate block " +
lastBlockInFile + " but it already contains " +
lastBlockInFile.getNumBytes() + " bytes");
}
// Case 2
// Return the last block.
NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
"caught retry for allocation of a new block in " +
src + ". Returning previously allocated block " + lastBlockInFile);
long offset = pendingFile.computeFileSize();
onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
((BlockInfoContiguousUnderConstruction)lastBlockInFile).getExpectedStorageLocations(),
offset);
return new FileState(pendingFile, src, iip);
} else {
// Case 3
throw new IOException("Cannot allocate block in " + src + ": " +
"passed 'previous' block " + previous + " does not match actual " +
"last block in file " + lastBlockInFile);
}
}
return new FileState(pendingFile, src, iip);
}
LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
long offset) throws IOException {
LocatedBlock lBlk = BlockManager.newLocatedBlock(
getExtendedBlock(blk), locs, offset, false);
getBlockManager().setBlockToken(
lBlk, BlockTokenIdentifier.AccessMode.WRITE);
return lBlk;
} }
/** @see ClientProtocol#getAdditionalDatanode */ /** @see ClientProtocol#getAdditionalDatanode */
@ -3378,7 +3136,7 @@ LocatedBlock getAdditionalDatanode(String src, long fileId,
} }
if (clientnode == null) { if (clientnode == null) {
clientnode = getClientNode(clientMachine); clientnode = FSDirWriteFileOp.getClientNode(blockManager, clientMachine);
} }
// choose new datanodes. // choose new datanodes.
@ -3394,60 +3152,32 @@ LocatedBlock getAdditionalDatanode(String src, long fileId,
/** /**
* The client would like to let go of the given block * The client would like to let go of the given block
*/ */
boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder) void abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
throws IOException { throws IOException {
if(NameNode.stateChangeLog.isDebugEnabled()) { if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b
+ "of file " + src); + "of file " + src);
} }
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
FSPermissionChecker pc = getPermissionChecker();
waitForLoadingFSImage(); waitForLoadingFSImage();
checkOperation(OperationCategory.WRITE);
FSPermissionChecker pc = getPermissionChecker();
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src); checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src);
src = dir.resolvePath(pc, src, pathComponents); FSDirWriteFileOp.abandonBlock(dir, pc, b, fileId, src, holder);
final INode inode;
final INodesInPath iip;
if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
iip = dir.getINodesInPath(src, true);
inode = iip.getLastINode();
} else {
inode = dir.getInode(fileId);
iip = INodesInPath.fromINode(inode);
if (inode != null) {
src = iip.getPath();
}
}
final INodeFile file = checkLease(src, holder, inode, fileId);
// Remove the block from the pending creates list
boolean removed = dir.removeBlock(src, iip, file,
ExtendedBlock.getLocalBlock(b));
if (!removed) {
return true;
}
if(NameNode.stateChangeLog.isDebugEnabled()) { if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ b + " is removed from pendingCreates"); + b + " is removed from pendingCreates");
} }
persistBlocks(src, file, false);
} finally { } finally {
writeUnlock(); writeUnlock();
} }
getEditLog().logSync(); getEditLog().logSync();
return true;
} }
private INodeFile checkLease(String src, String holder, INode inode, INodeFile checkLease(
long fileId) throws LeaseExpiredException, FileNotFoundException { String src, String holder, INode inode, long fileId) throws LeaseExpiredException, FileNotFoundException {
assert hasReadLock(); assert hasReadLock();
final String ident = src + " (inode " + fileId + ")"; final String ident = src + " (inode " + fileId + ")";
if (inode == null) { if (inode == null) {
@ -3492,120 +3222,30 @@ private INodeFile checkLease(String src, String holder, INode inode,
* (e.g if not all blocks have reached minimum replication yet) * (e.g if not all blocks have reached minimum replication yet)
* @throws IOException on error (eg lease mismatch, file not open, file deleted) * @throws IOException on error (eg lease mismatch, file not open, file deleted)
*/ */
boolean completeFile(final String srcArg, String holder, boolean completeFile(final String src, String holder,
ExtendedBlock last, long fileId) ExtendedBlock last, long fileId)
throws SafeModeException, UnresolvedLinkException, IOException { throws IOException {
String src = srcArg;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
src + " for " + holder);
}
checkBlock(last);
boolean success = false; boolean success = false;
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
FSPermissionChecker pc = getPermissionChecker();
waitForLoadingFSImage(); waitForLoadingFSImage();
FSPermissionChecker pc = getPermissionChecker();
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot complete file " + src); checkNameNodeSafeMode("Cannot complete file " + src);
src = dir.resolvePath(pc, src, pathComponents); success = FSDirWriteFileOp.completeFile(this, pc, src, holder, last,
success = completeFileInternal(src, holder, fileId);
ExtendedBlock.getLocalBlock(last), fileId);
} finally { } finally {
writeUnlock(); writeUnlock();
} }
getEditLog().logSync(); getEditLog().logSync();
if (success) {
NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg
+ " is closed by " + holder);
}
return success; return success;
} }
private boolean completeFileInternal(String src, String holder, Block last,
long fileId) throws IOException {
assert hasWriteLock();
final INodeFile pendingFile;
final INodesInPath iip;
INode inode = null;
try {
if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
iip = dir.getINodesInPath(src, true);
inode = iip.getLastINode();
} else {
inode = dir.getInode(fileId);
iip = INodesInPath.fromINode(inode);
if (inode != null) {
src = iip.getPath();
}
}
pendingFile = checkLease(src, holder, inode, fileId);
} catch (LeaseExpiredException lee) {
if (inode != null && inode.isFile() &&
!inode.asFile().isUnderConstruction()) {
// This could be a retry RPC - i.e the client tried to close
// the file, but missed the RPC response. Thus, it is trying
// again to close the file. If the file still exists and
// the client's view of the last block matches the actual
// last block, then we'll treat it as a successful close.
// See HDFS-3031.
final Block realLastBlock = inode.asFile().getLastBlock();
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
NameNode.stateChangeLog.info("DIR* completeFile: " +
"request from " + holder + " to complete inode " + fileId +
"(" + src + ") which is already closed. But, it appears to be " +
"an RPC retry. Returning success");
return true;
}
}
throw lee;
}
// Check the state of the penultimate block. It should be completed
// before attempting to complete the last one.
if (!checkFileProgress(src, pendingFile, false)) {
return false;
}
// commit the last block and complete it if it has minimum replicas
commitOrCompleteLastBlock(pendingFile, iip, last);
if (!checkFileProgress(src, pendingFile, true)) {
return false;
}
finalizeINodeFileUnderConstruction(src, pendingFile,
Snapshot.CURRENT_STATE_ID);
return true;
}
/**
* Save allocated block at the given pending filename
*
* @param src path to the file
* @param inodesInPath representing each of the components of src.
* The last INode is the INode for {@code src} file.
* @param newBlock newly allocated block to be save
* @param targets target datanodes where replicas of the new block is placed
* @throws QuotaExceededException If addition of block exceeds space quota
*/
private void saveAllocatedBlock(String src, INodesInPath inodesInPath,
Block newBlock, DatanodeStorageInfo[] targets)
throws IOException {
assert hasWriteLock();
BlockInfoContiguous b = dir.addBlock(src, inodesInPath, newBlock, targets);
NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src);
DatanodeStorageInfo.incrementBlocksScheduled(targets);
}
/** /**
* Create new block with a unique block id and a new generation stamp. * Create new block with a unique block id and a new generation stamp.
*/ */
private Block createNewBlock() throws IOException { Block createNewBlock() throws IOException {
assert hasWriteLock(); assert hasWriteLock();
Block b = new Block(nextBlockId(), 0, 0); Block b = new Block(nextBlockId(), 0, 0);
// Increment the generation stamp for every new block. // Increment the generation stamp for every new block.
@ -3997,7 +3637,7 @@ void fsync(String src, long fileId, String clientName, long lastBlockLength)
pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock( pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock(
pendingFile, lastBlockLength); pendingFile, lastBlockLength);
} }
persistBlocks(src, pendingFile, false); FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, false);
} finally { } finally {
writeUnlock(); writeUnlock();
} }
@ -4167,8 +3807,9 @@ Lease reassignLeaseInternal(Lease lease, String newHolder, INodeFile pendingFile
return leaseManager.reassignLease(lease, pendingFile, newHolder); return leaseManager.reassignLease(lease, pendingFile, newHolder);
} }
private void commitOrCompleteLastBlock(final INodeFile fileINode, void commitOrCompleteLastBlock(
final INodesInPath iip, final Block commitBlock) throws IOException { final INodeFile fileINode, final INodesInPath iip,
final Block commitBlock) throws IOException {
assert hasWriteLock(); assert hasWriteLock();
Preconditions.checkArgument(fileINode.isUnderConstruction()); Preconditions.checkArgument(fileINode.isUnderConstruction());
if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) { if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
@ -4186,14 +3827,14 @@ private void commitOrCompleteLastBlock(final INodeFile fileINode,
} }
} }
private void finalizeINodeFileUnderConstruction(String src, void finalizeINodeFileUnderConstruction(
INodeFile pendingFile, int latestSnapshot) throws IOException { String src, INodeFile pendingFile, int latestSnapshot) throws IOException {
assert hasWriteLock(); assert hasWriteLock();
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature(); FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
Preconditions.checkArgument(uc != null); Preconditions.checkArgument(uc != null);
leaseManager.removeLease(uc.getClientName(), pendingFile); leaseManager.removeLease(uc.getClientName(), pendingFile);
pendingFile.recordModification(latestSnapshot); pendingFile.recordModification(latestSnapshot);
// The file is no longer pending. // The file is no longer pending.
@ -4405,7 +4046,7 @@ void commitBlockSynchronization(ExtendedBlock oldBlock,
} else { } else {
// If this commit does not want to close the file, persist blocks // If this commit does not want to close the file, persist blocks
src = iFile.getFullPathName(); src = iFile.getFullPathName();
persistBlocks(src, iFile, false); FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
} }
} finally { } finally {
writeUnlock(); writeUnlock();
@ -4595,24 +4236,6 @@ void checkAvailableResources() {
hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace(); hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
} }
/**
* Persist the block list for the inode.
* @param path
* @param file
* @param logRetryCache
*/
private void persistBlocks(String path, INodeFile file,
boolean logRetryCache) {
assert hasWriteLock();
Preconditions.checkArgument(file.isUnderConstruction());
getEditLog().logUpdateBlocks(path, file, logRetryCache);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("persistBlocks: " + path
+ " with " + file.getBlocks().length + " blocks is persisted to" +
" the file system");
}
}
/** /**
* Close file. * Close file.
* @param path * @param path
@ -4800,13 +4423,6 @@ public FSImage getFSImage() {
public FSEditLog getEditLog() { public FSEditLog getEditLog() {
return getFSImage().getEditLog(); return getFSImage().getEditLog();
}
private void checkBlock(ExtendedBlock block) throws IOException {
if (block != null && !this.blockPoolId.equals(block.getBlockPoolId())) {
throw new IOException("Unexpected BlockPoolId " + block.getBlockPoolId()
+ " - expected " + blockPoolId);
}
} }
@Metric({"MissingBlocks", "Number of missing blocks"}) @Metric({"MissingBlocks", "Number of missing blocks"})
@ -5079,21 +4695,6 @@ void setBalancerBandwidth(long bandwidth) throws IOException {
getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth); getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
} }
/**
* Persist the new block (the last block of the given file).
* @param path
* @param file
*/
private void persistNewBlock(String path, INodeFile file) {
Preconditions.checkArgument(file.isUnderConstruction());
getEditLog().logAddBlock(path, file);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("persistNewBlock: "
+ path + " with new block " + file.getLastBlock().toString()
+ ", current total block count is " + file.getBlocks().length);
}
}
/** /**
* SafeModeInfo contains information related to the safe mode. * SafeModeInfo contains information related to the safe mode.
* <p> * <p>
@ -6399,7 +6000,7 @@ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
blockinfo.setExpectedLocations(storages); blockinfo.setExpectedLocations(storages);
String src = pendingFile.getFullPathName(); String src = pendingFile.getFullPathName();
persistBlocks(src, pendingFile, logRetryCache); FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache);
} }
/** /**

View File

@ -713,23 +713,11 @@ public LocatedBlock addBlock(String src, String clientName,
String[] favoredNodes) String[] favoredNodes)
throws IOException { throws IOException {
checkNNStartup(); checkNNStartup();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
+ " fileId=" + fileId + " for " + clientName);
}
Set<Node> excludedNodesSet = null;
if (excludedNodes != null) {
excludedNodesSet = new HashSet<Node>(excludedNodes.length);
for (Node node : excludedNodes) {
excludedNodesSet.add(node);
}
}
List<String> favoredNodesList = (favoredNodes == null) ? null
: Arrays.asList(favoredNodes);
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId, LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
clientName, previous, excludedNodesSet, favoredNodesList); clientName, previous, excludedNodes, favoredNodes);
if (locatedBlock != null) if (locatedBlock != null) {
metrics.incrAddBlockOps(); metrics.incrAddBlockOps();
}
return locatedBlock; return locatedBlock;
} }
@ -770,13 +758,7 @@ public LocatedBlock getAdditionalDatanode(final String src,
public void abandonBlock(ExtendedBlock b, long fileId, String src, public void abandonBlock(ExtendedBlock b, long fileId, String src,
String holder) throws IOException { String holder) throws IOException {
checkNNStartup(); checkNNStartup();
if(stateChangeLog.isDebugEnabled()) { namesystem.abandonBlock(b, fileId, src, holder);
stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
+b+" of file "+src);
}
if (!namesystem.abandonBlock(b, fileId, src, holder)) {
throw new IOException("Cannot abandon block during write to " + src);
}
} }
@Override // ClientProtocol @Override // ClientProtocol
@ -784,10 +766,6 @@ public boolean complete(String src, String clientName,
ExtendedBlock last, long fileId) ExtendedBlock last, long fileId)
throws IOException { throws IOException {
checkNNStartup(); checkNNStartup();
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.complete: "
+ src + " fileId=" + fileId +" for " + clientName);
}
return namesystem.completeFile(src, clientName, last, fileId); return namesystem.completeFile(src, clientName, last, fileId);
} }

View File

@ -39,6 +39,7 @@
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
/** /**
* Race between two threads simultaneously calling * Race between two threads simultaneously calling
@ -88,25 +89,40 @@ public void testRetryAddBlockWhileInChooseTarget() throws Exception {
// start first addBlock() // start first addBlock()
LOG.info("Starting first addBlock for " + src); LOG.info("Starting first addBlock for " + src);
LocatedBlock[] onRetryBlock = new LocatedBlock[1]; LocatedBlock[] onRetryBlock = new LocatedBlock[1];
DatanodeStorageInfo targets[] = ns.getNewBlockTargets( ns.readLock();
src, HdfsConstants.GRANDFATHER_INODE_ID, "clientName", FSDirWriteFileOp.ValidateAddBlockResult r;
null, null, null, onRetryBlock); FSPermissionChecker pc = Mockito.mock(FSPermissionChecker.class);
try {
r = FSDirWriteFileOp.validateAddBlock(ns, pc, src,
HdfsConstants.GRANDFATHER_INODE_ID,
"clientName", null, onRetryBlock);
} finally {
ns.readUnlock();;
}
DatanodeStorageInfo targets[] = FSDirWriteFileOp.chooseTargetForNewBlock(
ns.getBlockManager(), src, null, null, r);
assertNotNull("Targets must be generated", targets); assertNotNull("Targets must be generated", targets);
// run second addBlock() // run second addBlock()
LOG.info("Starting second addBlock for " + src); LOG.info("Starting second addBlock for " + src);
nn.addBlock(src, "clientName", null, null, nn.addBlock(src, "clientName", null, null,
HdfsConstants.GRANDFATHER_INODE_ID, null); HdfsConstants.GRANDFATHER_INODE_ID, null);
assertTrue("Penultimate block must be complete", assertTrue("Penultimate block must be complete",
checkFileProgress(src, false)); checkFileProgress(src, false));
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE); LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size()); assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
LocatedBlock lb2 = lbs.get(0); LocatedBlock lb2 = lbs.get(0);
assertEquals("Wrong replication", REPLICATION, lb2.getLocations().length); assertEquals("Wrong replication", REPLICATION, lb2.getLocations().length);
// continue first addBlock() // continue first addBlock()
LocatedBlock newBlock = ns.storeAllocatedBlock( ns.writeLock();
src, HdfsConstants.GRANDFATHER_INODE_ID, "clientName", null, targets); LocatedBlock newBlock;
try {
newBlock = FSDirWriteFileOp.storeAllocatedBlock(ns, src,
HdfsConstants.GRANDFATHER_INODE_ID, "clientName", null, targets);
} finally {
ns.writeUnlock();
}
assertEquals("Blocks are not equal", lb2.getBlock(), newBlock.getBlock()); assertEquals("Blocks are not equal", lb2.getBlock(), newBlock.getBlock());
// check locations // check locations

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.junit.Test; import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
import java.io.IOException; import java.io.IOException;
@ -45,7 +46,9 @@ public class TestCommitBlockSynchronization {
private FSNamesystem makeNameSystemSpy(Block block, INodeFile file) private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
throws IOException { throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
FSEditLog editlog = mock(FSEditLog.class);
FSImage image = new FSImage(conf); FSImage image = new FSImage(conf);
Whitebox.setInternalState(image, "editLog", editlog);
final DatanodeStorageInfo[] targets = {}; final DatanodeStorageInfo[] targets = {};
FSNamesystem namesystem = new FSNamesystem(conf, image); FSNamesystem namesystem = new FSNamesystem(conf, image);