HDFS-7837. Erasure Coding: allocate and persist striped blocks in NameNode. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-03-02 13:44:33 -08:00 committed by Zhe Zhang
parent 9af1f4779b
commit 1e1e930407
18 changed files with 340 additions and 176 deletions

View File

@ -104,20 +104,37 @@ SequentialBlockIdGenerator getBlockIdGenerator() {
}
/**
* Sets the maximum allocated block ID for this filesystem. This is
* Sets the maximum allocated contiguous block ID for this filesystem. This is
* the basis for allocating new block IDs.
*/
public void setLastAllocatedBlockId(long blockId) {
public void setLastAllocatedContiguousBlockId(long blockId) {
blockIdGenerator.skipTo(blockId);
}
/**
* Gets the maximum sequentially allocated block ID for this filesystem
* Gets the maximum sequentially allocated contiguous block ID for this
* filesystem
*/
public long getLastAllocatedBlockId() {
public long getLastAllocatedContiguousBlockId() {
return blockIdGenerator.getCurrentValue();
}
/**
* Sets the maximum allocated striped block ID for this filesystem. This is
* the basis for allocating new block IDs.
*/
public void setLastAllocatedStripedBlockId(long blockId) {
blockGroupIdGenerator.skipTo(blockId);
}
/**
* Gets the maximum sequentially allocated striped block ID for this
* filesystem
*/
public long getLastAllocatedStripedBlockId() {
return blockGroupIdGenerator.getCurrentValue();
}
/**
* Sets the current generation stamp for legacy blocks
*/
@ -189,11 +206,11 @@ public boolean isLegacyBlock(Block block) {
/**
* Increments, logs and then returns the block ID
*/
public long nextBlockId() {
public long nextContiguousBlockId() {
return blockIdGenerator.nextValue();
}
public long nextBlockGroupId() {
public long nextStripedBlockId() {
return blockGroupIdGenerator.nextValue();
}
@ -217,7 +234,7 @@ public static boolean isStripedBlockID(long id) {
return id < 0;
}
public static long convertToGroupID(long id) {
public static long convertToStripedID(long id) {
return id & (~HdfsConstants.BLOCK_GROUP_INDEX_MASK);
}

View File

@ -169,6 +169,8 @@ public int getCapacity() {
*/
abstract void replaceBlock(BlockInfo newBlock);
public abstract boolean isStriped();
/**
* Find specified DatanodeDescriptor.
* @return index or -1 if not found.
@ -336,7 +338,7 @@ public void setNext(LightWeightGSet.LinkedElement next) {
}
static BlockInfo copyOf(BlockInfo b) {
if (b instanceof BlockInfoContiguous) {
if (!b.isStriped()) {
return new BlockInfoContiguous((BlockInfoContiguous) b);
} else {
return new BlockInfoStriped((BlockInfoStriped) b);

View File

@ -148,7 +148,7 @@ private int ensureCapacity(int num) {
* happen only when replication is manually increased by the user. */
Object[] old = triplets;
triplets = new Object[(last+num)*3];
System.arraycopy(old, 0, triplets, 0, last*3);
System.arraycopy(old, 0, triplets, 0, last * 3);
return last;
}
@ -232,4 +232,9 @@ public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction(
ucBlock.setBlockCollection(getBlockCollection());
return ucBlock;
}
@Override
public final boolean isStriped() {
return false;
}
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
/**
@ -57,7 +56,7 @@ public BlockInfoStriped(Block blk, short dataBlockNum, short parityBlockNum) {
this.setBlockCollection(b.getBlockCollection());
}
private short getTotalBlockNum() {
short getTotalBlockNum() {
return (short) (dataBlockNum + parityBlockNum);
}
@ -174,6 +173,11 @@ void replaceBlock(BlockInfo newBlock) {
}
}
@Override
public final boolean isStriped() {
return true;
}
@Override
public int numNodes() {
assert this.triplets != null : "BlockInfo is not initialized";

View File

@ -582,11 +582,22 @@ public int getMaxReplicationStreams() {
return maxReplicationStreams;
}
/**
* @return true if the block has minimum replicas
*/
public boolean checkMinReplication(Block block) {
return (countNodes(block).liveReplicas() >= minReplication);
public int getDefaultStorageNum(BlockInfo block) {
return block.isStriped() ?
((BlockInfoStriped) block).getTotalBlockNum() : defaultReplication;
}
public short getMinStorageNum(BlockInfo block) {
return block.isStriped() ?
((BlockInfoStriped) block).getDataBlockNum() : minReplication;
}
public boolean checkMinStorage(BlockInfo block) {
return countNodes(block).liveReplicas() >= getMinStorageNum(block);
}
public boolean checkMinStorage(BlockInfo block, int liveNum) {
return liveNum >= getMinStorageNum(block);
}
/**
@ -630,7 +641,7 @@ public boolean commitOrCompleteLastBlock(BlockCollection bc,
return false; // already completed (e.g. by syncBlock)
final boolean b = commitBlock(lastBlock, commitBlock);
if (countNodes(lastBlock).liveReplicas() >= minReplication) {
if (checkMinStorage(lastBlock)) {
completeBlock(bc, bc.numBlocks() - 1, false);
}
return b;
@ -654,7 +665,7 @@ private BlockInfo completeBlock(final BlockCollection bc,
}
int numNodes = curBlock.numNodes();
if (!force && numNodes < minReplication) {
if (!force && !checkMinStorage(curBlock, numNodes)) {
throw new IOException("Cannot complete block: " +
"block does not satisfy minimal replication requirement.");
}
@ -698,9 +709,8 @@ private BlockInfo completeBlock(final BlockCollection bc,
* when tailing edit logs as a Standby.
*/
public BlockInfo forceCompleteBlock(final BlockCollection bc,
final BlockInfoContiguousUnderConstruction block) throws IOException {
// TODO: support BlockInfoStripedUC for editlog
block.commitBlock(block);
final BlockInfo block) throws IOException {
BlockInfo.commitBlock(block, block);
return completeBlock(bc, block, true);
}
@ -751,7 +761,7 @@ public LocatedBlock convertLastBlockToUnderConstruction(
// count in safe-mode.
namesystem.adjustSafeModeBlockTotals(
// decrement safe if we had enough
targets.length >= minReplication ? -1 : 0,
checkMinStorage(oldBlock, targets.length) ? -1 : 0,
// always decrement total blocks
-1);
@ -1197,8 +1207,8 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
NumberReplicas numberOfReplicas = countNodes(b.stored);
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
expectedReplicas;
boolean minReplicationSatisfied =
numberOfReplicas.liveReplicas() >= minReplication;
boolean minReplicationSatisfied = checkMinStorage(b.stored,
numberOfReplicas.liveReplicas());
boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
expectedReplicas;
@ -2526,7 +2536,7 @@ private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
&& numCurrentReplica >= minReplication) {
&& checkMinStorage(storedBlock, numCurrentReplica)) {
completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
// check whether safe replication is reached for the block
@ -2601,7 +2611,7 @@ private Block addStoredBlock(final BlockInfo block,
+ pendingReplications.getNumReplicas(storedBlock);
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
numLiveReplicas >= minReplication) {
checkMinStorage(storedBlock, numLiveReplicas)) {
storedBlock = completeBlock(bc, storedBlock, false);
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
// check whether safe replication is reached for the block
@ -3283,6 +3293,8 @@ public void processIncrementalBlockReport(final DatanodeID nodeID,
/**
* Return the number of nodes hosting a given block, grouped
* by the state of those replicas.
* For a striped block, this includes nodes storing blocks belonging to the
* striped block group.
*/
public NumberReplicas countNodes(Block b) {
int decommissioned = 0;
@ -3438,7 +3450,7 @@ public BlockInfo getStoredBlock(Block block) {
BlockInfo info = null;
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
info = blocksMap.getStoredBlock(
new Block(BlockIdManager.convertToGroupID(block.getBlockId())));
new Block(BlockIdManager.convertToStripedID(block.getBlockId())));
}
if (info == null) {
info = blocksMap.getStoredBlock(block);

View File

@ -43,8 +43,15 @@ private static class StorageIterator implements Iterator<DatanodeStorageInfo> {
@Override
public boolean hasNext() {
return blockInfo != null && nextIdx < blockInfo.getCapacity()
&& blockInfo.getDatanode(nextIdx) != null;
if (blockInfo == null) {
return false;
}
while (nextIdx < blockInfo.getCapacity() &&
blockInfo.getDatanode(nextIdx) == null) {
// note that for striped blocks there may be null in the triplets
nextIdx++;
}
return nextIdx < blockInfo.getCapacity();
}
@Override
@ -123,10 +130,13 @@ void removeBlock(Block block) {
return;
blockInfo.setBlockCollection(null);
// TODO: fix this logic for block group
for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) {
final int size = blockInfo instanceof BlockInfoContiguous ?
blockInfo.numNodes() : blockInfo.getCapacity();
for(int idx = size - 1; idx >= 0; idx--) {
DatanodeDescriptor dn = blockInfo.getDatanode(idx);
dn.removeBlock(blockInfo); // remove from the list and wipe the location
if (dn != null) {
dn.removeBlock(blockInfo); // remove from the list and wipe the location
}
}
}

View File

@ -544,7 +544,7 @@ private void processBlocksForDecomInternal(
int underReplicatedInOpenFiles = 0;
while (it.hasNext()) {
numBlocksChecked++;
final BlockInfoContiguous block = it.next();
final BlockInfo block = it.next();
// Remove the block from the list if it's no longer in the block map,
// e.g. the containing file has been deleted
if (blockManager.blocksMap.getStoredBlock(block) == null) {
@ -578,8 +578,9 @@ private void processBlocksForDecomInternal(
}
// Even if the block is under-replicated,
// it doesn't block decommission if it's sufficiently replicated
if (isSufficientlyReplicated(block, bc, num)) {
// it doesn't block decommission if it's sufficiently replicated
BlockInfoContiguous blk = (BlockInfoContiguous) block;
if (isSufficientlyReplicated(blk, bc, num)) {
if (pruneSufficientlyReplicated) {
it.remove();
}
@ -588,7 +589,7 @@ private void processBlocksForDecomInternal(
// We've found an insufficiently replicated block.
if (insufficientlyReplicated != null) {
insufficientlyReplicated.add(block);
insufficientlyReplicated.add(blk);
}
// Log if this is our first time through
if (firstReplicationLog) {

View File

@ -471,21 +471,20 @@ static INodeFile addFileForEditLog(
assert fsd.hasWriteLock();
if (underConstruction) {
newNode = newINodeFile(id, permissions, modificationTime,
modificationTime, replication,
preferredBlockSize,
storagePolicyId);
modificationTime, replication, preferredBlockSize, storagePolicyId);
newNode.toUnderConstruction(clientName, clientMachine);
} else {
newNode = newINodeFile(id, permissions, modificationTime,
atime, replication,
preferredBlockSize,
storagePolicyId);
newNode = newINodeFile(id, permissions, modificationTime, atime,
replication, preferredBlockSize, storagePolicyId);
}
newNode.setLocalName(localName);
try {
INodesInPath iip = fsd.addINode(existing, newNode);
if (iip != null) {
if (newNode.isStriped()) {
newNode.addStripedBlocksFeature();
}
if (aclEntries != null) {
AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
}
@ -553,7 +552,7 @@ private static INodesInPath addFile(
long modTime = now();
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
modTime, modTime, replication, preferredBlockSize);
modTime, modTime, replication, preferredBlockSize);
newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
newNode.toUnderConstruction(clientName, clientMachine);
@ -561,12 +560,15 @@ private static INodesInPath addFile(
fsd.writeLock();
try {
newiip = fsd.addINode(existing, newNode);
if (newiip != null && newNode.isStriped()) {
newNode.addStripedBlocksFeature();
}
} finally {
fsd.writeUnlock();
}
if (newiip == null) {
NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
existing.getPath() + "/" + localName);
existing.getPath() + "/" + localName);
return null;
}

View File

@ -38,7 +38,9 @@
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -413,7 +415,8 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
// Update the salient file attributes.
newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
updateBlocks(fsDir, addCloseOp, iip, newFile);
// TODO whether the file is striped should later be retrieved from iip
updateBlocks(fsDir, addCloseOp, iip, newFile, newFile.isStriped());
break;
}
case OP_CLOSE: {
@ -433,7 +436,8 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
// Update the salient file attributes.
file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
updateBlocks(fsDir, addCloseOp, iip, file);
// TODO whether the file is striped should later be retrieved from iip
updateBlocks(fsDir, addCloseOp, iip, file, file.isStriped());
// Now close the file
if (!file.isUnderConstruction() &&
@ -491,7 +495,8 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
INodesInPath iip = fsDir.getINodesInPath(path, true);
INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
// Update in-memory data structures
updateBlocks(fsDir, updateOp, iip, oldFile);
// TODO whether the file is striped should later be retrieved from iip
updateBlocks(fsDir, updateOp, iip, oldFile, oldFile.isStriped());
if (toAddRetryCache) {
fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
@ -507,7 +512,8 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
}
INodeFile oldFile = INodeFile.valueOf(fsDir.getINode(path), path);
// add the new block to the INodeFile
addNewBlock(addBlockOp, oldFile);
// TODO whether the file is striped should later be retrieved from iip
addNewBlock(addBlockOp, oldFile, oldFile.isStriped());
break;
}
case OP_SET_REPLICATION: {
@ -787,8 +793,15 @@ fsDir, renameReservedPathsOnUpgrade(timesOp.path, logVersion),
}
case OP_ALLOCATE_BLOCK_ID: {
AllocateBlockIdOp allocateBlockIdOp = (AllocateBlockIdOp) op;
fsNamesys.getBlockIdManager().setLastAllocatedBlockId(
allocateBlockIdOp.blockId);
if (BlockIdManager.isStripedBlockID(allocateBlockIdOp.blockId)) {
// ALLOCATE_BLOCK_ID is added for sequential block id, thus if the id
// is negative, it must belong to striped blocks
fsNamesys.getBlockIdManager().setLastAllocatedStripedBlockId(
allocateBlockIdOp.blockId);
} else {
fsNamesys.getBlockIdManager().setLastAllocatedContiguousBlockId(
allocateBlockIdOp.blockId);
}
break;
}
case OP_ROLLING_UPGRADE_START: {
@ -940,9 +953,9 @@ private static String formatEditLogReplayError(EditLogInputStream in,
/**
* Add a new block into the given INodeFile
* TODO support adding striped block
*/
private void addNewBlock(AddBlockOp op, INodeFile file) throws IOException {
private void addNewBlock(AddBlockOp op, INodeFile file, boolean isStriped)
throws IOException {
BlockInfo[] oldBlocks = file.getBlocks();
Block pBlock = op.getPenultimateBlock();
Block newBlock= op.getLastBlock();
@ -950,7 +963,7 @@ private void addNewBlock(AddBlockOp op, INodeFile file) throws IOException {
if (pBlock != null) { // the penultimate block is not null
assert oldBlocks != null && oldBlocks.length > 0;
// compare pBlock with the last block of oldBlocks
Block oldLastBlock = oldBlocks[oldBlocks.length - 1];
BlockInfo oldLastBlock = oldBlocks[oldBlocks.length - 1];
if (oldLastBlock.getBlockId() != pBlock.getBlockId()
|| oldLastBlock.getGenerationStamp() != pBlock.getGenerationStamp()) {
throw new IOException(
@ -960,29 +973,33 @@ private void addNewBlock(AddBlockOp op, INodeFile file) throws IOException {
}
oldLastBlock.setNumBytes(pBlock.getNumBytes());
if (oldLastBlock instanceof BlockInfoContiguousUnderConstruction) {
fsNamesys.getBlockManager().forceCompleteBlock(file,
(BlockInfoContiguousUnderConstruction) oldLastBlock);
if (!oldLastBlock.isComplete()) {
fsNamesys.getBlockManager().forceCompleteBlock(file, oldLastBlock);
fsNamesys.getBlockManager().processQueuedMessagesForBlock(pBlock);
}
} else { // the penultimate block is null
Preconditions.checkState(oldBlocks == null || oldBlocks.length == 0);
}
// add the new block
BlockInfoContiguous newBI = new BlockInfoContiguousUnderConstruction(
newBlock, file.getPreferredBlockReplication());
fsNamesys.getBlockManager().addBlockCollection(newBI, file);
file.addBlock(newBI);
final BlockInfo newBlockInfo;
if (isStriped) {
newBlockInfo = new BlockInfoStripedUnderConstruction(newBlock,
HdfsConstants.NUM_DATA_BLOCKS, HdfsConstants.NUM_PARITY_BLOCKS);
} else {
newBlockInfo = new BlockInfoContiguousUnderConstruction(newBlock,
file.getPreferredBlockReplication());
}
fsNamesys.getBlockManager().addBlockCollection(newBlockInfo, file);
file.addBlock(newBlockInfo);
fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
}
/**
* Update in-memory data structures with new block information.
* TODO support adding striped block
* @throws IOException
*/
private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
INodesInPath iip, INodeFile file) throws IOException {
INodesInPath iip, INodeFile file, boolean isStriped) throws IOException {
// Update its block list
BlockInfo[] oldBlocks = file.getBlocks();
Block[] newBlocks = op.getBlocks();
@ -1011,11 +1028,10 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
oldBlock.getGenerationStamp() != newBlock.getGenerationStamp();
oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
if (oldBlock instanceof BlockInfoContiguousUnderConstruction &&
if (!oldBlock.isComplete() &&
(!isLastBlock || op.shouldCompleteLastBlock())) {
changeMade = true;
fsNamesys.getBlockManager().forceCompleteBlock(file,
(BlockInfoContiguousUnderConstruction) oldBlock);
fsNamesys.getBlockManager().forceCompleteBlock(file, oldBlock);
}
if (changeMade) {
// The state or gen-stamp of the block has changed. So, we may be
@ -1045,13 +1061,18 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
// We're adding blocks
for (int i = oldBlocks.length; i < newBlocks.length; i++) {
Block newBlock = newBlocks[i];
BlockInfoContiguous newBI;
final BlockInfo newBI;
if (!op.shouldCompleteLastBlock()) {
// TODO: shouldn't this only be true for the last block?
// what about an old-version fsync() where fsync isn't called
// until several blocks in?
newBI = new BlockInfoContiguousUnderConstruction(
newBlock, file.getPreferredBlockReplication());
if (isStriped) {
newBI = new BlockInfoStripedUnderConstruction(newBlock,
HdfsConstants.NUM_DATA_BLOCKS, HdfsConstants.NUM_PARITY_BLOCKS);
} else {
newBI = new BlockInfoContiguousUnderConstruction(newBlock,
file.getPreferredBlockReplication());
}
} else {
// OP_CLOSE should add finalized blocks. This code path
// is only executed when loading edits written by prior

View File

@ -359,7 +359,14 @@ public void load(File curFile) throws IOException {
// read the max sequential block ID.
long maxSequentialBlockId = in.readLong();
namesystem.getBlockIdManager().setLastAllocatedBlockId(maxSequentialBlockId);
namesystem.getBlockIdManager().setLastAllocatedContiguousBlockId(
maxSequentialBlockId);
if (NameNodeLayoutVersion.supports(
NameNodeLayoutVersion.Feature.ERASURE_CODING, imgVersion)) {
final long maxStripedBlockId = in.readLong();
namesystem.getBlockIdManager().setLastAllocatedStripedBlockId(
maxStripedBlockId);
}
} else {
long startingGenStamp = namesystem.getBlockIdManager()
@ -1269,7 +1276,8 @@ void save(File newFile, FSImageCompression compression) throws IOException {
out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV1());
out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV2());
out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampAtblockIdSwitch());
out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedBlockId());
out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedContiguousBlockId());
out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedStripedBlockId());
out.writeLong(context.getTxId());
out.writeLong(sourceNamesystem.dir.getLastInodeId());

View File

@ -643,8 +643,9 @@ private void save(OutputStream out, INodeFile n) throws IOException {
INodeSection.INodeFile.Builder b = buildINodeFile(n,
parent.getSaverContext());
if (n.getBlocks() != null) {
for (Block block : n.getBlocks()) {
BlockInfoContiguous[] cBlks = n.getContiguousBlocks();
if (cBlks != null) {
for (Block block : cBlks) {
b.addBlocks(PBHelper.convert(block));
}
}

View File

@ -296,7 +296,11 @@ private void loadNameSystemSection(InputStream in) throws IOException {
blockIdManager.setGenerationStampV1(s.getGenstampV1());
blockIdManager.setGenerationStampV2(s.getGenstampV2());
blockIdManager.setGenerationStampV1Limit(s.getGenstampV1Limit());
blockIdManager.setLastAllocatedBlockId(s.getLastAllocatedBlockId());
blockIdManager.setLastAllocatedContiguousBlockId(s.getLastAllocatedBlockId());
if (s.hasLastAllocatedStripedBlockId()) {
blockIdManager.setLastAllocatedStripedBlockId(
s.getLastAllocatedStripedBlockId());
}
imgTxId = s.getTransactionId();
if (s.hasRollingUpgradeStartTime()
&& fsn.getFSImage().hasRollbackFSImage()) {
@ -536,7 +540,8 @@ private void saveNameSystemSection(FileSummary.Builder summary)
.setGenstampV1(blockIdManager.getGenerationStampV1())
.setGenstampV1Limit(blockIdManager.getGenerationStampV1Limit())
.setGenstampV2(blockIdManager.getGenerationStampV2())
.setLastAllocatedBlockId(blockIdManager.getLastAllocatedBlockId())
.setLastAllocatedBlockId(blockIdManager.getLastAllocatedContiguousBlockId())
.setLastAllocatedStripedBlockId(blockIdManager.getLastAllocatedStripedBlockId())
.setTransactionId(context.getTxId());
// We use the non-locked version of getNamespaceInfo here since

View File

@ -206,7 +206,6 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@ -2099,7 +2098,7 @@ Block prepareFileForTruncate(INodesInPath iip,
boolean shouldRecoverNow = (newBlock == null);
BlockInfo oldBlock = file.getLastBlock();
assert oldBlock instanceof BlockInfoContiguous;
assert !oldBlock.isStriped();
boolean shouldCopyOnTruncate = shouldCopyOnTruncate(file,
(BlockInfoContiguous) oldBlock);
@ -3266,7 +3265,7 @@ void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
BlockInfo bi = getStoredBlock(b);
if (bi.isComplete()) {
numRemovedComplete++;
if (bi.numNodes() >= blockManager.minReplication) {
if (blockManager.checkMinStorage(bi, bi.numNodes())) {
numRemovedSafe++;
}
}
@ -3495,7 +3494,7 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
curBlock = blocks[nrCompleteBlocks];
if(!curBlock.isComplete())
break;
assert blockManager.checkMinReplication(curBlock) :
assert blockManager.checkMinStorage(curBlock) :
"A COMPLETE block is not minimally replicated in " + src;
}
@ -3530,8 +3529,8 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
// If penultimate block doesn't exist then its minReplication is met
boolean penultimateBlockMinReplication = penultimateBlock == null ||
blockManager.checkMinReplication(penultimateBlock);
boolean penultimateBlockMinStorage = penultimateBlock == null ||
blockManager.checkMinStorage(penultimateBlock);
switch(lastBlockState) {
case COMPLETE:
@ -3539,8 +3538,8 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
break;
case COMMITTED:
// Close file if committed blocks are minimally replicated
if(penultimateBlockMinReplication &&
blockManager.checkMinReplication(lastBlock)) {
if(penultimateBlockMinStorage &&
blockManager.checkMinStorage(lastBlock)) {
finalizeINodeFileUnderConstruction(src, pendingFile,
iip.getLatestSnapshotId());
NameNode.stateChangeLog.warn("BLOCK*"
@ -3640,6 +3639,7 @@ void commitOrCompleteLastBlock(
}
// Adjust disk space consumption if required
// TODO: support EC files
final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();
if (diff > 0) {
try {
@ -5634,7 +5634,7 @@ private long nextBlockId(boolean isStriped) throws IOException {
assert hasWriteLock();
checkNameNodeSafeMode("Cannot get next block ID");
final long blockId = isStriped ?
blockIdManager.nextBlockGroupId() : blockIdManager.nextBlockId();
blockIdManager.nextStripedBlockId() : blockIdManager.nextContiguousBlockId();
getEditLog().logAllocateBlockId(blockId);
// NB: callers sync the log
return blockId;

View File

@ -86,7 +86,7 @@ public static INodeFile valueOf(INode inode, String path, boolean acceptNull)
*/
static enum HeaderFormat {
PREFERRED_BLOCK_SIZE(null, 48, 1),
REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 1),
REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 0),
STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicySuite.ID_BIT_LENGTH,
0);
@ -261,10 +261,10 @@ private void assertAllBlocksComplete(BlockInfo[] blks) {
public void setBlock(int index, BlockInfo blk) {
FileWithStripedBlocksFeature sb = getStripedBlocksFeature();
if (sb == null) {
assert blk instanceof BlockInfoContiguous;
assert !blk.isStriped();
this.blocks[index] = (BlockInfoContiguous) blk;
} else {
assert blk instanceof BlockInfoStriped;
assert blk.isStriped();
assert hasNoContiguousBlock();
sb.setBlock(index, (BlockInfoStriped) blk);
}
@ -282,12 +282,12 @@ public void convertLastBlockToUC(BlockInfo lastBlock,
final BlockInfo ucBlock;
FileWithStripedBlocksFeature sb = getStripedBlocksFeature();
if (sb == null) {
assert lastBlock instanceof BlockInfoContiguous;
assert !lastBlock.isStriped();
ucBlock = ((BlockInfoContiguous) lastBlock)
.convertToBlockUnderConstruction(UNDER_CONSTRUCTION, locations);
} else {
assert hasNoContiguousBlock();
assert lastBlock instanceof BlockInfoStriped;
assert lastBlock.isStriped();
ucBlock = ((BlockInfoStriped) lastBlock)
.convertToBlockUnderConstruction(UNDER_CONSTRUCTION, locations);
}
@ -548,7 +548,7 @@ void concatBlocks(INodeFile[] inodes) {
/**
* add a contiguous block to the block list
*/
void addBlock(BlockInfoContiguous newblock) {
private void addContiguousBlock(BlockInfoContiguous newblock) {
if (this.blocks == null) {
this.setContiguousBlocks(new BlockInfoContiguous[]{newblock});
} else {
@ -560,6 +560,19 @@ void addBlock(BlockInfoContiguous newblock) {
}
}
/** add a striped or contiguous block */
void addBlock(BlockInfo newblock) {
FileWithStripedBlocksFeature sb = getStripedBlocksFeature();
if (sb == null) {
assert !newblock.isStriped();
addContiguousBlock((BlockInfoContiguous) newblock);
} else {
assert newblock.isStriped();
assert hasNoContiguousBlock();
sb.addBlock((BlockInfoStriped) newblock);
}
}
/** Set the blocks. */
public void setContiguousBlocks(BlockInfoContiguous[] blocks) {
this.blocks = blocks;

View File

@ -72,7 +72,8 @@ public static enum Feature implements LayoutFeature {
BLOCK_STORAGE_POLICY(-60, "Block Storage policy"),
TRUNCATE(-61, "Truncate"),
APPEND_NEW_BLOCK(-62, "Support appending to new block"),
QUOTA_BY_STORAGE_TYPE(-63, "Support quota for specific storage types");
QUOTA_BY_STORAGE_TYPE(-63, "Support quota for specific storage types"),
ERASURE_CODING(-64, "Support erasure coding");
private final FeatureInfo info;

View File

@ -73,6 +73,7 @@ message NameSystemSection {
optional uint64 lastAllocatedBlockId = 5;
optional uint64 transactionId = 6;
optional uint64 rollingUpgradeStartTime = 7;
optional uint64 lastAllocatedStripedBlockId = 8;
}
/**

View File

@ -1,85 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
public class TestAddBlockgroup {
public static final Log LOG = LogFactory.getLog(TestAddBlockgroup.class);
private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS +
HdfsConstants.NUM_PARITY_BLOCKS;
private final short NUM_DATANODES = GROUP_SIZE;
private static final int BLOCKSIZE = 1024;
private static final short REPLICATION = 3;
private MiniDFSCluster cluster;
private Configuration conf;
@Before
public void setup() throws IOException {
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
.build();
cluster.waitActive();
cluster.getFileSystem().setStoragePolicy(new Path("/"),
HdfsConstants.EC_STORAGE_POLICY_NAME);
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testAddBlockGroup() throws Exception {
DistributedFileSystem fs = cluster.getFileSystem();
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
final Path file1 = new Path("/file1");
DFSTestUtil.createFile(fs, file1, BLOCKSIZE * 2, REPLICATION, 0L);
INodeFile file1Node = fsdir.getINode4Write(file1.toString()).asFile();
BlockInfo[] file1Blocks = file1Node.getBlocks();
assertEquals(2, file1Blocks.length);
assertEquals(GROUP_SIZE, file1Blocks[0].numNodes());
assertEquals(HdfsConstants.MAX_BLOCKS_IN_GROUP,
file1Blocks[1].getBlockId() - file1Blocks[0].getBlockId());
}
}

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
public class TestAddStripedBlocks {
private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS +
HdfsConstants.NUM_PARITY_BLOCKS;
private MiniDFSCluster cluster;
private DistributedFileSystem dfs;
@Before
public void setup() throws IOException {
cluster = new MiniDFSCluster.Builder(new HdfsConfiguration())
.numDataNodes(GROUP_SIZE).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
dfs.setStoragePolicy(new Path("/"), HdfsConstants.EC_STORAGE_POLICY_NAME);
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testAddStripedBlock() throws Exception {
final Path file = new Path("/file1");
// create an empty file
FSDataOutputStream out = null;
try {
out = dfs.create(file, (short) 1);
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
LocatedBlock newBlock = cluster.getNamesystem().getAdditionalBlock(
file.toString(), fileNode.getId(), dfs.getClient().getClientName(),
null, null, null);
assertEquals(GROUP_SIZE, newBlock.getLocations().length);
assertEquals(GROUP_SIZE, newBlock.getStorageIDs().length);
BlockInfo[] blocks = fileNode.getBlocks();
assertEquals(1, blocks.length);
Assert.assertTrue(blocks[0].isStriped());
checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), true);
} finally {
IOUtils.cleanup(null, out);
}
// restart NameNode to check editlog
cluster.restartNameNode(true);
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
BlockInfo[] blocks = fileNode.getBlocks();
assertEquals(1, blocks.length);
Assert.assertTrue(blocks[0].isStriped());
checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), false);
// save namespace, restart namenode, and check
dfs = cluster.getFileSystem();
dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
dfs.saveNamespace();
dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
cluster.restartNameNode(true);
fsdir = cluster.getNamesystem().getFSDirectory();
fileNode = fsdir.getINode4Write(file.toString()).asFile();
blocks = fileNode.getBlocks();
assertEquals(1, blocks.length);
Assert.assertTrue(blocks[0].isStriped());
checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), false);
}
private void checkStripedBlockUC(BlockInfoStriped block,
boolean checkReplica) {
assertEquals(0, block.numNodes());
Assert.assertFalse(block.isComplete());
Assert.assertEquals(HdfsConstants.NUM_DATA_BLOCKS, block.getDataBlockNum());
Assert.assertEquals(HdfsConstants.NUM_PARITY_BLOCKS,
block.getParityBlockNum());
Assert.assertEquals(0,
block.getBlockId() & HdfsConstants.BLOCK_GROUP_INDEX_MASK);
final BlockInfoStripedUnderConstruction blockUC =
(BlockInfoStripedUnderConstruction) block;
Assert.assertEquals(HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
blockUC.getBlockUCState());
if (checkReplica) {
Assert.assertEquals(GROUP_SIZE, blockUC.getNumExpectedLocations());
DatanodeStorageInfo[] storages = blockUC.getExpectedStorageLocations();
for (DataNode dn : cluster.getDataNodes()) {
Assert.assertTrue(includeDataNode(dn.getDatanodeId(), storages));
}
}
}
private boolean includeDataNode(DatanodeID dn, DatanodeStorageInfo[] storages) {
for (DatanodeStorageInfo storage : storages) {
if (storage.getDatanodeDescriptor().equals(dn)) {
return true;
}
}
return false;
}
}