HDFS-8394. Move getAdditionalBlock() and related functionalities into a separate class. Contributed by Haohui Mai.
This commit is contained in:
parent
d6aa65d037
commit
1796d94d62
|
@ -224,6 +224,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-8397. Refactor the error handling code in DataStreamer.
|
HDFS-8397. Refactor the error handling code in DataStreamer.
|
||||||
(Tsz Wo Nicholas Sze via jing9)
|
(Tsz Wo Nicholas Sze via jing9)
|
||||||
|
|
||||||
|
HDFS-8394. Move getAdditionalBlock() and related functionalities into a
|
||||||
|
separate class. (wheat9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -0,0 +1,563 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||||
|
import org.apache.hadoop.net.Node;
|
||||||
|
import org.apache.hadoop.net.NodeBase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
class FSDirWriteFileOp {
|
||||||
|
private FSDirWriteFileOp() {}
|
||||||
|
static boolean unprotectedRemoveBlock(
|
||||||
|
FSDirectory fsd, String path, INodesInPath iip, INodeFile fileNode,
|
||||||
|
Block block) throws IOException {
|
||||||
|
// modify file-> block and blocksMap
|
||||||
|
// fileNode should be under construction
|
||||||
|
BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
|
||||||
|
if (uc == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
fsd.getBlockManager().removeBlockFromMap(block);
|
||||||
|
|
||||||
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
|
NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
|
||||||
|
+path+" with "+block
|
||||||
|
+" block is removed from the file system");
|
||||||
|
}
|
||||||
|
|
||||||
|
// update space consumed
|
||||||
|
fsd.updateCount(iip, 0, -fileNode.getPreferredBlockSize(),
|
||||||
|
fileNode.getPreferredBlockReplication(), true);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Persist the block list for the inode.
|
||||||
|
*/
|
||||||
|
static void persistBlocks(
|
||||||
|
FSDirectory fsd, String path, INodeFile file, boolean logRetryCache) {
|
||||||
|
assert fsd.getFSNamesystem().hasWriteLock();
|
||||||
|
Preconditions.checkArgument(file.isUnderConstruction());
|
||||||
|
fsd.getEditLog().logUpdateBlocks(path, file, logRetryCache);
|
||||||
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
|
NameNode.stateChangeLog.debug("persistBlocks: " + path
|
||||||
|
+ " with " + file.getBlocks().length + " blocks is persisted to" +
|
||||||
|
" the file system");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void abandonBlock(
|
||||||
|
FSDirectory fsd, FSPermissionChecker pc, ExtendedBlock b, long fileId,
|
||||||
|
String src, String holder) throws IOException {
|
||||||
|
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||||
|
src = fsd.resolvePath(pc, src, pathComponents);
|
||||||
|
|
||||||
|
final INode inode;
|
||||||
|
final INodesInPath iip;
|
||||||
|
if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
|
||||||
|
// Older clients may not have given us an inode ID to work with.
|
||||||
|
// In this case, we have to try to resolve the path and hope it
|
||||||
|
// hasn't changed or been deleted since the file was opened for write.
|
||||||
|
iip = fsd.getINodesInPath(src, true);
|
||||||
|
inode = iip.getLastINode();
|
||||||
|
} else {
|
||||||
|
inode = fsd.getInode(fileId);
|
||||||
|
iip = INodesInPath.fromINode(inode);
|
||||||
|
if (inode != null) {
|
||||||
|
src = iip.getPath();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
FSNamesystem fsn = fsd.getFSNamesystem();
|
||||||
|
final INodeFile file = fsn.checkLease(src, holder, inode, fileId);
|
||||||
|
Preconditions.checkState(file.isUnderConstruction());
|
||||||
|
|
||||||
|
Block localBlock = ExtendedBlock.getLocalBlock(b);
|
||||||
|
fsd.writeLock();
|
||||||
|
try {
|
||||||
|
// Remove the block from the pending creates list
|
||||||
|
if (!unprotectedRemoveBlock(fsd, src, iip, file, localBlock)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
fsd.writeUnlock();
|
||||||
|
}
|
||||||
|
persistBlocks(fsd, src, file, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void checkBlock(FSNamesystem fsn, ExtendedBlock block)
|
||||||
|
throws IOException {
|
||||||
|
String bpId = fsn.getBlockPoolId();
|
||||||
|
if (block != null && !bpId.equals(block.getBlockPoolId())) {
|
||||||
|
throw new IOException("Unexpected BlockPoolId " + block.getBlockPoolId()
|
||||||
|
+ " - expected " + bpId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Part I of getAdditionalBlock().
|
||||||
|
* Analyze the state of the file under read lock to determine if the client
|
||||||
|
* can add a new block, detect potential retries, lease mismatches,
|
||||||
|
* and minimal replication of the penultimate block.
|
||||||
|
*
|
||||||
|
* Generate target DataNode locations for the new block,
|
||||||
|
* but do not create the new block yet.
|
||||||
|
*/
|
||||||
|
static ValidateAddBlockResult validateAddBlock(
|
||||||
|
FSNamesystem fsn, FSPermissionChecker pc,
|
||||||
|
String src, long fileId, String clientName,
|
||||||
|
ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException {
|
||||||
|
final long blockSize;
|
||||||
|
final int replication;
|
||||||
|
final byte storagePolicyID;
|
||||||
|
String clientMachine;
|
||||||
|
|
||||||
|
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||||
|
src = fsn.dir.resolvePath(pc, src, pathComponents);
|
||||||
|
FileState fileState = analyzeFileState(fsn, src, fileId, clientName,
|
||||||
|
previous, onRetryBlock);
|
||||||
|
final INodeFile pendingFile = fileState.inode;
|
||||||
|
// Check if the penultimate block is minimally replicated
|
||||||
|
if (!fsn.checkFileProgress(src, pendingFile, false)) {
|
||||||
|
throw new NotReplicatedYetException("Not replicated yet: " + src);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
|
||||||
|
// This is a retry. No need to generate new locations.
|
||||||
|
// Use the last block if it has locations.
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) {
|
||||||
|
throw new IOException("File has reached the limit on maximum number of"
|
||||||
|
+ " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
|
||||||
|
+ "): " + pendingFile.getBlocks().length + " >= "
|
||||||
|
+ fsn.maxBlocksPerFile);
|
||||||
|
}
|
||||||
|
blockSize = pendingFile.getPreferredBlockSize();
|
||||||
|
clientMachine = pendingFile.getFileUnderConstructionFeature()
|
||||||
|
.getClientMachine();
|
||||||
|
replication = pendingFile.getFileReplication();
|
||||||
|
storagePolicyID = pendingFile.getStoragePolicyID();
|
||||||
|
return new ValidateAddBlockResult(blockSize, replication, storagePolicyID,
|
||||||
|
clientMachine);
|
||||||
|
}
|
||||||
|
|
||||||
|
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk,
|
||||||
|
DatanodeStorageInfo[] locs, long offset) throws IOException {
|
||||||
|
LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk),
|
||||||
|
locs, offset, false);
|
||||||
|
fsn.getBlockManager().setBlockToken(lBlk,
|
||||||
|
BlockTokenIdentifier.AccessMode.WRITE);
|
||||||
|
return lBlk;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Part II of getAdditionalBlock().
|
||||||
|
* Should repeat the same analysis of the file state as in Part 1,
|
||||||
|
* but under the write lock.
|
||||||
|
* If the conditions still hold, then allocate a new block with
|
||||||
|
* the new targets, add it to the INode and to the BlocksMap.
|
||||||
|
*/
|
||||||
|
static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
|
||||||
|
long fileId, String clientName, ExtendedBlock previous,
|
||||||
|
DatanodeStorageInfo[] targets) throws IOException {
|
||||||
|
long offset;
|
||||||
|
// Run the full analysis again, since things could have changed
|
||||||
|
// while chooseTarget() was executing.
|
||||||
|
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
||||||
|
FileState fileState = analyzeFileState(fsn, src, fileId, clientName,
|
||||||
|
previous, onRetryBlock);
|
||||||
|
final INodeFile pendingFile = fileState.inode;
|
||||||
|
src = fileState.path;
|
||||||
|
|
||||||
|
if (onRetryBlock[0] != null) {
|
||||||
|
if (onRetryBlock[0].getLocations().length > 0) {
|
||||||
|
// This is a retry. Just return the last block if having locations.
|
||||||
|
return onRetryBlock[0];
|
||||||
|
} else {
|
||||||
|
// add new chosen targets to already allocated block and return
|
||||||
|
BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
|
||||||
|
((BlockInfoContiguousUnderConstruction) lastBlockInFile)
|
||||||
|
.setExpectedLocations(targets);
|
||||||
|
offset = pendingFile.computeFileSize();
|
||||||
|
return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit the last block and complete it if it has minimum replicas
|
||||||
|
fsn.commitOrCompleteLastBlock(pendingFile, fileState.iip,
|
||||||
|
ExtendedBlock.getLocalBlock(previous));
|
||||||
|
|
||||||
|
// allocate new block, record block locations in INode.
|
||||||
|
Block newBlock = fsn.createNewBlock();
|
||||||
|
INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
|
||||||
|
saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets);
|
||||||
|
|
||||||
|
persistNewBlock(fsn, src, pendingFile);
|
||||||
|
offset = pendingFile.computeFileSize();
|
||||||
|
|
||||||
|
// Return located block
|
||||||
|
return makeLocatedBlock(fsn, newBlock, targets, offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
static DatanodeStorageInfo[] chooseTargetForNewBlock(
|
||||||
|
BlockManager bm, String src, DatanodeInfo[] excludedNodes, String[]
|
||||||
|
favoredNodes, ValidateAddBlockResult r) throws IOException {
|
||||||
|
Node clientNode = bm.getDatanodeManager()
|
||||||
|
.getDatanodeByHost(r.clientMachine);
|
||||||
|
if (clientNode == null) {
|
||||||
|
clientNode = getClientNode(bm, r.clientMachine);
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<Node> excludedNodesSet = null;
|
||||||
|
if (excludedNodes != null) {
|
||||||
|
excludedNodesSet = new HashSet<>(excludedNodes.length);
|
||||||
|
Collections.addAll(excludedNodesSet, excludedNodes);
|
||||||
|
}
|
||||||
|
List<String> favoredNodesList = (favoredNodes == null) ? null
|
||||||
|
: Arrays.asList(favoredNodes);
|
||||||
|
|
||||||
|
// choose targets for the new block to be allocated.
|
||||||
|
return bm.chooseTarget4NewBlock(src, r.replication, clientNode,
|
||||||
|
excludedNodesSet, r.blockSize,
|
||||||
|
favoredNodesList, r.storagePolicyID);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolve clientmachine address to get a network location path
|
||||||
|
*/
|
||||||
|
static Node getClientNode(BlockManager bm, String clientMachine) {
|
||||||
|
List<String> hosts = new ArrayList<>(1);
|
||||||
|
hosts.add(clientMachine);
|
||||||
|
List<String> rName = bm.getDatanodeManager()
|
||||||
|
.resolveNetworkLocation(hosts);
|
||||||
|
Node clientNode = null;
|
||||||
|
if (rName != null) {
|
||||||
|
// Able to resolve clientMachine mapping.
|
||||||
|
// Create a temp node to findout the rack local nodes
|
||||||
|
clientNode = new NodeBase(rName.get(0) + NodeBase.PATH_SEPARATOR_STR
|
||||||
|
+ clientMachine);
|
||||||
|
}
|
||||||
|
return clientNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a block to the file. Returns a reference to the added block.
|
||||||
|
*/
|
||||||
|
private static BlockInfoContiguous addBlock(
|
||||||
|
FSDirectory fsd, String path, INodesInPath inodesInPath, Block block,
|
||||||
|
DatanodeStorageInfo[] targets) throws IOException {
|
||||||
|
fsd.writeLock();
|
||||||
|
try {
|
||||||
|
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
|
||||||
|
Preconditions.checkState(fileINode.isUnderConstruction());
|
||||||
|
|
||||||
|
// check quota limits and updated space consumed
|
||||||
|
fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
|
||||||
|
fileINode.getPreferredBlockReplication(), true);
|
||||||
|
|
||||||
|
// associate new last block for the file
|
||||||
|
BlockInfoContiguousUnderConstruction blockInfo =
|
||||||
|
new BlockInfoContiguousUnderConstruction(
|
||||||
|
block,
|
||||||
|
fileINode.getFileReplication(),
|
||||||
|
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
|
||||||
|
targets);
|
||||||
|
fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
|
||||||
|
fileINode.addBlock(blockInfo);
|
||||||
|
|
||||||
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
|
NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: "
|
||||||
|
+ path + " with " + block
|
||||||
|
+ " block is added to the in-memory "
|
||||||
|
+ "file system");
|
||||||
|
}
|
||||||
|
return blockInfo;
|
||||||
|
} finally {
|
||||||
|
fsd.writeUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static FileState analyzeFileState(
|
||||||
|
FSNamesystem fsn, String src, long fileId, String clientName,
|
||||||
|
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
|
||||||
|
throws IOException {
|
||||||
|
assert fsn.hasReadLock();
|
||||||
|
|
||||||
|
checkBlock(fsn, previous);
|
||||||
|
onRetryBlock[0] = null;
|
||||||
|
fsn.checkNameNodeSafeMode("Cannot add block to " + src);
|
||||||
|
|
||||||
|
// have we exceeded the configured limit of fs objects.
|
||||||
|
fsn.checkFsObjectLimit();
|
||||||
|
|
||||||
|
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
|
||||||
|
final INode inode;
|
||||||
|
final INodesInPath iip;
|
||||||
|
if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
|
||||||
|
// Older clients may not have given us an inode ID to work with.
|
||||||
|
// In this case, we have to try to resolve the path and hope it
|
||||||
|
// hasn't changed or been deleted since the file was opened for write.
|
||||||
|
iip = fsn.dir.getINodesInPath4Write(src);
|
||||||
|
inode = iip.getLastINode();
|
||||||
|
} else {
|
||||||
|
// Newer clients pass the inode ID, so we can just get the inode
|
||||||
|
// directly.
|
||||||
|
inode = fsn.dir.getInode(fileId);
|
||||||
|
iip = INodesInPath.fromINode(inode);
|
||||||
|
if (inode != null) {
|
||||||
|
src = iip.getPath();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final INodeFile file = fsn.checkLease(src, clientName,
|
||||||
|
inode, fileId);
|
||||||
|
BlockInfoContiguous lastBlockInFile = file.getLastBlock();
|
||||||
|
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
|
||||||
|
// The block that the client claims is the current last block
|
||||||
|
// doesn't match up with what we think is the last block. There are
|
||||||
|
// four possibilities:
|
||||||
|
// 1) This is the first block allocation of an append() pipeline
|
||||||
|
// which started appending exactly at or exceeding the block boundary.
|
||||||
|
// In this case, the client isn't passed the previous block,
|
||||||
|
// so it makes the allocateBlock() call with previous=null.
|
||||||
|
// We can distinguish this since the last block of the file
|
||||||
|
// will be exactly a full block.
|
||||||
|
// 2) This is a retry from a client that missed the response of a
|
||||||
|
// prior getAdditionalBlock() call, perhaps because of a network
|
||||||
|
// timeout, or because of an HA failover. In that case, we know
|
||||||
|
// by the fact that the client is re-issuing the RPC that it
|
||||||
|
// never began to write to the old block. Hence it is safe to
|
||||||
|
// to return the existing block.
|
||||||
|
// 3) This is an entirely bogus request/bug -- we should error out
|
||||||
|
// rather than potentially appending a new block with an empty
|
||||||
|
// one in the middle, etc
|
||||||
|
// 4) This is a retry from a client that timed out while
|
||||||
|
// the prior getAdditionalBlock() is still being processed,
|
||||||
|
// currently working on chooseTarget().
|
||||||
|
// There are no means to distinguish between the first and
|
||||||
|
// the second attempts in Part I, because the first one hasn't
|
||||||
|
// changed the namesystem state yet.
|
||||||
|
// We run this analysis again in Part II where case 4 is impossible.
|
||||||
|
|
||||||
|
BlockInfoContiguous penultimateBlock = file.getPenultimateBlock();
|
||||||
|
if (previous == null &&
|
||||||
|
lastBlockInFile != null &&
|
||||||
|
lastBlockInFile.getNumBytes() >= file.getPreferredBlockSize() &&
|
||||||
|
lastBlockInFile.isComplete()) {
|
||||||
|
// Case 1
|
||||||
|
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
|
NameNode.stateChangeLog.debug(
|
||||||
|
"BLOCK* NameSystem.allocateBlock: handling block allocation" +
|
||||||
|
" writing to a file with a complete previous block: src=" +
|
||||||
|
src + " lastBlock=" + lastBlockInFile);
|
||||||
|
}
|
||||||
|
} else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
|
||||||
|
if (lastBlockInFile.getNumBytes() != 0) {
|
||||||
|
throw new IOException(
|
||||||
|
"Request looked like a retry to allocate block " +
|
||||||
|
lastBlockInFile + " but it already contains " +
|
||||||
|
lastBlockInFile.getNumBytes() + " bytes");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Case 2
|
||||||
|
// Return the last block.
|
||||||
|
NameNode.stateChangeLog.info("BLOCK* allocateBlock: caught retry for " +
|
||||||
|
"allocation of a new block in " + src + ". Returning previously" +
|
||||||
|
" allocated block " + lastBlockInFile);
|
||||||
|
long offset = file.computeFileSize();
|
||||||
|
BlockInfoContiguousUnderConstruction lastBlockUC =
|
||||||
|
(BlockInfoContiguousUnderConstruction) lastBlockInFile;
|
||||||
|
onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
|
||||||
|
lastBlockUC.getExpectedStorageLocations(), offset);
|
||||||
|
return new FileState(file, src, iip);
|
||||||
|
} else {
|
||||||
|
// Case 3
|
||||||
|
throw new IOException("Cannot allocate block in " + src + ": " +
|
||||||
|
"passed 'previous' block " + previous + " does not match actual " +
|
||||||
|
"last block in file " + lastBlockInFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new FileState(file, src, iip);
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean completeFile(FSNamesystem fsn, FSPermissionChecker pc,
|
||||||
|
final String srcArg, String holder, ExtendedBlock last, long fileId)
|
||||||
|
throws IOException {
|
||||||
|
String src = srcArg;
|
||||||
|
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
|
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
|
||||||
|
src + " for " + holder);
|
||||||
|
}
|
||||||
|
checkBlock(fsn, last);
|
||||||
|
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||||
|
src = fsn.dir.resolvePath(pc, src, pathComponents);
|
||||||
|
boolean success = completeFileInternal(fsn, src, holder,
|
||||||
|
ExtendedBlock.getLocalBlock(last),
|
||||||
|
fileId);
|
||||||
|
if (success) {
|
||||||
|
NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg
|
||||||
|
+ " is closed by " + holder);
|
||||||
|
}
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean completeFileInternal(
|
||||||
|
FSNamesystem fsn, String src, String holder, Block last, long fileId)
|
||||||
|
throws IOException {
|
||||||
|
assert fsn.hasWriteLock();
|
||||||
|
final INodeFile pendingFile;
|
||||||
|
final INodesInPath iip;
|
||||||
|
INode inode = null;
|
||||||
|
try {
|
||||||
|
if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
|
||||||
|
// Older clients may not have given us an inode ID to work with.
|
||||||
|
// In this case, we have to try to resolve the path and hope it
|
||||||
|
// hasn't changed or been deleted since the file was opened for write.
|
||||||
|
iip = fsn.dir.getINodesInPath(src, true);
|
||||||
|
inode = iip.getLastINode();
|
||||||
|
} else {
|
||||||
|
inode = fsn.dir.getInode(fileId);
|
||||||
|
iip = INodesInPath.fromINode(inode);
|
||||||
|
if (inode != null) {
|
||||||
|
src = iip.getPath();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pendingFile = fsn.checkLease(src, holder, inode, fileId);
|
||||||
|
} catch (LeaseExpiredException lee) {
|
||||||
|
if (inode != null && inode.isFile() &&
|
||||||
|
!inode.asFile().isUnderConstruction()) {
|
||||||
|
// This could be a retry RPC - i.e the client tried to close
|
||||||
|
// the file, but missed the RPC response. Thus, it is trying
|
||||||
|
// again to close the file. If the file still exists and
|
||||||
|
// the client's view of the last block matches the actual
|
||||||
|
// last block, then we'll treat it as a successful close.
|
||||||
|
// See HDFS-3031.
|
||||||
|
final Block realLastBlock = inode.asFile().getLastBlock();
|
||||||
|
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
|
||||||
|
NameNode.stateChangeLog.info("DIR* completeFile: " +
|
||||||
|
"request from " + holder + " to complete inode " + fileId +
|
||||||
|
"(" + src + ") which is already closed. But, it appears to be " +
|
||||||
|
"an RPC retry. Returning success");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw lee;
|
||||||
|
}
|
||||||
|
// Check the state of the penultimate block. It should be completed
|
||||||
|
// before attempting to complete the last one.
|
||||||
|
if (!fsn.checkFileProgress(src, pendingFile, false)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit the last block and complete it if it has minimum replicas
|
||||||
|
fsn.commitOrCompleteLastBlock(pendingFile, iip, last);
|
||||||
|
|
||||||
|
if (!fsn.checkFileProgress(src, pendingFile, true)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
fsn.finalizeINodeFileUnderConstruction(src, pendingFile,
|
||||||
|
Snapshot.CURRENT_STATE_ID);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Persist the new block (the last block of the given file).
|
||||||
|
*/
|
||||||
|
private static void persistNewBlock(
|
||||||
|
FSNamesystem fsn, String path, INodeFile file) {
|
||||||
|
Preconditions.checkArgument(file.isUnderConstruction());
|
||||||
|
fsn.getEditLog().logAddBlock(path, file);
|
||||||
|
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
|
NameNode.stateChangeLog.debug("persistNewBlock: "
|
||||||
|
+ path + " with new block " + file.getLastBlock().toString()
|
||||||
|
+ ", current total block count is " + file.getBlocks().length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Save allocated block at the given pending filename
|
||||||
|
*
|
||||||
|
* @param fsn FSNamesystem
|
||||||
|
* @param src path to the file
|
||||||
|
* @param inodesInPath representing each of the components of src.
|
||||||
|
* The last INode is the INode for {@code src} file.
|
||||||
|
* @param newBlock newly allocated block to be save
|
||||||
|
* @param targets target datanodes where replicas of the new block is placed
|
||||||
|
* @throws QuotaExceededException If addition of block exceeds space quota
|
||||||
|
*/
|
||||||
|
private static void saveAllocatedBlock(
|
||||||
|
FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock,
|
||||||
|
DatanodeStorageInfo[] targets)
|
||||||
|
throws IOException {
|
||||||
|
assert fsn.hasWriteLock();
|
||||||
|
BlockInfoContiguous b = addBlock(fsn.dir, src, inodesInPath, newBlock,
|
||||||
|
targets);
|
||||||
|
NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src);
|
||||||
|
DatanodeStorageInfo.incrementBlocksScheduled(targets);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class FileState {
|
||||||
|
final INodeFile inode;
|
||||||
|
final String path;
|
||||||
|
final INodesInPath iip;
|
||||||
|
|
||||||
|
FileState(INodeFile inode, String fullPath, INodesInPath iip) {
|
||||||
|
this.inode = inode;
|
||||||
|
this.path = fullPath;
|
||||||
|
this.iip = iip;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class ValidateAddBlockResult {
|
||||||
|
final long blockSize;
|
||||||
|
final int replication;
|
||||||
|
final byte storagePolicyID;
|
||||||
|
final String clientMachine;
|
||||||
|
|
||||||
|
ValidateAddBlockResult(
|
||||||
|
long blockSize, int replication, byte storagePolicyID,
|
||||||
|
String clientMachine) {
|
||||||
|
this.blockSize = blockSize;
|
||||||
|
this.replication = replication;
|
||||||
|
this.storagePolicyID = storagePolicyID;
|
||||||
|
this.clientMachine = clientMachine;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -55,12 +55,9 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||||
import org.apache.hadoop.hdfs.util.ByteArray;
|
import org.apache.hadoop.hdfs.util.ByteArray;
|
||||||
import org.apache.hadoop.hdfs.util.EnumCounters;
|
import org.apache.hadoop.hdfs.util.EnumCounters;
|
||||||
|
@ -308,7 +305,7 @@ public class FSDirectory implements Closeable {
|
||||||
return namesystem;
|
return namesystem;
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlockManager getBlockManager() {
|
BlockManager getBlockManager() {
|
||||||
return getFSNamesystem().getBlockManager();
|
return getFSNamesystem().getBlockManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -478,79 +475,6 @@ public class FSDirectory implements Closeable {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Add a block to the file. Returns a reference to the added block.
|
|
||||||
*/
|
|
||||||
BlockInfoContiguous addBlock(String path, INodesInPath inodesInPath,
|
|
||||||
Block block, DatanodeStorageInfo[] targets) throws IOException {
|
|
||||||
writeLock();
|
|
||||||
try {
|
|
||||||
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
|
|
||||||
Preconditions.checkState(fileINode.isUnderConstruction());
|
|
||||||
|
|
||||||
// check quota limits and updated space consumed
|
|
||||||
updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
|
|
||||||
fileINode.getPreferredBlockReplication(), true);
|
|
||||||
|
|
||||||
// associate new last block for the file
|
|
||||||
BlockInfoContiguousUnderConstruction blockInfo =
|
|
||||||
new BlockInfoContiguousUnderConstruction(
|
|
||||||
block,
|
|
||||||
fileINode.getFileReplication(),
|
|
||||||
BlockUCState.UNDER_CONSTRUCTION,
|
|
||||||
targets);
|
|
||||||
getBlockManager().addBlockCollection(blockInfo, fileINode);
|
|
||||||
fileINode.addBlock(blockInfo);
|
|
||||||
|
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
|
||||||
NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: "
|
|
||||||
+ path + " with " + block
|
|
||||||
+ " block is added to the in-memory "
|
|
||||||
+ "file system");
|
|
||||||
}
|
|
||||||
return blockInfo;
|
|
||||||
} finally {
|
|
||||||
writeUnlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Remove a block from the file.
|
|
||||||
* @return Whether the block exists in the corresponding file
|
|
||||||
*/
|
|
||||||
boolean removeBlock(String path, INodesInPath iip, INodeFile fileNode,
|
|
||||||
Block block) throws IOException {
|
|
||||||
Preconditions.checkArgument(fileNode.isUnderConstruction());
|
|
||||||
writeLock();
|
|
||||||
try {
|
|
||||||
return unprotectedRemoveBlock(path, iip, fileNode, block);
|
|
||||||
} finally {
|
|
||||||
writeUnlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean unprotectedRemoveBlock(String path, INodesInPath iip,
|
|
||||||
INodeFile fileNode, Block block) throws IOException {
|
|
||||||
// modify file-> block and blocksMap
|
|
||||||
// fileNode should be under construction
|
|
||||||
BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
|
|
||||||
if (uc == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
getBlockManager().removeBlockFromMap(block);
|
|
||||||
|
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
|
||||||
NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
|
|
||||||
+path+" with "+block
|
|
||||||
+" block is removed from the file system");
|
|
||||||
}
|
|
||||||
|
|
||||||
// update space consumed
|
|
||||||
updateCount(iip, 0, -fileNode.getPreferredBlockSize(),
|
|
||||||
fileNode.getPreferredBlockReplication(), true);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a wrapper for resolvePath(). If the path passed
|
* This is a wrapper for resolvePath(). If the path passed
|
||||||
* is prefixed with /.reserved/raw, then it checks to ensure that the caller
|
* is prefixed with /.reserved/raw, then it checks to ensure that the caller
|
||||||
|
|
|
@ -1041,7 +1041,8 @@ public class FSEditLogLoader {
|
||||||
+ path);
|
+ path);
|
||||||
}
|
}
|
||||||
Block oldBlock = oldBlocks[oldBlocks.length - 1];
|
Block oldBlock = oldBlocks[oldBlocks.length - 1];
|
||||||
boolean removed = fsDir.unprotectedRemoveBlock(path, iip, file, oldBlock);
|
boolean removed = FSDirWriteFileOp.unprotectedRemoveBlock(
|
||||||
|
fsDir, path, iip, file, oldBlock);
|
||||||
if (!removed && !(op instanceof UpdateBlocksOp)) {
|
if (!removed && !(op instanceof UpdateBlocksOp)) {
|
||||||
throw new IOException("Trying to delete non-existant block " + oldBlock);
|
throw new IOException("Trying to delete non-existant block " + oldBlock);
|
||||||
}
|
}
|
||||||
|
|
|
@ -271,7 +271,6 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.util.MBeans;
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.net.NodeBase;
|
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
|
@ -472,7 +471,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
private final long maxFsObjects; // maximum number of fs objects
|
private final long maxFsObjects; // maximum number of fs objects
|
||||||
|
|
||||||
private final long minBlockSize; // minimum block size
|
private final long minBlockSize; // minimum block size
|
||||||
private final long maxBlocksPerFile; // maximum # of blocks per file
|
final long maxBlocksPerFile; // maximum # of blocks per file
|
||||||
|
|
||||||
// precision of access times.
|
// precision of access times.
|
||||||
private final long accessTimePrecision;
|
private final long accessTimePrecision;
|
||||||
|
@ -602,7 +601,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
boolean isHaEnabled() {
|
boolean isHaEnabled() {
|
||||||
return haEnabled;
|
return haEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check the supplied configuration for correctness.
|
* Check the supplied configuration for correctness.
|
||||||
* @param conf Supplies the configuration to validate.
|
* @param conf Supplies the configuration to validate.
|
||||||
|
@ -1853,8 +1852,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
: dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
|
: dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
|
||||||
|
|
||||||
final LocatedBlocks blocks = blockManager.createLocatedBlocks(
|
final LocatedBlocks blocks = blockManager.createLocatedBlocks(
|
||||||
inode.getBlocks(iip.getPathSnapshotId()), fileSize,
|
inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
|
||||||
isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
|
length, needBlockToken, iip.isSnapshot(), feInfo);
|
||||||
|
|
||||||
// Set caching information for the located blocks.
|
// Set caching information for the located blocks.
|
||||||
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
|
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
|
||||||
|
@ -2222,8 +2221,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
checkNameNodeSafeMode("Cannot set storage policy for " + src);
|
checkNameNodeSafeMode("Cannot set storage policy for " + src);
|
||||||
auditStat = FSDirAttrOp.setStoragePolicy(
|
auditStat = FSDirAttrOp.setStoragePolicy(dir, blockManager, src,
|
||||||
dir, blockManager, src, policyName);
|
policyName);
|
||||||
} catch (AccessControlException e) {
|
} catch (AccessControlException e) {
|
||||||
logAuditEvent(false, "setStoragePolicy", src);
|
logAuditEvent(false, "setStoragePolicy", src);
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -2611,7 +2610,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
return toRemoveBlocks;
|
return toRemoveBlocks;
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
|
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
|
||||||
ie.getMessage());
|
ie.getMessage());
|
||||||
throw ie;
|
throw ie;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2693,8 +2692,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
"Cannot append to lazy persist file " + src);
|
"Cannot append to lazy persist file " + src);
|
||||||
}
|
}
|
||||||
// Opening an existing file for append - may need to recover lease.
|
// Opening an existing file for append - may need to recover lease.
|
||||||
recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE,
|
recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, src, holder,
|
||||||
iip, src, holder, clientMachine, false);
|
clientMachine, false);
|
||||||
|
|
||||||
final BlockInfoContiguous lastBlock = myFile.getLastBlock();
|
final BlockInfoContiguous lastBlock = myFile.getLastBlock();
|
||||||
// Check that the block has at least minimum replication.
|
// Check that the block has at least minimum replication.
|
||||||
|
@ -3038,290 +3037,49 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
* are replicated. Will return an empty 2-elt array if we want the
|
* are replicated. Will return an empty 2-elt array if we want the
|
||||||
* client to "try again later".
|
* client to "try again later".
|
||||||
*/
|
*/
|
||||||
LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
|
LocatedBlock getAdditionalBlock(
|
||||||
ExtendedBlock previous, Set<Node> excludedNodes,
|
String src, long fileId, String clientName, ExtendedBlock previous,
|
||||||
List<String> favoredNodes) throws IOException {
|
DatanodeInfo[] excludedNodes, String[] favoredNodes) throws IOException {
|
||||||
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
|
||||||
DatanodeStorageInfo targets[] = getNewBlockTargets(src, fileId,
|
|
||||||
clientName, previous, excludedNodes, favoredNodes, onRetryBlock);
|
|
||||||
if (targets == null) {
|
|
||||||
assert onRetryBlock[0] != null : "Retry block is null";
|
|
||||||
// This is a retry. Just return the last block.
|
|
||||||
return onRetryBlock[0];
|
|
||||||
}
|
|
||||||
LocatedBlock newBlock = storeAllocatedBlock(
|
|
||||||
src, fileId, clientName, previous, targets);
|
|
||||||
return newBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Part I of getAdditionalBlock().
|
|
||||||
* Analyze the state of the file under read lock to determine if the client
|
|
||||||
* can add a new block, detect potential retries, lease mismatches,
|
|
||||||
* and minimal replication of the penultimate block.
|
|
||||||
*
|
|
||||||
* Generate target DataNode locations for the new block,
|
|
||||||
* but do not create the new block yet.
|
|
||||||
*/
|
|
||||||
DatanodeStorageInfo[] getNewBlockTargets(String src, long fileId,
|
|
||||||
String clientName, ExtendedBlock previous, Set<Node> excludedNodes,
|
|
||||||
List<String> favoredNodes, LocatedBlock[] onRetryBlock) throws IOException {
|
|
||||||
final long blockSize;
|
|
||||||
final int replication;
|
|
||||||
final byte storagePolicyID;
|
|
||||||
Node clientNode = null;
|
|
||||||
String clientMachine = null;
|
|
||||||
|
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: "
|
NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: "
|
||||||
+ src + " inodeId " + fileId + " for " + clientName);
|
+ src + " inodeId " + fileId + " for " + clientName);
|
||||||
}
|
}
|
||||||
|
|
||||||
checkOperation(OperationCategory.READ);
|
waitForLoadingFSImage();
|
||||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
||||||
|
FSDirWriteFileOp.ValidateAddBlockResult r;
|
||||||
FSPermissionChecker pc = getPermissionChecker();
|
FSPermissionChecker pc = getPermissionChecker();
|
||||||
|
checkOperation(OperationCategory.READ);
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
src = dir.resolvePath(pc, src, pathComponents);
|
r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
|
||||||
FileState fileState = analyzeFileState(
|
previous, onRetryBlock);
|
||||||
src, fileId, clientName, previous, onRetryBlock);
|
|
||||||
final INodeFile pendingFile = fileState.inode;
|
|
||||||
// Check if the penultimate block is minimally replicated
|
|
||||||
if (!checkFileProgress(src, pendingFile, false)) {
|
|
||||||
throw new NotReplicatedYetException("Not replicated yet: " + src);
|
|
||||||
}
|
|
||||||
src = fileState.path;
|
|
||||||
|
|
||||||
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
|
|
||||||
// This is a retry. No need to generate new locations.
|
|
||||||
// Use the last block if it has locations.
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
if (pendingFile.getBlocks().length >= maxBlocksPerFile) {
|
|
||||||
throw new IOException("File has reached the limit on maximum number of"
|
|
||||||
+ " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
|
|
||||||
+ "): " + pendingFile.getBlocks().length + " >= "
|
|
||||||
+ maxBlocksPerFile);
|
|
||||||
}
|
|
||||||
blockSize = pendingFile.getPreferredBlockSize();
|
|
||||||
clientMachine = pendingFile.getFileUnderConstructionFeature()
|
|
||||||
.getClientMachine();
|
|
||||||
clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
|
|
||||||
clientMachine);
|
|
||||||
replication = pendingFile.getFileReplication();
|
|
||||||
storagePolicyID = pendingFile.getStoragePolicyID();
|
|
||||||
} finally {
|
} finally {
|
||||||
readUnlock();
|
readUnlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (clientNode == null) {
|
if (r == null) {
|
||||||
clientNode = getClientNode(clientMachine);
|
assert onRetryBlock[0] != null : "Retry block is null";
|
||||||
|
// This is a retry. Just return the last block.
|
||||||
|
return onRetryBlock[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
// choose targets for the new block to be allocated.
|
DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
|
||||||
return getBlockManager().chooseTarget4NewBlock(
|
blockManager, src, excludedNodes, favoredNodes, r);
|
||||||
src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
|
|
||||||
storagePolicyID);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Part II of getAdditionalBlock().
|
|
||||||
* Should repeat the same analysis of the file state as in Part 1,
|
|
||||||
* but under the write lock.
|
|
||||||
* If the conditions still hold, then allocate a new block with
|
|
||||||
* the new targets, add it to the INode and to the BlocksMap.
|
|
||||||
*/
|
|
||||||
LocatedBlock storeAllocatedBlock(String src, long fileId, String clientName,
|
|
||||||
ExtendedBlock previous, DatanodeStorageInfo[] targets) throws IOException {
|
|
||||||
Block newBlock = null;
|
|
||||||
long offset;
|
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
waitForLoadingFSImage();
|
|
||||||
writeLock();
|
writeLock();
|
||||||
|
LocatedBlock lb;
|
||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
// Run the full analysis again, since things could have changed
|
lb = FSDirWriteFileOp.storeAllocatedBlock(
|
||||||
// while chooseTarget() was executing.
|
this, src, fileId, clientName, previous, targets);
|
||||||
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
|
||||||
FileState fileState =
|
|
||||||
analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
|
|
||||||
final INodeFile pendingFile = fileState.inode;
|
|
||||||
src = fileState.path;
|
|
||||||
|
|
||||||
if (onRetryBlock[0] != null) {
|
|
||||||
if (onRetryBlock[0].getLocations().length > 0) {
|
|
||||||
// This is a retry. Just return the last block if having locations.
|
|
||||||
return onRetryBlock[0];
|
|
||||||
} else {
|
|
||||||
// add new chosen targets to already allocated block and return
|
|
||||||
BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
|
|
||||||
((BlockInfoContiguousUnderConstruction) lastBlockInFile)
|
|
||||||
.setExpectedLocations(targets);
|
|
||||||
offset = pendingFile.computeFileSize();
|
|
||||||
return makeLocatedBlock(lastBlockInFile, targets, offset);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// commit the last block and complete it if it has minimum replicas
|
|
||||||
commitOrCompleteLastBlock(pendingFile, fileState.iip,
|
|
||||||
ExtendedBlock.getLocalBlock(previous));
|
|
||||||
|
|
||||||
// allocate new block, record block locations in INode.
|
|
||||||
newBlock = createNewBlock();
|
|
||||||
INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
|
|
||||||
saveAllocatedBlock(src, inodesInPath, newBlock, targets);
|
|
||||||
|
|
||||||
persistNewBlock(src, pendingFile);
|
|
||||||
offset = pendingFile.computeFileSize();
|
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
getEditLog().logSync();
|
getEditLog().logSync();
|
||||||
|
return lb;
|
||||||
// Return located block
|
|
||||||
return makeLocatedBlock(newBlock, targets, offset);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Resolve clientmachine address to get a network location path
|
|
||||||
*/
|
|
||||||
private Node getClientNode(String clientMachine) {
|
|
||||||
List<String> hosts = new ArrayList<String>(1);
|
|
||||||
hosts.add(clientMachine);
|
|
||||||
List<String> rName = getBlockManager().getDatanodeManager()
|
|
||||||
.resolveNetworkLocation(hosts);
|
|
||||||
Node clientNode = null;
|
|
||||||
if (rName != null) {
|
|
||||||
// Able to resolve clientMachine mapping.
|
|
||||||
// Create a temp node to findout the rack local nodes
|
|
||||||
clientNode = new NodeBase(rName.get(0) + NodeBase.PATH_SEPARATOR_STR
|
|
||||||
+ clientMachine);
|
|
||||||
}
|
|
||||||
return clientNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
static class FileState {
|
|
||||||
public final INodeFile inode;
|
|
||||||
public final String path;
|
|
||||||
public final INodesInPath iip;
|
|
||||||
|
|
||||||
public FileState(INodeFile inode, String fullPath, INodesInPath iip) {
|
|
||||||
this.inode = inode;
|
|
||||||
this.path = fullPath;
|
|
||||||
this.iip = iip;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
FileState analyzeFileState(String src,
|
|
||||||
long fileId,
|
|
||||||
String clientName,
|
|
||||||
ExtendedBlock previous,
|
|
||||||
LocatedBlock[] onRetryBlock)
|
|
||||||
throws IOException {
|
|
||||||
assert hasReadLock();
|
|
||||||
|
|
||||||
checkBlock(previous);
|
|
||||||
onRetryBlock[0] = null;
|
|
||||||
checkNameNodeSafeMode("Cannot add block to " + src);
|
|
||||||
|
|
||||||
// have we exceeded the configured limit of fs objects.
|
|
||||||
checkFsObjectLimit();
|
|
||||||
|
|
||||||
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
|
|
||||||
final INode inode;
|
|
||||||
final INodesInPath iip;
|
|
||||||
if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
|
|
||||||
// Older clients may not have given us an inode ID to work with.
|
|
||||||
// In this case, we have to try to resolve the path and hope it
|
|
||||||
// hasn't changed or been deleted since the file was opened for write.
|
|
||||||
iip = dir.getINodesInPath4Write(src);
|
|
||||||
inode = iip.getLastINode();
|
|
||||||
} else {
|
|
||||||
// Newer clients pass the inode ID, so we can just get the inode
|
|
||||||
// directly.
|
|
||||||
inode = dir.getInode(fileId);
|
|
||||||
iip = INodesInPath.fromINode(inode);
|
|
||||||
if (inode != null) {
|
|
||||||
src = iip.getPath();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
|
|
||||||
BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
|
|
||||||
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
|
|
||||||
// The block that the client claims is the current last block
|
|
||||||
// doesn't match up with what we think is the last block. There are
|
|
||||||
// four possibilities:
|
|
||||||
// 1) This is the first block allocation of an append() pipeline
|
|
||||||
// which started appending exactly at or exceeding the block boundary.
|
|
||||||
// In this case, the client isn't passed the previous block,
|
|
||||||
// so it makes the allocateBlock() call with previous=null.
|
|
||||||
// We can distinguish this since the last block of the file
|
|
||||||
// will be exactly a full block.
|
|
||||||
// 2) This is a retry from a client that missed the response of a
|
|
||||||
// prior getAdditionalBlock() call, perhaps because of a network
|
|
||||||
// timeout, or because of an HA failover. In that case, we know
|
|
||||||
// by the fact that the client is re-issuing the RPC that it
|
|
||||||
// never began to write to the old block. Hence it is safe to
|
|
||||||
// to return the existing block.
|
|
||||||
// 3) This is an entirely bogus request/bug -- we should error out
|
|
||||||
// rather than potentially appending a new block with an empty
|
|
||||||
// one in the middle, etc
|
|
||||||
// 4) This is a retry from a client that timed out while
|
|
||||||
// the prior getAdditionalBlock() is still being processed,
|
|
||||||
// currently working on chooseTarget().
|
|
||||||
// There are no means to distinguish between the first and
|
|
||||||
// the second attempts in Part I, because the first one hasn't
|
|
||||||
// changed the namesystem state yet.
|
|
||||||
// We run this analysis again in Part II where case 4 is impossible.
|
|
||||||
|
|
||||||
BlockInfoContiguous penultimateBlock = pendingFile.getPenultimateBlock();
|
|
||||||
if (previous == null &&
|
|
||||||
lastBlockInFile != null &&
|
|
||||||
lastBlockInFile.getNumBytes() >= pendingFile.getPreferredBlockSize() &&
|
|
||||||
lastBlockInFile.isComplete()) {
|
|
||||||
// Case 1
|
|
||||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
|
||||||
NameNode.stateChangeLog.debug(
|
|
||||||
"BLOCK* NameSystem.allocateBlock: handling block allocation" +
|
|
||||||
" writing to a file with a complete previous block: src=" +
|
|
||||||
src + " lastBlock=" + lastBlockInFile);
|
|
||||||
}
|
|
||||||
} else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
|
|
||||||
if (lastBlockInFile.getNumBytes() != 0) {
|
|
||||||
throw new IOException(
|
|
||||||
"Request looked like a retry to allocate block " +
|
|
||||||
lastBlockInFile + " but it already contains " +
|
|
||||||
lastBlockInFile.getNumBytes() + " bytes");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Case 2
|
|
||||||
// Return the last block.
|
|
||||||
NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
|
|
||||||
"caught retry for allocation of a new block in " +
|
|
||||||
src + ". Returning previously allocated block " + lastBlockInFile);
|
|
||||||
long offset = pendingFile.computeFileSize();
|
|
||||||
onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
|
|
||||||
((BlockInfoContiguousUnderConstruction)lastBlockInFile).getExpectedStorageLocations(),
|
|
||||||
offset);
|
|
||||||
return new FileState(pendingFile, src, iip);
|
|
||||||
} else {
|
|
||||||
// Case 3
|
|
||||||
throw new IOException("Cannot allocate block in " + src + ": " +
|
|
||||||
"passed 'previous' block " + previous + " does not match actual " +
|
|
||||||
"last block in file " + lastBlockInFile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return new FileState(pendingFile, src, iip);
|
|
||||||
}
|
|
||||||
|
|
||||||
LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
|
|
||||||
long offset) throws IOException {
|
|
||||||
LocatedBlock lBlk = BlockManager.newLocatedBlock(
|
|
||||||
getExtendedBlock(blk), locs, offset, false);
|
|
||||||
getBlockManager().setBlockToken(
|
|
||||||
lBlk, BlockTokenIdentifier.AccessMode.WRITE);
|
|
||||||
return lBlk;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @see ClientProtocol#getAdditionalDatanode */
|
/** @see ClientProtocol#getAdditionalDatanode */
|
||||||
|
@ -3374,7 +3132,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (clientnode == null) {
|
if (clientnode == null) {
|
||||||
clientnode = getClientNode(clientMachine);
|
clientnode = FSDirWriteFileOp.getClientNode(blockManager, clientMachine);
|
||||||
}
|
}
|
||||||
|
|
||||||
// choose new datanodes.
|
// choose new datanodes.
|
||||||
|
@ -3390,60 +3148,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
/**
|
/**
|
||||||
* The client would like to let go of the given block
|
* The client would like to let go of the given block
|
||||||
*/
|
*/
|
||||||
boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
|
void abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b
|
||||||
+ "of file " + src);
|
+ "of file " + src);
|
||||||
}
|
}
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
|
||||||
FSPermissionChecker pc = getPermissionChecker();
|
|
||||||
waitForLoadingFSImage();
|
waitForLoadingFSImage();
|
||||||
|
checkOperation(OperationCategory.WRITE);
|
||||||
|
FSPermissionChecker pc = getPermissionChecker();
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src);
|
checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src);
|
||||||
src = dir.resolvePath(pc, src, pathComponents);
|
FSDirWriteFileOp.abandonBlock(dir, pc, b, fileId, src, holder);
|
||||||
|
|
||||||
final INode inode;
|
|
||||||
final INodesInPath iip;
|
|
||||||
if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
|
|
||||||
// Older clients may not have given us an inode ID to work with.
|
|
||||||
// In this case, we have to try to resolve the path and hope it
|
|
||||||
// hasn't changed or been deleted since the file was opened for write.
|
|
||||||
iip = dir.getINodesInPath(src, true);
|
|
||||||
inode = iip.getLastINode();
|
|
||||||
} else {
|
|
||||||
inode = dir.getInode(fileId);
|
|
||||||
iip = INodesInPath.fromINode(inode);
|
|
||||||
if (inode != null) {
|
|
||||||
src = iip.getPath();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
final INodeFile file = checkLease(src, holder, inode, fileId);
|
|
||||||
|
|
||||||
// Remove the block from the pending creates list
|
|
||||||
boolean removed = dir.removeBlock(src, iip, file,
|
|
||||||
ExtendedBlock.getLocalBlock(b));
|
|
||||||
if (!removed) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
|
||||||
+ b + " is removed from pendingCreates");
|
+ b + " is removed from pendingCreates");
|
||||||
}
|
}
|
||||||
persistBlocks(src, file, false);
|
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
getEditLog().logSync();
|
getEditLog().logSync();
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private INodeFile checkLease(String src, String holder, INode inode,
|
INodeFile checkLease(
|
||||||
long fileId) throws LeaseExpiredException, FileNotFoundException {
|
String src, String holder, INode inode, long fileId) throws LeaseExpiredException, FileNotFoundException {
|
||||||
assert hasReadLock();
|
assert hasReadLock();
|
||||||
final String ident = src + " (inode " + fileId + ")";
|
final String ident = src + " (inode " + fileId + ")";
|
||||||
if (inode == null) {
|
if (inode == null) {
|
||||||
|
@ -3488,120 +3218,30 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
* (e.g if not all blocks have reached minimum replication yet)
|
* (e.g if not all blocks have reached minimum replication yet)
|
||||||
* @throws IOException on error (eg lease mismatch, file not open, file deleted)
|
* @throws IOException on error (eg lease mismatch, file not open, file deleted)
|
||||||
*/
|
*/
|
||||||
boolean completeFile(final String srcArg, String holder,
|
boolean completeFile(final String src, String holder,
|
||||||
ExtendedBlock last, long fileId)
|
ExtendedBlock last, long fileId)
|
||||||
throws SafeModeException, UnresolvedLinkException, IOException {
|
throws IOException {
|
||||||
String src = srcArg;
|
|
||||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
|
||||||
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
|
|
||||||
src + " for " + holder);
|
|
||||||
}
|
|
||||||
checkBlock(last);
|
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
|
||||||
FSPermissionChecker pc = getPermissionChecker();
|
|
||||||
waitForLoadingFSImage();
|
waitForLoadingFSImage();
|
||||||
|
FSPermissionChecker pc = getPermissionChecker();
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
checkNameNodeSafeMode("Cannot complete file " + src);
|
checkNameNodeSafeMode("Cannot complete file " + src);
|
||||||
src = dir.resolvePath(pc, src, pathComponents);
|
success = FSDirWriteFileOp.completeFile(this, pc, src, holder, last,
|
||||||
success = completeFileInternal(src, holder,
|
fileId);
|
||||||
ExtendedBlock.getLocalBlock(last), fileId);
|
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
getEditLog().logSync();
|
getEditLog().logSync();
|
||||||
if (success) {
|
|
||||||
NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg
|
|
||||||
+ " is closed by " + holder);
|
|
||||||
}
|
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean completeFileInternal(String src, String holder, Block last,
|
|
||||||
long fileId) throws IOException {
|
|
||||||
assert hasWriteLock();
|
|
||||||
final INodeFile pendingFile;
|
|
||||||
final INodesInPath iip;
|
|
||||||
INode inode = null;
|
|
||||||
try {
|
|
||||||
if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
|
|
||||||
// Older clients may not have given us an inode ID to work with.
|
|
||||||
// In this case, we have to try to resolve the path and hope it
|
|
||||||
// hasn't changed or been deleted since the file was opened for write.
|
|
||||||
iip = dir.getINodesInPath(src, true);
|
|
||||||
inode = iip.getLastINode();
|
|
||||||
} else {
|
|
||||||
inode = dir.getInode(fileId);
|
|
||||||
iip = INodesInPath.fromINode(inode);
|
|
||||||
if (inode != null) {
|
|
||||||
src = iip.getPath();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pendingFile = checkLease(src, holder, inode, fileId);
|
|
||||||
} catch (LeaseExpiredException lee) {
|
|
||||||
if (inode != null && inode.isFile() &&
|
|
||||||
!inode.asFile().isUnderConstruction()) {
|
|
||||||
// This could be a retry RPC - i.e the client tried to close
|
|
||||||
// the file, but missed the RPC response. Thus, it is trying
|
|
||||||
// again to close the file. If the file still exists and
|
|
||||||
// the client's view of the last block matches the actual
|
|
||||||
// last block, then we'll treat it as a successful close.
|
|
||||||
// See HDFS-3031.
|
|
||||||
final Block realLastBlock = inode.asFile().getLastBlock();
|
|
||||||
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
|
|
||||||
NameNode.stateChangeLog.info("DIR* completeFile: " +
|
|
||||||
"request from " + holder + " to complete inode " + fileId +
|
|
||||||
"(" + src + ") which is already closed. But, it appears to be " +
|
|
||||||
"an RPC retry. Returning success");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw lee;
|
|
||||||
}
|
|
||||||
// Check the state of the penultimate block. It should be completed
|
|
||||||
// before attempting to complete the last one.
|
|
||||||
if (!checkFileProgress(src, pendingFile, false)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// commit the last block and complete it if it has minimum replicas
|
|
||||||
commitOrCompleteLastBlock(pendingFile, iip, last);
|
|
||||||
|
|
||||||
if (!checkFileProgress(src, pendingFile, true)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
finalizeINodeFileUnderConstruction(src, pendingFile,
|
|
||||||
Snapshot.CURRENT_STATE_ID);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Save allocated block at the given pending filename
|
|
||||||
*
|
|
||||||
* @param src path to the file
|
|
||||||
* @param inodesInPath representing each of the components of src.
|
|
||||||
* The last INode is the INode for {@code src} file.
|
|
||||||
* @param newBlock newly allocated block to be save
|
|
||||||
* @param targets target datanodes where replicas of the new block is placed
|
|
||||||
* @throws QuotaExceededException If addition of block exceeds space quota
|
|
||||||
*/
|
|
||||||
private void saveAllocatedBlock(String src, INodesInPath inodesInPath,
|
|
||||||
Block newBlock, DatanodeStorageInfo[] targets)
|
|
||||||
throws IOException {
|
|
||||||
assert hasWriteLock();
|
|
||||||
BlockInfoContiguous b = dir.addBlock(src, inodesInPath, newBlock, targets);
|
|
||||||
NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src);
|
|
||||||
DatanodeStorageInfo.incrementBlocksScheduled(targets);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create new block with a unique block id and a new generation stamp.
|
* Create new block with a unique block id and a new generation stamp.
|
||||||
*/
|
*/
|
||||||
private Block createNewBlock() throws IOException {
|
Block createNewBlock() throws IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
Block b = new Block(nextBlockId(), 0, 0);
|
Block b = new Block(nextBlockId(), 0, 0);
|
||||||
// Increment the generation stamp for every new block.
|
// Increment the generation stamp for every new block.
|
||||||
|
@ -3993,7 +3633,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock(
|
pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock(
|
||||||
pendingFile, lastBlockLength);
|
pendingFile, lastBlockLength);
|
||||||
}
|
}
|
||||||
persistBlocks(src, pendingFile, false);
|
FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, false);
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
|
@ -4163,8 +3803,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
return leaseManager.reassignLease(lease, pendingFile, newHolder);
|
return leaseManager.reassignLease(lease, pendingFile, newHolder);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void commitOrCompleteLastBlock(final INodeFile fileINode,
|
void commitOrCompleteLastBlock(
|
||||||
final INodesInPath iip, final Block commitBlock) throws IOException {
|
final INodeFile fileINode, final INodesInPath iip,
|
||||||
|
final Block commitBlock) throws IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
Preconditions.checkArgument(fileINode.isUnderConstruction());
|
Preconditions.checkArgument(fileINode.isUnderConstruction());
|
||||||
if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
|
if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
|
||||||
|
@ -4182,14 +3823,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void finalizeINodeFileUnderConstruction(String src,
|
void finalizeINodeFileUnderConstruction(
|
||||||
INodeFile pendingFile, int latestSnapshot) throws IOException {
|
String src, INodeFile pendingFile, int latestSnapshot) throws IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
|
|
||||||
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
|
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
|
||||||
Preconditions.checkArgument(uc != null);
|
Preconditions.checkArgument(uc != null);
|
||||||
leaseManager.removeLease(uc.getClientName(), pendingFile);
|
leaseManager.removeLease(uc.getClientName(), pendingFile);
|
||||||
|
|
||||||
pendingFile.recordModification(latestSnapshot);
|
pendingFile.recordModification(latestSnapshot);
|
||||||
|
|
||||||
// The file is no longer pending.
|
// The file is no longer pending.
|
||||||
|
@ -4401,7 +4042,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
} else {
|
} else {
|
||||||
// If this commit does not want to close the file, persist blocks
|
// If this commit does not want to close the file, persist blocks
|
||||||
src = iFile.getFullPathName();
|
src = iFile.getFullPathName();
|
||||||
persistBlocks(src, iFile, false);
|
FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
|
@ -4591,24 +4232,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
|
hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Persist the block list for the inode.
|
|
||||||
* @param path
|
|
||||||
* @param file
|
|
||||||
* @param logRetryCache
|
|
||||||
*/
|
|
||||||
private void persistBlocks(String path, INodeFile file,
|
|
||||||
boolean logRetryCache) {
|
|
||||||
assert hasWriteLock();
|
|
||||||
Preconditions.checkArgument(file.isUnderConstruction());
|
|
||||||
getEditLog().logUpdateBlocks(path, file, logRetryCache);
|
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
|
||||||
NameNode.stateChangeLog.debug("persistBlocks: " + path
|
|
||||||
+ " with " + file.getBlocks().length + " blocks is persisted to" +
|
|
||||||
" the file system");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close file.
|
* Close file.
|
||||||
* @param path
|
* @param path
|
||||||
|
@ -4796,13 +4419,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
|
|
||||||
public FSEditLog getEditLog() {
|
public FSEditLog getEditLog() {
|
||||||
return getFSImage().getEditLog();
|
return getFSImage().getEditLog();
|
||||||
}
|
|
||||||
|
|
||||||
private void checkBlock(ExtendedBlock block) throws IOException {
|
|
||||||
if (block != null && !this.blockPoolId.equals(block.getBlockPoolId())) {
|
|
||||||
throw new IOException("Unexpected BlockPoolId " + block.getBlockPoolId()
|
|
||||||
+ " - expected " + blockPoolId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Metric({"MissingBlocks", "Number of missing blocks"})
|
@Metric({"MissingBlocks", "Number of missing blocks"})
|
||||||
|
@ -5073,21 +4689,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
|
getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Persist the new block (the last block of the given file).
|
|
||||||
* @param path
|
|
||||||
* @param file
|
|
||||||
*/
|
|
||||||
private void persistNewBlock(String path, INodeFile file) {
|
|
||||||
Preconditions.checkArgument(file.isUnderConstruction());
|
|
||||||
getEditLog().logAddBlock(path, file);
|
|
||||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
|
||||||
NameNode.stateChangeLog.debug("persistNewBlock: "
|
|
||||||
+ path + " with new block " + file.getLastBlock().toString()
|
|
||||||
+ ", current total block count is " + file.getBlocks().length);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* SafeModeInfo contains information related to the safe mode.
|
* SafeModeInfo contains information related to the safe mode.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -6393,7 +5994,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
blockinfo.setExpectedLocations(storages);
|
blockinfo.setExpectedLocations(storages);
|
||||||
|
|
||||||
String src = pendingFile.getFullPathName();
|
String src = pendingFile.getFullPathName();
|
||||||
persistBlocks(src, pendingFile, logRetryCache);
|
FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -708,23 +708,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
String[] favoredNodes)
|
String[] favoredNodes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
if (stateChangeLog.isDebugEnabled()) {
|
|
||||||
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
|
|
||||||
+ " fileId=" + fileId + " for " + clientName);
|
|
||||||
}
|
|
||||||
Set<Node> excludedNodesSet = null;
|
|
||||||
if (excludedNodes != null) {
|
|
||||||
excludedNodesSet = new HashSet<Node>(excludedNodes.length);
|
|
||||||
for (Node node : excludedNodes) {
|
|
||||||
excludedNodesSet.add(node);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
List<String> favoredNodesList = (favoredNodes == null) ? null
|
|
||||||
: Arrays.asList(favoredNodes);
|
|
||||||
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
|
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
|
||||||
clientName, previous, excludedNodesSet, favoredNodesList);
|
clientName, previous, excludedNodes, favoredNodes);
|
||||||
if (locatedBlock != null)
|
if (locatedBlock != null) {
|
||||||
metrics.incrAddBlockOps();
|
metrics.incrAddBlockOps();
|
||||||
|
}
|
||||||
return locatedBlock;
|
return locatedBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -765,13 +753,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
public void abandonBlock(ExtendedBlock b, long fileId, String src,
|
public void abandonBlock(ExtendedBlock b, long fileId, String src,
|
||||||
String holder) throws IOException {
|
String holder) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
if(stateChangeLog.isDebugEnabled()) {
|
namesystem.abandonBlock(b, fileId, src, holder);
|
||||||
stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
|
|
||||||
+b+" of file "+src);
|
|
||||||
}
|
|
||||||
if (!namesystem.abandonBlock(b, fileId, src, holder)) {
|
|
||||||
throw new IOException("Cannot abandon block during write to " + src);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
|
@ -779,10 +761,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
ExtendedBlock last, long fileId)
|
ExtendedBlock last, long fileId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
if(stateChangeLog.isDebugEnabled()) {
|
|
||||||
stateChangeLog.debug("*DIR* NameNode.complete: "
|
|
||||||
+ src + " fileId=" + fileId +" for " + clientName);
|
|
||||||
}
|
|
||||||
return namesystem.completeFile(src, clientName, last, fileId);
|
return namesystem.completeFile(src, clientName, last, fileId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Race between two threads simultaneously calling
|
* Race between two threads simultaneously calling
|
||||||
|
@ -88,25 +89,40 @@ public class TestAddBlockRetry {
|
||||||
// start first addBlock()
|
// start first addBlock()
|
||||||
LOG.info("Starting first addBlock for " + src);
|
LOG.info("Starting first addBlock for " + src);
|
||||||
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
||||||
DatanodeStorageInfo targets[] = ns.getNewBlockTargets(
|
ns.readLock();
|
||||||
src, HdfsConstants.GRANDFATHER_INODE_ID, "clientName",
|
FSDirWriteFileOp.ValidateAddBlockResult r;
|
||||||
null, null, null, onRetryBlock);
|
FSPermissionChecker pc = Mockito.mock(FSPermissionChecker.class);
|
||||||
|
try {
|
||||||
|
r = FSDirWriteFileOp.validateAddBlock(ns, pc, src,
|
||||||
|
HdfsConstants.GRANDFATHER_INODE_ID,
|
||||||
|
"clientName", null, onRetryBlock);
|
||||||
|
} finally {
|
||||||
|
ns.readUnlock();;
|
||||||
|
}
|
||||||
|
DatanodeStorageInfo targets[] = FSDirWriteFileOp.chooseTargetForNewBlock(
|
||||||
|
ns.getBlockManager(), src, null, null, r);
|
||||||
assertNotNull("Targets must be generated", targets);
|
assertNotNull("Targets must be generated", targets);
|
||||||
|
|
||||||
// run second addBlock()
|
// run second addBlock()
|
||||||
LOG.info("Starting second addBlock for " + src);
|
LOG.info("Starting second addBlock for " + src);
|
||||||
nn.addBlock(src, "clientName", null, null,
|
nn.addBlock(src, "clientName", null, null,
|
||||||
HdfsConstants.GRANDFATHER_INODE_ID, null);
|
HdfsConstants.GRANDFATHER_INODE_ID, null);
|
||||||
assertTrue("Penultimate block must be complete",
|
assertTrue("Penultimate block must be complete",
|
||||||
checkFileProgress(src, false));
|
checkFileProgress(src, false));
|
||||||
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
|
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
|
||||||
assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
|
assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
|
||||||
LocatedBlock lb2 = lbs.get(0);
|
LocatedBlock lb2 = lbs.get(0);
|
||||||
assertEquals("Wrong replication", REPLICATION, lb2.getLocations().length);
|
assertEquals("Wrong replication", REPLICATION, lb2.getLocations().length);
|
||||||
|
|
||||||
// continue first addBlock()
|
// continue first addBlock()
|
||||||
LocatedBlock newBlock = ns.storeAllocatedBlock(
|
ns.writeLock();
|
||||||
src, HdfsConstants.GRANDFATHER_INODE_ID, "clientName", null, targets);
|
LocatedBlock newBlock;
|
||||||
|
try {
|
||||||
|
newBlock = FSDirWriteFileOp.storeAllocatedBlock(ns, src,
|
||||||
|
HdfsConstants.GRANDFATHER_INODE_ID, "clientName", null, targets);
|
||||||
|
} finally {
|
||||||
|
ns.writeUnlock();
|
||||||
|
}
|
||||||
assertEquals("Blocks are not equal", lb2.getBlock(), newBlock.getBlock());
|
assertEquals("Blocks are not equal", lb2.getBlock(), newBlock.getBlock());
|
||||||
|
|
||||||
// check locations
|
// check locations
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderCon
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -45,7 +46,9 @@ public class TestCommitBlockSynchronization {
|
||||||
private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
|
private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
FSEditLog editlog = mock(FSEditLog.class);
|
||||||
FSImage image = new FSImage(conf);
|
FSImage image = new FSImage(conf);
|
||||||
|
Whitebox.setInternalState(image, "editLog", editlog);
|
||||||
final DatanodeStorageInfo[] targets = {};
|
final DatanodeStorageInfo[] targets = {};
|
||||||
|
|
||||||
FSNamesystem namesystem = new FSNamesystem(conf, image);
|
FSNamesystem namesystem = new FSNamesystem(conf, image);
|
||||||
|
|
Loading…
Reference in New Issue