From d8ea443af0b1c8289a1dd738945831ff8be0e9c1 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Sat, 16 May 2015 16:57:12 -0700 Subject: [PATCH] Merge HDFS-8394 from trunk: Move getAdditionalBlock() and related functionalities into a separate class. --- .../java/org/apache/hadoop/hdfs/DFSUtil.java | 2 +- .../blockmanagement/BlockInfoContiguous.java | 2 +- .../server/blockmanagement/BlockManager.java | 8 +- .../server/namenode/FSDirWriteFileOp.java | 111 ++++++++++-------- .../hdfs/server/namenode/FSNamesystem.java | 6 +- .../hdfs/server/namenode/INodeFile.java | 8 -- .../hadoop/hdfs/util/StripedBlockUtil.java | 2 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +- 8 files changed, 78 insertions(+), 63 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 2e2ecfd652a..c06a4355d0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -1514,7 +1514,7 @@ public class DFSUtil { public static int getSmallBufferSize(Configuration conf) { return Math.min(getIoFileBufferSize(conf) / 2, 512); } - + /** * Probe for HDFS Encryption being enabled; this uses the value of * the option {@link DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI}, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index eeab076d576..416091fba5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@ -42,7 +42,7 @@ public class BlockInfoContiguous extends BlockInfo { * @param from BlockReplicationInfo to copy from. */ protected BlockInfoContiguous(BlockInfoContiguous from) { - this(from, from.getBlockCollection().getBlockReplication()); + this(from, from.getBlockCollection().getPreferredBlockReplication()); this.triplets = new Object[from.triplets.length]; this.setBlockCollection(from.getBlockCollection()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 5b876f95175..8b5144821ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3567,6 +3567,11 @@ public class BlockManager { return storages; } + /** @return an iterator of the datanodes. */ + public Iterable getStorages(final Block block) { + return blocksMap.getStorages(block); + } + public int getTotalBlocks() { return blocksMap.size(); } @@ -3958,7 +3963,7 @@ public class BlockManager { null); } - public LocatedBlock newLocatedBlock(ExtendedBlock eb, BlockInfo info, + public static LocatedBlock newLocatedBlock(ExtendedBlock eb, BlockInfo info, DatanodeStorageInfo[] locs, long offset) throws IOException { final LocatedBlock lb; if (info.isStriped()) { @@ -3968,7 +3973,6 @@ public class BlockManager { } else { lb = newLocatedBlock(eb, locs, offset, false); } - setBlockToken(lb, BlockTokenIdentifier.AccessMode.WRITE); return lb; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index fa17d9df9b0..81a2fa7355e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -45,10 +45,13 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.util.ChunkedArrayList; @@ -74,7 +77,7 @@ class FSDirWriteFileOp { Block block) throws IOException { // modify file-> block and blocksMap // fileNode should be under construction - BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block); + BlockInfoUnderConstruction uc = fileNode.removeLastBlock(block); if (uc == null) { return false; } @@ -88,7 +91,7 @@ class FSDirWriteFileOp { // update space consumed fsd.updateCount(iip, 0, -fileNode.getPreferredBlockSize(), - fileNode.getPreferredBlockReplication(), true); + fileNode.getPreferredBlockReplication(), true); return true; } @@ -168,7 +171,7 @@ class FSDirWriteFileOp { String src, long fileId, String clientName, ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException { final long blockSize; - final int replication; + final short numTargets; final byte storagePolicyID; String clientMachine; @@ -196,18 +199,21 @@ class FSDirWriteFileOp { blockSize = pendingFile.getPreferredBlockSize(); clientMachine = pendingFile.getFileUnderConstructionFeature() .getClientMachine(); - replication = pendingFile.getFileReplication(); + boolean isStriped = pendingFile.isStriped(); + numTargets = isStriped ? + HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : + pendingFile.getFileReplication(); storagePolicyID = pendingFile.getStoragePolicyID(); - return new ValidateAddBlockResult(blockSize, replication, storagePolicyID, - clientMachine); + return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID, + clientMachine); } - static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk, + static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk, DatanodeStorageInfo[] locs, long offset) throws IOException { LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk), - locs, offset, false); + blk, locs, offset); fsn.getBlockManager().setBlockToken(lBlk, - BlockTokenIdentifier.AccessMode.WRITE); + BlockTokenIdentifier.AccessMode.WRITE); return lBlk; } @@ -236,9 +242,10 @@ class FSDirWriteFileOp { return onRetryBlock[0]; } else { // add new chosen targets to already allocated block and return - BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock(); - ((BlockInfoContiguousUnderConstruction) lastBlockInFile) - .setExpectedLocations(targets); + BlockInfo lastBlockInFile = pendingFile.getLastBlock(); + final BlockInfoUnderConstruction uc + = (BlockInfoUnderConstruction)lastBlockInFile; + uc.setExpectedLocations(targets); offset = pendingFile.computeFileSize(); return makeLocatedBlock(fsn, lastBlockInFile, targets, offset); } @@ -249,15 +256,17 @@ class FSDirWriteFileOp { ExtendedBlock.getLocalBlock(previous)); // allocate new block, record block locations in INode. - Block newBlock = fsn.createNewBlock(); + final boolean isStriped = pendingFile.isStriped(); + // allocate new block, record block locations in INode. + Block newBlock = fsn.createNewBlock(isStriped); INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile); - saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets); + saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets, isStriped); persistNewBlock(fsn, src, pendingFile); offset = pendingFile.computeFileSize(); // Return located block - return makeLocatedBlock(fsn, newBlock, targets, offset); + return makeLocatedBlock(fsn, fsn.getStoredBlock(newBlock), targets, offset); } static DatanodeStorageInfo[] chooseTargetForNewBlock( @@ -278,7 +287,7 @@ class FSDirWriteFileOp { : Arrays.asList(favoredNodes); // choose targets for the new block to be allocated. - return bm.chooseTarget4NewBlock(src, r.replication, clientNode, + return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode, excludedNodesSet, r.blockSize, favoredNodesList, r.storagePolicyID); } @@ -504,25 +513,38 @@ class FSDirWriteFileOp { /** * Add a block to the file. Returns a reference to the added block. */ - private static BlockInfoContiguous addBlock( - FSDirectory fsd, String path, INodesInPath inodesInPath, Block block, - DatanodeStorageInfo[] targets) throws IOException { + private static BlockInfo addBlock(FSDirectory fsd, String path, + INodesInPath inodesInPath, Block block, DatanodeStorageInfo[] targets, + boolean isStriped) throws IOException { fsd.writeLock(); try { final INodeFile fileINode = inodesInPath.getLastINode().asFile(); Preconditions.checkState(fileINode.isUnderConstruction()); - // check quota limits and updated space consumed - fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), - fileINode.getPreferredBlockReplication(), true); - // associate new last block for the file - BlockInfoContiguousUnderConstruction blockInfo = - new BlockInfoContiguousUnderConstruction( - block, - fileINode.getFileReplication(), - HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, + final BlockInfo blockInfo; + if (isStriped) { + ECSchema ecSchema = fsd.getECSchema(inodesInPath); + short numDataUnits = (short) ecSchema.getNumDataUnits(); + short numParityUnits = (short) ecSchema.getNumParityUnits(); + short numLocations = (short) (numDataUnits + numParityUnits); + + // check quota limits and updated space consumed + fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), + numLocations, true); + blockInfo = new BlockInfoStripedUnderConstruction(block, numDataUnits, + numParityUnits, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); + } else { + // check quota limits and updated space consumed + fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), + fileINode.getPreferredBlockReplication(), true); + + short numLocations = fileINode.getFileReplication(); + blockInfo = new BlockInfoContiguousUnderConstruction(block, + numLocations, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, + targets); + } fsd.getBlockManager().addBlockCollection(blockInfo, fileINode); fileINode.addBlock(blockInfo); @@ -576,7 +598,7 @@ class FSDirWriteFileOp { private static FileState analyzeFileState( FSNamesystem fsn, String src, long fileId, String clientName, ExtendedBlock previous, LocatedBlock[] onRetryBlock) - throws IOException { + throws IOException { assert fsn.hasReadLock(); checkBlock(fsn, previous); @@ -659,8 +681,8 @@ class FSDirWriteFileOp { "allocation of a new block in " + src + ". Returning previously" + " allocated block " + lastBlockInFile); long offset = file.computeFileSize(); - BlockInfoContiguousUnderConstruction lastBlockUC = - (BlockInfoContiguousUnderConstruction) lastBlockInFile; + BlockInfoUnderConstruction lastBlockUC = + (BlockInfoUnderConstruction) lastBlockInFile; onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile, lastBlockUC.getExpectedStorageLocations(), offset); return new FileState(file, src, iip); @@ -685,14 +707,8 @@ class FSDirWriteFileOp { checkBlock(fsn, last); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); src = fsn.dir.resolvePath(pc, src, pathComponents); - boolean success = completeFileInternal(fsn, src, holder, - ExtendedBlock.getLocalBlock(last), - fileId); - if (success) { - NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg - + " is closed by " + holder); - } - return success; + return completeFileInternal(fsn, src, holder, + ExtendedBlock.getLocalBlock(last), fileId); } private static boolean completeFileInternal( @@ -794,13 +810,12 @@ class FSDirWriteFileOp { * @param targets target datanodes where replicas of the new block is placed * @throws QuotaExceededException If addition of block exceeds space quota */ - private static void saveAllocatedBlock( - FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock, - DatanodeStorageInfo[] targets) - throws IOException { + private static void saveAllocatedBlock(FSNamesystem fsn, String src, + INodesInPath inodesInPath, Block newBlock, DatanodeStorageInfo[] targets, + boolean isStriped) throws IOException { assert fsn.hasWriteLock(); - BlockInfoContiguous b = addBlock(fsn.dir, src, inodesInPath, newBlock, - targets); + BlockInfo b = addBlock(fsn.dir, src, inodesInPath, newBlock, targets, + isStriped); NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src); DatanodeStorageInfo.incrementBlocksScheduled(targets); } @@ -849,15 +864,15 @@ class FSDirWriteFileOp { static class ValidateAddBlockResult { final long blockSize; - final int replication; + final int numTargets; final byte storagePolicyID; final String clientMachine; ValidateAddBlockResult( - long blockSize, int replication, byte storagePolicyID, + long blockSize, int numTargets, byte storagePolicyID, String clientMachine) { this.blockSize = blockSize; - this.replication = replication; + this.numTargets = numTargets; this.storagePolicyID = storagePolicyID; this.clientMachine = clientMachine; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 3c2c9cfa744..71a3dcf29bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3073,6 +3073,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, writeUnlock(); } getEditLog().logSync(); + if (success) { + NameNode.stateChangeLog.info("DIR* completeFile: " + src + + " is closed by " + holder); + } return success; } @@ -3080,7 +3084,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * Create new block with a unique block id and a new generation stamp. * @param isStriped is the file under striping or contiguous layout? */ - Block createNewBlock() throws IOException { + Block createNewBlock(boolean isStriped) throws IOException { assert hasWriteLock(); Block b = new Block(nextBlockId(isStriped), 0, 0); // Increment the generation stamp for every new block. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 154198c5206..41287e87cf7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -905,14 +905,6 @@ public class INodeFile extends INodeWithAdditionalFields return counts; } - public final short getReplication(int lastSnapshotId) { - if (lastSnapshotId != CURRENT_STATE_ID) { - return getFileReplication(lastSnapshotId); - } else { - return getBlockReplication(); - } - } - /** * Return the penultimate allocated block for this file. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index f7ae88a4737..c95f0b4e0a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -87,7 +87,7 @@ public class StripedBlockUtil { new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, - bg.getStartOffset() + idxInBlockGroup, bg.isCorrupt(), + bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(), null); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 0165189bc9c..9f106cfcabe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1915,7 +1915,7 @@ public class DFSTestUtil { fileNode.getId(), null); final BlockInfo lastBlock = fileNode.getLastBlock(); - final int groupSize = fileNode.getBlockReplication(); + final int groupSize = fileNode.getPreferredBlockReplication(); assert dataNodes.size() >= groupSize; // 1. RECEIVING_BLOCK IBR for (int i = 0; i < groupSize; i++) {