HDFS-8355. Erasure Coding: Refactor BlockInfo and BlockInfoUnderConstruction. Contributed by Tsz Wo Nicholas Sze.

This commit is contained in:
Jing Zhao 2015-05-08 13:56:56 -07:00 committed by Zhe Zhang
parent 9da927540f
commit 51ea117f88
7 changed files with 95 additions and 153 deletions

View File

@ -192,3 +192,6 @@
HDFS-8289. Erasure Coding: add ECSchema to HdfsFileStatus. (Yong Zhang via
jing9)
HDFS-8355. Erasure Coding: Refactor BlockInfo and BlockInfoUnderConstruction.
(Tsz Wo Nicholas Sze via jing9)

View File

@ -17,13 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.LinkedList;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.util.LightWeightGSet;
import java.io.IOException;
import java.util.LinkedList;
/**
* For a given block (or an erasure coding block group), BlockInfo class
* maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes
@ -336,94 +335,4 @@ public abstract class BlockInfo extends Block
public void setNext(LightWeightGSet.LinkedElement next) {
this.nextLinkedElement = next;
}
static BlockInfo copyOf(BlockInfo b) {
if (!b.isStriped()) {
return new BlockInfoContiguous((BlockInfoContiguous) b);
} else {
return new BlockInfoStriped((BlockInfoStriped) b);
}
}
static BlockInfo convertToCompleteBlock(BlockInfo blk) throws IOException {
if (blk instanceof BlockInfoContiguousUnderConstruction) {
return ((BlockInfoContiguousUnderConstruction) blk)
.convertToCompleteBlock();
} else if (blk instanceof BlockInfoStripedUnderConstruction) {
return ((BlockInfoStripedUnderConstruction) blk).convertToCompleteBlock();
} else {
return blk;
}
}
static void commitBlock(BlockInfo blockInfo, Block reported)
throws IOException {
if (blockInfo instanceof BlockInfoContiguousUnderConstruction) {
((BlockInfoContiguousUnderConstruction) blockInfo).commitBlock(reported);
} else if (blockInfo instanceof BlockInfoStripedUnderConstruction) {
((BlockInfoStripedUnderConstruction) blockInfo).commitBlock(reported);
}
}
static void addReplica(BlockInfo ucBlock, DatanodeStorageInfo storageInfo,
Block reportedBlock, HdfsServerConstants.ReplicaState reportedState) {
assert ucBlock instanceof BlockInfoContiguousUnderConstruction ||
ucBlock instanceof BlockInfoStripedUnderConstruction;
if (ucBlock instanceof BlockInfoContiguousUnderConstruction) {
((BlockInfoContiguousUnderConstruction) ucBlock).addReplicaIfNotPresent(
storageInfo, reportedBlock, reportedState);
} else { // StripedUC
((BlockInfoStripedUnderConstruction) ucBlock).addReplicaIfNotPresent(
storageInfo, reportedBlock, reportedState);
}
}
static int getNumExpectedLocations(BlockInfo ucBlock) {
assert ucBlock instanceof BlockInfoContiguousUnderConstruction ||
ucBlock instanceof BlockInfoStripedUnderConstruction;
if (ucBlock instanceof BlockInfoContiguousUnderConstruction) {
return ((BlockInfoContiguousUnderConstruction) ucBlock)
.getNumExpectedLocations();
} else { // StripedUC
return ((BlockInfoStripedUnderConstruction) ucBlock)
.getNumExpectedLocations();
}
}
public static DatanodeStorageInfo[] getExpectedStorageLocations(
BlockInfo ucBlock) {
assert ucBlock instanceof BlockInfoContiguousUnderConstruction ||
ucBlock instanceof BlockInfoStripedUnderConstruction;
if (ucBlock instanceof BlockInfoContiguousUnderConstruction) {
return ((BlockInfoContiguousUnderConstruction) ucBlock)
.getExpectedStorageLocations();
} else { // StripedUC
return ((BlockInfoStripedUnderConstruction) ucBlock)
.getExpectedStorageLocations();
}
}
public static void setExpectedLocations(BlockInfo ucBlock,
DatanodeStorageInfo[] targets) {
assert ucBlock instanceof BlockInfoContiguousUnderConstruction ||
ucBlock instanceof BlockInfoStripedUnderConstruction;
if (ucBlock instanceof BlockInfoContiguousUnderConstruction) {
((BlockInfoContiguousUnderConstruction) ucBlock)
.setExpectedLocations(targets);
} else { // StripedUC
((BlockInfoStripedUnderConstruction) ucBlock)
.setExpectedLocations(targets);
}
}
public static long getBlockRecoveryId(BlockInfo ucBlock) {
assert ucBlock instanceof BlockInfoContiguousUnderConstruction ||
ucBlock instanceof BlockInfoStripedUnderConstruction;
if (ucBlock instanceof BlockInfoContiguousUnderConstruction) {
return ((BlockInfoContiguousUnderConstruction) ucBlock)
.getBlockRecoveryId();
} else { // StripedUC
return ((BlockInfoStripedUnderConstruction) ucBlock).getBlockRecoveryId();
}
}
}

View File

@ -80,16 +80,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous
setExpectedLocations(targets);
}
/**
* Convert an under construction block to a complete block.
*
* @return BlockInfoContiguous - a complete block.
* @throws IOException if the state of the block
* (the generation stamp and the length) has not been committed by
* the client or it does not have at least a minimal number of replicas
* reported from data-nodes.
*/
BlockInfoContiguous convertToCompleteBlock() throws IOException {
@Override
public BlockInfoContiguous convertToCompleteBlock() throws IOException {
assert getBlockUCState() != BlockUCState.COMPLETE :
"Trying to convert a COMPLETE block";
return new BlockInfoContiguous(this);
@ -170,13 +162,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous
}
}
/**
* Commit block's length and generation stamp as reported by the client.
* Set block state to {@link BlockUCState#COMMITTED}.
* @param block - contains client reported block length and generation
* @throws IOException if block ids are inconsistent.
*/
void commitBlock(Block block) throws IOException {
@Override
public void commitBlock(Block block) throws IOException {
if(getBlockId() != block.getBlockId())
throw new IOException("Trying to commit inconsistent block: id = "
+ block.getBlockId() + ", expected id = " + getBlockId());
@ -235,9 +222,9 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous
}
}
void addReplicaIfNotPresent(DatanodeStorageInfo storage,
Block block,
ReplicaState rState) {
@Override
public void addReplicaIfNotPresent(DatanodeStorageInfo storage,
Block block, ReplicaState rState) {
Iterator<ReplicaUnderConstruction> it = replicas.iterator();
while (it.hasNext()) {
ReplicaUnderConstruction r = it.next();

View File

@ -73,16 +73,8 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped
setExpectedLocations(targets);
}
/**
* Convert an under construction striped block to a complete striped block.
*
* @return BlockInfoStriped - a complete block.
* @throws IOException if the state of the block
* (the generation stamp and the length) has not been committed by
* the client or it does not have at least a minimal number of replicas
* reported from data-nodes.
*/
BlockInfoStriped convertToCompleteBlock() throws IOException {
@Override
public BlockInfoStriped convertToCompleteBlock() throws IOException {
assert getBlockUCState() != COMPLETE :
"Trying to convert a COMPLETE block";
return new BlockInfoStriped(this);
@ -177,12 +169,8 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped
}
}
/**
* Commit block's length and generation stamp as reported by the client.
* Set block state to {@link BlockUCState#COMMITTED}.
* @param block - contains client reported block length and generation
*/
void commitBlock(Block block) throws IOException {
@Override
public void commitBlock(Block block) throws IOException {
if (getBlockId() != block.getBlockId()) {
throw new IOException("Trying to commit inconsistent block: id = "
+ block.getBlockId() + ", expected id = " + getBlockId());
@ -242,8 +230,9 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped
}
}
void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block reportedBlock,
ReplicaState rState) {
@Override
public void addReplicaIfNotPresent(DatanodeStorageInfo storage,
Block reportedBlock, ReplicaState rState) {
if (replicas == null) {
replicas = new ReplicaUnderConstruction[1];
replicas[0] = new ReplicaUnderConstruction(reportedBlock, storage, rState);

View File

@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
public interface BlockInfoUnderConstruction {
/**
@ -54,4 +58,27 @@ public interface BlockInfoUnderConstruction {
* make it primary.
*/
public void initializeBlockRecovery(long recoveryId);
/** Add the reported replica if it is not already in the replica list. */
public void addReplicaIfNotPresent(DatanodeStorageInfo storage,
Block reportedBlock, ReplicaState rState);
/**
* Commit block's length and generation stamp as reported by the client.
* Set block state to {@link BlockUCState#COMMITTED}.
* @param block - contains client reported block length and generation
* @throws IOException if block ids are inconsistent.
*/
public void commitBlock(Block block) throws IOException;
/**
* Convert an under construction block to a complete block.
*
* @return a complete block.
* @throws IOException
* if the state of the block (the generation stamp and the length)
* has not been committed by the client or it does not have at least
* a minimal number of replicas reported from data-nodes.
*/
public BlockInfo convertToCompleteBlock() throws IOException;
}

View File

@ -637,13 +637,19 @@ public class BlockManager {
*/
private static boolean commitBlock(final BlockInfo block,
final Block commitBlock) throws IOException {
if (block.getBlockUCState() == BlockUCState.COMMITTED)
return false;
assert block.getNumBytes() <= commitBlock.getNumBytes() :
"commitBlock length is less than the stored one "
+ commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
BlockInfo.commitBlock(block, commitBlock);
return true;
if (block instanceof BlockInfoUnderConstruction
&& block.getBlockUCState() != BlockUCState.COMMITTED) {
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)block;
assert block.getNumBytes() <= commitBlock.getNumBytes() :
"commitBlock length is less than the stored one "
+ commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
uc.commitBlock(commitBlock);
return true;
}
return false;
}
/**
@ -700,7 +706,10 @@ public class BlockManager {
"Cannot complete block: block has not been COMMITTED by the client");
}
final BlockInfo completeBlock = BlockInfo.convertToCompleteBlock(curBlock);
final BlockInfo completeBlock
= !(curBlock instanceof BlockInfoUnderConstruction)? curBlock
: ((BlockInfoUnderConstruction)curBlock).convertToCompleteBlock();
// replace penultimate block in file
bc.setBlock(blkIndex, completeBlock);
@ -738,7 +747,9 @@ public class BlockManager {
*/
public BlockInfo forceCompleteBlock(final BlockCollection bc,
final BlockInfo block) throws IOException {
BlockInfo.commitBlock(block, block);
if (block instanceof BlockInfoUnderConstruction) {
((BlockInfoUnderConstruction)block).commitBlock(block);
}
return completeBlock(bc, block, true);
}
@ -2250,12 +2261,13 @@ public class BlockManager {
// If block is under construction, add this replica to its list
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
BlockInfo.addReplica(storedBlock, storageInfo, iblk, reportedState);
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)storedBlock;
uc.addReplicaIfNotPresent(storageInfo, iblk, reportedState);
// OpenFileBlocks only inside snapshots also will be added to safemode
// threshold. So we need to update such blocks to safemode
// refer HDFS-5283
if (namesystem.isInSnapshot(storedBlock.getBlockCollection())) {
int numOfReplicas = BlockInfo.getNumExpectedLocations(storedBlock);
int numOfReplicas = uc.getNumExpectedLocations();
namesystem.incrementSafeBlockCount(numOfReplicas, storedBlock);
}
//and fall through to next clause
@ -2617,7 +2629,8 @@ public class BlockManager {
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
DatanodeStorageInfo storageInfo) throws IOException {
BlockInfo block = ucBlock.storedBlock;
BlockInfo.addReplica(block, storageInfo, ucBlock.reportedBlock,
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)block;
uc.addReplicaIfNotPresent(storageInfo, ucBlock.reportedBlock,
ucBlock.reportedState);
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
@ -3945,6 +3958,20 @@ public class BlockManager {
null);
}
public LocatedBlock newLocatedBlock(ExtendedBlock eb, BlockInfo info,
DatanodeStorageInfo[] locs, long offset) throws IOException {
final LocatedBlock lb;
if (info.isStriped()) {
lb = newLocatedStripedBlock(eb, locs,
((BlockInfoStripedUnderConstruction)info).getBlockIndices(),
offset, false);
} else {
lb = newLocatedBlock(eb, locs, offset, false);
}
setBlockToken(lb, BlockTokenIdentifier.AccessMode.WRITE);
return lb;
}
/**
* This class is used internally by {@link this#computeRecoveryWorkForBlocks}
* to represent a task to recover a block through replication or erasure

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
@ -60,9 +59,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROL
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@ -88,8 +87,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROU
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import static org.apache.hadoop.util.Time.now;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
@ -140,6 +139,7 @@ import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
@ -152,6 +152,7 @@ import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
@ -160,10 +161,8 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
@ -172,6 +171,7 @@ import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
@ -180,15 +180,15 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingZoneInfo;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@ -209,7 +209,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@ -217,7 +217,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
@ -3799,7 +3798,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
truncatedBlock = iFile.getLastBlock();
long recoveryId = BlockInfo.getBlockRecoveryId(truncatedBlock);
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)truncatedBlock;
final long recoveryId = uc.getBlockRecoveryId();
copyTruncate = truncatedBlock.getBlockId() != storedBlock.getBlockId();
if(recoveryId != newgenerationstamp) {
throw new IOException("The recovery id " + newgenerationstamp