From 1e1e93040748231dc913190aec1e031c379d8271 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Mon, 2 Mar 2015 13:44:33 -0800 Subject: [PATCH] HDFS-7837. Erasure Coding: allocate and persist striped blocks in NameNode. Contributed by Jing Zhao. --- .../blockmanagement/BlockIdManager.java | 31 +++- .../server/blockmanagement/BlockInfo.java | 4 +- .../blockmanagement/BlockInfoContiguous.java | 7 +- .../blockmanagement/BlockInfoStriped.java | 8 +- .../server/blockmanagement/BlockManager.java | 44 ++++-- .../server/blockmanagement/BlocksMap.java | 20 ++- .../blockmanagement/DecommissionManager.java | 9 +- .../server/namenode/FSDirWriteFileOp.java | 20 +-- .../hdfs/server/namenode/FSEditLogLoader.java | 69 ++++++--- .../hdfs/server/namenode/FSImageFormat.java | 12 +- .../server/namenode/FSImageFormatPBINode.java | 5 +- .../namenode/FSImageFormatProtobuf.java | 9 +- .../hdfs/server/namenode/FSNamesystem.java | 18 +-- .../hdfs/server/namenode/INodeFile.java | 25 ++- .../namenode/NameNodeLayoutVersion.java | 3 +- .../hadoop-hdfs/src/main/proto/fsimage.proto | 1 + .../server/namenode/TestAddBlockgroup.java | 85 ---------- .../server/namenode/TestAddStripedBlocks.java | 146 ++++++++++++++++++ 18 files changed, 340 insertions(+), 176 deletions(-) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockgroup.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java index fa800c572f2..8a71f18715a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java @@ -104,20 +104,37 @@ public class BlockIdManager { } /** - * Sets the maximum allocated block ID for this filesystem. This is + * Sets the maximum allocated contiguous block ID for this filesystem. This is * the basis for allocating new block IDs. */ - public void setLastAllocatedBlockId(long blockId) { + public void setLastAllocatedContiguousBlockId(long blockId) { blockIdGenerator.skipTo(blockId); } /** - * Gets the maximum sequentially allocated block ID for this filesystem + * Gets the maximum sequentially allocated contiguous block ID for this + * filesystem */ - public long getLastAllocatedBlockId() { + public long getLastAllocatedContiguousBlockId() { return blockIdGenerator.getCurrentValue(); } + /** + * Sets the maximum allocated striped block ID for this filesystem. This is + * the basis for allocating new block IDs. + */ + public void setLastAllocatedStripedBlockId(long blockId) { + blockGroupIdGenerator.skipTo(blockId); + } + + /** + * Gets the maximum sequentially allocated striped block ID for this + * filesystem + */ + public long getLastAllocatedStripedBlockId() { + return blockGroupIdGenerator.getCurrentValue(); + } + /** * Sets the current generation stamp for legacy blocks */ @@ -189,11 +206,11 @@ public class BlockIdManager { /** * Increments, logs and then returns the block ID */ - public long nextBlockId() { + public long nextContiguousBlockId() { return blockIdGenerator.nextValue(); } - public long nextBlockGroupId() { + public long nextStripedBlockId() { return blockGroupIdGenerator.nextValue(); } @@ -217,7 +234,7 @@ public class BlockIdManager { return id < 0; } - public static long convertToGroupID(long id) { + public static long convertToStripedID(long id) { return id & (~HdfsConstants.BLOCK_GROUP_INDEX_MASK); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index bf8e64f4ed7..8b7192521aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -169,6 +169,8 @@ public abstract class BlockInfo extends Block */ abstract void replaceBlock(BlockInfo newBlock); + public abstract boolean isStriped(); + /** * Find specified DatanodeDescriptor. * @return index or -1 if not found. @@ -336,7 +338,7 @@ public abstract class BlockInfo extends Block } static BlockInfo copyOf(BlockInfo b) { - if (b instanceof BlockInfoContiguous) { + if (!b.isStriped()) { return new BlockInfoContiguous((BlockInfoContiguous) b); } else { return new BlockInfoStriped((BlockInfoStriped) b); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index e30e022eaa5..d3051a300ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@ -148,7 +148,7 @@ public class BlockInfoContiguous extends BlockInfo { * happen only when replication is manually increased by the user. */ Object[] old = triplets; triplets = new Object[(last+num)*3]; - System.arraycopy(old, 0, triplets, 0, last*3); + System.arraycopy(old, 0, triplets, 0, last * 3); return last; } @@ -232,4 +232,9 @@ public class BlockInfoContiguous extends BlockInfo { ucBlock.setBlockCollection(getBlockCollection()); return ucBlock; } + + @Override + public final boolean isStriped() { + return false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index 57de772f11f..8b458df82dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; /** @@ -57,7 +56,7 @@ public class BlockInfoStriped extends BlockInfo { this.setBlockCollection(b.getBlockCollection()); } - private short getTotalBlockNum() { + short getTotalBlockNum() { return (short) (dataBlockNum + parityBlockNum); } @@ -174,6 +173,11 @@ public class BlockInfoStriped extends BlockInfo { } } + @Override + public final boolean isStriped() { + return true; + } + @Override public int numNodes() { assert this.triplets != null : "BlockInfo is not initialized"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 80394aaf1cf..70e697e7e45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -582,11 +582,22 @@ public class BlockManager { return maxReplicationStreams; } - /** - * @return true if the block has minimum replicas - */ - public boolean checkMinReplication(Block block) { - return (countNodes(block).liveReplicas() >= minReplication); + public int getDefaultStorageNum(BlockInfo block) { + return block.isStriped() ? + ((BlockInfoStriped) block).getTotalBlockNum() : defaultReplication; + } + + public short getMinStorageNum(BlockInfo block) { + return block.isStriped() ? + ((BlockInfoStriped) block).getDataBlockNum() : minReplication; + } + + public boolean checkMinStorage(BlockInfo block) { + return countNodes(block).liveReplicas() >= getMinStorageNum(block); + } + + public boolean checkMinStorage(BlockInfo block, int liveNum) { + return liveNum >= getMinStorageNum(block); } /** @@ -630,7 +641,7 @@ public class BlockManager { return false; // already completed (e.g. by syncBlock) final boolean b = commitBlock(lastBlock, commitBlock); - if (countNodes(lastBlock).liveReplicas() >= minReplication) { + if (checkMinStorage(lastBlock)) { completeBlock(bc, bc.numBlocks() - 1, false); } return b; @@ -654,7 +665,7 @@ public class BlockManager { } int numNodes = curBlock.numNodes(); - if (!force && numNodes < minReplication) { + if (!force && !checkMinStorage(curBlock, numNodes)) { throw new IOException("Cannot complete block: " + "block does not satisfy minimal replication requirement."); } @@ -698,9 +709,8 @@ public class BlockManager { * when tailing edit logs as a Standby. */ public BlockInfo forceCompleteBlock(final BlockCollection bc, - final BlockInfoContiguousUnderConstruction block) throws IOException { - // TODO: support BlockInfoStripedUC for editlog - block.commitBlock(block); + final BlockInfo block) throws IOException { + BlockInfo.commitBlock(block, block); return completeBlock(bc, block, true); } @@ -751,7 +761,7 @@ public class BlockManager { // count in safe-mode. namesystem.adjustSafeModeBlockTotals( // decrement safe if we had enough - targets.length >= minReplication ? -1 : 0, + checkMinStorage(oldBlock, targets.length) ? -1 : 0, // always decrement total blocks -1); @@ -1197,8 +1207,8 @@ public class BlockManager { NumberReplicas numberOfReplicas = countNodes(b.stored); boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= expectedReplicas; - boolean minReplicationSatisfied = - numberOfReplicas.liveReplicas() >= minReplication; + boolean minReplicationSatisfied = checkMinStorage(b.stored, + numberOfReplicas.liveReplicas()); boolean hasMoreCorruptReplicas = minReplicationSatisfied && (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) > expectedReplicas; @@ -2526,7 +2536,7 @@ public class BlockManager { // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED - && numCurrentReplica >= minReplication) { + && checkMinStorage(storedBlock, numCurrentReplica)) { completeBlock(storedBlock.getBlockCollection(), storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block @@ -2601,7 +2611,7 @@ public class BlockManager { + pendingReplications.getNumReplicas(storedBlock); if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && - numLiveReplicas >= minReplication) { + checkMinStorage(storedBlock, numLiveReplicas)) { storedBlock = completeBlock(bc, storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block @@ -3283,6 +3293,8 @@ public class BlockManager { /** * Return the number of nodes hosting a given block, grouped * by the state of those replicas. + * For a striped block, this includes nodes storing blocks belonging to the + * striped block group. */ public NumberReplicas countNodes(Block b) { int decommissioned = 0; @@ -3438,7 +3450,7 @@ public class BlockManager { BlockInfo info = null; if (BlockIdManager.isStripedBlockID(block.getBlockId())) { info = blocksMap.getStoredBlock( - new Block(BlockIdManager.convertToGroupID(block.getBlockId()))); + new Block(BlockIdManager.convertToStripedID(block.getBlockId()))); } if (info == null) { info = blocksMap.getStoredBlock(block); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 59ff030b89d..b093e5b143d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -43,8 +43,15 @@ class BlocksMap { @Override public boolean hasNext() { - return blockInfo != null && nextIdx < blockInfo.getCapacity() - && blockInfo.getDatanode(nextIdx) != null; + if (blockInfo == null) { + return false; + } + while (nextIdx < blockInfo.getCapacity() && + blockInfo.getDatanode(nextIdx) == null) { + // note that for striped blocks there may be null in the triplets + nextIdx++; + } + return nextIdx < blockInfo.getCapacity(); } @Override @@ -123,10 +130,13 @@ class BlocksMap { return; blockInfo.setBlockCollection(null); - // TODO: fix this logic for block group - for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) { + final int size = blockInfo instanceof BlockInfoContiguous ? + blockInfo.numNodes() : blockInfo.getCapacity(); + for(int idx = size - 1; idx >= 0; idx--) { DatanodeDescriptor dn = blockInfo.getDatanode(idx); - dn.removeBlock(blockInfo); // remove from the list and wipe the location + if (dn != null) { + dn.removeBlock(blockInfo); // remove from the list and wipe the location + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index 5f7366e5a04..8a3657611c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -544,7 +544,7 @@ public class DecommissionManager { int underReplicatedInOpenFiles = 0; while (it.hasNext()) { numBlocksChecked++; - final BlockInfoContiguous block = it.next(); + final BlockInfo block = it.next(); // Remove the block from the list if it's no longer in the block map, // e.g. the containing file has been deleted if (blockManager.blocksMap.getStoredBlock(block) == null) { @@ -578,8 +578,9 @@ public class DecommissionManager { } // Even if the block is under-replicated, - // it doesn't block decommission if it's sufficiently replicated - if (isSufficientlyReplicated(block, bc, num)) { + // it doesn't block decommission if it's sufficiently replicated + BlockInfoContiguous blk = (BlockInfoContiguous) block; + if (isSufficientlyReplicated(blk, bc, num)) { if (pruneSufficientlyReplicated) { it.remove(); } @@ -588,7 +589,7 @@ public class DecommissionManager { // We've found an insufficiently replicated block. if (insufficientlyReplicated != null) { - insufficientlyReplicated.add(block); + insufficientlyReplicated.add(blk); } // Log if this is our first time through if (firstReplicationLog) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 307bd594226..325d9d34e81 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -471,21 +471,20 @@ class FSDirWriteFileOp { assert fsd.hasWriteLock(); if (underConstruction) { newNode = newINodeFile(id, permissions, modificationTime, - modificationTime, replication, - preferredBlockSize, - storagePolicyId); + modificationTime, replication, preferredBlockSize, storagePolicyId); newNode.toUnderConstruction(clientName, clientMachine); } else { - newNode = newINodeFile(id, permissions, modificationTime, - atime, replication, - preferredBlockSize, - storagePolicyId); + newNode = newINodeFile(id, permissions, modificationTime, atime, + replication, preferredBlockSize, storagePolicyId); } newNode.setLocalName(localName); try { INodesInPath iip = fsd.addINode(existing, newNode); if (iip != null) { + if (newNode.isStriped()) { + newNode.addStripedBlocksFeature(); + } if (aclEntries != null) { AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID); } @@ -553,7 +552,7 @@ class FSDirWriteFileOp { long modTime = now(); INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, - modTime, modTime, replication, preferredBlockSize); + modTime, modTime, replication, preferredBlockSize); newNode.setLocalName(localName.getBytes(Charsets.UTF_8)); newNode.toUnderConstruction(clientName, clientMachine); @@ -561,12 +560,15 @@ class FSDirWriteFileOp { fsd.writeLock(); try { newiip = fsd.addINode(existing, newNode); + if (newiip != null && newNode.isStriped()) { + newNode.addStripedBlocksFeature(); + } } finally { fsd.writeUnlock(); } if (newiip == null) { NameNode.stateChangeLog.info("DIR* addFile: failed to add " + - existing.getPath() + "/" + localName); + existing.getPath() + "/" + localName); return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index ce93ad7036b..c3ab2d5843b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -38,7 +38,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -413,7 +415,8 @@ public class FSEditLogLoader { // Update the salient file attributes. newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID); newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID); - updateBlocks(fsDir, addCloseOp, iip, newFile); + // TODO whether the file is striped should later be retrieved from iip + updateBlocks(fsDir, addCloseOp, iip, newFile, newFile.isStriped()); break; } case OP_CLOSE: { @@ -433,7 +436,8 @@ public class FSEditLogLoader { // Update the salient file attributes. file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID); file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID); - updateBlocks(fsDir, addCloseOp, iip, file); + // TODO whether the file is striped should later be retrieved from iip + updateBlocks(fsDir, addCloseOp, iip, file, file.isStriped()); // Now close the file if (!file.isUnderConstruction() && @@ -491,7 +495,8 @@ public class FSEditLogLoader { INodesInPath iip = fsDir.getINodesInPath(path, true); INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path); // Update in-memory data structures - updateBlocks(fsDir, updateOp, iip, oldFile); + // TODO whether the file is striped should later be retrieved from iip + updateBlocks(fsDir, updateOp, iip, oldFile, oldFile.isStriped()); if (toAddRetryCache) { fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId); @@ -507,7 +512,8 @@ public class FSEditLogLoader { } INodeFile oldFile = INodeFile.valueOf(fsDir.getINode(path), path); // add the new block to the INodeFile - addNewBlock(addBlockOp, oldFile); + // TODO whether the file is striped should later be retrieved from iip + addNewBlock(addBlockOp, oldFile, oldFile.isStriped()); break; } case OP_SET_REPLICATION: { @@ -787,8 +793,15 @@ public class FSEditLogLoader { } case OP_ALLOCATE_BLOCK_ID: { AllocateBlockIdOp allocateBlockIdOp = (AllocateBlockIdOp) op; - fsNamesys.getBlockIdManager().setLastAllocatedBlockId( - allocateBlockIdOp.blockId); + if (BlockIdManager.isStripedBlockID(allocateBlockIdOp.blockId)) { + // ALLOCATE_BLOCK_ID is added for sequential block id, thus if the id + // is negative, it must belong to striped blocks + fsNamesys.getBlockIdManager().setLastAllocatedStripedBlockId( + allocateBlockIdOp.blockId); + } else { + fsNamesys.getBlockIdManager().setLastAllocatedContiguousBlockId( + allocateBlockIdOp.blockId); + } break; } case OP_ROLLING_UPGRADE_START: { @@ -940,9 +953,9 @@ public class FSEditLogLoader { /** * Add a new block into the given INodeFile - * TODO support adding striped block */ - private void addNewBlock(AddBlockOp op, INodeFile file) throws IOException { + private void addNewBlock(AddBlockOp op, INodeFile file, boolean isStriped) + throws IOException { BlockInfo[] oldBlocks = file.getBlocks(); Block pBlock = op.getPenultimateBlock(); Block newBlock= op.getLastBlock(); @@ -950,7 +963,7 @@ public class FSEditLogLoader { if (pBlock != null) { // the penultimate block is not null assert oldBlocks != null && oldBlocks.length > 0; // compare pBlock with the last block of oldBlocks - Block oldLastBlock = oldBlocks[oldBlocks.length - 1]; + BlockInfo oldLastBlock = oldBlocks[oldBlocks.length - 1]; if (oldLastBlock.getBlockId() != pBlock.getBlockId() || oldLastBlock.getGenerationStamp() != pBlock.getGenerationStamp()) { throw new IOException( @@ -960,29 +973,33 @@ public class FSEditLogLoader { } oldLastBlock.setNumBytes(pBlock.getNumBytes()); - if (oldLastBlock instanceof BlockInfoContiguousUnderConstruction) { - fsNamesys.getBlockManager().forceCompleteBlock(file, - (BlockInfoContiguousUnderConstruction) oldLastBlock); + if (!oldLastBlock.isComplete()) { + fsNamesys.getBlockManager().forceCompleteBlock(file, oldLastBlock); fsNamesys.getBlockManager().processQueuedMessagesForBlock(pBlock); } } else { // the penultimate block is null Preconditions.checkState(oldBlocks == null || oldBlocks.length == 0); } // add the new block - BlockInfoContiguous newBI = new BlockInfoContiguousUnderConstruction( - newBlock, file.getPreferredBlockReplication()); - fsNamesys.getBlockManager().addBlockCollection(newBI, file); - file.addBlock(newBI); + final BlockInfo newBlockInfo; + if (isStriped) { + newBlockInfo = new BlockInfoStripedUnderConstruction(newBlock, + HdfsConstants.NUM_DATA_BLOCKS, HdfsConstants.NUM_PARITY_BLOCKS); + } else { + newBlockInfo = new BlockInfoContiguousUnderConstruction(newBlock, + file.getPreferredBlockReplication()); + } + fsNamesys.getBlockManager().addBlockCollection(newBlockInfo, file); + file.addBlock(newBlockInfo); fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock); } /** * Update in-memory data structures with new block information. - * TODO support adding striped block * @throws IOException */ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op, - INodesInPath iip, INodeFile file) throws IOException { + INodesInPath iip, INodeFile file, boolean isStriped) throws IOException { // Update its block list BlockInfo[] oldBlocks = file.getBlocks(); Block[] newBlocks = op.getBlocks(); @@ -1011,11 +1028,10 @@ public class FSEditLogLoader { oldBlock.getGenerationStamp() != newBlock.getGenerationStamp(); oldBlock.setGenerationStamp(newBlock.getGenerationStamp()); - if (oldBlock instanceof BlockInfoContiguousUnderConstruction && + if (!oldBlock.isComplete() && (!isLastBlock || op.shouldCompleteLastBlock())) { changeMade = true; - fsNamesys.getBlockManager().forceCompleteBlock(file, - (BlockInfoContiguousUnderConstruction) oldBlock); + fsNamesys.getBlockManager().forceCompleteBlock(file, oldBlock); } if (changeMade) { // The state or gen-stamp of the block has changed. So, we may be @@ -1045,13 +1061,18 @@ public class FSEditLogLoader { // We're adding blocks for (int i = oldBlocks.length; i < newBlocks.length; i++) { Block newBlock = newBlocks[i]; - BlockInfoContiguous newBI; + final BlockInfo newBI; if (!op.shouldCompleteLastBlock()) { // TODO: shouldn't this only be true for the last block? // what about an old-version fsync() where fsync isn't called // until several blocks in? - newBI = new BlockInfoContiguousUnderConstruction( - newBlock, file.getPreferredBlockReplication()); + if (isStriped) { + newBI = new BlockInfoStripedUnderConstruction(newBlock, + HdfsConstants.NUM_DATA_BLOCKS, HdfsConstants.NUM_PARITY_BLOCKS); + } else { + newBI = new BlockInfoContiguousUnderConstruction(newBlock, + file.getPreferredBlockReplication()); + } } else { // OP_CLOSE should add finalized blocks. This code path // is only executed when loading edits written by prior diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index ff9baf4f42f..aef0b2811b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -359,7 +359,14 @@ public class FSImageFormat { // read the max sequential block ID. long maxSequentialBlockId = in.readLong(); - namesystem.getBlockIdManager().setLastAllocatedBlockId(maxSequentialBlockId); + namesystem.getBlockIdManager().setLastAllocatedContiguousBlockId( + maxSequentialBlockId); + if (NameNodeLayoutVersion.supports( + NameNodeLayoutVersion.Feature.ERASURE_CODING, imgVersion)) { + final long maxStripedBlockId = in.readLong(); + namesystem.getBlockIdManager().setLastAllocatedStripedBlockId( + maxStripedBlockId); + } } else { long startingGenStamp = namesystem.getBlockIdManager() @@ -1269,7 +1276,8 @@ public class FSImageFormat { out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV1()); out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV2()); out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampAtblockIdSwitch()); - out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedBlockId()); + out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedContiguousBlockId()); + out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedStripedBlockId()); out.writeLong(context.getTxId()); out.writeLong(sourceNamesystem.dir.getLastInodeId()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index b8dfa518b3e..a58e37ef6bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -643,8 +643,9 @@ public final class FSImageFormatPBINode { INodeSection.INodeFile.Builder b = buildINodeFile(n, parent.getSaverContext()); - if (n.getBlocks() != null) { - for (Block block : n.getBlocks()) { + BlockInfoContiguous[] cBlks = n.getContiguousBlocks(); + if (cBlks != null) { + for (Block block : cBlks) { b.addBlocks(PBHelper.convert(block)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index 24afcae4e07..4b75c2c9b18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -296,7 +296,11 @@ public final class FSImageFormatProtobuf { blockIdManager.setGenerationStampV1(s.getGenstampV1()); blockIdManager.setGenerationStampV2(s.getGenstampV2()); blockIdManager.setGenerationStampV1Limit(s.getGenstampV1Limit()); - blockIdManager.setLastAllocatedBlockId(s.getLastAllocatedBlockId()); + blockIdManager.setLastAllocatedContiguousBlockId(s.getLastAllocatedBlockId()); + if (s.hasLastAllocatedStripedBlockId()) { + blockIdManager.setLastAllocatedStripedBlockId( + s.getLastAllocatedStripedBlockId()); + } imgTxId = s.getTransactionId(); if (s.hasRollingUpgradeStartTime() && fsn.getFSImage().hasRollbackFSImage()) { @@ -536,7 +540,8 @@ public final class FSImageFormatProtobuf { .setGenstampV1(blockIdManager.getGenerationStampV1()) .setGenstampV1Limit(blockIdManager.getGenerationStampV1Limit()) .setGenstampV2(blockIdManager.getGenerationStampV2()) - .setLastAllocatedBlockId(blockIdManager.getLastAllocatedBlockId()) + .setLastAllocatedBlockId(blockIdManager.getLastAllocatedContiguousBlockId()) + .setLastAllocatedStripedBlockId(blockIdManager.getLastAllocatedStripedBlockId()) .setTransactionId(context.getTxId()); // We use the non-locked version of getNamespaceInfo here since diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 6097c71dac0..fca848e27a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -206,7 +206,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@ -2099,7 +2098,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, boolean shouldRecoverNow = (newBlock == null); BlockInfo oldBlock = file.getLastBlock(); - assert oldBlock instanceof BlockInfoContiguous; + assert !oldBlock.isStriped(); boolean shouldCopyOnTruncate = shouldCopyOnTruncate(file, (BlockInfoContiguous) oldBlock); @@ -3266,7 +3265,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, BlockInfo bi = getStoredBlock(b); if (bi.isComplete()) { numRemovedComplete++; - if (bi.numNodes() >= blockManager.minReplication) { + if (blockManager.checkMinStorage(bi, bi.numNodes())) { numRemovedSafe++; } } @@ -3495,7 +3494,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, curBlock = blocks[nrCompleteBlocks]; if(!curBlock.isComplete()) break; - assert blockManager.checkMinReplication(curBlock) : + assert blockManager.checkMinStorage(curBlock) : "A COMPLETE block is not minimally replicated in " + src; } @@ -3530,8 +3529,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, BlockInfo penultimateBlock = pendingFile.getPenultimateBlock(); // If penultimate block doesn't exist then its minReplication is met - boolean penultimateBlockMinReplication = penultimateBlock == null || - blockManager.checkMinReplication(penultimateBlock); + boolean penultimateBlockMinStorage = penultimateBlock == null || + blockManager.checkMinStorage(penultimateBlock); switch(lastBlockState) { case COMPLETE: @@ -3539,8 +3538,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, break; case COMMITTED: // Close file if committed blocks are minimally replicated - if(penultimateBlockMinReplication && - blockManager.checkMinReplication(lastBlock)) { + if(penultimateBlockMinStorage && + blockManager.checkMinStorage(lastBlock)) { finalizeINodeFileUnderConstruction(src, pendingFile, iip.getLatestSnapshotId()); NameNode.stateChangeLog.warn("BLOCK*" @@ -3640,6 +3639,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } // Adjust disk space consumption if required + // TODO: support EC files final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes(); if (diff > 0) { try { @@ -5634,7 +5634,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, assert hasWriteLock(); checkNameNodeSafeMode("Cannot get next block ID"); final long blockId = isStriped ? - blockIdManager.nextBlockGroupId() : blockIdManager.nextBlockId(); + blockIdManager.nextStripedBlockId() : blockIdManager.nextContiguousBlockId(); getEditLog().logAllocateBlockId(blockId); // NB: callers sync the log return blockId; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index ac58812b7cc..bc25f254fdc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -86,7 +86,7 @@ public class INodeFile extends INodeWithAdditionalFields */ static enum HeaderFormat { PREFERRED_BLOCK_SIZE(null, 48, 1), - REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 1), + REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 0), STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicySuite.ID_BIT_LENGTH, 0); @@ -261,10 +261,10 @@ public class INodeFile extends INodeWithAdditionalFields public void setBlock(int index, BlockInfo blk) { FileWithStripedBlocksFeature sb = getStripedBlocksFeature(); if (sb == null) { - assert blk instanceof BlockInfoContiguous; + assert !blk.isStriped(); this.blocks[index] = (BlockInfoContiguous) blk; } else { - assert blk instanceof BlockInfoStriped; + assert blk.isStriped(); assert hasNoContiguousBlock(); sb.setBlock(index, (BlockInfoStriped) blk); } @@ -282,12 +282,12 @@ public class INodeFile extends INodeWithAdditionalFields final BlockInfo ucBlock; FileWithStripedBlocksFeature sb = getStripedBlocksFeature(); if (sb == null) { - assert lastBlock instanceof BlockInfoContiguous; + assert !lastBlock.isStriped(); ucBlock = ((BlockInfoContiguous) lastBlock) .convertToBlockUnderConstruction(UNDER_CONSTRUCTION, locations); } else { assert hasNoContiguousBlock(); - assert lastBlock instanceof BlockInfoStriped; + assert lastBlock.isStriped(); ucBlock = ((BlockInfoStriped) lastBlock) .convertToBlockUnderConstruction(UNDER_CONSTRUCTION, locations); } @@ -548,7 +548,7 @@ public class INodeFile extends INodeWithAdditionalFields /** * add a contiguous block to the block list */ - void addBlock(BlockInfoContiguous newblock) { + private void addContiguousBlock(BlockInfoContiguous newblock) { if (this.blocks == null) { this.setContiguousBlocks(new BlockInfoContiguous[]{newblock}); } else { @@ -560,6 +560,19 @@ public class INodeFile extends INodeWithAdditionalFields } } + /** add a striped or contiguous block */ + void addBlock(BlockInfo newblock) { + FileWithStripedBlocksFeature sb = getStripedBlocksFeature(); + if (sb == null) { + assert !newblock.isStriped(); + addContiguousBlock((BlockInfoContiguous) newblock); + } else { + assert newblock.isStriped(); + assert hasNoContiguousBlock(); + sb.addBlock((BlockInfoStriped) newblock); + } + } + /** Set the blocks. */ public void setContiguousBlocks(BlockInfoContiguous[] blocks) { this.blocks = blocks; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java index d235e2b4ccb..f93218fccc1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java @@ -72,7 +72,8 @@ public class NameNodeLayoutVersion { BLOCK_STORAGE_POLICY(-60, "Block Storage policy"), TRUNCATE(-61, "Truncate"), APPEND_NEW_BLOCK(-62, "Support appending to new block"), - QUOTA_BY_STORAGE_TYPE(-63, "Support quota for specific storage types"); + QUOTA_BY_STORAGE_TYPE(-63, "Support quota for specific storage types"), + ERASURE_CODING(-64, "Support erasure coding"); private final FeatureInfo info; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto index b6fd03396fa..3f3a71ec64f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto @@ -73,6 +73,7 @@ message NameSystemSection { optional uint64 lastAllocatedBlockId = 5; optional uint64 transactionId = 6; optional uint64 rollingUpgradeStartTime = 7; + optional uint64 lastAllocatedStripedBlockId = 8; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockgroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockgroup.java deleted file mode 100644 index a2ef7b2d18f..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockgroup.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; - -import static org.junit.Assert.assertEquals; - -public class TestAddBlockgroup { - - public static final Log LOG = LogFactory.getLog(TestAddBlockgroup.class); - - private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS + - HdfsConstants.NUM_PARITY_BLOCKS; - private final short NUM_DATANODES = GROUP_SIZE; - - private static final int BLOCKSIZE = 1024; - private static final short REPLICATION = 3; - - private MiniDFSCluster cluster; - private Configuration conf; - - @Before - public void setup() throws IOException { - conf = new Configuration(); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES) - .build(); - cluster.waitActive(); - cluster.getFileSystem().setStoragePolicy(new Path("/"), - HdfsConstants.EC_STORAGE_POLICY_NAME); - } - - @After - public void tearDown() { - if (cluster != null) { - cluster.shutdown(); - } - } - - @Test - public void testAddBlockGroup() throws Exception { - DistributedFileSystem fs = cluster.getFileSystem(); - FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); - - final Path file1 = new Path("/file1"); - DFSTestUtil.createFile(fs, file1, BLOCKSIZE * 2, REPLICATION, 0L); - INodeFile file1Node = fsdir.getINode4Write(file1.toString()).asFile(); - BlockInfo[] file1Blocks = file1Node.getBlocks(); - assertEquals(2, file1Blocks.length); - assertEquals(GROUP_SIZE, file1Blocks[0].numNodes()); - assertEquals(HdfsConstants.MAX_BLOCKS_IN_GROUP, - file1Blocks[1].getBlockId() - file1Blocks[0].getBlockId()); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java new file mode 100644 index 00000000000..7226f519bbd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.io.IOUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class TestAddStripedBlocks { + private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS + + HdfsConstants.NUM_PARITY_BLOCKS; + + private MiniDFSCluster cluster; + private DistributedFileSystem dfs; + + @Before + public void setup() throws IOException { + cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()) + .numDataNodes(GROUP_SIZE).build(); + cluster.waitActive(); + dfs = cluster.getFileSystem(); + dfs.setStoragePolicy(new Path("/"), HdfsConstants.EC_STORAGE_POLICY_NAME); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testAddStripedBlock() throws Exception { + final Path file = new Path("/file1"); + // create an empty file + FSDataOutputStream out = null; + try { + out = dfs.create(file, (short) 1); + + FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); + INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); + LocatedBlock newBlock = cluster.getNamesystem().getAdditionalBlock( + file.toString(), fileNode.getId(), dfs.getClient().getClientName(), + null, null, null); + assertEquals(GROUP_SIZE, newBlock.getLocations().length); + assertEquals(GROUP_SIZE, newBlock.getStorageIDs().length); + + BlockInfo[] blocks = fileNode.getBlocks(); + assertEquals(1, blocks.length); + Assert.assertTrue(blocks[0].isStriped()); + + checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), true); + } finally { + IOUtils.cleanup(null, out); + } + + // restart NameNode to check editlog + cluster.restartNameNode(true); + FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); + INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); + BlockInfo[] blocks = fileNode.getBlocks(); + assertEquals(1, blocks.length); + Assert.assertTrue(blocks[0].isStriped()); + checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), false); + + // save namespace, restart namenode, and check + dfs = cluster.getFileSystem(); + dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); + dfs.saveNamespace(); + dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + cluster.restartNameNode(true); + fsdir = cluster.getNamesystem().getFSDirectory(); + fileNode = fsdir.getINode4Write(file.toString()).asFile(); + blocks = fileNode.getBlocks(); + assertEquals(1, blocks.length); + Assert.assertTrue(blocks[0].isStriped()); + checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), false); + } + + private void checkStripedBlockUC(BlockInfoStriped block, + boolean checkReplica) { + assertEquals(0, block.numNodes()); + Assert.assertFalse(block.isComplete()); + Assert.assertEquals(HdfsConstants.NUM_DATA_BLOCKS, block.getDataBlockNum()); + Assert.assertEquals(HdfsConstants.NUM_PARITY_BLOCKS, + block.getParityBlockNum()); + Assert.assertEquals(0, + block.getBlockId() & HdfsConstants.BLOCK_GROUP_INDEX_MASK); + + final BlockInfoStripedUnderConstruction blockUC = + (BlockInfoStripedUnderConstruction) block; + Assert.assertEquals(HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, + blockUC.getBlockUCState()); + if (checkReplica) { + Assert.assertEquals(GROUP_SIZE, blockUC.getNumExpectedLocations()); + DatanodeStorageInfo[] storages = blockUC.getExpectedStorageLocations(); + for (DataNode dn : cluster.getDataNodes()) { + Assert.assertTrue(includeDataNode(dn.getDatanodeId(), storages)); + } + } + } + + private boolean includeDataNode(DatanodeID dn, DatanodeStorageInfo[] storages) { + for (DatanodeStorageInfo storage : storages) { + if (storage.getDatanodeDescriptor().equals(dn)) { + return true; + } + } + return false; + } +}