Merge HDFS-8394 from trunk: Move getAdditionalBlock() and related functionalities into a separate class.

This commit is contained in:
Jing Zhao 2015-05-16 16:57:12 -07:00 committed by Zhe Zhang
parent c99c337928
commit d8ea443af0
8 changed files with 78 additions and 63 deletions

View File

@ -1514,7 +1514,7 @@ public class DFSUtil {
public static int getSmallBufferSize(Configuration conf) {
return Math.min(getIoFileBufferSize(conf) / 2, 512);
}
/**
* Probe for HDFS Encryption being enabled; this uses the value of
* the option {@link DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI},

View File

@ -42,7 +42,7 @@ public class BlockInfoContiguous extends BlockInfo {
* @param from BlockReplicationInfo to copy from.
*/
protected BlockInfoContiguous(BlockInfoContiguous from) {
this(from, from.getBlockCollection().getBlockReplication());
this(from, from.getBlockCollection().getPreferredBlockReplication());
this.triplets = new Object[from.triplets.length];
this.setBlockCollection(from.getBlockCollection());
}

View File

@ -3567,6 +3567,11 @@ public class BlockManager {
return storages;
}
/** @return an iterator of the datanodes. */
public Iterable<DatanodeStorageInfo> getStorages(final Block block) {
return blocksMap.getStorages(block);
}
public int getTotalBlocks() {
return blocksMap.size();
}
@ -3958,7 +3963,7 @@ public class BlockManager {
null);
}
public LocatedBlock newLocatedBlock(ExtendedBlock eb, BlockInfo info,
public static LocatedBlock newLocatedBlock(ExtendedBlock eb, BlockInfo info,
DatanodeStorageInfo[] locs, long offset) throws IOException {
final LocatedBlock lb;
if (info.isStriped()) {
@ -3968,7 +3973,6 @@ public class BlockManager {
} else {
lb = newLocatedBlock(eb, locs, offset, false);
}
setBlockToken(lb, BlockTokenIdentifier.AccessMode.WRITE);
return lb;
}

View File

@ -45,10 +45,13 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.ChunkedArrayList;
@ -74,7 +77,7 @@ class FSDirWriteFileOp {
Block block) throws IOException {
// modify file-> block and blocksMap
// fileNode should be under construction
BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
BlockInfoUnderConstruction uc = fileNode.removeLastBlock(block);
if (uc == null) {
return false;
}
@ -88,7 +91,7 @@ class FSDirWriteFileOp {
// update space consumed
fsd.updateCount(iip, 0, -fileNode.getPreferredBlockSize(),
fileNode.getPreferredBlockReplication(), true);
fileNode.getPreferredBlockReplication(), true);
return true;
}
@ -168,7 +171,7 @@ class FSDirWriteFileOp {
String src, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException {
final long blockSize;
final int replication;
final short numTargets;
final byte storagePolicyID;
String clientMachine;
@ -196,18 +199,21 @@ class FSDirWriteFileOp {
blockSize = pendingFile.getPreferredBlockSize();
clientMachine = pendingFile.getFileUnderConstructionFeature()
.getClientMachine();
replication = pendingFile.getFileReplication();
boolean isStriped = pendingFile.isStriped();
numTargets = isStriped ?
HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS :
pendingFile.getFileReplication();
storagePolicyID = pendingFile.getStoragePolicyID();
return new ValidateAddBlockResult(blockSize, replication, storagePolicyID,
clientMachine);
return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,
clientMachine);
}
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk,
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk,
DatanodeStorageInfo[] locs, long offset) throws IOException {
LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk),
locs, offset, false);
blk, locs, offset);
fsn.getBlockManager().setBlockToken(lBlk,
BlockTokenIdentifier.AccessMode.WRITE);
BlockTokenIdentifier.AccessMode.WRITE);
return lBlk;
}
@ -236,9 +242,10 @@ class FSDirWriteFileOp {
return onRetryBlock[0];
} else {
// add new chosen targets to already allocated block and return
BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
((BlockInfoContiguousUnderConstruction) lastBlockInFile)
.setExpectedLocations(targets);
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
final BlockInfoUnderConstruction uc
= (BlockInfoUnderConstruction)lastBlockInFile;
uc.setExpectedLocations(targets);
offset = pendingFile.computeFileSize();
return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
}
@ -249,15 +256,17 @@ class FSDirWriteFileOp {
ExtendedBlock.getLocalBlock(previous));
// allocate new block, record block locations in INode.
Block newBlock = fsn.createNewBlock();
final boolean isStriped = pendingFile.isStriped();
// allocate new block, record block locations in INode.
Block newBlock = fsn.createNewBlock(isStriped);
INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets);
saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets, isStriped);
persistNewBlock(fsn, src, pendingFile);
offset = pendingFile.computeFileSize();
// Return located block
return makeLocatedBlock(fsn, newBlock, targets, offset);
return makeLocatedBlock(fsn, fsn.getStoredBlock(newBlock), targets, offset);
}
static DatanodeStorageInfo[] chooseTargetForNewBlock(
@ -278,7 +287,7 @@ class FSDirWriteFileOp {
: Arrays.asList(favoredNodes);
// choose targets for the new block to be allocated.
return bm.chooseTarget4NewBlock(src, r.replication, clientNode,
return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
excludedNodesSet, r.blockSize,
favoredNodesList, r.storagePolicyID);
}
@ -504,25 +513,38 @@ class FSDirWriteFileOp {
/**
* Add a block to the file. Returns a reference to the added block.
*/
private static BlockInfoContiguous addBlock(
FSDirectory fsd, String path, INodesInPath inodesInPath, Block block,
DatanodeStorageInfo[] targets) throws IOException {
private static BlockInfo addBlock(FSDirectory fsd, String path,
INodesInPath inodesInPath, Block block, DatanodeStorageInfo[] targets,
boolean isStriped) throws IOException {
fsd.writeLock();
try {
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
Preconditions.checkState(fileINode.isUnderConstruction());
// check quota limits and updated space consumed
fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
fileINode.getPreferredBlockReplication(), true);
// associate new last block for the file
BlockInfoContiguousUnderConstruction blockInfo =
new BlockInfoContiguousUnderConstruction(
block,
fileINode.getFileReplication(),
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
final BlockInfo blockInfo;
if (isStriped) {
ECSchema ecSchema = fsd.getECSchema(inodesInPath);
short numDataUnits = (short) ecSchema.getNumDataUnits();
short numParityUnits = (short) ecSchema.getNumParityUnits();
short numLocations = (short) (numDataUnits + numParityUnits);
// check quota limits and updated space consumed
fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
numLocations, true);
blockInfo = new BlockInfoStripedUnderConstruction(block, numDataUnits,
numParityUnits, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
targets);
} else {
// check quota limits and updated space consumed
fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
fileINode.getPreferredBlockReplication(), true);
short numLocations = fileINode.getFileReplication();
blockInfo = new BlockInfoContiguousUnderConstruction(block,
numLocations, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
targets);
}
fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
fileINode.addBlock(blockInfo);
@ -576,7 +598,7 @@ class FSDirWriteFileOp {
private static FileState analyzeFileState(
FSNamesystem fsn, String src, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
throws IOException {
throws IOException {
assert fsn.hasReadLock();
checkBlock(fsn, previous);
@ -659,8 +681,8 @@ class FSDirWriteFileOp {
"allocation of a new block in " + src + ". Returning previously" +
" allocated block " + lastBlockInFile);
long offset = file.computeFileSize();
BlockInfoContiguousUnderConstruction lastBlockUC =
(BlockInfoContiguousUnderConstruction) lastBlockInFile;
BlockInfoUnderConstruction lastBlockUC =
(BlockInfoUnderConstruction) lastBlockInFile;
onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
lastBlockUC.getExpectedStorageLocations(), offset);
return new FileState(file, src, iip);
@ -685,14 +707,8 @@ class FSDirWriteFileOp {
checkBlock(fsn, last);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsn.dir.resolvePath(pc, src, pathComponents);
boolean success = completeFileInternal(fsn, src, holder,
ExtendedBlock.getLocalBlock(last),
fileId);
if (success) {
NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg
+ " is closed by " + holder);
}
return success;
return completeFileInternal(fsn, src, holder,
ExtendedBlock.getLocalBlock(last), fileId);
}
private static boolean completeFileInternal(
@ -794,13 +810,12 @@ class FSDirWriteFileOp {
* @param targets target datanodes where replicas of the new block is placed
* @throws QuotaExceededException If addition of block exceeds space quota
*/
private static void saveAllocatedBlock(
FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock,
DatanodeStorageInfo[] targets)
throws IOException {
private static void saveAllocatedBlock(FSNamesystem fsn, String src,
INodesInPath inodesInPath, Block newBlock, DatanodeStorageInfo[] targets,
boolean isStriped) throws IOException {
assert fsn.hasWriteLock();
BlockInfoContiguous b = addBlock(fsn.dir, src, inodesInPath, newBlock,
targets);
BlockInfo b = addBlock(fsn.dir, src, inodesInPath, newBlock, targets,
isStriped);
NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src);
DatanodeStorageInfo.incrementBlocksScheduled(targets);
}
@ -849,15 +864,15 @@ class FSDirWriteFileOp {
static class ValidateAddBlockResult {
final long blockSize;
final int replication;
final int numTargets;
final byte storagePolicyID;
final String clientMachine;
ValidateAddBlockResult(
long blockSize, int replication, byte storagePolicyID,
long blockSize, int numTargets, byte storagePolicyID,
String clientMachine) {
this.blockSize = blockSize;
this.replication = replication;
this.numTargets = numTargets;
this.storagePolicyID = storagePolicyID;
this.clientMachine = clientMachine;
}

View File

@ -3073,6 +3073,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
writeUnlock();
}
getEditLog().logSync();
if (success) {
NameNode.stateChangeLog.info("DIR* completeFile: " + src
+ " is closed by " + holder);
}
return success;
}
@ -3080,7 +3084,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* Create new block with a unique block id and a new generation stamp.
* @param isStriped is the file under striping or contiguous layout?
*/
Block createNewBlock() throws IOException {
Block createNewBlock(boolean isStriped) throws IOException {
assert hasWriteLock();
Block b = new Block(nextBlockId(isStriped), 0, 0);
// Increment the generation stamp for every new block.

View File

@ -905,14 +905,6 @@ public class INodeFile extends INodeWithAdditionalFields
return counts;
}
public final short getReplication(int lastSnapshotId) {
if (lastSnapshotId != CURRENT_STATE_ID) {
return getFileReplication(lastSnapshotId);
} else {
return getBlockReplication();
}
}
/**
* Return the penultimate allocated block for this file.
*/

View File

@ -87,7 +87,7 @@ public class StripedBlockUtil {
new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
bg.getStartOffset() + idxInBlockGroup, bg.isCorrupt(),
bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
null);
}

View File

@ -1915,7 +1915,7 @@ public class DFSTestUtil {
fileNode.getId(), null);
final BlockInfo lastBlock = fileNode.getLastBlock();
final int groupSize = fileNode.getBlockReplication();
final int groupSize = fileNode.getPreferredBlockReplication();
assert dataNodes.size() >= groupSize;
// 1. RECEIVING_BLOCK IBR
for (int i = 0; i < groupSize; i++) {